.NET NMS - 1.0

Extension ID

com.castsoftware.dotnet.nms

What’s new?

See .NET NMS - 1.0 - Release Notes.

Description

This extension provides support for Apache NMS .NET APIs which are responsible for consumer and publisher operations in .NET applications. If your C# application utilizes Apache NMS for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.

Supported libraries

Library Version Supported
Apache NMSexternal link up to: 2.1.0
Apache NMS ActiveMQexternal link up to 2.1.0
Apache NMS AMQPexternal link up to 2.1.0

Compatibility

Core release Operating System Supported
8.3.x Microsoft Windows

Dependencies with other extensions

Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET NMS extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Imaging Console):

  • CAST AIP Internal extension (internal technical extension)
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed in CAST Imaging Console. If you need to use it, you should manually install the extension using the Application - Extensions interface.

What results can you expect?

Objects

Icon Description
Apache NMS ActiveMQ publisher
Apache NMS ActiveMQ Receiver
Apache NMS ActiveMQ unknown publisher
Apache NMS ActiveMQ unknown Receiver
Apache NMS AMQP publisher
Apache NMS AMQP Receiver
Apache NMS AMQP unknown publisher
Apache NMS AMQP unknown Receiver
Link Type Source and Destination of link Supported APIs
callLink callLink between the caller C# method and the Publisher object Apache.NMS.IMessageProducer.Send
Apache.NMS.IMessageProducer.SendAsync
Apache.NMS.ActiveMQ.IMessageProducer.Send
Apache.NMS.ActiveMQ.IMessageProducer.SendAsync
Apache.NMS.AMQP.NmsProducer.Send
Apache.NMS.AMQP.NmsProducer.SendAsync
callLink callLink between the Receiver object and the caller C# method Apache.NMS.IMessageConsumer.Receive
Apache.NMS.IMessageConsumer.ReceiveAsync
Apache.NMS.ActiveMQ.IMessageReceiver.Send
Apache.NMS.ActiveMQ.IMessageReceiver.SendAsync
Apache.NMS.AMQP.NmsConsumer.Receive
Apache.NMS.AMQP.NmsConsumer.ReceiveAsync

Example code scenarios

ActiveMQ

Publisher APIs

 static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

Receiver APIs

 static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }

Dotnet Publisher and Receiver

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;

class Program
{
    static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

    static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }
    }
}

AMQP

Publisher APIs

private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

Receiver APIs

private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

Linking code

using System;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;

namespace Tutorial.ProfanityFilter.Svc
{
    class Program
    {
        private IMessageProducer _producer;
        private IMessageConsumer _consumer;
        private ISession _session;
        private const string Queue = "queue://App.Message.Processing.Queue";

        static void Main(string[] args)
        {
            new Program().Start();
        }

        private void Start()
        {
            while (true)
            {
                var request = ReceiveMessage();
                var requestText = request?.Text;
                if (string.IsNullOrWhiteSpace(requestText)) continue;

                Task.Factory.StartNew(async () =>
                {
                    var messageToPublish = await ProfanityFilterService.Filter(requestText);
                    SendMessage(messageToPublish);
                });
            }
        }

        private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

        private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

        private Program()
        {
            InitializeCommunication();
        }

        private void InitializeCommunication()
        {
            const string userName = "admin";
            const string password = "admin";
            const string uri = "amqp:tcp://localhost:61616";
            var connecturi = new Uri(uri);
            var factory = new NMSConnectionFactory(connecturi);
            var connection = factory.CreateConnection(userName, password);
            connection.Start();
            _session = connection.CreateSession();
            var queueDestination = SessionUtil.GetDestination(_session, Queue);
            _consumer = _session.CreateConsumer(queueDestination);
            _producer = _session.CreateProducer(queueDestination);
        }
    }
}

Known limitations

  • Unknown Publisher/Receiver Objects will be created if the evaluation fails to resolve the necessary parameter.