.NET NMS - 1.1


Compatibility: v2 v3 Express
What's new? Release Notes
ID: com.castsoftware.dotnet.nms

Description

This extension provides support for Apache NMS .NET APIs which are responsible for consumer and publisher operations in .NET applications. If your C# application utilizes Apache NMS for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.

Supported libraries

Library Version Supported
Apache NMSexternal link up to: 2.1.0
Apache NMS ActiveMQexternal link up to 2.1.0
Apache NMS AMQPexternal link up to 2.2.0
ArtemisNetClientexternal link up to 3.1.1

Transactions

Transaction support is derived from metamodel concepts used to build CAST Imaging Blueprint and structural transaction flows. Entry Points start transactions; Exit Points include both output/boundary concepts and Data Entities manipulated by transactions.

Role Support Breakdown
Entry Point
  • Message Queue Subscriber
Exit Point
  • Message Queue Publisher

Data version: 1.0.2-funcrel

ISO 5055 Structural Rules

Quality support is based on ISO 5055 structural rules available for the selected extension version.

Reliability Maintainability Security Performance Efficiency

Data version: 1.0.2-funcrel

Dependencies with other extensions

Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET NMS extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Imaging Console):

  • CAST AIP Internal extension (internal technical extension)
  • Web Services Linker

Download and installation instructions

The extension will be automatically downloaded and installed in CAST Console. You can manage the extension using the Application - Extensions interface.

What results can you expect?

Objects

Icon Description
Apache NMS ActiveMQ publisher
Apache NMS ActiveMQ Receiver
Apache NMS ActiveMQ unknown publisher
Apache NMS ActiveMQ unknown Receiver
Apache NMS AMQP publisher
Apache NMS AMQP Receiver
Apache NMS AMQP unknown publisher
Apache NMS AMQP unknown Receiver
Apache NMS Artemis publisher
Apache NMS Artemis Receiver
Apache NMS Artemis unknown publisher
Apache NMS Artemis unknown Receiver
Link Type Source and Destination of link Supported APIs
callLink callLink between the caller C# method and the Publisher object Apache.NMS.IMessageProducer.Send
Apache.NMS.IMessageProducer.SendAsync
Apache.NMS.ActiveMQ.IMessageProducer.Send
Apache.NMS.ActiveMQ.IMessageProducer.SendAsync
Apache.NMS.AMQP.NmsProducer.Send
Apache.NMS.AMQP.NmsProducer.SendAsync
Apache.NMS.AMQP.NmsMessageProducer.Send
Apache.NMS.AMQP.NmsMessageProducer.SendAsync
ActiveMQ.Artemis.Client.IProducer.Send
ActiveMQ.Artemis.Client.IProducer.SendAsync
ActiveMQ.Artemis.Client.ProducerExtensions.SendAsync
ActiveMQ.Artemis.Client.IAnonymousProducer.Send
ActiveMQ.Artemis.Client.IAnonymousProducer.SendAsync
callLink callLink between the Receiver object and the caller C# method Apache.NMS.IMessageConsumer.Receive
Apache.NMS.IMessageConsumer.ReceiveAsync
Apache.NMS.IMessageConsumer.ReceiveNoWait
Apache.NMS.ActiveMQ.IMessageConsumer.Receive
Apache.NMS.ActiveMQ.IMessageConsumer.ReceiveAsync
Apache.NMS.ActiveMQ.IMessageConsumer.ReceiveNoWait
Apache.NMS.AMQP.NmsConsumer.Receive
Apache.NMS.AMQP.NmsConsumer.ReceiveAsync
Apache.NMS.AMQP.NmsConsumer.ReceiveNoWait
Apache.NMS.AMQP.NmsMessageConsumer.Receive
Apache.NMS.AMQP.NmsMessageConsumer.ReceiveAsync
Apache.NMS.AMQP.NmsMessageConsumer.ReceiveNoWait
ActiveMQ.Artemis.Client.IConsumer.ReceiveAsync

Example code scenarios

ActiveMQ

Publisher APIs

 static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

Receiver APIs

 static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }

Dotnet Publisher and Receiver

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;

class Program
{
    static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

    static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }
    }
}

AMQP

Publisher APIs

private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

Receiver APIs

private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

Linking code

using System;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;

namespace Tutorial.ProfanityFilter.Svc
{
    class Program
    {
        private IMessageProducer _producer;
        private IMessageConsumer _consumer;
        private ISession _session;
        private const string Queue = "queue://App.Message.Processing.Queue";

        static void Main(string[] args)
        {
            new Program().Start();
        }

        private void Start()
        {
            while (true)
            {
                var request = ReceiveMessage();
                var requestText = request?.Text;
                if (string.IsNullOrWhiteSpace(requestText)) continue;

                Task.Factory.StartNew(async () =>
                {
                    var messageToPublish = await ProfanityFilterService.Filter(requestText);
                    SendMessage(messageToPublish);
                });
            }
        }

        private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

        private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

        private Program()
        {
            InitializeCommunication();
        }

        private void InitializeCommunication()
        {
            const string userName = "admin";
            const string password = "admin";
            const string uri = "amqp:tcp://localhost:61616";
            var connecturi = new Uri(uri);
            var factory = new NMSConnectionFactory(connecturi);
            var connection = factory.CreateConnection(userName, password);
            connection.Start();
            _session = connection.CreateSession();
            var queueDestination = SessionUtil.GetDestination(_session, Queue);
            _consumer = _session.CreateConsumer(queueDestination);
            _producer = _session.CreateProducer(queueDestination);
        }
    }
}

Artemis

Publisher APIs

private const string OrdersQueue = "orders.incoming";
private IProducer _orderProducer;

public async Task InitializeAsync()
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);
    _orderProducer = await connection.CreateProducerAsync(OrdersQueue, RoutingType.Anycast);
}

public async Task PublishOrderAsync(string orderId, string product, int quantity)
{
    var payload = $"ORDER|{orderId}|{product}|{quantity}";
    await _orderProducer.SendAsync(new Message(payload), CancellationToken.None);
}

public void PublishOrderSync(string orderId, string product)
{
    var payload = $"ORDER_SYNC|{orderId}|{product}";
    _orderProducer.Send(new Message(payload), CancellationToken.None);
}

Receiver APIs

private const string OrdersQueue = "orders.incoming";
private IConsumer _orderConsumer;

public async Task InitializeAsync()
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);
    _orderConsumer = await connection.CreateConsumerAsync(OrdersQueue, RoutingType.Anycast);
}

public async Task<Message> ConsumeOrderAsync()
{
    var message = await _orderConsumer.ReceiveAsync(CancellationToken.None);
    await _orderConsumer.AcceptAsync(message);
    return message;
}

Publisher and Receiver

using System;
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client;

namespace ArtemisOrderProcessing
{
    class ArtemisOrderService
    {
        private IProducer _orderProducer;
        private IProducer _paymentProducer;
        private IConsumer _orderConsumer;
        private IConsumer _paymentConsumer;
        private IAnonymousProducer _alertProducer;

        private const string OrdersQueue    = "orders.incoming";
        private const string PaymentsQueue  = "payments.processing";
        private const string ConfirmedQueue = "payments.confirmed";
        private const string AlertsQueue    = "alerts.operations";

        public async Task InitializeAsync()
        {
            var connectionFactory = new ConnectionFactory();
            var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
            var connection = await connectionFactory.CreateAsync(endpoint);

            _orderProducer   = await connection.CreateProducerAsync(OrdersQueue,    RoutingType.Anycast);
            _paymentProducer = await connection.CreateProducerAsync(PaymentsQueue,  RoutingType.Anycast);
            _orderConsumer   = await connection.CreateConsumerAsync(OrdersQueue,    RoutingType.Anycast);
            _paymentConsumer = await connection.CreateConsumerAsync(ConfirmedQueue, RoutingType.Anycast);
            _alertProducer   = await connection.CreateAnonymousProducerAsync();
        }

        public async Task PublishOrderAsync(string orderId, string product, int quantity)
        {
            var payload = $"ORDER|{orderId}|{product}|{quantity}";
            await _orderProducer.SendAsync(new Message(payload), CancellationToken.None);
        }

        public void PublishOrderSync(string orderId, string product)
        {
            var payload = $"ORDER_SYNC|{orderId}|{product}";
            _orderProducer.Send(new Message(payload), CancellationToken.None);
        }

        public async Task<Message> ConsumeOrderAsync()
        {
            var message = await _orderConsumer.ReceiveAsync(CancellationToken.None);
            await _orderConsumer.AcceptAsync(message);
            return message;
        }

        public async Task SendAlertAsync(string severity, string text)
        {
            await _alertProducer.SendAsync(AlertsQueue, RoutingType.Anycast, new Message($"ALERT|{severity}|{text}"), CancellationToken.None);
        }

        public void SendUrgentAlert(string text)
        {
            _alertProducer.Send(AlertsQueue, RoutingType.Anycast, new Message($"URGENT|{text}"), CancellationToken.None);
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var service = new ArtemisOrderService();
            await service.InitializeAsync();

            await service.PublishOrderAsync("ORD-1001", "Mechanical Keyboard", 2);
            service.PublishOrderSync("ORD-1002", "USB Hub");

            await service.PublishPaymentRequestAsync("ORD-1001", 189.99m, "USD");

            await service.SendAlertAsync("INFO", "Order pipeline started");
            service.SendUrgentAlert("Payment gateway latency high");

            var order = await service.ConsumeOrderAsync();
            Console.WriteLine($"Processing order: {order.GetBody<string>()}");
        }
    }
}

Known limitations

  • Unknown Publisher/Receiver Objects will be created if the evaluation fails to resolve the necessary parameter.