RabbitMQ for .NET - 1.0

Extension ID

com.castsoftware.dotnet.rabbitmq

What’s new?

See RabbitMQ for .NET - 1.0 - Release Notes for more information.

Description

This extension provides support for RabbitMQ for .NET APIs which are responsible for publishing and receiving messages across messaging queues.

In what situation should you install this extension?

If your C# application utilizes RabbitMQ for messaging and you want to view a model that shows the message publisher and receiver objects with their corresponding links, then you should install this extension.

Technology Libraries

Library Version Supported
RabbitMQ.Client upto 6.5
EasyNetQ upto 7.5
MassTransit.RabbitMQ upto 8.1
CAP.RabbitMQ upto 6.0
NServiceBus.RabbitMQ upto 8.0

Compatibility

Release Operating System Supported
v3/8.4.x Microsoft Windows / Linux
v2/8.3.x Microsoft Windows

Dependencies

The RabbitMQ extension is dependent on following extensions to provide fully functional analysis:

  • CAST AIP Internal extension
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed. If you need to use it, you should manually install the extension:

What results can you expect?

Objects

Icon Description
RabbitMQ DotNet Publisher
RabbitMQ DotNet Consumer
RabbitMQ Unknown DotNet Publisher
RabbitMQ Unknown DotNet Consumer

Config Objects

Icon Description Supported APIs
RabbitMQ DotNet ExchangeDeclaration
RabbitMQ.Client Exchange Creation APIsRabbitMQ.Client.IModel.ExchangeDeclare
RabbitMQ.Client.IModel.ExchangeDeclareNoWait
RabbitMQ.Client.IModelExensions.ExchangeDeclare
RabbitMQ.Client.IModelExensions.ExchangeDeclareNoWait
EasyNetQ Exchange Creation APIs EasyNetQ.IAdvancedBus.ExchangeDeclare
EasyNetQ.IAdvancedBus.ExchangeDeclareAsync
EasyNetQ.Producer.IExchangeDeclareStrategy.DeclareExchangeAsync
EasyNetQ.Producer.IPublishExchangeDeclareStrategy.DeclareExchangeAsync
MassTransit.RabbitMQ Exchange Creation APIsMassTransit.IRabbitMqBusFactoryConfigurator.Publish
NServiceBus.RabbitMQ Exchange Creation APIsNServiceBus.EndpointConfiguration.EndpointConfiguration
RabbitMQ DotNet QueueBind
RabbitMQ.Client QueueBind APIsRabbitMQ.Client.IModel.QueueBind
RabbitMQ.Client.IModel.QueueBindNoWait
RabbitMQ.Client.IModelExensions.QueueBind
RabbitMQ.Client.IModelExensions.QueueBindNoWait
EasyNetQ Bind APIs EasyNetQ.IAdvancedBus.Bind
EasyNetQ.IAdvancedBus.BindAsync
MassTransit.RabbitMQ Bind APIsMassTransit.IRabbitMqReceiveEndpointConfigurator.Bind
RabbitMQ DotNet ExchangeBind
RabbitMQ.Client ExchangeBind APIsRabbitMQ.Client.IModel.ExchangeBind
RabbitMQ.Client.IModel.ExchangeBindNoWait
RabbitMQ.Client.IModelExensions.ExchangeBind
RabbitMQ.Client.IModelExensions.ExchangeBindNoWait
EasyNetQ Bind APIs EasyNetQ.IAdvancedBus.Bind
EasyNetQ.IAdvancedBus.BindAsync
MassTransit.RabbitMQ Bind APIsMassTransit.IRabbitMqReceiveEndpointConfigurator.Bind
Link Type Source and Destination Link Supported APIs
callLink Link between the caller C# method and the DotNet RabbitMQ Publisher object
RabbitMQ Publisher APIsRabbitMQ.Client.IModel.BasicPublish
RabbitMQ.Client.IModelExensions.BasicPublish
EasyNetQ Publisher APIsEasyNetQ.IAdvancedBus.Publish
EasyNetQ.IAdvancedBus.PublishAsync
EasyNetQ.IBus.Publish
EasyNetQ.IBus.PublishAsync
EasyNetQ.IBus.Send
EasyNetQ.IBus.SendAsync
EasyNetQ.IBus.Request
EasyNetQ.IBus.RequestAsync
EasyNetQ.IPubSub.PublishAsync
EasyNetQ.IRpc.RequestAsync
EasyNetQ.ISendReceive.SendAsync
EasyNetQ.IScheduler.FuturePublishAsync
EasyNetQ.Scheduling.IScheduler.FuturePublishAsync
EasyNetQ.Scheduling.IScheduler.FuturePublish
EasyNetQ.SchedulerExtensions.FuturePublish
EasyNetQ.SchedulerExtensions.FuturePublishAsync
EasyNetQ.NonGenericSchedulerExtensions.FuturePublish
EasyNetQ.NonGenericSchedulerExtensions.FuturePublishAsync
MassTransit.RabbitMQ Publisher APIsMassTransit.IPublishEndpoint.Publish
MassTransit.PublishExecuteExtensions.Publish
MassTransit.ISendEndpoint.Send
CAP.RabbitMQ Publisher APIsDotNetCore.CAP.ICapPublisher.Publish
DotNetCore.CAP.ICapPublisher.PublishDelayAsync
DotNetCore.CAP.ICapPublisher.PublishAsync
DotNetCore.CAP.ICapPublisher.PublishDelay
NServiceBus.RabbitMQ Publisher APIsNServiceBus.IMessageSession.Send
NServiceBus.MessageSessionExtensions.Send
NServiceBus.MessageSessionExtensions.Publish
NServiceBus.IMessageSession.Publish
NServiceBus.MessageSessionExtensions.SendLocal
NServiceBus.IMessageSession.SendLocal
callLink Link between the DotNet RabbitMQ Consumer object and the caller C# method
RabbitMQ Consumer APIsRabbitMQ.Client.IModel.BasicConsume
RabbitMQ.Client.IModel.BasicGet
RabbitMQ.Client.IModelExensions.BasicConsume
EasyNetQ Consumer APIsEasyNetQ.IAdvancedBus.Get
EasyNetQ.IAdvancedBus.Consume
EasyNetQ.IBus.Subscribe
EasyNetQ.IBus.SubscribeAsync
EasyNetQ.IBus.Receive
EasyNetQ.IBus.Respond
EasyNetQ.IBus.RespondAsync
EasyNetQ.IPubSub.SubscribeAsync
EasyNetQ.IRpc.RespondAsync
EasyNetQ.ISendReceive.ReceiveAsync
EasyNetQ.AutoSubscribe.IConsumeAsync.ConsumeAsync
EasyNetQ.AutoSubscribe.IConsumeAsync.Consume
EasyNetQ.AutoSubscribe.IConsume.Consume
MassTransit.RabbitMQ Consumer APIsMassTransit.IReceiveConfigurator.ReceiveEndpoint
MassTransit.IRegistrationConfigurator.AddConsumer
CAP.RabbitMQ Consumer APIsDotNetCore.CAP.CapSubscribeAttribute.CapSubscribeAttribute
NServiceBus.RabbitMQ Consumer APIsNServiceBus.IHandleMessages

Code Examples

RabbitMQ.Client

Publisher

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
  
        
    ReceiveMessage("queueName1", channel);
    ReceiveMessage("queueName2", channel);
    SendMEssage("HelloWOrld", "test2", channel);
  
        
}

private static void SendMEssage(string message, string routingkey, IModel channel)
{

var body = Encoding.UTF8.GetBytes(message);
  channel.BasicPublish(exchange: "Sep",
                            routingKey: routingkey,
                            basicProperties: null,
                            body: body);

}

Unknown Publisher

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;
 
        channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
        channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
        channel.ExchangeBind("Oct", "Sep", "");
  
  
        var body = Encoding.UTF8.GetBytes("Hello World");
 
            channel.BasicPublish(exchange: unknown_exchange_name,
                            routingKey: "test2",
                            basicProperties: channel.CreateBasicProperties(),
                            body: body);
  
            Console.WriteLine(" [x] Sent {0}", "Hello World");
        Console.WriteLine(" Press [enter] to exit.");
                       Console.ReadLine();
  
  
      
}
}
}

Consumer

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    internal class Receive
    {
        const string QueueName = "task_queue3";

        static void Main()
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
            };

            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {

                        channel.QueueDeclare(queue: QueueName,
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                        var channel1 = channel;
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] Received {0}", message);

                            var dots = message.Split('.').Length - 1;
                            Thread.Sleep(dots * 1000);

                            Console.WriteLine(" [x] Done");

                            // Message acknowledgment
                            channel1.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };

   
                        channel.BasicConsume(queue: QueueName,
                            autoAck: false,
                            consumer: consumer);

                        channel.BasicConsume(queue: Unknown_QueueName,
                            autoAck: false,
                            consumer: consumer);

                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }

            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.ToString());
                Console.ReadKey();
            }
        }
    }
}

Unknown Consumer

ExchangeDeclare

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
        
}

private static void ExchangeInit(IModel channel)
{
    channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
    channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
    channel.ExchangeBind("Oct", "Sep", "");

}

QueueBind

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
  
    channel.QueueBind(queue: "queueName1" ,
                  exchange: "Oct",
                  routingKey: "test2");
 
        channel.QueueBind(queue: "queueName2" ,
                  exchange: "Sep",
                   routingKey: "");
  
  
        Console.WriteLine(" [*] Waiting for logs.");
  
        
    ReceiveMessage("queueName1", channel);
    ReceiveMessage("queueName2", channel);
    SendMEssage("HelloWOrld", "test2", channel);
  
  
      
}

ExchangeBind

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Routing
{
    class Subs
    {
  
    static void Main()
    {
        var factory = new ConnectionFactory { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
  
        var queueName1 = channel.QueueDeclare("queueName1").QueueName;
    var queueName2 = channel.QueueDeclare("queueName2").QueueName;

    ExchangeInit(channel);
   
}

private static void ExchangeInit(IModel channel)
{
    channel.ExchangeDeclare(exchange: "Sep", type: "fanout", true);
  
    channel.ExchangeDeclare(exchange: "Oct", type: "direct", true);
  
    channel.ExchangeBind("Oct", "Sep", "");

}

EasyNetQ

IBus

Publish
using System;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.SystemMessages;
using System.Configuration;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
            
            var payment1 = new CardPaymentRequestMessage
            {
                CardNumber = "415411585",
                CardHolderName = "Mr F Bloggs",
                ExpiryDate = "12/12",
                Amount = 99.00m
            };

            var payment2 = new CardPaymentRequestMessage
            {
                CardNumber = "3456345634563456",
                CardHolderName = "Mr S Claws",
                ExpiryDate = "03/11",
                Amount = 15.00m
            };

            var payment3 = new CardPaymentRequestMessage
            {
                CardNumber = "6789678967896789",
                CardHolderName = "Mrs E Curry",
                ExpiryDate = "01/03",
                Amount = 1250.24m
            };

            var payment4 = new CardPaymentRequestMessage
            {
                CardNumber = "9991999299939994",
                CardHolderName = "Mrs D Parton",
                ExpiryDate = "04/07",
                Amount = 34.87m
            };

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine((new EasyNetQ.SystemMessages.Error()).Queue);
                Console.WriteLine();

                bus.Publish(payment1);
                bus.Publish(payment2);
                bus.Publish(payment3);
                bus.Publish(payment4);
               
            }
        }
    }
}

Subscribe
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                bus.Subscribe<CardPaymentRequestMessage>("cardPayment", HandleCardPaymentMessage);

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        static void HandleCardPaymentMessage(CardPaymentRequestMessage paymentMessage)
        {
            Console.WriteLine("Payment = <" +
                              paymentMessage.CardNumber + ", " +
                              paymentMessage.CardHolderName + ", " +
                              paymentMessage.ExpiryDate + ", " +
                              paymentMessage.Amount + ">");
        }
    }
}

Send
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Send
{
    class Program
    {
        public static void Main(string[] args)
        {
            var payment1 = new CardPaymentRequestMessage
            {
                CardNumber = "1234123412341234",
                CardHolderName = "Mr F Bloggs",
                ExpiryDate = "12/12",
                Amount = 99.00m
            };


            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with send and receive.");
                Console.WriteLine();

                bus.Send("my.paymentsqueue", payment1);
                bus.Send("my.paymentsqueue", purchaseOrder1);
                bus.Send("my.paymentsqueue", payment2);
                bus.Send("my.paymentsqueue", payment3);
                bus.Send("my.paymentsqueue", purchaseOrder2);
                bus.Send("my.paymentsqueue", payment4);
            }
          
        }
    }
}

Receive
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace Receive
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                bus.Receive<CardPaymentRequestMessage>("my.paymentsqueue", message => HandleCardPaymentMessage(message));

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        static void HandleCardPaymentMessage(CardPaymentRequestMessage paymentMessage)
        {
            Console.WriteLine("Processing Payment = <" +
                              paymentMessage.CardNumber + ", " +
                              paymentMessage.CardHolderName + ", " +
                              paymentMessage.ExpiryDate + ", " +
                              paymentMessage.Amount + ">");
        }
    }
}

Request
using System;
using EasyNetQ;
using EasyNetQMessages;

namespace RequestAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            var payment = new CardPaymentRequestMessage
            {
                CardNumber = "1234123412341234",
                CardHolderName = "Mr F Bloggs",
                ExpiryDate = "12/12",
                Amount = 99.00m
            };

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                Console.WriteLine("Publishing messages with request and response.");
                Console.WriteLine();

                var task = bus.RequestAsync<CardPaymentRequestMessage, CardPaymentResponseMessage>(payment);

                task.ContinueWith(response => {
                    Console.WriteLine("Got response: '{0}'", response.Result.AuthCode);
                });

                Console.ReadLine();
            }
        }
    }
}

Respond
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQMessages;

namespace ResponseAsync
{
    class Program
    {
        public  class MyWorker
        {
            public CardPaymentResponseMessage Execute(CardPaymentRequestMessage request)
            {
                CardPaymentResponseMessage responseMessage = new CardPaymentResponseMessage();
                responseMessage.AuthCode = "1234";
                Console.WriteLine("Worker activated to process response.");

                return responseMessage;
            }
        }

        static void Main(string[] args)
        {
            // Create a group of worker objects
            var workers = new BlockingCollection<MyWorker>();
            for (int i = 0; i < 10; i++)
            {
                workers.Add(new MyWorker());
            }

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                // Respond to requests
                bus.RespondAsync<CardPaymentRequestMessage, CardPaymentResponseMessage>(request =>
                    Task.Factory.StartNew(() =>
                    {
                        var worker = workers.Take();
                        try
                        {
                            return worker.Execute(request);
                        }
                        finally
                        {
                            workers.Add(worker);
                        }
                    }));

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }
    }
}

IAdvancedBus

Publisher
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
        var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
        var queue = advancedBus.QueueDeclare("my.queue");
            
            var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
        var properties = new MessageProperties();
            var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

Unknown Publisher
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
        var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
        var queue = advancedBus.QueueDeclare("my.queue");
            
            var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
        var properties = new MessageProperties();
            var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(unknwon_exchange, "A.AA", false, properties, body);

            }
        }



    }
}

Consumer
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
                    
                 var queue = advancedBus.QueueDeclare("my.queue");

        advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
                  {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
                     }));
    
                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }


    }
}

Unknown Consumer
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Subscribe
{
    class Program
    {
        public static void Main(string[] args)
        {
            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
                    
                 var queue = advancedBus.QueueDeclare("my.queue");

        advancedBus.Consume(unknown_queue, (body, properties, info) => Task.Factory.StartNew(() =>
                  {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
                     }));
    
                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }


    }
}

ExchangeDeclare
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
        var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
        var queue = advancedBus.QueueDeclare("my.queue");
            
            var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
        var properties = new MessageProperties();
            var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

Bind
using System;
using System.Text;
using EasyNetQ;
using EasyNetQMessages;
using EasyNetQ.Topology;

namespace Publish
{
    class Program
    {
        public static void Main(string[] args)
        {
           

            using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
            {   
        var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
        var queue = advancedBus.QueueDeclare("my.queue");
            
            var binding = advancedBus.Bind(exchange, queue, "A.*");

                Console.WriteLine("Publishing messages with publish and subscribe.");
                Console.WriteLine();
        var properties = new MessageProperties();
            var body = Encoding.UTF8.GetBytes("Hello World!");
                advancedBus.Publish(exchange, "A.AA", false, properties, body);

            }
        }



    }
}

MassTransit.RabbitMQ

Publisher

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

        });

        await busControl.StartAsync();
        try
        {
            do
            {   var endpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/your_queue_name"));

                endpoint.Send(new MyMessage { Text = "Hello MyMessage, World from Send!" });
                busControl.Publish(new MyMessage { Text = "Hello MyMessage, World from Publish!" }, x=>x.SetRoutingKey("high"));

                Console.WriteLine("Press [q] to exit");
            } while (Console.ReadKey().KeyChar != 'q');
        }
        finally
        {
            await busControl.StopAsync();
        }


    }


}
Publish

Send

Consumer

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

           cfg.Publish<MyMessage>(x => x.ExchangeType = "direct");

            cfg.ReceiveEndpoint("end_point", e =>
            {
                e.Consumer<MyMessageConsumer>();

                e.Bind<MyMessage>(s => 
                {
                    s.RoutingKey = "high";
                    s.ExchangeType = "direct";
                });

            });

    });

}
}
ReceiveEndpoint

Bind

AddConsumer
using MassTransit;

namespace Microservice.TodoApp.Consumers
{
    public class TodoConsumer : IConsumer<Todo>
    {
        public async Task Consume(ConsumeContext<Todo> context)
        {
            var data = context.Message;

        }
    }
}
namespace Microservice.TodoApp.Consumers
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<TodoConsumer>();
                x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    cfg.Host(new Uri(RabbitMqConsts.RabbitMqRootUri), h =>
                    {
                        h.Username(RabbitMqConsts.UserName);
                        h.Password(RabbitMqConsts.Password);
                    });
                    
                }));
            });
            services.AddMassTransitHostedService();
        }
    }
}

Unknown Publihser

using System;
using System.Threading.Tasks;
using MassTransit;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitRabbitMQDemo;

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });


public void SendMessage(IBus busControl)

{

try
        {
            do
            {   var endpoint = await busControl.GetSendEndpoint(new Uri(unknown));
                endpoint.Send(new MyMessage { Text = "Hello MyMessage, World from Send!" });
                busControl.Publish(Unknown, x=>x.SetRoutingKey("high"));
                Console.WriteLine("Press [q] to exit");
            } while (Console.ReadKey().KeyChar != 'q');
        }
        finally
        {
            await busControl.StopAsync();
        }
}

}

Unknown Consumer

using System;
using System.Threading.Tasks;
using MassTransit;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace MassTransitRabbitMQDemo;

class Program
{
    static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), h => 
            {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.Publish<MyMessage>(x => x.ExchangeType = "direct");


             cfg.ReceiveEndpoint(unknown_variable, e =>
            {
                e.Consumer<MyMessageResponder>();

            });

        });

        await busControl.StartAsync();
}

CAP.RabbitMQ

Publish

namespace Sample.RabbitMQ.MongoDB.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {

        private readonly IMongoClient _client;
        private readonly ICapPublisher _capBus;
private readonly string _topic;

        public ValuesController(IMongoClient client, ICapPublisher capBus, IConfiguration configuration)
        {
            _client = client;
            _capBus = capBus;
Configuration = configuration;
_topic = "sample.rabbitmq.mongodb";

        }
    public IConfiguration Configuration { get; }

        [Route("~/without/transaction")]
        public IActionResult WithoutTransaction()

        {
        
            _capBus.PublishAsync(_topic, DateTime.Now);

            return Ok();
        }


        [Route("~/delay/{delaySeconds:int}")]
        public async Task<IActionResult> Delay(int delaySeconds)
        {
            await _capBus.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), _topic, DateTime.Now);

            return Ok();
        }

}
}

CapSubscribe

namespace Sample.RabbitMQ.MongoDB.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {

        private readonly IMongoClient _client;
        private readonly ICapPublisher _capBus;
private readonly string _topic;

        public ValuesController(IMongoClient client, ICapPublisher capBus, IConfiguration configuration)
        {
            _client = client;
            _capBus = capBus;
Configuration = configuration;
_topic = "sample.rabbitmq.mongodb";

        }
    public IConfiguration Configuration { get; }

        [CapSubscribe("sample.rabbitmq.mongodb")]
        public void ReceiveMessage(DateTime time)
        {
            Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
        }

}
}

NServiceBus.RabbitMQ

Publish

using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

Send

using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

SendLocal

using System;
using System.Threading.Tasks;
using NServiceBus;

class Program
{
    static async Task Main()
    {
        Console.Title = "Samples.RabbitMQ.SimpleSender";

        #region ConfigureRabbit
        var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleSender");
        var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
        transport.UseConventionalRoutingTopology(QueueType.Quorum);
        transport.ConnectionString("host=localhost");
        #endregion

        transport.Routing().RouteToEndpoint(typeof(MyCommand), "Samples.RabbitMQ.SimpleReceiver");
        endpointConfiguration.EnableInstallers();

        var endpointInstance = await Endpoint.Start(endpointConfiguration)
            .ConfigureAwait(false);
        await SendMessages(endpointInstance);
        await endpointInstance.Stop()
            .ConfigureAwait(false);
    }

    static async Task SendMessages(IMessageSession messageSession)
    {
        Console.WriteLine("Press [c] to send a command, or [e] to publish an event. Press [Esc] to exit.");
        while (true)
        {
            var input = Console.ReadKey();
            Console.WriteLine();

            switch (input.Key)
            {
                case ConsoleKey.C:
                    await messageSession.Send(new MyCommand());
                    break;
                case ConsoleKey.L:
                    await messageSession.SendLocal(new MyCommand());
                    break;
                case ConsoleKey.E:
                    await messageSession.Publish(new MyEvent());
                    break;
                case ConsoleKey.Escape:
                    return;
            }
        }
    }
}

Consumer

IHandleMessages
namespace Receiver
{
    using System.Threading.Tasks;
    using NServiceBus;
    using NServiceBus.Logging;

    public class MyEventHandler : IHandleMessages<MyEvent>
    {
        static ILog log = LogManager.GetLogger<MyEventHandler>();

        public Task Handle(MyEvent eventMessage, IMessageHandlerContext context)
        {
            log.Info($"Hello from {nameof(MyEventHandler)}");
            return Task.CompletedTask;
        }
    }
}

Endpoint Configuration - Exchange
using System;

namespace Receiver
{
    using System.Threading.Tasks;
    using NServiceBus;

    class Program
    {
        static async Task Main()
        {
            Console.Title = "Samples.RabbitMQ.SimpleReceiver";
            var endpointConfiguration = new EndpointConfiguration("Samples.RabbitMQ.SimpleReceiver");
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.UseConventionalRoutingTopology(QueueType.Quorum);
            transport.ConnectionString("host=localhost");
            endpointConfiguration.EnableInstallers();

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
            await endpointInstance.Stop()
                .ConfigureAwait(false);
        }
    }
}