.NET NMS - 1.0

Extension ID

com.castsoftware.dotnet.nms

What’s new?

See .NET NMS - 1.0 - Release Notes .

Description

This extension provides support for Apache NMS .NET APIs which are responsible for consumer and publisher operations in .NET applications. If your C# application utilizes Apache NMS for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.

Supported libraries

Library Version Supported
Apache NMS up to: 2.1.0

(tick)

Apache NMS ActiveMQ up to 2.1.0

(tick)

Apache NMS ActiveMQ up to 2.1.0

(tick)

Compatibility

CAST Imaging Core release Supported
 ≥ 8.3.x ✔️

Dependencies with other extensions

Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET NMS extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Imaging Console):

  • CAST AIP Internal extension (internal technical extension)
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed in CAST Imaging Console. If you need to use it, you should manually install the extension using the Application - Extensions interface.

What results can you expect?

Objects

Icon Description
Apache NMS ActiveMQ publisher
Apache NMS ActiveMQ Receiver
Apache NMS ActiveMQ unknown publisher
Apache NMS ActiveMQ unknown Receiver
Apache NMS AMQP publisher
Apache NMS AMQP Receiver
Apache NMS AMQP unknown publisher
Apache NMS AMQP unknown Receiver

Link Type

Source and Destination of link

Supported APIs

callLink callLink between the caller C# method and the  Publisher object

Apache.NMS.IMessageProducer.Send

Apache.NMS.IMessageProducer.SendAsync

Apache.NMS.ActiveMQ.IMessageProducer.Send

Apache.NMS.ActiveMQ.IMessageProducer.SendAsync

Apache.NMS.AMQP.NmsProducer.Send

Apache.NMS.AMQP.NmsProducer.SendAsync

callLink

callLink between the  Receiver object and the caller  C# method 

Apache.NMS.IMessageConsumer.Receive

Apache.NMS.IMessageConsumer.ReceiveAsync

Apache.NMS.ActiveMQ.IMessageReceiver.Send

Apache.NMS.ActiveMQ.IMessageReceiver.SendAsync

Apache.NMS.AMQP.NmsConsumer.Receive

Apache.NMS.AMQP.NmsConsumer.ReceiveAsync

Example code scenarios

ActiveMQ

Publisher APIs

 static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

Receiver APIs

 static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }

Dotnet Publisher and Receiver

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;

class Program
{
    static void Main()
    {
        string brokerUri = "tcp://localhost:61616"; 
        string queueName = "dest_queue"; 

        using (IConnection connection = new ConnectionFactory(new Uri(brokerUri)).CreateConnection())
        {
            connection.Start();

            using (ISession session = connection.CreateSession())
            {
                IDestination destination = session.GetQueue(queueName);

                // Produce message
                ProduceMessage(session, destination);

                // Consume message
                ConsumeMessage(session, destination);

                connection.Stop();
            }
        }
    }

    static void ProduceMessage(ISession session, IDestination destination)
    {
        using (IMessageProducer producer = session.CreateProducer(destination))
        using (ITextMessage message = session.CreateTextMessage("Hello, ActiveMQ!"))
        {
            producer.Send(message);
            Console.WriteLine("Message sent: " + message.Text);
        }
    }

    static void ConsumeMessage(ISession session, IDestination destination)
    {
        using (IMessageConsumer consumer = session.CreateConsumer(destination))
        {
            ITextMessage receivedMessage = consumer.Receive() as ITextMessage;
            if (receivedMessage != null)
            {
                Console.WriteLine("Received message: " + receivedMessage.Text);
            }
            else
            {
                Console.WriteLine("No messages in the queue.");
            }
        }
    }
}

AMQP

Publisher APIs

private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

Receiver APIs

private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

Linking code

using System;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.Util;

namespace Tutorial.ProfanityFilter.Svc
{
    class Program
    {
        private IMessageProducer _producer;
        private IMessageConsumer _consumer;
        private ISession _session;
        private const string Queue = "queue://App.Message.Processing.Queue";

        static void Main(string[] args)
        {
            new Program().Start();
        }

        private void Start()
        {
            while (true)
            {
                var request = ReceiveMessage();
                var requestText = request?.Text;
                if (string.IsNullOrWhiteSpace(requestText)) continue;

                Task.Factory.StartNew(async () =>
                {
                    var messageToPublish = await ProfanityFilterService.Filter(requestText);
                    SendMessage(messageToPublish);
                });
            }
        }

        private ITextMessage ReceiveMessage()
        {
            return _consumer.Receive() as ITextMessage;
        }

        private void SendMessage(string message)
        {
            var response = _session.CreateTextMessage();
            response.Text = message;
            _producer.Send(response);
        }

        private Program()
        {
            InitializeCommunication();
        }

        private void InitializeCommunication()
        {
            const string userName = "admin";
            const string password = "admin";
            const string uri = "amqp:tcp://localhost:61616";
            var connecturi = new Uri(uri);
            var factory = new NMSConnectionFactory(connecturi);
            var connection = factory.CreateConnection(userName, password);
            connection.Start();
            _session = connection.CreateSession();
            var queueDestination = SessionUtil.GetDestination(_session, Queue);
            _consumer = _session.CreateConsumer(queueDestination);
            _producer = _session.CreateProducer(queueDestination);
        }
    }
}

Known limitations

  • Unknown Publisher/Receiver Objects will be created if the evaluation fails to resolve the necessary parameter.