Page tree
Skip to end of metadata
Go to start of metadata

Summary: This document provides basic information about the extension providing Message Queues support for Java.

Extension ID

com.castsoftware.mqe

What's new?

Please see Message Queues 1.2 - Release Notes for more information.

Description

This extension should be installed when analyzing projects containing Message Queue applications, and you want to view a transaction consisting of queue calls and queue receive objects with their corresponding links. This version supports Message Queues for:

JavaPlain Java(tick)
Spring(tick)

JMS

(tick)
AWS-SQS(tick)
MainframeSupported via the Mainframe analyzer. See Support for IBM MQSeries(tick)

Supported Message Queue versions

The following table displays the supported versions matrix:

Message QueueVersionSupport
ActiveMQ5.15.3
  • OpenWire + JMS
  • Spring + JMS with XML based configuration
  • JMS with SpringBoot
IBM MQ

6.0.0, 8.0.0,

9.0.0

  • Spring + JMS with XML and Annotation based configuration
  • SpringBoot (when queue is autowired in different file)
  • Plain Java
RabbitMQ3.6.9
  • AMQP + SLF4J
  • Spring AMQP + Spring Rabbit with XML based configuration
  • Spring AMQP with SpringBoot
JMS1.0, 1.1
  • JMS Queue
  • JMS Topic
AWS-SQS1.x
  • Simple Queue Service
KAFKA2.6.0
  • Apache Kafka Patterns : send/subscribe
  • Spring Kafka

CAST AIP Compatibility

This extension is compatible with:

CAST AIP releaseSupported
8.3.x(tick)
8.2.x(tick)
8.1.x       (tick)
8.0.x(tick)
7.3.4 and all higher 7.3.x releases(tick)

Supported DBMS servers

This extension is compatible with the following DBMS servers:

DBMSSupported
CSS(tick)
Oracle(tick)
Microsoft SQL Server(error)

Prerequisites

(tick)An installation of any compatible release of CAST AIP (see table above)

Dependencies with other extensions

Note that when using the CAST Extension Downloader to download the extension and the Manage Extensions interface in CAST Server Manager to install the extension, any dependent extensions are automatically downloaded and installed for you. You do not need to do anything.

Download and installation instructions

Please see:

The latest release status of this extension can be seen when downloading it from the CAST Extend server.

Packaging, delivering and analyzing your source code

Once the extension is installed, no further configuration changes are required before you can package your source code and run an analysis. The process of packaging, delivering and analyzing your source code is as follows:

 Packaging and delivery

Note that the Message Queues extension does not contain any CAST Delivery Manager Tool discoverers or extractors, therefore, no "Message Queue" projects will be detected by the DMT. You therefore need to manually create an Analysis Unit in the CAST Management Studio - this is explained below.

Using the CAST Delivery Manager Tool:

  • Create a new Version
  • Create a new Package for your Message Queue source code using the Files on your file system option:

  • Define a name for the package and the root folder of your Application source code:

  • Run the Package action: the CAST Delivery Manager Tool will not find any "projects" related to the Message Queue application source code - this is the expected behavior. However, if your Java related source code is part of a larger application, then other projects may be found during the package action.


  • Deliver the Version

Analyzing

Using the CAST Management Studio:

  • Accept and deploy the Version in the CAST Management Studio. No Analysis Units will be created automatically relating to the Java source code - this is the expected behavior. However, if your Message Queue related source code is part of a larger application, then other Analysis Units may be created automatically:

  • In the Current Version tab, add a new Analysis Unit specifically for your Java source code containing Message Queues, selecting the Add new J2EE Analysis Unit option:

 



  • Run a test analysis on the Analysis Unit before you generate a new snapshot.

Objects 

The following specific objects are displayed in CAST Enlighten:

IconsDescription

  • IBM MQ Java Queue Publisher
  • IBM MQ Java Topic Publisher
  • RabbitMQ Exchange
  • JMS Java Queue Publisher
  • JMS Java Topic Publisher
  • Java AWS Simple Queue Service Publisher

  • IBM MQ Java Queue Receiver
  • IBM MQ Java Topic Receiver
  • RabbitMQ Queue
  • JMS Java Queue Receiver
  • JMS Java Topic Receiver
  • Java AWS Simple Queue Service Receiver

  • IBM MQ Java Unknown Queue Publisher
  • IBM MQ Java Unknown Topic Publisher
  • JMS Java Unknown Queue Publisher
  • JMS Java Unknown Topic Publisher
  • Java AWS Simple Queue Service Unknown Publisher

  • IBM MQ Java Unknown Queue Receiver
  • IBM MQ Java Unknown Topic Receiver
  • JMS Java Unknown Queue Receiver
  • JMS Java Unknown Topic Receiver
  • Java AWS Simple Queue Service Unknown Receiver

What results can you expect?

Once the analysis/snapshot generation has completed, you can view the results in the normal manner:

    

Links

For IBM MQ, Call link is created between:

  • Producer method object and Queue Call object, at the analyser level
  • Consumer method object and Queue Receive object, at the analyser level
  • Queue Call object and Queue Receive object, at the Application level by Web Services Linker

For RabbitMQ, Call link is created between:

  • Producer method object and RabbitMQ Exchange object, at the analyser level
  • Consumer method object and RabbitMQ Queue object, at the analyser level/application level
  • RabbitMQ Exchange object and RabbitMQ Queue object, at the application level by Web Services Linker

For JMS, Call link is created between:

  • Producer method object and JMS Java Queue Call object, at analyzer level
  • Consumer method object and JMS Java Queue Receive object, at analyzer level
  • Queue Call object and Queue Receive object, at the Application level by Web Services Linker
  • Producer method object and JMS Java TopiccCall object, at analyzer level
  • Consumer method object and JMS Java Topic Receive object, at analyzer level
  • Topic Call object and Topic Receive object, at the Application level by Web Services Linker

For AWS-SQS, Call link is created between:

  • Producer method object and JMS Java Queue Call object, at analyzer level
  • Consumer method object and JMS Java Queue Receive object, at analyzer level
  • Queue Call object and Queue Receive object, at the Application level by Web Services Linker

JMS with ActiveMQ

Example of JMS with ActiveMQ (Spring-XML)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- JmsTemplate Definition -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="destinationQueue" />
        <property name="messageConverter" ref="myMessageConverter" />
    </bean>

    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="tcp://localhost:61616" />
    </bean>

    <!-- ConnectionFactory Definition -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
    </bean>

    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="IN_QUEUE" />
    </bean>

    <bean id="SampleJmsMessageSender" class="com.baeldung.spring.jms.SampleJmsMessageSender">
        <property name="queue" ref="destinationQueue" />
        <property name="jmsTemplate" ref="jmsTemplate" />
    </bean>

    <bean id="myMessageConverter" class="com.baeldung.spring.jms.SampleMessageConverter" />

    <!-- this is the Message-Driven POJO (MDP) -->
    <bean id="messageListener" class="com.baeldung.spring.jms.SampleListener">
        <property name="jmsTemplate" ref="jmsTemplate" />
        <property name="queue" ref="destinationQueue" />
    </bean>

    <bean id="errorHandler" class="com.baeldung.spring.jms.SampleJmsErrorHandler" />

    <!-- and this is the message listener container -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="IN_QUEUE" />
        <property name="messageListener" ref="messageListener" />
        <property name="errorHandler" ref="errorHandler" />
    </bean>
</beans>

Example of JMS with ActiveMQ Publisher convertAndSend API - Queue is stored in XML file

private JmsTemplate jmsTemplate;
private Queue queue;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public void setQueue(Queue queue) {
    this.queue = queue;
}
public void sendMessage(final Employee employee) {
     this.jmsTemplate.convertAndSend(employee);
}

Example of JMS with ActiveMQ Publisher send API - Queue is stored in XML file

private JmsTemplate jmsTemplate;
private Queue queue;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public void setQueue(Queue queue) {
    this.queue = queue;
}
public void simpleSend() {
    jmsTemplate.send(queue, s -> s.createTextMessage("hello queue world"));
}

Example of JMS with ActiveMQ Receiver (Springboot)

public class OrderConsumer {
	public static final String ORDER_QUEUE = "Queue_Anno";
    private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    Order received;
    private CountDownLatch countDownLatch;


    @JmsListener(destination = ORDER_QUEUE)
    public void receiveMessage(@Payload Order order,
                               @Headers MessageHeaders headers,
                               Message message, Session session) {
	}
}

Example of JMS with ActiveMQ - JNDI is used to store Queue

  public QBorrower() throws NamingException, JMSException {
  	Context ctx=new InitialContext();
  	QueueConnectionFactory connectionFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
  	queueConnection=connectionFactory.createQueueConnection();
  	requestQueue=(Queue)ctx.lookup("jms.LoanRequestQueue");
  	responseQueue=(Queue)ctx.lookup("jms.LoanResponseQueue");
  	queueConnection.start();
  	queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
 	}
 
 private void sendLoanRequest(double salary,double loanAmount) throws JMSException {
  	MapMessage message=queueSession.createMapMessage();
  	message.setDoubleProperty("salary", salary);
  	message.setDoubleProperty("loanAmount", loanAmount);
  	message.setJMSReplyTo(responseQueue);
  	QueueSender sender=queueSession.createSender(requestQueue);
      	QueueReceiver queueReceiver=queueSession.createReceiver(responseQueue);
  	sender.send(message);
	}

IBM MQ

Example of IBM MQ Producer and Consumer (Plain Java)

com.ibm.mq.MQDestination.put and com.ibm.mq.MQDestination.get APIs are associated with com.ibm.mq.MQQueueManager.accessQueue and com.ibm.mq.MQQueueManager.accessTopic APIs. Here is an example with accessQueue API which indicates the name of the queue where the message is sent.

public static void main(String args[]) {
		    int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
			MQQueue q =	qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",openOptions,null,null,null);
			MQMessage mBuf = new MQMessage();
			MQPutMessageOptions pmo = new MQPutMessageOptions();
			do {
				runShow = br.readLine();
				if (runShow.length() > 0) {
					mBuf.clearMessage();                // reset the buffer
					mBuf.correlationId = 1; // set correlationId
					mBuf.messageId = 1;     // set messageId
					mBuf.writeString(runShow);          // set actual message
					System.out.println("--> writing message to queue");
					q.put(mBuf,pmo);      // put the message out on the queue
					}
				} while (runShow.length() > 0);
			q.close();
			qMgr.disconnect();
			}
		} catch (MQException ex) {
			System.out.println(
		"WMQ exception occurred : Completion code ");
		}
	}
 private void read() throws MQException
 {
  
   MQQueue queue = _queueManager.accessQueue( inputQName,
                                   openOptions,
                                   null,           // default q manager
                                   null,           // no dynamic q name
                                   null );         // no alternate user id

   MQGetMessageOptions getOptions = new MQGetMessageOptions();
   getOptions.options = MQC.MQGMO_NO_WAIT + MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
   while(true)
   {
     MQMessage message = new MQMessage();
     try
     {
      queue.get(message, getOptions);
      byte[] b = new byte[message.getMessageLength()];
      message.readFully(b);
      System.out.println(new String(b));
      message.clearMessage();
     }
   }
   queue.close();
   _queueManager.disconnect();
 }

Example of IBM MQ Publisher (JMS Interface)

public int sendMessage(int type, String msg) { 
		System.out.println("sendMessage type "+type);
		System.out.println("msg = "+msg);
			
		if(type == TYPE_CAP)
		{
			port=1414;
			queueManager="QM1";
			queueName="IBM_QUEUE_1";
		}
		else if(type == TYPE_MEASURE)
		{
			port=1415;
			queueManager="QM2";
			queueName="IBM_QUEUE_2";
		}
		else if(type == TYPE_WOOUT)
		{
			port=1415;
			queueManager="QM3";
			queueName="IBM_QUEUE_3";
		}
		else
			return -1;
		
			System.out.println(port+","+queueManager+","+queueName);

		  int status = 200;
		  MQQueueConnectionFactory cf = null;
		  MQQueueConnection connection = null;
		  MQQueueSession session = null;
		  MQQueue queue = null;
		  MQQueueSender sender = null;

		  try {
		   cf = new MQQueueConnectionFactory();
		   cf.setHostName(host);// host
		   cf.setPort(port);// port
		   cf.setTransportType(1);// JMSC.MQJMS_TP_CLIENT_MQ_TCPIP
		   cf.setQueueManager(queueManager);// queue
		   cf.setChannel(channel);// channel

		   connection = (MQQueueConnection) cf.createQueueConnection();
		   session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		   queue = (MQQueue) session.createQueue(queueName);// queue
		                          // name
		   sender = (MQQueueSender) session.createSender(queue);
		   JMSTextMessage message = (JMSTextMessage) session.createTextMessage(msg);
		   
		   // Start the connection
		   connection.start();
		
		   // DO NOT MAKE LOOP!!!
		   sender.send(message);  
		   
		  } catch (JMSException e){
			  e.printStackTrace();
		  } finally {
			   try {
			    sender.close();
			   } catch (Exception e) {
			   }
			   try {
			    session.close();
			   } catch (Exception e) {
			   }
			   if(connection != null){
				   try {
				    connection.close();
				   } catch (JMSException e) {
					   e.printStackTrace();
				   }
			   }
		  }
		  
		  return status; 
}

Example of IBM MQ Topic Publisher (JMS Interface)

public class SimplePubSub {   
  
	public static void main(String[] args) {     
		try {       
			MQTopicConnectionFactory cf = new MQTopicConnectionFactory(); 
			// Config       
			cf.setHostName("localhost");       
			cf.setPort(1414);       
			cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);       
			cf.setQueueManager("QM_thinkpad");       
			cf.setChannel("SYSTEM.DEF.SVRCONN");       
			
			MQTopicConnection connection = (MQTopicConnection) cf.createTopicConnection();       
			MQTopicSession session = (MQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);       
			MQTopic topic = (MQTopic) session.createTopic("topic://foo");       
			MQTopicPublisher publisher =  (MQTopicPublisher) session.createPublisher(topic);                   
			long uniqueNumber = System.currentTimeMillis() % 1000;       
			
			JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePubSub "+ uniqueNumber);            
			// Start the connection       
			connection.start();       
			publisher.publish(message);       
			System.out.println("Sent message:\\n" + message);
			
			publisher.close();           
			session.close();       
			connection.close();       
			System.out.println("\\nSUCCESS\\n");     
		}   
		catch (JMSException jmsex) {       
			System.out.println(jmsex);       
			System.out.println("\\nFAILURE\\n");     
		}     
		catch (Exception ex) {       
			System.out.println(ex);       
			System.out.println("\\nFAILURE\\n");     
		}   
	} 
}

RabbitMQ

Example of Spring AMQP RabbitMQ Producer

         @Service
         public class CustomMessageSender {
             private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
             private final RabbitTemplate rabbitTemplate;
             @Autowired
             public CustomMessageSender(final RabbitTemplate rabbitTemplate) {
                 this.rabbitTemplate = rabbitTemplate;
             }
         @Scheduled(fixedDelay = 3000L)
         public void sendMessage() {
             final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false);
             log.info("Sending message...");
             rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY, message);
         }  }

Example of Spring AMQP RabbitMQ Consumer

    @Service
    public class CustomMessageListener {
        private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);
        @RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)
        public void receiveMessage(final Message message) {
            log.info("Received message as generic: {}", message.toString());
        }
        @RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)
        public void receiveMessageSpecific(final CustomMessage customMessage) {
            log.info("Received message as specific class: {}", customMessage.toString());
        } }

Example of SpringBoot RabbitMQ Exchange-Queue Binding configuration

public class MessagingApplication implements RabbitListenerConfigurer{
     public static final String EXCHANGE_NAME = "appExchange";
     public static final String QUEUE_GENERIC_NAME = "appGenericQueue";
     public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue";
     public static final String ROUTING_KEY = "messages.key";
     public static void main(String[] args) {
          SpringApplication.run(MessagingApplication.class, args);
     }
     @Bean
      public TopicExchange appExchange() {
          return new TopicExchange(EXCHANGE_NAME);
     }
     @Bean
     public Queue appQueueGeneric() {
          return new Queue(QUEUE_GENERIC_NAME);
     }
     @Bean
     public Queue appQueueSpecific() {
          return new Queue(QUEUE_SPECIFIC_NAME);
     }
     @Bean
     public Binding declareBindingGeneric() {
          return BindingBuilder.bind (appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
     }
     @Bean
     public Binding declareBindingSpecific() {
          return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
     }

Example of Spring AMQP RabbitMQ XML based configuration

<rabbit:template id="amqpTemplate" exchange="myExchange" routing-key="foo.bar" /> <rabbit:queue name="myQueue" />
 <rabbit:topic-exchange name="myExchange">
   <rabbit:bindings>
           <rabbit:binding queue="myQueue" pattern="foo.*" />
   </rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connectionFactory">
   <rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="consumer" class="com.baeldung.springamqp.consumer.Consumer" />


One to Many: RabbitMQ Topic Exchange bound to two Queues 


RabbitMQ : Sender using Default Exchange to send message to Queue

JMS

Example of JMS Queue with send and receive patterns using JNDI binding for Queue names defined in beans

public String transmit(String xmlRequest) throws Throwable {
   String xmlResponse = null; // Transmit the message and get a response.
   String requestQueue = "java:comp/env/ServiceRequestQueue";
   String responseQueue = "java:comp/env/ServiceResponseQueue";
   JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue);
   //19.1 Queue changes end
   xmlResponse = messageDest.sendAndReceive(xmlRequest);
}

The sendAndReceive() method:

public String sendAndReceive(String message) throws ServiceException {

		String responseXml = null;

		QueueConnection connection = null;
		QueueSession session = null;
		Throwable thrown = null;

		try {
			// Create a connection and start it.

			connection = qcf.createQueueConnection();
			connection.start();

			// Create a session.
			session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

			String correlationID = send(message, session);

			responseXml = receive(correlationID, session, message);
		} catch (ServiceException serviceException ) {
			throw serviceException ;
		} finally {
			// Release resources.
			close(session);
			close(connection);
		}
		// Return the response.
		return responseXml;
	}

The send() method:

public String send(String message, QueueSession session) throws Throwable {
		QueueSender sender = null;

		try {
			// Create the sender queue.
			sender = session.createSender(requestQueue);
			sender.setTimeToLive(expiry);

			TextMessage outMessage = (TextMessage) session.createTextMessage(message);

			outMessage.setJMSReplyTo(responseQueue);
			outMessage.setJMSCorrelationID(correlationID);

			// Override dead message queue with desired response queue
			outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true);

			sender.send(outMessage);
            ...
        }
}

The receive() method:

public String receive(String correlationID, QueueSession session, String message) throws Throwable {
		...
		QueueReceiver receiver = null;
		try {
			receiver = session.createReceiver(responseQueue, ...);

			TextMessage inMessage = (TextMessage) receiver.receive(timeout);

		}
		...
	}

The XML file where binding is defined:

<session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication">
    <resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" />

JMS with send and receive patterns using JNDI binding for Queue names defined in beans.

JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.

Example of JMS Topic with publish pattern

public class JMSDestination {
	...
	requestTopic = 'pub/jms/topic';
	public String send(String msg, TopicSession session) throws Throwable
	{
		TopicPublisher publisher = null;
		try
		{
			...
			publisher = session.createPublisher(requestTopic);
			publisher.setTimeToLive(expiry);
			TextMessage outMsg = session.createTextMessage(msg);
			publisher.publish(outMsg);

		}
		...
	}
}
	private void main() {
		String xmlRq = "messageToSend";
		JMSDestination msgDest = new JMSDestination();
		String xmlRs = msgDest.send(xmlRq);
						
	}

JMS with publish pattern for Topic


Example of JMS asynchronous messaging

The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.

   private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException,
      IOException, SQLException
   {
      try
      {
         Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties");
         String jTopicName = "topicListener";
         final String JMS_FACTORY = "javax.jms.TopicConnectionFactory";

         InitialContext ctx = getInitialContext(url);
         TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
         jtcon = tconFactory.createTopicConnection();
         jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic jtopic = (Topic) ctx.lookup(jTopicName);
         jtopicPublisher = jtsession.createPublisher(jtopic);

         TopicSubscriber jtopicSubscriber = jtsession.createSubscriber(jtopic, selectorString, false);
         MsgListener jtopicListener = new MsgListener(service);
         jtopicSubscriber.setMessageListener(jtopicListener);
         jtcon.setExceptionListener(new ExceptionListener()
         {
            public void onException(JMSException arg0)
            {
               logger.error("onException invoked for: " + arg0.getMessage());
               restartConnection();
            }
         });
         return jtcon;
      }
   }

JMS with setMessageListener pattern for Topic (asynchronous messaging)


Example of JMS request-reply pattern

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

@Value("${jms.queue.name}")
private String queueName;

private void sendMessages() {
        ...
            try {
                jmsTemplate.convertAndSend(queueName);
            } catch (Exception e) {
                LOG.debug("Error ", e);
            }
        }
    }
@JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory")
public void onMessage(final Message message) {
     ...
}

JMS with request-reply pattern


Example of JMS with JmsTemplate send API

Application. Properties
mq.hostName=MQ_SERVER_IP
mq.port=PORT
mq.queueManager=QUEUE.MANAGER.NAME
mq.CCSID=437
mq.username=mqm

mq.password= 
mq.pubSubDomain=false
mq.receiveTimeout=20000

mq.myDestination=QUEUE_NAME
public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    //Referring to the value in the property file
    @Value("${mq.myDestination}")
    private String myDestination;

    public void simpleSend(final String message) {
        this.jmsTemplate.send(myDestination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

JMS with Message-Driven Bean Class

Session beans allow you to send JMS messages and to receive them. The message-driven bean class must implement the javax.jms.MessageListener interface and the onMessage method.

Example of Message-Driven Beans to receive messages synchronously:

Application. Properties
@MessageDriven(
		activationConfig = {
				@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
				@ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory")
		},
		mappedName = "jms/destinationQueue")
@TransactionManagement(TransactionManagementType.CONTAINER)
@TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED)
public class GenHealthMDB implements MessageListener {
	private static final String INSTANCE_COUNT = "instanceCount";
	private static final String MAKE_ACTIVE = "ACTIVE";
	private static final String MAKE_INACTIVE = "INACTIVE";
	
	private static Logger logger = Logger.getLogger(GenHealthMDB.class);

	@Override
	public void onMessage(Message message) {
		...
	}
}

Example of Message-Driven Beans to receive messages asynchronously:

Application. Properties
mq.myDestination=QUEUE_NAME
Spring.XML
<bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" />

    <!-- and this is the message listener container -->
    <jms:listener-container connection-factory="jmsQueueConnectionFactory">
        <jms:listener destination="${mq.myDestination}" ref="jmsQueueListener" />
    </jms:listener-container>
public class JmsQueueListener implements MessageListener {

    public void onMessage(Message message) {
    	...
    }

}

AWS-SQS

Supported APIs:

ProducerConsumer

com.amazonaws.services.sqs.AmazonSQS.sendMessage

com.amazonaws.services.sqs.AmazonSQSClient.sendMessage

com.amazonaws.services.sqs.AmazonSQS.sendMessageBatch

com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageBatchAsync

com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageAsync

com.amazonaws.services.sqs.AmazonSQS.receiveMessage

com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage

com.amazonaws.services.sqs.AmazonSQSAsync.receiveMessageAsync


AWS-SQS with sendMessage and receiveMessage APIs

public class AWSResources {

    public static final String SQS_QUEUE_NAME = "reinvent-memes";
}
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME
public class MemeUtils {

    public ImageMacro submitJob(String topCaption, String bottomCaption, String imageKey, String createdBy) {
        String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
        SQS.sendMessage(new SendMessageRequest(queueUrl, macro.getId()));  
    }
}
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME

public void run() {
        System.out.println("MemeWorker listening for work");
        String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl();
		
		while (true) {
            try {
                ReceiveMessageResult result = SQS.receiveMessage(
                        new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1));
                for (Message msg : result.getMessages()) {
                    executorService.submit(new MessageProcessor(queueUrl, msg));
                }
                sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new RuntimeException("Worker interrupted");
            } catch (Exception e) {
                // ignore and retry
            }
        }
}

AWS-SQS with sendMessage and receiveMessage APIs

AWS-SQS with sendMessage API; cross technologies linking with JmsListener API

	private static final String DEFAULT_QUEUE_NAME = "test-sdk";

	@RequestMapping(
            path = "/sqs/message",
            method = RequestMethod.POST,
    consumes = MediaType.APPLICATION_JSON_VALUE)
    public HttpEntity addMessage(@RequestBody  @Valid SimpleMessage simpleMessage){

        AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
        GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAME);
        String queueUrl = getQueueUrlResult.getQueueUrl();

        SendMessageRequest sendMessageRequest = new SendMessageRequest();
        sendMessageRequest.setQueueUrl(queueUrl);
        sendMessageRequest.setMessageBody(simpleMessage.getMessage());

        SendMessageResult messageResult = sqs.sendMessage(sendMessageRequest);

        return new ResponseEntity(messageResult, HttpStatus.CREATED);
    }

AWS-SQS with sendMessage API

Apache Kafka Patterns

KafkaProducer send() API example

public class Example {
	public static void main(String[] args) {
		private Properties kafkaProps = new Properties();
		kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
		kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for key
		kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value
		 
		KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
		ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France");
		try {
			producer.send(record);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

KafkaConsumer subscribe() API example

public class Example {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "broker1:9092,broker2:9092");
		props.put("group.id", "CountryCounter");
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
		consumer.subscribe(Collections.singletonList("customerCountries"));
	}
}

Spring Kafka

@KafkaListener annotation on method example

@KafkaListener(topics = "${topic.name}", id="id")
public void listen(@Payload String message,
                       @Header(KafkaHeaders.OFFSET) int offset,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
    ...
}

Topic value is present in properties file:

topic.name=KAFKA_TOPIC

@KafkaListener annotation on method example (with list od topics)

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

	@KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"})
	public void listen(String message) {
		...
	}
}

Topic values are present in properties file:

kafka.event.contracting.topic={kafka.event.contracting.topic}
kafka.legacyConsumerTopic={kafka.legacyConsumerTopic}

@KafkaListener annotation on class with @KafkaHandler annotation example

@Service
@KafkaListener(topics = "${topic.name}")
public class KafkaConsumerService {

	@KafkaHandler
	public void listen(String message) {
	
	}
}

Topic value is present in properties file:

topic.name=KAFKA_TOPIC

@KafkaListener and @SendTo annotations on method example

public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @SendTo(BAR_TOPIC)
    @KafkaListener(topics = FOO_TOPIC)
    public Double calculate(Double data) {
        LOG.info("calculating square root from='{}'", data);
        return Math.sqrt(data);
    }

    @KafkaListener(topics = BAR_TOPIC)
    public void result(Double data) {
        LOG.info("received square root='{}'", data);
    }

}
@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, Double> kafkaTemplate;

    public void send(Double data){
        LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
        kafkaTemplate.send(FOO_TOPIC, data);
    }
}
public class Constants {

    public static final String FOO_TOPIC = "foo.t";
    public static final String BAR_TOPIC = "bar.t";

}

There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don't create any object.

@KafkaListener meta annotation example

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MyListener{

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] value() default {};

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";
}
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@MyListener(id = "my.group", topics = "${topic.name}")
	public void listen(String in) {
	
	}

}

Topic value is present in properties file:

topic.name=KAFKA_TOPIC

KafkaTemplate send API example

@Value("${kafka.event.topic}")
private String topic;
 
@Override
public void send(Map<String, String> requestMap) {
    String message = constructKafkaMessage(requestMap);
    kafkaTemplate.send(topic, message);
}

Topic value is present in properties file:

kafka.event.topic={kafka.event.topic}

ReplyingKafkaTemplate sendAndReceive API example

	@Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

ContainerProperties setMessageListener API example (with KafkaMessageListenerContainer)

	@Bean
	public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) {
		ContainerProperties props = new ContainerProperties("perf");
		Map<String, Object> configs = new HashMap<>(
				((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties());
		configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024);
		configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
		configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
		props.setPollTimeout(100);
		props.setConsumerRebalanceListener(new RebalanceListener());
		Listener messageListener = new Listener();
		props.setMessageListener(messageListener);
		KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>(
				new DefaultKafkaConsumerFactory<>(configs), props);
		messageListener.setContainer(container);
		return container;
	}

Limitations

The following cases are not handled:

  • When the queue name is given at the runtime i.e. when Queue name is not initialized anywhere in the code and is given dynamically during the session/connection
  • No labels