This documentation is not maintained. Please refer to doc.castsoftware.com/technologies to find the latest updates.

Extension ID

com.castsoftware.dotnet.rabbitmq

What's new?

See RabbitMQ for .NET - 1.0 - Release Notes for more information.

Description

This extension provides support for RabbitMQ for .NET APIs which are responsible for publishing and receiving messages across messaging queues.

In what situation should you install this extension?

If your C# application utilizes RabbitMQ for messaging and you want to view a model that shows the message publisher and receiver objects with their corresponding links, then you should install this extension.

Technology Libraries

Library

Version

Supported

RabbitMQ.Client

 upto 6.5(tick)
EasyNetQ upto 7.5(tick)
MassTransit.RabbitMQ upto 8.1(tick)
CAP.RabbitMQ upto 6.0(tick)
NServiceBus.RabbitMQ upto 8.0 (tick)  

AIP Core compatibility

AIP Core release

Supported

8.3.x(tick)

Supported DBMS servers

DBMS

Supported

CSS/PostgreSQL      (tick)

Dependencies

The RabbitMQ extension is dependent on following extensions to provide fully functional analysis:

  • CAST AIP Internal extension
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface:

What results can you expect?

Once the analysis/snapshot generation has been completed, you can view the results in the normal manner. The following objects and links will be displayed in CAST Imaging

Objects

Icon

Description

RabbitMQ DotNet Publisher

RabbitMQ DotNet Receiver

RabbitMQ Unknown DotNet Publisher

RabbitMQ Unknown DotNet Receiver

Config Objects

 Icon

Description

             Supported APIs

RabbitMQ DotNet ExchangeDeclaration
RabbitMQ.Client
  • RabbitMQ.Client.IModel.ExchangeDeclare

  • RabbitMQ.Client.IModel.ExchangeDeclareNoWait

  • RabbitMQ.Client.IModelExensions.ExchangeDeclare

  • RabbitMQ.Client.IModelExensions.ExchangeDeclareNoWait

EasyNetQ
  • EasyNetQ.IAdvancedBus.ExchangeDeclare

  • EasyNetQ.IAdvancedBus.ExchangeDeclareAsync

  • EasyNetQ.Producer.IExchangeDeclareStrategy.DeclareExchangeAsync

  • EasyNetQ.Producer.IPublishExchangeDeclareStrategy.DeclareExchangeAsync

MassTransit.RabbitMQ
  • MassTransit.IRabbitMqBusFactoryConfigurator.Publish

NServiceBus.RabbitMQ
  • NServiceBus.EndpointConfiguration.EndpointConfiguration

RabbitMQ DotNet QueueBind
RabbitMQ.Client
  • RabbitMQ.Client.IModel.QueueBind

  • RabbitMQ.Client.IModel.QueueBindNoWait

  • RabbitMQ.Client.IModelExensions.QueueBind

  • RabbitMQ.Client.IModelExensions.QueueBindNoWait

EasyNetQ
  • EasyNetQ.IAdvancedBus.Bind

  • EasyNetQ.IAdvancedBus.BindAsync

MassTransit.RabbitMQ
  • MassTransit.IRabbitMqReceiveEndpointConfigurator.Bind

RabbitMQ DotNet ExchangeBind
RabbitMQ.Client
  • RabbitMQ.Client.IModel.ExchangeBind

  • RabbitMQ.Client.IModel.ExchangeBindNoWait

  • RabbitMQ.Client.IModelExensions.ExchangeBind

  • RabbitMQ.Client.IModelExensions.ExchangeBindNoWait

EasyNetQ
  • EasyNetQ.IAdvancedBus.Bind

  • EasyNetQ.IAdvancedBus.BindAsync

MassTransit.RabbitMQ
  • MassTransit.IRabbitMqReceiveEndpointConfigurator.Bind

       Link Type

Source and Destination of link

             Supported APIs

        callLinkLink between the caller C# method and the DotNet RabbitMQ Publisher object
RabbitMQ.Client Publisher APIs
  • RabbitMQ.Client.IModel.BasicPublish

  • RabbitMQ.Client.IModelExensions.BasicPublish

EasyNetQ Publisher APIs
  • EasyNetQ.IAdvancedBus.Publish

  • EasyNetQ.IAdvancedBus.PublishAsync

  • EasyNetQ.IBus.Publish

  • EasyNetQ.IBus.PublishAsync

  • EasyNetQ.IBus.Send

  • EasyNetQ.IBus.SendAsync

  • EasyNetQ.IBus.Request

  • EasyNetQ.IBus.RequestAsync

  • EasyNetQ.IPubSub.PublishAsync

  • EasyNetQ.IRpc.RequestAsync

  • EasyNetQ.ISendReceive.SendAsync

  • EasyNetQ.IScheduler.FuturePublishAsync

  • EasyNetQ.Scheduling.IScheduler.FuturePublishAsync

  • EasyNetQ.Scheduling.IScheduler.FuturePublish

  • EasyNetQ.SchedulerExtensions.FuturePublish

  • EasyNetQ.SchedulerExtensions.FuturePublishAsync

  • EasyNetQ.NonGenericSchedulerExtensions.FuturePublish

  • EasyNetQ.NonGenericSchedulerExtensions.FuturePublishAsync

MassTransit.RabbitMQ Publisher APIs
  • MassTransit.IPublishEndpoint.Publish

  • MassTransit.PublishExecuteExtensions.Publish

  • MassTransit.ISendEndpoint.Send

CAP.RabbitMQ Publisher APIs
  • DotNetCore.CAP.ICapPublisher.Publish

  • DotNetCore.CAP.ICapPublisher.PublishDelayAsync

  • DotNetCore.CAP.ICapPublisher.PublishAsync

  • DotNetCore.CAP.ICapPublisher.PublishDelay

NServiceBus.RabbitMQ Publisher APIs
  • NServiceBus.IMessageSession.Send

  • NServiceBus.MessageSessionExtensions.Send

  • NServiceBus.MessageSessionExtensions.Publish

  • NServiceBus.IMessageSession.Publish

  • NServiceBus.MessageSessionExtensions.SendLocal

  • NServiceBus.IMessageSession.SendLocal

        callLinkLink between the DotNet RabbitMQ Receiver object and the caller C# method 
RabbitMQ Receiver APIs
  • RabbitMQ.Client.IModel.BasicConsume

  • RabbitMQ.Client.IModel.BasicGet

  • RabbitMQ.Client.IModelExensions.BasicConsume

EasyNetQ Receiver APIs
  • EasyNetQ.IAdvancedBus.Get

  • EasyNetQ.IAdvancedBus.Consume

  • EasyNetQ.IBus.Subscribe

  • EasyNetQ.IBus.SubscribeAsync

  • EasyNetQ.IBus.Receive

  • EasyNetQ.IBus.Respond

  • EasyNetQ.IBus.RespondAsync

  • EasyNetQ.IPubSub.SubscribeAsync

  • EasyNetQ.IRpc.RespondAsync

  • EasyNetQ.ISendReceive.ReceiveAsync

  • EasyNetQ.AutoSubscribe.IConsumeAsync.ConsumeAsync

  • EasyNetQ.AutoSubscribe.IConsumeAsync.Consume

  • EasyNetQ.AutoSubscribe.IConsume.Consume

MassTransit.RabbitMQ Receiver APIs
  • MassTransit.IReceiveConfigurator.ReceiveEndpoint

  • MassTransit.IRegistrationConfigurator.AddConsumer

CAP.RabbitMQ Receiver APIs
  • DotNetCore.CAP.CapSubscribeAttribute.CapSubscribeAttribute

NServiceBus.RabbitMQ Receiver APIs
  • NServiceBus.IHandleMessages

Code Examples

RabbitMQ.Client

Publisher
.NET Producer
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
  
        
 	ReceiveMessage("queueName1", channel);
  	ReceiveMessage("queueName2", channel);
 	SendMEssage("HelloWOrld", "test2", channel);
  
        
}

private static void SendMEssage(string message, string routingkey, IModel channel)
{

var body = Encoding.UTF8.GetBytes(message);
  channel.BasicPublish(exchange: "Sep",
                            routingKey: routingkey,
                            basicProperties: null,
                            body: body);

}

Unknown Publisher
.NET Rabbit
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;
 
        channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
        channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
        channel.ExchangeBind("Oct", "Sep", "");
  
  
        var body = Encoding.UTF8.GetBytes("Hello World");
 
            channel.BasicPublish(exchange: unknown_exchange_name,
                            routingKey: "test2",
                            basicProperties: channel.CreateBasicProperties(),
                            body: body);
  
            Console.WriteLine(" [x] Sent {0}", "Hello World");
        Console.WriteLine(" Press [enter] to exit.");
                       Console.ReadLine();
  
  
      
}
}
}

Receiver
.NET Receiver
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
  
    channel.QueueBind(queue: "queueName1" ,
                  exchange: "Oct",
                  routingKey: "test2");
 
        channel.QueueBind(queue: "queueName2" ,
                  exchange: "Sep",
                   routingKey: "");
  
  
        Console.WriteLine(" [*] Waiting for logs.");
  
        
 	ReceiveMessage("queueName1", channel);
  	ReceiveMessage("queueName2", channel);
 	SendMEssage("HelloWOrld", "test2", channel);
  
  
      
}

private static void ReceiveMessage(string queueName, IModel channel)
{
   var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
            {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($" [x] Received {message}");
            };
  
        channel.BasicConsume(queue: queueName,
                     autoAck: true,
                     consumer: consumer);
}

Unknown Receiver
.NET Rabbit
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName; 
  
        Console.WriteLine(" [*] Waiting for logs.");
  
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
            {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($" [x] Received {message}");
            };
  
          
  
        channel.BasicConsume(queue: unknown_queue,
                     autoAck: true,
                     consumer: consumer);
 
    channel.BasicConsume(queue: "queueName2",
                     autoAck: true,
                     consumer: consumer);
   
      
}
}
}

ExchangeDeclare
.NET Rabbit
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
        
}

private static void ExchangeInit(IModel channel)
{
    channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
    channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
    channel.ExchangeBind("Oct", "Sep", "");

}

QueueBind
.NET Rabbit
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
  
    channel.QueueBind(queue: "queueName1" ,
                  exchange: "Oct",
                  routingKey: "test2");
 
        channel.QueueBind(queue: "queueName2" ,
                  exchange: "Sep",
                   routingKey: "");
  
  
        Console.WriteLine(" [*] Waiting for logs.");
  
        
 	ReceiveMessage("queueName1", channel);
  	ReceiveMessage("queueName2", channel);
 	SendMEssage("HelloWOrld", "test2", channel);
  
  
      
}

ExchangeBind
.NET Rabbit
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
   
}

private static void ExchangeInit(IModel channel)
{
    channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
    channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
    channel.ExchangeBind("Oct", "Sep", "");

}

Cross Tech - Java Publisher and .Net Receiver

JAVA Publisher

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
  
public class Send {
  
    private static final String EXCHANGE_NAME = "August";
  
  
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
            String message = argv.length < 1 ? "info: Hello Worldss!" :
                    String.join(" ", argv);
      
    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  
  
    channel.queueDeclare("queue1", false, false, false, null);
  
    channel.queueBind("queue1", EXCHANGE_NAME, "test.*");

    channel.basicPublish(EXCHANGE_NAME, "test1", null, message.getBytes("UTF-8"));     
          
    }
  
}

.NET Receiver

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
class Subs
{

static void Main()
{
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();


Console.WriteLine(" [*] Waiting for logs.");

var q1 = channel.QueueDeclare("queue1", false, false, false, null);

var q2 = channel.QueueDeclare("queue2", false, false, false, null);

channel.ExchangeDeclare("August", "topic", true);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] {message}");
};


channel.QueueBind("queue2", "August", "test1"); 

channel.BasicConsume(queue: "queue1",
autoAck: true,
consumer: consumer);

channel.BasicConsume(queue: "queue2",
autoAck: true,
consumer: consumer);


}
}

}

EasyNetQ

IBus

Publish
using System;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.SystemMessages;
using System.Configuration;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
            

            var payment2 = new CardPaymentRequestMessage
            {
                CardNumber = "3456345634563456",
                CardHolderName = "Mr S Claws",
                ExpiryDate = "03/11",
                Amount = 15.00m
            };

       

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine((new EasyNetQ.SystemMessages.Error()).Queue);
                Console.WriteLine();

                bus.Publish(payment1);
                bus.Publish(payment2);
                bus.Publish(payment3);
                bus.Publish(payment4);
               
            }
        }
    }
}


Subscribe
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                bus.Subscribe<CardPaymentRequestMessage>("cardPayment", HandleCardPaymentMessage);

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        static void HandleCardPaymentMessage(CardPaymentRequestMessage paymentMessage)
        {
            Console.WriteLine("Payment = <" +
                              paymentMessage.CardNumber + ", " +
                              paymentMessage.CardHolderName + ", " +
                              paymentMessage.ExpiryDate + ", " +
                              paymentMessage.Amount + ">");
        }
    }
}

Send
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Send
{
    class Program
    {
        public static void Main(string[] args)
        {
            var payment1 = new CardPaymentRequestMessage
            {
                CardNumber = "1234123412341234",
                CardHolderName = "Mr F Bloggs",
                ExpiryDate = "12/12",
                Amount = 99.00m
            };


            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with send and receive.");
                Console.WriteLine();

                bus.Send("my.paymentsqueue", payment1);
                bus.Send("my.paymentsqueue", purchaseOrder1);
                bus.Send("my.paymentsqueue", payment2);
                bus.Send("my.paymentsqueue", payment3);
                bus.Send("my.paymentsqueue", purchaseOrder2);
                bus.Send("my.paymentsqueue", payment4);
            }
          
        }
    }
}


Receive
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Receive
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                bus.Receive<CardPaymentRequestMessage>("my.paymentsqueue", message => HandleCardPaymentMessage(message));

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        static void HandleCardPaymentMessage(CardPaymentRequestMessage paymentMessage)
        {
            Console.WriteLine("Processing Payment = <" +
                              paymentMessage.CardNumber + ", " +
                              paymentMessage.CardHolderName + ", " +
                              paymentMessage.ExpiryDate + ", " +
                              paymentMessage.Amount + ">");
        }
    }
}



Request
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace RequestAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            var payment = new CardPaymentRequestMessage
            {
                CardNumber = "1234123412341234",
                CardHolderName = "Mr F Bloggs",
                ExpiryDate = "12/12",
                Amount = 99.00m
            };

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with request and response.");
                Console.WriteLine();

                var task = bus.RequestAsync<CardPaymentRequestMessage, CardPaymentResponseMessage>(payment);

                task.ContinueWith(response => {
                    Console.WriteLine("Got response: '{0}'", response.Result.AuthCode);
                });

                Console.ReadLine();
            }
        }
    }
}


Respond
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQMessages;

namespace ResponseAsync
{
    class Program
    {
        public  class MyWorker
        {
            public CardPaymentResponseMessage Execute(CardPaymentRequestMessage request)
            {
                CardPaymentResponseMessage responseMessage = new CardPaymentResponseMessage();
                responseMessage.AuthCode = "1234";
                Console.WriteLine("Worker activated to process response.");

                return responseMessage;
            }
        }

        static void Main(string[] args)
        {
            // Create a group of worker objects
            var workers = new BlockingCollection<MyWorker>();
            for (int i = 0; i < 10; i++)
            {
                workers.Add(new MyWorker());
            }

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                // Respond to requests
                bus.RespondAsync<CardPaymentRequestMessage, CardPaymentResponseMessage>(request =>
                    Task.Factory.StartNew(() =>
                    {
                        var worker = workers.Take();
                        try
                        {
                            return worker.Execute(request);
                        }
                        finally
                        {
                            workers.Add(worker);
                        }
                    }));

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }
    }
}


IAdvancedBus

Publisher
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {	
		var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
		var queue = advancedBus.QueueDeclare("my.queue");
    		
    		var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
		var properties = new MessageProperties();
    		var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

Unknown Publisher
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {	
		var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
		var queue = advancedBus.QueueDeclare("my.queue");
    		
    		var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
		var properties = new MessageProperties();
    		var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(unknwon_exchange, "A.AA", false, properties, body);

            }
        }



    }
}


Receiver
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            { 	
                    
                 var queue = advancedBus.QueueDeclare("my.queue");

		advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
                  {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
                     }));
	
                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }


    }
}

Unknown Receiver
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            { 	
                    
                 var queue = advancedBus.QueueDeclare("my.queue");

		advancedBus.Consume(unknown_queue, (body, properties, info) => Task.Factory.StartNew(() =>
                  {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
                     }));
	
                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }


    }
}

ExchangeDeclare
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {	
		var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
		var queue = advancedBus.QueueDeclare("my.queue");
    		
    		var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
		var properties = new MessageProperties();
    		var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

Bind
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {	
		var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
		var queue = advancedBus.QueueDeclare("my.queue");
    		
    		var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
		var properties = new MessageProperties();
    		var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

MassTransit.RabbitMQ

Publish
.NET Producer
class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

        });

        await busControl.StartAsync();
        try
        {
            do
            {	var endpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/your_queue_name"));

		        endpoint.Send(new MyMessage { Text = "Hello MyMessage, World from Send!" });
		        busControl.Publish(new MyMessage { Text = "Hello MyMessage, World from Publish!" }, x=>x.SetRoutingKey("high"));
		        busControl.Publish(new MyAnotherMessage { Text = "Hello MyAnotherMessage, World from Publish!" });
                Console.WriteLine("Press [q] to exit");
            } while (Console.ReadKey().KeyChar != 'q');
        }
        finally
        {
            await busControl.StopAsync();
        }


    }


}

Send
.NET Producer
class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

        });

        await busControl.StartAsync();
        try
        {
            do
            {	var endpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/your_queue_name"));

		        endpoint.Send(new MyMessage { Text = "Hello MyMessage, World from Send!" });
		        busControl.Publish(new MyMessage { Text = "Hello MyMessage, World from Publish!" }, x=>x.SetRoutingKey("high"));
		        busControl.Publish(new MyAnotherMessage { Text = "Hello MyAnotherMessage, World from Publish!" });
                Console.WriteLine("Press [q] to exit");
            } while (Console.ReadKey().KeyChar != 'q');
        }
        finally
        {
            await busControl.StopAsync();
        }


    }


}

ReceiveEndPoint
.NET Receiver
class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

	       cfg.Publish<MyMessage>(x => x.ExchangeType = "direct");

			cfg.ReceiveEndpoint("your_queue_name", e =>
    		{
        		e.Consumer<MyMessageConsumer>();
				e.Consumer<MyMessageReceiver>();

   			 });

			cfg.ReceiveEndpoint("end_point", e =>
    		{
        		e.Consumer<MyMessageConsumer>();

				e.Bind<MyMessage>(s => 
    			{
        			s.RoutingKey = "high";
        			s.ExchangeType = "direct";
    			});

    		});

    });

        

}
}

AddConsumer
.NET Receiver
namespace GettingStarted
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddMassTransit(x =>
                    {
                        x.AddConsumer<MessageConsumer>();

                        x.UsingRabbitMq((context,cfg) =>
                        {
                            cfg.ConfigureEndpoints(context);
                        });
                    });

                    services.AddHostedService<Worker>();
                });
    }
}

Bind
.NET Program.cs
class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

			cfg.Publish<MyMessage>(x => x.ExchangeType = "direct");

			cfg.ReceiveEndpoint("end_point", e =>
    		{
        		e.Consumer<MyMessageConsumer>();

				e.Bind<MyMessage>(s => 
    			{
        			s.RoutingKey = "high";
        			s.ExchangeType = "direct";
    			});

            });

    });

 }

Unknown Publihser
.NET Producer
using System;
using System.Threading.Tasks;
using MassTransit;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitRabbitMQDemo;

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });


public void SendMessage(IBus busControl)

{

try
        {
            do
            {	var endpoint = await busControl.GetSendEndpoint(new Uri(unknown));
				endpoint.Send(new MyMessage { Text = "Hello MyMessage, World from Send!" });
				busControl.Publish(Unknown, x=>x.SetRoutingKey("high"));
                Console.WriteLine("Press [q] to exit");
            } while (Console.ReadKey().KeyChar != 'q');
        }
        finally
        {
            await busControl.StopAsync();
        }
}

}

Unknown Receiver
.NET Receiver
using System;
using System.Threading.Tasks;
using MassTransit;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitRabbitMQDemo;

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

			cfg.Publish<MyMessage>(x => x.ExchangeType = "direct");


			 cfg.ReceiveEndpoint(unknown_variable, e =>
                {
                    e.Consumer<MyMessageConsumer>();
                    e.Consumer<MyMessageReceiver>();
		
                });

    		});


        });

        await busControl.StartAsync();
}

CAP.RabbitMQ

Publish
.NET Producer
namespace Sample.RabbitMQ.MongoDB.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {

        private readonly IMongoClient _client;
        private readonly ICapPublisher _capBus;
private readonly string _topic;

        public ValuesController(IMongoClient client, ICapPublisher capBus, IConfiguration configuration)
        {
            _client = client;
            _capBus = capBus;
Configuration = configuration;
_topic = "sample.rabbitmq.mongodb";

        }
	public IConfiguration Configuration { get; }

        [Route("~/without/transaction")]
        public IActionResult WithoutTransaction()

        {
		
            _capBus.PublishAsync(_topic, DateTime.Now);

            return Ok();
        }


        [Route("~/delay/{delaySeconds:int}")]
        public async Task<IActionResult> Delay(int delaySeconds)
        {
            await _capBus.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), _topic, DateTime.Now);

            return Ok();
        }

}
}

CapSubscribe
.NET Receiver
namespace Sample.RabbitMQ.MongoDB.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {

        private readonly IMongoClient _client;
        private readonly ICapPublisher _capBus;
private readonly string _topic;

        public ValuesController(IMongoClient client, ICapPublisher capBus, IConfiguration configuration)
        {
            _client = client;
            _capBus = capBus;
Configuration = configuration;
_topic = "sample.rabbitmq.mongodb";

        }
	public IConfiguration Configuration { get; }

        [CapSubscribe("sample.rabbitmq.mongodb")]
        public void ReceiveMessage(DateTime time)
        {
            Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
        }

}
}

NServiceBus.RabbitMQ

Publish
.NET Producer
using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

Send
.NET Producer
using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

SendLocal
.NET Producer
using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

Receiver - IHandleMessages
.NET Receiver
namespace Receiver
{
    using System.Threading.Tasks;
    using NServiceBus;
    using NServiceBus.Logging;

    public class MyEventHandler : IHandleMessages<MyEvent>
    {
        static ILog log = LogManager.GetLogger<MyEventHandler>();

        public Task Handle(MyEvent eventMessage, IMessageHandlerContext context)
        {
            log.Info($"Hello from {nameof(MyEventHandler)}");
            return Task.CompletedTask;
        }
    }
}

Endpoint Configuration
.NET Receiver
using System;

namespace Receiver
{
    using System.Threading.Tasks;
    using NServiceBus;

    class Program
    {
        static async Task Main()
        {
            Console.Title = "Samples.RabbitMQ.SimpleReceiver";
            var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleReceiver");
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.UseConventionalRoutingTopology(QueueType.Quorum);
            transport.ConnectionString("host=localhost");
            endpointConfiguration.EnableInstallers();

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
            await endpointInstance.Stop()
                .ConfigureAwait(false);
        }
    }
}

Limitations

  • Unknown Publisher object with no properties is created if the exchange name argument from the Publish APIs cannot be resolved
  • Unknown Receiver object with no properties is created if the queue name argument from the Consumer APIs cannot be resolved