Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Message QueueVersionSupport
ActiveMQ5.15.3
  • OpenWire + JMS
  • Spring + JMS with XML based configuration
  • JMS with SpringBoot
IBM MQ

6.0.0, 8.0.0,

9.0.0

  • Spring + JMS with XML and Annotation based configuration
  • SpringBoot (when queue is autowired in different file)
  • Plain Java
RabbitMQ3.6.9
  • AMQP + SLF4J
  • Spring AMQP + Spring Rabbit with XML based configuration
  • Spring AMQP with SpringBoot
JMS1.0x, 12.1x
  • JMS Queue
  • JMS Topic
  • JMSContext
AWS-SQS1.x, 2.x, 3.x
  • Simple Queue Service
KAFKA2.6.0
  • Apache Kafka Patterns : send/subscribe
  • Spring Kafka

...

AIP

...

Core compatibility

This extension is compatible with:

...

This extension is compatible with the following DBMS servers:

DBMSSupported
CSS CAST Storage Service / PostgreSQL(tick)

Prerequisites

...

IconsDescription

  • IBM MQ Java Queue Publisher
  • IBM MQ Java Topic Publisher
  • RabbitMQ ExchangeJava Queue Publisher
  • JMS Java Queue Publisher
  • JMS Java Topic Publisher
  • Java AWS Simple Queue Service Publisher

  • IBM MQ Java Queue Receiver
  • IBM MQ Java Topic Receiver
  • RabbitMQ Java Queue Receiver
  • JMS Java Queue Receiver
  • JMS Java Topic Receiver
  • Java AWS Simple Queue Service Receiver

  • IBM MQ Java Unknown Queue Publisher
  • IBM MQ Java Unknown Topic Publisher
  • RabbitMQ Unknown Java Queue Publisher
  • JMS Java Unknown Queue Publisher
  • JMS Java Unknown Topic Publisher
  • Java AWS Simple Queue Service Unknown Publisher

  • IBM MQ Java Unknown Queue Receiver
  • IBM MQ Java Unknown Topic Receiver
  • RabbitMQ Unknown Java Queue Receiver
  • JMS Java Unknown Queue Receiver
  • JMS Java Unknown Topic Receiver
  • Java AWS Simple Queue Service Unknown Receiver

...

For IBM MQ, Call link is created between:

  • Producer method object and and IBM MQ Java Queue Call Publisher object, at the analyser levelConsumer method
  • IBM MQ Java Queue Receiver object and Queue Receive consumer method object , at the analyser level
  • IBM MQ Java Queue Call object Publisher object and IBM MQ Java Queue Receive Receiver object, at the Application level by Web Services Linker

...

  • Producer method object and RabbitMQ Exchange IBM MQ Java Topic Publisher object, at the analyser analyzer level
  • IBM MQ Java Topic Receiver object and Consumer method object and RabbitMQ Queue object, at the analyser level/application analyzer level
  • RabbitMQ Exchange object and RabbitMQ Queue IBM MQ Java Topic Publisher object and IBM MQ Java Topic Receiver object, at the application Application level by Web Services Linker

For JMS RabbitMQ, Call link is created between:

  • Producer method object and JMS RabbitMQ Java Queue Call Publisher object, at analyzer the analyser level
  • RabbitMQ Java Queue Receiver object and Consumer method object, at the analyser level
  • RabbitMQ Java Queue Publisher object and RabbitMQ Java Queue Receiver object, at the application level by Web Services Linker

For JMS, Call link is created between:

  • Producer method object and JMS Java Queue Receive Publisher object, at analyzer level
  • JMS Java Queue Call Receiver object and Queue Receive consumer method object, at analyzer level
  • JMS Java Queue Publisher object and JMS Java Queue Receiver object, at the Application level by Web Services Linker
  • Producer method object and JMS and JMS Java TopiccCall Topic Publisher object, at analyzer levelConsumer method object and
  • JMS Java Topic Receive Receiver object and Consumer method object, at analyzer level
  • Topic Call object and Topic Receive JMS Java Topic Publisher object and JMS Java Topic Receiver object, at the Application level by Web Services Linker

For AWS-SQS Kafka, Call link is created between:

  • Producer method object and JMS Java Queue Call and JMS Java Topic Publisher object, at analyzer level
  • JMS Java Topic Receiver object and Consumer method object and JMS Java Queue Receive object, at analyzer level
  • Queue Call object and Queue Receive JMS Java Topic Publisher object and JMS Java Topic Receiver object, at the Application level by Web Services Linker

For AWS-SQS, Call link is created between:

  • Producer method object and Java AWS Simple Queue Service Publisher object, at analyzer level
  • Java AWS Simple Queue Service Receiver object and Consumer method object, at analyzer level
  • Java AWS Simple Queue Service Publisher object and Java AWS Simple Queue Service Receiver object, at the Application level by Web Services Linker

...

Code Block
languagejava
  public QBorrower() throws NamingException, JMSException {
  	Context ctx=new InitialContext();
  	QueueConnectionFactory connectionFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
  	queueConnection=connectionFactory.createQueueConnection();
  	requestQueue=(Queue)ctx.lookup("jms.LoanRequestQueue");
  	responseQueue=(Queue)ctx.lookup("jms.LoanResponseQueue");
  	queueConnection.start();
  	queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
 	}
 
 private void sendLoanRequest(double salary,double loanAmount) throws JMSException {
  	MapMessage message=queueSession.createMapMessage();
  	message.setDoubleProperty("salary", salary);
  	message.setDoubleProperty("loanAmount", loanAmount);
  	message.setJMSReplyTo(responseQueue);
  	QueueSender sender=queueSession.createSender(requestQueue);
      	QueueReceiver queueReceiver=queueSession.createReceiver(responseQueue);
  	sender.send(message);
	}

In order to recognize that ActiveMQ is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to ActiveMQ value.

IBM MQ

Example of IBM MQ Producer and Consumer (Plain Java)

...

Code Block
languagejava
public class SimplePubSub {   
  
	public static void main(String[] args) {     
		try {       
			MQTopicConnectionFactory cf = new MQTopicConnectionFactory(); 
			// Config       
			cf.setHostName("localhost");       
			cf.setPort(1414);       
			cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);       
			cf.setQueueManager("QM_thinkpad");       
			cf.setChannel("SYSTEM.DEF.SVRCONN");       
			
			MQTopicConnection connection = (MQTopicConnection) cf.createTopicConnection();       
			MQTopicSession session = (MQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);       
			MQTopic topic = (MQTopic) session.createTopic("topic://foo");       
			MQTopicPublisher publisher =  (MQTopicPublisher) session.createPublisher(topic);                   
			long uniqueNumber = System.currentTimeMillis() % 1000;       
			
			JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePubSub "+ uniqueNumber);            
			// Start the connection       
			connection.start();       
			publisher.publish(message);       
			System.out.println("Sent message:\\n" + message);
			
			publisher.close();           
			session.close();       
			connection.close();       
			System.out.println("\\nSUCCESS\\n");     
		}   
		catch (JMSException jmsex) {       
			System.out.println(jmsex);       
			System.out.println("\\nFAILURE\\n");     
		}     
		catch (Exception ex) {       
			System.out.println(ex);       
			System.out.println("\\nFAILURE\\n");     
		}   
	} 
}

RabbitMQ

Example of Spring AMQP RabbitMQ Producer

...

languagejava

...

Supported APIs:

PublisherReceiver

org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend

org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive

org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive

org.springframework.amqp.core.AmqpTemplate.convertAndSend

org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive

org.springframework.amqp.rabbit.core.RabbitTemplate.send

com.rabbitmq.client.Channel.basicPublish

org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert

org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndReply

org.springframework.amqp.rabbit.core.RabbitTemplate.receive

com.rabbitmq.client.Channel.basicConsume


Example of Spring AMQP RabbitMQ Producer

Code Block
languagejava
         @Service
         public class CustomMessageSender(final RabbitTemplate rabbitTemplate {
             private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
             private final RabbitTemplate rabbitTemplate;
             @Autowired
             public CustomMessageSender(final RabbitTemplate rabbitTemplate) {
                 this.rabbitTemplate = rabbitTemplate;
             }
         @Scheduled(fixedDelay = 3000L)
         public void sendMessage() {
             final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false);
             log.info("Sending message...");
             rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY, message);
         }  }

...

Code Block
languagejava
public class MessagingApplication implements RabbitListenerConfigurer{
     public static final String EXCHANGE_NAME = "appExchange";
     public static final String QUEUE_GENERIC_NAME = "appGenericQueue";
     public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue";
     public static final String ROUTING_KEY = "messages.key";
     public static void main(String[] args) {
          SpringApplication.run(MessagingApplication.class, args);
     }
     @Bean
      public TopicExchange appExchange() {
          return new TopicExchange(EXCHANGE_NAME);
     }
     @Bean
     public Queue appQueueGeneric() {
          return new Queue(QUEUE_GENERIC_NAME);
     }
     @Bean
     public Queue appQueueSpecific() {
          return new Queue(QUEUE_SPECIFIC_NAME);
     }
     @Bean
     public Binding declareBindingGeneric() {
          return BindingBuilder.bind (appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
     }
     @Bean
     public Binding declareBindingSpecific() {
          return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
     }

...


One to Many: RabbitMQ Topic Exchange bound to two Queues 

Image Added

RabbitMQ basicPublish and exchangeDeclare example with topic-exchange

<rabbit:template id="amqpTemplate" exchange="myExchange" routing-key="foo.bar" /> <rabbit:queue name="myQueue" /> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings>
Code Block
languagexml
java
public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection  <rabbit:bindingconnection queue="myQueue" pattern="foo.*" /> null;
    </rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connectionFactory">
   <rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="consumer" class="com.baeldung.springamqp.consumer.Consumer" />

One to Many: RabbitMQ Topic Exchange bound to two Queues 

Image Removed

RabbitMQ : Sender using Default Exchange to send message to Queue

Image Removed

JMS

Example of JMS Queue with send and receive patterns using JNDI binding for Queue names defined in beans

Code Block
languagejava
public String transmit(String xmlRequest) throws Throwable {
   String xmlResponse = null; // Transmit the message and get a response.
   String requestQueue = "java:comp/env/ServiceRequestQueue";
   String responseQueue = "java:comp/env/ServiceResponseQueue";
   JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue);
   //19.1 Queue changes end
   xmlResponse = messageDest.sendAndReceive(xmlRequest);
}

The sendAndReceive() method:

Code Block
languagejava
public String sendAndReceive(String message) throws ServiceException {

		String responseXml = null;

		QueueConnection connection = null;
		QueueSession session = null;
		Throwable thrown = null;

		try {
			// Create a connection and start it.

			connection = qcf.createQueueConnection();
			connection.start();

			// Create a session.
			session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

			String correlationID = send(message, session);

			responseXml = receive(correlationID, session, message);
		} catch (ServiceException serviceException ) {
			throw serviceException ;
		} finally {
			// Release resources.
			close(session);
			close(connection);
		}
		// Return the response.
		return responseXml;
	}

...

Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");

      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      String routingKey = "tp_key";
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
...
}


Code Block
languagejava
public class StringReceiveLogsTopic send(String{
message,
QueueSession session) throwsprivate Throwablestatic {final 		QueueSender senderString EXCHANGE_NAME = null;

		try {
			// Create the sender queue.
			sender = session.createSender(requestQueue);
			sender.setTimeToLive(expiry);

			TextMessage outMessage = (TextMessage) session.createTextMessage(message);

			outMessage.setJMSReplyTo(responseQueue);
			outMessage.setJMSCorrelationID(correlationID);

			// Override dead message queue with desired response queue
			outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true);

			sender.send(outMessage);
     "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = "topic_queue";

    if (argv...length < 1) {
     }
}

The receive() method:

Code Block
languagejava
public String receive(String correlationID, QueueSession session, String message) throws Throwable {
		...
		QueueReceiver receiver = null;
		try {
			receiver = session.createReceiver(responseQueue, ...);

			TextMessage inMessage = (TextMessage) receiver.receive(timeout);

		}
		...
	}

The XML file where binding is defined:

Code Block
languagexml
<session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication">
    <resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" />

JMS with send and receive patterns using JNDI binding for Queue names defined in beans.

Image Removed

JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.

Image Removed

...

 System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }
...
}

Image Added

RabbitMQ Exchange object properties:

Image Added

RabbitMQ Queue object properties:

Image Added

RabbitMQ MessageListener with spring xml queue declaration

Code Block
languagejava
import javax.jms.*;

public class MessageReceiver JMSDestinationimplements MessageListener {
	...
	requestTopic = 'pub/jms/topic';
	public String send(String msg, TopicSession session) throws Throwable
	{
		TopicPublisher publisher = null;
		try
		{
			...
			publisher = session.createPublisher(requestTopic);
			publisher.setTimeToLive(expiry);
			TextMessage outMsg = session.createTextMessage(msg);
			publisher.publish(outMsg);

		}
		...
	}
}
Code Block
languagejava
	private void main() {
		String xmlRq = "messageToSend";
		JMSDestination msgDest = new JMSDestination();
		String xmlRs = msgDest.send(xmlRq);
						
	}

JMS with publish pattern for Topic

Image Removed

Example of JMS asynchronous messaging

The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.

Code Block
languagejava
collapsetrue
   private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException,
    public void onMessage(Message message) {
        if(message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println(String.format("Received: %s",text));
               IOException, SQLException
   try {
 
    try       {          Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties"Thread.sleep(100);

        String jTopicName = "topicListener";     } catch (InterruptedException e) {
final String JMS_FACTORY = "javax.jms.TopicConnectionFactory";           InitialContext ctx = getInitialContext(url);
         TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY  e.printStackTrace();
         jtcon = tconFactory.createTopicConnection();     }
    jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);     } catch (JMSException e) {
Topic jtopic = (Topic) ctx.lookup(jTopicName);          jtopicPublisher = jtsessione.createPublisherprintStackTrace(jtopic);
           TopicSubscriber jtopicSubscriber}
= jtsession.createSubscriber(jtopic, selectorString, false);     }
    MsgListener}
jtopicListener = new MsgListener(service);
   }


Code Block
languagexml
    <!-- Queues -->

    jtopicSubscriber.setMessageListener(jtopicListener);
   <rabbit:queue id="springQueue" name="spring.queue" auto-delete="true" durable="false"/>

    jtcon.setExceptionListener(new ExceptionListener()<rabbit:listener-container connection-factory="connectionFactory">
         {
<rabbit:listener queues="springQueue" ref="messageListener"/>
    </rabbit:listener-container>

    <bean public void onException(JMSException arg0)id="messageListener" class="com.ndpar.spring.rabbitmq.MessageHandler"/>

    <!-- Bindings -->

     {<rabbit:fanout-exchange name="amq.fanout">
        <rabbit:bindings>
          logger.error("onException invoked for<rabbit:binding queue="springQueue"/>
+ arg0.getMessage());       </rabbit:bindings>
    </rabbit:fanout-exchange>

Image Added

RabbitMQ Queue object properties:

Image Added

@RabbitListener with @RabbitHandler

Code Block
languagejava
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
 restartConnection()import org.springframework.messaging.handler.annotation.Payload;

import java.util.Date;

@RabbitListener(queues = "foo")
public class Listerner {

}    @RabbitHandler
     });
public void process(@Payload String foo) {
        return jtcon;
      }System.out.println(new Date() + ": " + foo);
    }
}

Image Added

JMS

Example of JMS

...

Image Removed

Example of JMS request-reply pattern

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

...

languagejava
collapsetrue

...

Queue with send and receive patterns using JNDI binding for Queue names defined in beans

Code Block
languagejava
public String transmit(String xmlRequest) throws Throwable {
   String xmlResponse = null; // Transmit the message and get a response.
   String requestQueue = "java:comp/env/ServiceRequestQueue";
   String responseQueue = "java:comp/env/ServiceResponseQueue";
   JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue);
   //19.1 Queue changes end
   xmlResponse = messageDest.sendAndReceive(xmlRequest);
}

The sendAndReceive() method:

Code Block
languagejava
public String sendAndReceive(String message) throws ServiceException {

		String responseXml = null;

		QueueConnection connection = null;
		QueueSession session = null;
		Throwable thrown = null;

		try {
			// Create a connection and start it.

			connection = qcf.createQueueConnection();
			connection.start();

			// Create a session.
			session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

			String correlationID = send(message, session);

			responseXml = receive(correlationID, session, message);
		} catch (ServiceException serviceException ) {
			throw serviceException ;
		} finally {
			// Release resources.
			close(session);
			close(connection);
		}
		// Return the response.
		return responseXml;
	}

The send() method:

Code Block
languagejava
public String send(String message, QueueSession session) throws Throwable {
		QueueSender sender = null;

		try {
			// Create the sender queue.
			sender = session.createSender(requestQueue);
			sender.setTimeToLive(expiry);

			TextMessage outMessage = (TextMessage) session.createTextMessage(message);

			outMessage.setJMSReplyTo(responseQueue);
			outMessage.setJMSCorrelationID(correlationID);

			// Override dead message queue with desired response queue
			outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true);

			sender.send(outMessage);
            ...
        }
}

The receive() method:

Code Block
languagejava
public String receive(String correlationID, QueueSession session, String message) throws Throwable {
		...
		QueueReceiver receiver = null;
		try {
			receiver = session.createReceiver(responseQueue, ...);

			TextMessage inMessage = (TextMessage) receiver.receive(timeout);

		}
		...
	}

The XML file where binding is defined:

Code Block
languagexml
<session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication">
    <resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" />

JMS with send and receive patterns using JNDI binding for Queue names defined in beans.

Image Added

JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.

Image Added

Example of JMS Topic with publish pattern

Code Block
languagejava
public class JMSDestination {
	...
	requestTopic = 'pub/jms/topic';
	public String send(String msg, TopicSession session) throws Throwable
	{
		TopicPublisher publisher = null;
		try
		{
			...
			publisher = session.createPublisher(requestTopic);
			publisher.setTimeToLive(expiry);
			TextMessage outMsg = session.createTextMessage(msg);
			publisher.publish(outMsg);

		}
		...
	}
}


Code Block
languagejava
	private void main() {
		String xmlRq = "messageToSend";
		JMSDestination msgDest = new JMSDestination();
		String xmlRs = msgDest.send(xmlRq);
						
	}

JMS with publish pattern for Topic

Image Added

Example of JMS asynchronous messaging

The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.

Code Block
languagejava
collapsetrue
   private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException,
      IOException, SQLException
   {
      try
      {
         Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties");
         String jTopicName = "topicListener";
         final String JMS_FACTORY = "javax.jms.TopicConnectionFactory";

         InitialContext ctx = getInitialContext(url);
         TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
         jtcon = tconFactory.createTopicConnection();
         jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic jtopic = (Topic) ctx.lookup(jTopicName);
         jtopicPublisher = jtsession.createPublisher(jtopic);

         TopicSubscriber jtopicSubscriber = jtsession.createSubscriber(jtopic, selectorString, false);
         MsgListener jtopicListener = new MsgListener(service);
         jtopicSubscriber.setMessageListener(jtopicListener);
         jtcon.setExceptionListener(new ExceptionListener()
         {
            public void onException(JMSException arg0)
            {
               logger.error("onException invoked for: " + arg0.getMessage());
               restartConnection();
            }
         });
         return jtcon;
      }
   }

JMS with setMessageListener pattern for Topic (asynchronous messaging)

Image Added

Example of JMS request-reply pattern

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

Code Block
languagejava
collapsetrue
@Value("${jms.queue.name}")
private String queueName;

private void sendMessages() {
        ...
            try {
                jmsTemplate.convertAndSend(queueName);
            } catch (Exception e) {
                LOG.debug("Error ", e);
            }
        }
    }


Code Block
languagejava
collapsetrue
@JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(final Message message) {
     ...
}

JMS with request-reply pattern

Image Added

Example of JMS with JmsTemplate send API

Code Block
languagetext
titleApplication. Properties
mq.hostName=MQ_SERVER_IP
mq.port=PORT
mq.queueManager=QUEUE.MANAGER.NAME
mq.CCSID=437
mq.username=mqm

mq.password= 
mq.pubSubDomain=false
mq.receiveTimeout=20000

mq.myDestination=QUEUE_NAME


Code Block
languagejava
public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    //Referring to the value in the property file
    @Value("${mq.myDestination}")
    private String myDestination;

    public void simpleSend(final String message) {
        this.jmsTemplate.send(myDestination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

Image Added

JMS with Message Driven Bean Class

Session beans allow you to send JMS messages and to receive them. The message-driven bean class must implement the javax.jms.MessageListener interface and the onMessage method.

Example of Message Driven Beans to receive messages synchronously:

Code Block
languagetext
titleApplication. Properties
@MessageDriven(
		activationConfig = {
				@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
				@ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory")
		},
		mappedName = "jms/destinationQueue")
@TransactionManagement(TransactionManagementType.CONTAINER)
@TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED)
public class GenHealthMDB implements MessageListener {
	private static final String INSTANCE_COUNT = "instanceCount";
	private static final String MAKE_ACTIVE = "ACTIVE";
	private static final String MAKE_INACTIVE = "INACTIVE";
	
	private static Logger logger = Logger.getLogger(GenHealthMDB.class);

	@Override
	public void onMessage(Message message) {
		...
	}
}

Image Added


Example of Message Driven Beans to receive messages asynchronously, xml defined queue:

Code Block
languagexml
titleSpring.XML
		<message-driven>
			<description>Message-Driven configured by using XML.</description>
			<display-name>MDBFilTraitementAsyn</display-name>
			<ejb-name>MDBFilTraitementAsyn</ejb-name>
			<ejb-class>fr.mi.siv.mti.cip.trait.core.fil.mdb.MDBFilTraitementAsyn</ejb-class>
			<message-destination-type>javax.jms.Queue</message-destination-type>
			<activation-config>
				<activation-config-property>
					<activation-config-property-name>destination</activation-config-property-name>
					<activation-config-property-value>queueTraitRequeteASyn</activation-config-property-value>
				</activation-config-property>
				<activation-config-property>
					<activation-config-property-name>destinationType</activation-config-property-name>
					<activation-config-property-value>javax.jms.Queue</activation-config-property-value>
				</activation-config-property>
			</activation-config>
		</message-driven>


Code Block
languagetext
titleApplication. Properties
public class MDBFilTraitementAsyn implements MessageListener {
	public void onMessage(final Message message)
   	{
		...
	}
}

Image Added

Example of Message-Driven Beans to receive messages asynchronously:

Code Block
languagetext
titleApplication. Properties
mq.myDestination=QUEUE_NAME


Code Block
languagexml
titleSpring.XML
<bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" />

    <!-- and this is the message listener container -->
    <jms:listener-container connection-factory="jmsQueueConnectionFactory">
        <jms:listener destination="${mq.myDestination}" ref="jmsQueueListener" />
    </jms:listener-container>


Code Block
languagejava
public class JmsQueueListener implements MessageListener {

    public void onMessage(Message message) {
    	...
    }

}

Image Added

Example of Message Driven Beans with weblogic:

weblogic-ejb-jar.xml

Code Block
languagexml
titleSpring.XML
<?xml version="1.0" encoding="UTF-8"?>
<wls:weblogic-ejb-jar xmlns:wls="http://xmlns.oracle.com/weblogic/weblogic-ejb-jar" 
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd http://xmlns.oracle.com/weblogic/weblogic-ejb-jar http://xmlns.oracle.com/weblogic/weblogic-ejb-jar/1.2/weblogic-ejb-jar.xsd">
	<wls:weblogic-enterprise-bean>
		<wls:ejb-name>NotifieMDB</wls:ejb-name>
		<wls:message-driven-descriptor>
			<wls:pool>
				<wls:max-beans-in-free-pool>15</wls:max-beans-in-free-pool>
				<wls:initial-beans-in-free-pool>15</wls:initial-beans-in-free-pool>
			</wls:pool>
			<wls:destination-jndi-name>Notification_Queue</wls:destination-jndi-name>
		</wls:message-driven-descriptor>
		<wls:enable-call-by-reference>true</wls:enable-call-by-reference>
		<wls:dispatch-policy>IFT.notification</wls:dispatch-policy>
	</wls:weblogic-enterprise-bean>
</wls:weblogic-ejb-jar>

ejb-jar.xml

Code Block
languagexml
titleSpring.XML
<?xml version="1.0" encoding="UTF-8"?>
<ejb-jar xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
         version="3.0">
	<enterprise-beans>
		<message-driven>
			<ejb-name>NotifieMDB</ejb-name>
			<ejb-class>com.notification.mdb.NotifieMDB</ejb-class>
			<transaction-type>Bean</transaction-type>
			<message-destination-type>javax.jms.Queue</message-destination-type>
		</message-driven>
	</enterprise-beans>
</ejb-jar>


Code Block
languagejava
import javax.jms.Message;
import javax.jms.ObjectMessage;
public class NotifieMDB
{
	public void onMessage(Message msg)
    {
		...
	}
}

Image Added


In order to recognize that Message Driven Bean is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to MessageDrivenBean value.

JMS with JMSContext

JMSContext is the main interface in the simplified JMS API which combines in a single object Connection and Session.

Code Block
languagejava
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Queue;
import javax.jms.Topic;
...
public class Vendor {

    @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
    private static ConnectionFactory connectionFactory;
    @Resource(lookup = "jms/AQueue")
    private static Queue vendorOrderQueue;
    @Resource(lookup = "jms/CQueue")
    private static Queue vendorConfirmQueue;
    @Resource(lookup = "jms/OTopic")
    private static Topic supplierOrderTopic;
    static Random rgen = new Random();
    static int throwException = 1;

    public static void main(String[] args) {
        JMSConsumer vendorOrderReceiver;
        MapMessage orderMessage;
        JMSConsumer vendorConfirmReceiver;
        VendorMessageListener listener;
        Message inMessage;
        MapMessage vendorOrderMessage;
        Message endOfMessageStream;
        Order order;
        int quantity;
		...
		try (JMSContext context =
                connectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
                JMSContext asyncContext =
                context.createContext(JMSContext.SESSION_TRANSACTED);) {
			vendorOrderReceiver = context.createConsumer(vendorOrderQueue);
            orderMessage = context.createMapMessage();
			vendorConfirmReceiver = asyncContext.createConsumer(
                    vendorConfirmQueue);
            listener = new VendorMessageListener(asyncContext, 2);
            vendorConfirmReceiver.setMessageListener(listener);
			while (true) {
                try {
					inMessage = vendorOrderReceiver.receive();

                    if (inMessage instanceof MapMessage) {
                        vendorOrderMessage = (MapMessage) inMessage;
                    } else {
                        endOfMessageStream = context.createMessage();
                        endOfMessageStream.setJMSReplyTo(
                                vendorConfirmQueue);
                        context.createProducer().send(supplierOrderTopic,
                                endOfMessageStream);
                        context.commit();

                        break;
                    }
                    if (rgen.nextInt(4) == throwException) {
                        throw new JMSException(
                                "Simulated database concurrent access "
                                + "exception");
                    }
                    order = new Order(vendorOrderMessage);
                    orderMessage.setInt(
                            "VendorOrderNumber",
                            order.orderNumber);
                    orderMessage.setJMSReplyTo(vendorConfirmQueue);
                    quantity = vendorOrderMessage.getInt("Quantity");
                    System.out.println(
                            "Vendor: Retailer ordered " + quantity
                            + " " + vendorOrderMessage.getString("Item"));
                    orderMessage.setString("Item", "");
                    orderMessage.setInt("Quantity", quantity);
                    context.createProducer().send(supplierOrderTopic,
                            orderMessage);
                    System.out.println(
                            "Vendor: Ordered " + quantity
                            + " CPU(s) and hard drive(s)");
                    context.commit();
                    System.out.println(
                            "  Vendor: Committed transaction 1");
                } catch (JMSException e) {
                    System.err.println(
                            "Vendor: JMSException occurred: "
                            + e.toString());
                    context.rollback();
                    System.err.println(
                            "  Vendor: Rolled back transaction 1");
                }
            }
            listener.monitor.waitTillDone();
        } catch (JMSRuntimeException e) {
            System...err.println(
                    "Vendor: Exception occurred: " + e.toString());
try {       }
    }
}

Result of Queue creation:

Image Added

Result of Topic creation:

Image Added

JMS with AWS-SQS

SQSConnection class extends javax.jms.Connection. It can be used together with the JMS standard connection methods in order to create new queues.

Code Block
languagejava
import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
import com.amazon.sqs.javamessaging.SQSConnection;
 jmsTemplate.convertAndSend(queueName);
            } catch (Exception e)import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import javax.jms.*;

public class App
{
    private static String queueName = "ymq_jms_example";
    public static void LOG.debugmain("Error ", e);
      String[] args ) throws JMSException
    {
 }       SQSConnectionFactory connectionFactory }= new SQSConnectionFactory(
  }
Code Block
languagejava
collapsetrue
@JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory") public void onMessage(final Message message) {   new ProviderConfiguration(),
 ... }

JMS with request-reply pattern

Image Removed

Example of JMS with JmsTemplate send API

Code Block
languagetext
titleApplication. Properties
mq.hostName=MQ_SERVER_IP mq.port=PORT mq.queueManager=QUEUE.MANAGER.NAME
mq.CCSID=437
mq.username=mqm

mq.password= 
mq.pubSubDomain=false
mq.receiveTimeout=20000

mq.myDestination=QUEUE_NAME
Code Block
languagejava
public class JmsQueueSender {            AmazonSQSClientBuilder.standard()
     private JmsTemplate jmsTemplate;     //Referring to the value in the property file     @Value.withRegion("${mq.myDestination}ru-central1")
 
  private String myDestination;      public void simpleSend(final String message) {         this.jmsTemplate.send(myDestination, withEndpointConfiguration(new MessageCreatorEndpointConfiguration()
{             public Message createMessage(Session session) throws JMSException {         "https://message-queue.api.cloud.yandex.net",
       return session.createTextMessage(message);                }     "ru-central1"
   });     } }

Image Removed

JMS with Message-Driven Bean Class

Session beans allow you to send JMS messages and to receive them. The message-driven bean class must implement the javax.jms.MessageListener interface and the onMessage method.

Example of Message-Driven Beans to receive messages synchronously:

Code Block
languagetext
titleApplication. Properties
@MessageDriven( 		activationConfig = { 				@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), 				@ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory")
		},
		mappedName = "jms/destinationQueue")
@TransactionManagement(TransactionManagementType.CONTAINER)
@TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED)
public class GenHealthMDB implements MessageListener {
	private static final String INSTANCE_COUNT = "instanceCount";
	private static final String MAKE_ACTIVE = "ACTIVE";
	private static final String MAKE_INACTIVE = "INACTIVE";
	
	private static Logger logger = Logger.getLogger(GenHealthMDB.class);

	@Override
	public void onMessage(Message message) {
		...
	}
}

Image Removed

Example of Message-Driven Beans to receive messages asynchronously:

Code Block
languagetext
titleApplication. Properties
mq.myDestination=QUEUE_NAME
Code Block
languagexml
titleSpring.XML
<bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" />

    <!-- and this is the message listener container -->
    <jms:listener-container connection-factory="jmsQueueConnectionFactory">))
        );

        SQSConnection connection = connectionFactory.createConnection();
        AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();
        if( !client.queueExists(queueName) ) {
            client.createQueue( queueName );
        }
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue <jms:listenerqueue destination="${mq.myDestination}" ref="jmsQueueListener" />= session.createQueue(queueName);
        MessageProducer  </jms:listener-container>
Code Block
languagejava
public class JmsQueueListener implements MessageListener {producer = session.createProducer(queue);
        Message publicmessage void= onMessage(Messagesession.createTextMessage("test message");
   {     	...producer.send(message);
    }

}

Image RemovedImage Added

Anchor
AWSSQS
AWSSQS
AWS-SQS

...

ProducerConsumer

com.amazonaws.services.sqs.AmazonSQS.sendMessage

com.amazonaws.services.sqs.AmazonSQSClient.sendMessage

com.amazonaws.services.sqs.AmazonSQS.sendMessageBatch

com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageBatchAsynccom.amazonaws

com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageAsync

software.amazon.awssdk.services.sqs.SqsClient.sendMessage

software.amazon.awssdk.services.sqs.AmazonSQSAsyncSqsClient.sendMessageAsyncsendMessageBatch

com.amazonaws.services.sqs.AmazonSQS.receiveMessage

com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage

com.amazonaws.services.sqs.AmazonSQSAsync.receiveMessageAsync

software.amazon.awssdk.services.sqs.SqsClient.receiveMessage

AWS-SQS with sendMessage and receiveMessage APIs

...

Code Block
	private static final String DEFAULT_QUEUE_NAME = "test-sdk";

	@RequestMapping(
            path = "/sqs/message",
            method = RequestMethod.POST,
    consumes = MediaType.APPLICATION_JSON_VALUE)
    public HttpEntity addMessage(@RequestBody  @Valid SimpleMessage simpleMessage){

        AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
        GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAME);
        String   methodqueueUrl = RequestMethod.POST,getQueueUrlResult.getQueueUrl();

   consumes = MediaType.APPLICATION_JSON_VALUE)   SendMessageRequest sendMessageRequest public= HttpEntitynew addMessageSendMessageRequest(@RequestBody);
  @Valid SimpleMessage simpleMessage){    sendMessageRequest.setQueueUrl(queueUrl);
     AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(sendMessageRequest.setMessageBody(simpleMessage.getMessage());

        GetQueueUrlResultSendMessageResult getQueueUrlResultmessageResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAMEsendMessage(sendMessageRequest);

        Stringreturn queueUrl = getQueueUrlResult.getQueueUrl();

        SendMessageRequest sendMessageRequestnew ResponseEntity(messageResult, HttpStatus.CREATED);
    }

AWS-SQS with sendMessage API

Image Added

Apache Kafka Patterns

KafkaProducer send() API example

Code Block
public class Example {
	public static void main(String[] args) {
		private Properties kafkaProps = new SendMessageRequestProperties();
		kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
		kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for key
sendMessageRequest		kafkaProps.setQueueUrl(queueUrlput("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value
  sendMessageRequest.setMessageBody(simpleMessage.getMessage());

        SendMessageResult messageResult = sqs.sendMessage(sendMessageRequest);

        return new ResponseEntity(messageResult, HttpStatus.CREATED);
    }

AWS-SQS with sendMessage API

Image Removed

Apache Kafka Patterns

...

		 
		KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
		ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France");
		try {
			producer.send(record);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Image Added

KafkaConsumer subscribe() API example

Code Block
public class Example {
	public static void main(String[] args) {
		private Properties kafkaPropsprops = new Properties();
		kafkaPropsprops.put("bootstrap.servers", "broker1:9092,broker2:90929092");
		props.put("group.id", "CountryCounter");
		kafkaPropsprops.put("key.serializerdeserializer","org.apache.kafka.common.serialization.StringSerializerStringDeserializer"); // basic serializer class for key
		kafkaPropsprops.put("value.serializerdeserializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value
		 
		KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
		ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France");
		try {
			producer.send(record);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Image Removed

KafkaConsumer subscribe() API example

Code Block
public class Example {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "broker1:9092,broker2:9092");
		props.put("group.id", "CountryCounter");
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
		consumer.subscribe(Collections.singletonList("customerCountries"));
	}
}

Image Removed

Spring Kafka

@KafkaListener annotation on method example

Code Block
.StringDeserializer");
		 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
		consumer.subscribe(Collections.singletonList("customerCountries"));
	}
}

Image Added

Spring Kafka

@KafkaListener annotation on method example

Code Block
@KafkaListener(topics = "${topic.name}", id="id")
public void listen(@Payload String message,
                       @Header(KafkaHeaders.OFFSET) int offset,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
    ...
}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Added

@KafkaListener annotation on method example (with list od topics)

Code Block
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

	@KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"})
	public void listen(String message) {
		...
	}
}

Topic values are present in properties file:

Code Block
kafka.event.contracting.topic={kafka.event.contracting.topic}
kafka.legacyConsumerTopic={kafka.legacyConsumerTopic}

Image Added

@KafkaListener annotation on class with @KafkaHandler annotation example

Code Block
@Service
@KafkaListener(topics = "${topic.name}", id="id")
public voidclass listen(@PayloadKafkaConsumerService String{
message,
	@KafkaHandler
	public void listen(String message) {
	
	}
}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Added

@KafkaListener and @SendTo annotations on method example

Code Block
public class Receiver {

    private static final Logger LOG = @HeaderLoggerFactory.getLogger(KafkaHeadersReceiver.OFFSET) int offset,class);

                      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){@SendTo(BAR_TOPIC)
    ...
}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Removed

@KafkaListener annotation on method example (with list od topics)

Code Block
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

	@KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"})
	public void listen(String message) {
		...
	}
}

Topic values are present in properties file:

Code Block
kafka.event.contracting.topic={kafka.event.contracting.topic}
kafka.legacyConsumerTopic={kafka.legacyConsumerTopic}

Image Removed

@KafkaListener annotation on class with @KafkaHandler annotation example

Code Block
@Service@KafkaListener(topics = FOO_TOPIC)
    public Double calculate(Double data) {
        LOG.info("calculating square root from='{}'", data);
        return Math.sqrt(data);
    }

    @KafkaListener(topics = "${topic.name}")
public class KafkaConsumerService {

	@KafkaHandler
	public void listen(String message) {
	
	}
}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Removed

@KafkaListener and @SendTo annotations on method example

Code Block
public class ReceiverBAR_TOPIC)
    public void result(Double data) {
        LOG.info("received square root='{}'", data);
    }

}


Code Block
@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

Sender.class);

    @Autowired
    private KafkaTemplate<String, Double> kafkaTemplate;

    public void send(Double data){
        LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
   @SendTo(BAR_TOPIC)     @KafkaListener(topics = kafkaTemplate.send(FOO_TOPIC, data);
    }
}


Code Block
public class Constants Double{
calculate(Double
data) {   public static final String FOO_TOPIC = LOG"foo.info(t"calculating;
square root from='{}'", data); public static final String BAR_TOPIC =   return Math.sqrt(data);
    }

    @KafkaListener(topics = BAR_TOPIC)
    public void result(Double data) "bar.t";

}

Image Added

There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don't create any object.

@KafkaListener meta annotation example

Code Block
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MyListener{

    @AliasFor(annotation =  LOG.info("received square root='{}'", data);KafkaListener.class, attribute = "id")
    }String 
}
Code Block
@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, Double> kafkaTemplate;

    public void send(Double data){
        LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
        kafkaTemplate.send(FOO_TOPIC, data);
    }
}
Code Block
public class Constants {

    public static final String FOO_TOPIC = "foo.t";
    public static final String BAR_TOPIC = "bar.t";

}

Image Removed

There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don't create any object.

@KafkaListener meta annotation example

Code Block
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MyListener{

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] value() default {};

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";
}
Code Block
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@MyListener(id = "my.group", topics = "${topic.name}")
	public void listen(String in) {
	
	}

}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Removed

KafkaTemplate send API example

Code Block
@Value("${kafka.event.topic}")
private String topic;
 
@Override
public void send(Map<String, String> requestMap) {
    String message = constructKafkaMessage(requestMap);
    kafkaTemplate.send(topic, message);
}

Topic value is present in properties file:

Code Block
kafka.event.topic={kafka.event.topic}

Image Removed

ReplyingKafkaTemplate sendAndReceive API example

Code Block
	@Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

Image Removed

ContainerProperties setMessageListener API example (with KafkaMessageListenerContainer)

Code Block
	@Bean
	public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) {
		ContainerProperties props = new ContainerProperties("perf");
		Map<String, Object> configs = new HashMap<>(
				((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties());
		configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024);
		configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
		configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
		props.setPollTimeout(100);
		props.setConsumerRebalanceListener(new RebalanceListener());
		Listener messageListener = new Listener();
		props.setMessageListener(messageListener);
		KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>(
				new DefaultKafkaConsumerFactory<>(configs), props);
		messageListener.setContainer(container);
		return container;
	}

...

id();

    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] value() default {};

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";
}


Code Block
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@MyListener(id = "my.group", topics = "${topic.name}")
	public void listen(String in) {
	
	}

}

Topic value is present in properties file:

Code Block
topic.name=KAFKA_TOPIC

Image Added

KafkaTemplate send API example

Code Block
@Value("${kafka.event.topic}")
private String topic;
 
@Override
public void send(Map<String, String> requestMap) {
    String message = constructKafkaMessage(requestMap);
    kafkaTemplate.send(topic, message);
}

Topic value is present in properties file:

Code Block
kafka.event.topic={kafka.event.topic}

Image Added

ReplyingKafkaTemplate sendAndReceive API example

Code Block
	@Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

Image Added

ContainerProperties setMessageListener API example (with KafkaMessageListenerContainer)

Code Block
	@Bean
	public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) {
		ContainerProperties props = new ContainerProperties("perf");
		Map<String, Object> configs = new HashMap<>(
				((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties());
		configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024);
		configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
		configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
		props.setPollTimeout(100);
		props.setConsumerRebalanceListener(new RebalanceListener());
		Listener messageListener = new Listener();
		props.setMessageListener(messageListener);
		KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>(
				new DefaultKafkaConsumerFactory<>(configs), props);
		messageListener.setContainer(container);
		return container;
	}

Image Added

Messaging service type

For JMS, ActiveMQ, KAFKA and MessageDrivenBean, a JMS object is created, decorated with a property which specifies the vendor. Below is a table listing these properties and their values:

VendorQueue/TopicProperty nameProperty value
JMS


Queue PublisherCAST_MQE_QueueCall.messengingSystemJMS
Queue ReceiverCAST_MQE_QueueReceive.messengingSystemJMS
Topic PublisherCAST_RabbitMQ_Exchange.exchangeNameJMS
Topic ReceiverCAST_RabbitMQ_Queue.exchangeNameJMS
ActiveMQ


Queue PublisherCAST_MQE_QueueCall.messengingSystemActiveMQ
Queue ReceiverCAST_MQE_QueueReceive.messengingSystemActiveMQ
Topic PublisherCAST_RabbitMQ_Exchange.exchangeNameActiveMQ
Topic ReceiverCAST_RabbitMQ_Queue.exchangeNameActiveMQ
KAFKATopic Publisher

CAST_RabbitMQ_Exchange.exchangeName

KAFKA
Topic ReceiverCAST_RabbitMQ_Queue.exchangeNameKAFKA
MessageDrivenBeanQueue ReceiverCAST_MQE_QueueReceive.messengingSystemMessageDrivenBean
Topic ReceiverCAST_RabbitMQ_Queue.exchangeNameMessageDrivenBean

For IBMMQ, RabbitMQ and AWS-SQS, specific objects are created.

Limitations

The following cases are not handled:

...