Summary: This document provides basic information about the extension providing Message Queues support for Java. |
com.castsoftware.mqe
Please see Message Queues 1.2 - Release Notes for more information.
This extension should be installed when analyzing projects containing Message Queue applications, and you want to view a transaction consisting of queue calls and queue receive objects with their corresponding links. This version supports Message Queues for:
Java | Plain Java | |
---|---|---|
Spring | ||
JMS | ||
AWS-SQS | ||
Mainframe | Supported via the Mainframe analyzer. See Support for IBM MQSeries |
The following table displays the supported versions matrix:
Message Queue | Version | Support |
---|---|---|
ActiveMQ | 5.15.3 |
|
IBM MQ | 6.0.0, 8.0.0, 9.0.0 |
|
RabbitMQ | 3.6.9 |
|
JMS | 1.x, 2.x |
|
AWS-SQS | 1.x, 2.x, 3.x |
|
KAFKA | 2.6.0 |
|
AIP Core release | Supported |
---|---|
8.3.x |
This extension is compatible with the following DBMS servers:
DBMS | Supported |
---|---|
CAST Storage Service / PostgreSQL |
An installation of any compatible release of AIP Core (see table above) |
Some CAST extensions require the presence of other CAST extensions in order to function correctly. The Message Queue extension requires that the following other CAST extensions are also installed:
Note that when using the CAST Extension Downloader to download the extension and the Manage Extensions interface in CAST Server Manager to install the extension, any dependent extensions are automatically downloaded and installed for you. You do not need to do anything. |
The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, should manually install the extension using the Application - Extensions interface:
The Message Queues extension does not contain any discoverers or extractors, therefore, no "Message Queue" specific projects will be detected. Your Message Queue source code should be part of a larger Java/JEE related project which you are also analyzing, and as such, JEE Analysis Units will be created - simply ensure that the path to your Message Queues source code is included in these JEE Analysis Units: browse to the Application - Config panel and expand the JEE Technology option (3):
Once the analysis/snapshot generation has completed, you can view the results in the normal manner:
The following specific objects are displayed in CAST Enlighten:
Icons | Description |
---|---|
| |
| |
| |
|
For IBM MQ, Call link is created between:
For RabbitMQ, Call link is created between:
For JMS, Call link is created between:
For Kafka, Call link is created between:
For AWS-SQS, Call link is created between:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- JmsTemplate Definition --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="destinationQueue" /> <property name="messageConverter" ref="myMessageConverter" /> </bean> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg index="0" value="tcp://localhost:61616" /> </bean> <!-- ConnectionFactory Definition --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> </bean> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="IN_QUEUE" /> </bean> <bean id="SampleJmsMessageSender" class="com.baeldung.spring.jms.SampleJmsMessageSender"> <property name="queue" ref="destinationQueue" /> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <bean id="myMessageConverter" class="com.baeldung.spring.jms.SampleMessageConverter" /> <!-- this is the Message-Driven POJO (MDP) --> <bean id="messageListener" class="com.baeldung.spring.jms.SampleListener"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="queue" ref="destinationQueue" /> </bean> <bean id="errorHandler" class="com.baeldung.spring.jms.SampleJmsErrorHandler" /> <!-- and this is the message listener container --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destinationName" value="IN_QUEUE" /> <property name="messageListener" ref="messageListener" /> <property name="errorHandler" ref="errorHandler" /> </bean> </beans> |
private JmsTemplate jmsTemplate; private Queue queue; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setQueue(Queue queue) { this.queue = queue; } public void sendMessage(final Employee employee) { this.jmsTemplate.convertAndSend(employee); } |
private JmsTemplate jmsTemplate; private Queue queue; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setQueue(Queue queue) { this.queue = queue; } public void simpleSend() { jmsTemplate.send(queue, s -> s.createTextMessage("hello queue world")); } |
public class OrderConsumer { public static final String ORDER_QUEUE = "Queue_Anno"; private static Logger log = LoggerFactory.getLogger(OrderConsumer.class); Order received; private CountDownLatch countDownLatch; @JmsListener(destination = ORDER_QUEUE) public void receiveMessage(@Payload Order order, @Headers MessageHeaders headers, Message message, Session session) { } } |
public QBorrower() throws NamingException, JMSException { Context ctx=new InitialContext(); QueueConnectionFactory connectionFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory"); queueConnection=connectionFactory.createQueueConnection(); requestQueue=(Queue)ctx.lookup("jms.LoanRequestQueue"); responseQueue=(Queue)ctx.lookup("jms.LoanResponseQueue"); queueConnection.start(); queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); } private void sendLoanRequest(double salary,double loanAmount) throws JMSException { MapMessage message=queueSession.createMapMessage(); message.setDoubleProperty("salary", salary); message.setDoubleProperty("loanAmount", loanAmount); message.setJMSReplyTo(responseQueue); QueueSender sender=queueSession.createSender(requestQueue); QueueReceiver queueReceiver=queueSession.createReceiver(responseQueue); sender.send(message); } |
In order to recognize that ActiveMQ is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to ActiveMQ value.
com.ibm.mq.MQDestination.put and com.ibm.mq.MQDestination.get APIs are associated with com.ibm.mq.MQQueueManager.accessQueue and com.ibm.mq.MQQueueManager.accessTopic APIs. Here is an example with accessQueue API which indicates the name of the queue where the message is sent.
public static void main(String args[]) { int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED; MQQueue q = qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",openOptions,null,null,null); MQMessage mBuf = new MQMessage(); MQPutMessageOptions pmo = new MQPutMessageOptions(); do { runShow = br.readLine(); if (runShow.length() > 0) { mBuf.clearMessage(); // reset the buffer mBuf.correlationId = 1; // set correlationId mBuf.messageId = 1; // set messageId mBuf.writeString(runShow); // set actual message System.out.println("--> writing message to queue"); q.put(mBuf,pmo); // put the message out on the queue } } while (runShow.length() > 0); q.close(); qMgr.disconnect(); } } catch (MQException ex) { System.out.println( "WMQ exception occurred : Completion code "); } } |
private void read() throws MQException { MQQueue queue = _queueManager.accessQueue( inputQName, openOptions, null, // default q manager null, // no dynamic q name null ); // no alternate user id MQGetMessageOptions getOptions = new MQGetMessageOptions(); getOptions.options = MQC.MQGMO_NO_WAIT + MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT; while(true) { MQMessage message = new MQMessage(); try { queue.get(message, getOptions); byte[] b = new byte[message.getMessageLength()]; message.readFully(b); System.out.println(new String(b)); message.clearMessage(); } } queue.close(); _queueManager.disconnect(); } |
public int sendMessage(int type, String msg) { System.out.println("sendMessage type "+type); System.out.println("msg = "+msg); if(type == TYPE_CAP) { port=1414; queueManager="QM1"; queueName="IBM_QUEUE_1"; } else if(type == TYPE_MEASURE) { port=1415; queueManager="QM2"; queueName="IBM_QUEUE_2"; } else if(type == TYPE_WOOUT) { port=1415; queueManager="QM3"; queueName="IBM_QUEUE_3"; } else return -1; System.out.println(port+","+queueManager+","+queueName); int status = 200; MQQueueConnectionFactory cf = null; MQQueueConnection connection = null; MQQueueSession session = null; MQQueue queue = null; MQQueueSender sender = null; try { cf = new MQQueueConnectionFactory(); cf.setHostName(host);// host cf.setPort(port);// port cf.setTransportType(1);// JMSC.MQJMS_TP_CLIENT_MQ_TCPIP cf.setQueueManager(queueManager);// queue cf.setChannel(channel);// channel connection = (MQQueueConnection) cf.createQueueConnection(); session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = (MQQueue) session.createQueue(queueName);// queue // name sender = (MQQueueSender) session.createSender(queue); JMSTextMessage message = (JMSTextMessage) session.createTextMessage(msg); // Start the connection connection.start(); // DO NOT MAKE LOOP!!! sender.send(message); } catch (JMSException e){ e.printStackTrace(); } finally { try { sender.close(); } catch (Exception e) { } try { session.close(); } catch (Exception e) { } if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return status; } |
public class SimplePubSub { public static void main(String[] args) { try { MQTopicConnectionFactory cf = new MQTopicConnectionFactory(); // Config cf.setHostName("localhost"); cf.setPort(1414); cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); cf.setQueueManager("QM_thinkpad"); cf.setChannel("SYSTEM.DEF.SVRCONN"); MQTopicConnection connection = (MQTopicConnection) cf.createTopicConnection(); MQTopicSession session = (MQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); MQTopic topic = (MQTopic) session.createTopic("topic://foo"); MQTopicPublisher publisher = (MQTopicPublisher) session.createPublisher(topic); long uniqueNumber = System.currentTimeMillis() % 1000; JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePubSub "+ uniqueNumber); // Start the connection connection.start(); publisher.publish(message); System.out.println("Sent message:\\n" + message); publisher.close(); session.close(); connection.close(); System.out.println("\\nSUCCESS\\n"); } catch (JMSException jmsex) { System.out.println(jmsex); System.out.println("\\nFAILURE\\n"); } catch (Exception ex) { System.out.println(ex); System.out.println("\\nFAILURE\\n"); } } } |
Supported APIs:
Publisher | Receiver |
---|---|
org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive org.springframework.amqp.core.AmqpTemplate.convertAndSend org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive org.springframework.amqp.rabbit.core.RabbitTemplate.send com.rabbitmq.client.Channel.basicPublish | org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndReply org.springframework.amqp.rabbit.core.RabbitTemplate.receive com.rabbitmq.client.Channel.basicConsume |
@Service public class CustomMessageSender { private static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class); private final RabbitTemplate rabbitTemplate; @Autowired public CustomMessageSender(final RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Scheduled(fixedDelay = 3000L) public void sendMessage() { final CustomMessage message = new CustomMessage("Hello there!", new Random().nextInt(50), false); log.info("Sending message..."); rabbitTemplate.convertAndSend(MessagingApplication.EXCHANGE_NAME, MessagingApplication.ROUTING_KEY, message); } } |
@Service public class CustomMessageListener { private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class); @RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME) public void receiveMessage(final Message message) { log.info("Received message as generic: {}", message.toString()); } @RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME) public void receiveMessageSpecific(final CustomMessage customMessage) { log.info("Received message as specific class: {}", customMessage.toString()); } } |
public class MessagingApplication implements RabbitListenerConfigurer{ public static final String EXCHANGE_NAME = "appExchange"; public static final String QUEUE_GENERIC_NAME = "appGenericQueue"; public static final String QUEUE_SPECIFIC_NAME = "appSpecificQueue"; public static final String ROUTING_KEY = "messages.key"; public static void main(String[] args) { SpringApplication.run(MessagingApplication.class, args); } @Bean public TopicExchange appExchange() { return new TopicExchange(EXCHANGE_NAME); } @Bean public Queue appQueueGeneric() { return new Queue(QUEUE_GENERIC_NAME); } @Bean public Queue appQueueSpecific() { return new Queue(QUEUE_SPECIFIC_NAME); } @Bean public Binding declareBindingGeneric() { return BindingBuilder.bind (appQueueGeneric()).to(appExchange()).with(ROUTING_KEY); } @Bean public Binding declareBindingSpecific() { return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY); } |
One to Many: RabbitMQ Topic Exchange bound to two Queues
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "tp_key"; String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } ... } |
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "topic_queue"; if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } ... } |
RabbitMQ Exchange object properties:
RabbitMQ Queue object properties:
import javax.jms.*; public class MessageReceiver implements MessageListener { public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(String.format("Received: %s",text)); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } catch (JMSException e) { e.printStackTrace(); } } } } |
<!-- Queues --> <rabbit:queue id="springQueue" name="spring.queue" auto-delete="true" durable="false"/> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="springQueue" ref="messageListener"/> </rabbit:listener-container> <bean id="messageListener" class="com.ndpar.spring.rabbitmq.MessageHandler"/> <!-- Bindings --> <rabbit:fanout-exchange name="amq.fanout"> <rabbit:bindings> <rabbit:binding queue="springQueue"/> </rabbit:bindings> </rabbit:fanout-exchange> |
RabbitMQ Queue object properties:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import java.util.Date; @RabbitListener(queues = "foo") public class Listerner { @RabbitHandler public void process(@Payload String foo) { System.out.println(new Date() + ": " + foo); } } |
public String transmit(String xmlRequest) throws Throwable { String xmlResponse = null; // Transmit the message and get a response. String requestQueue = "java:comp/env/ServiceRequestQueue"; String responseQueue = "java:comp/env/ServiceResponseQueue"; JMSDestination messageDest = new JMSDestination(requestQueue, responseQueue); //19.1 Queue changes end xmlResponse = messageDest.sendAndReceive(xmlRequest); } |
The sendAndReceive() method:
public String sendAndReceive(String message) throws ServiceException { String responseXml = null; QueueConnection connection = null; QueueSession session = null; Throwable thrown = null; try { // Create a connection and start it. connection = qcf.createQueueConnection(); connection.start(); // Create a session. session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); String correlationID = send(message, session); responseXml = receive(correlationID, session, message); } catch (ServiceException serviceException ) { throw serviceException ; } finally { // Release resources. close(session); close(connection); } // Return the response. return responseXml; } |
The send() method:
public String send(String message, QueueSession session) throws Throwable { QueueSender sender = null; try { // Create the sender queue. sender = session.createSender(requestQueue); sender.setTimeToLive(expiry); TextMessage outMessage = (TextMessage) session.createTextMessage(message); outMessage.setJMSReplyTo(responseQueue); outMessage.setJMSCorrelationID(correlationID); // Override dead message queue with desired response queue outMessage.setBooleanProperty(Constants.PRESERVE_UNDELIVERED, true); sender.send(outMessage); ... } } |
The receive() method:
public String receive(String correlationID, QueueSession session, String message) throws Throwable { ... QueueReceiver receiver = null; try { receiver = session.createReceiver(responseQueue, ...); TextMessage inMessage = (TextMessage) receiver.receive(timeout); } ... } |
The XML file where binding is defined:
<session name="ServiceApplication" simple-binding-name="ejb/com/iwm/example/services/ServiceApplication"> <resource-ref name="ServiceRequestQueue" binding-name="jms/ServiceRequestQueue" /> |
JMS with send and receive patterns using JNDI binding for Queue names defined in beans.
JMS with send and receive patterns using JNDI binding for Queue names not defined in beans.
public class JMSDestination { ... requestTopic = 'pub/jms/topic'; public String send(String msg, TopicSession session) throws Throwable { TopicPublisher publisher = null; try { ... publisher = session.createPublisher(requestTopic); publisher.setTimeToLive(expiry); TextMessage outMsg = session.createTextMessage(msg); publisher.publish(outMsg); } ... } } |
private void main() { String xmlRq = "messageToSend"; JMSDestination msgDest = new JMSDestination(); String xmlRs = msgDest.send(xmlRq); } |
JMS with publish pattern for Topic
The receive() method from MessageConsumer class alows receiving messages synchronously. When calling this method the message is received or not. The MessageListener interface defines a listener for receiving messages asynchronously. In this case, the onMessage() method will be called when a new message is received at the destination.The listener is registered using the setMessageListener() method from MessageConsumer() class.
private TopicConnection getTopicConnection() throws JMSException, NamingException, FileNotFoundException, IOException, SQLException { try { Properties jmsProperties = SenderUtils.loadPropertiesFromFile("jms.properties"); String jTopicName = "topicListener"; final String JMS_FACTORY = "javax.jms.TopicConnectionFactory"; InitialContext ctx = getInitialContext(url); TopicConnectionFactory tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY); jtcon = tconFactory.createTopicConnection(); jtsession = jtcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic jtopic = (Topic) ctx.lookup(jTopicName); jtopicPublisher = jtsession.createPublisher(jtopic); TopicSubscriber jtopicSubscriber = jtsession.createSubscriber(jtopic, selectorString, false); MsgListener jtopicListener = new MsgListener(service); jtopicSubscriber.setMessageListener(jtopicListener); jtcon.setExceptionListener(new ExceptionListener() { public void onException(JMSException arg0) { logger.error("onException invoked for: " + arg0.getMessage()); restartConnection(); } }); return jtcon; } } |
JMS with setMessageListener pattern for Topic (asynchronous messaging)
In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.
The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).
@Value("${jms.queue.name}") private String queueName; private void sendMessages() { ... try { jmsTemplate.convertAndSend(queueName); } catch (Exception e) { LOG.debug("Error ", e); } } } |
@JmsListener(destination = "${jms.queue.name}", containerFactory = "jmsListenerContainerFactory") public void onMessage(final Message message) { ... } |
JMS with request-reply pattern
mq.hostName=MQ_SERVER_IP mq.port=PORT mq.queueManager=QUEUE.MANAGER.NAME mq.CCSID=437 mq.username=mqm mq.password= mq.pubSubDomain=false mq.receiveTimeout=20000 mq.myDestination=QUEUE_NAME |
public class JmsQueueSender { private JmsTemplate jmsTemplate; //Referring to the value in the property file @Value("${mq.myDestination}") private String myDestination; public void simpleSend(final String message) { this.jmsTemplate.send(myDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } } |
Session beans allow you to send JMS messages and to receive them. The message-driven bean class must implement the javax.jms.MessageListener interface and the onMessage method.
Example of Message Driven Beans to receive messages synchronously:
@MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "connectionFactoryJndiName", propertyValue = "jms/hConnectionFactory") }, mappedName = "jms/destinationQueue") @TransactionManagement(TransactionManagementType.CONTAINER) @TransactionAttribute (TransactionAttributeType.NOT_SUPPORTED) public class GenHealthMDB implements MessageListener { private static final String INSTANCE_COUNT = "instanceCount"; private static final String MAKE_ACTIVE = "ACTIVE"; private static final String MAKE_INACTIVE = "INACTIVE"; private static Logger logger = Logger.getLogger(GenHealthMDB.class); @Override public void onMessage(Message message) { ... } } |
Example of Message Driven Beans to receive messages asynchronously, xml defined queue:
<message-driven> <description>Message-Driven configured by using XML.</description> <display-name>MDBFilTraitementAsyn</display-name> <ejb-name>MDBFilTraitementAsyn</ejb-name> <ejb-class>fr.mi.siv.mti.cip.trait.core.fil.mdb.MDBFilTraitementAsyn</ejb-class> <message-destination-type>javax.jms.Queue</message-destination-type> <activation-config> <activation-config-property> <activation-config-property-name>destination</activation-config-property-name> <activation-config-property-value>queueTraitRequeteASyn</activation-config-property-value> </activation-config-property> <activation-config-property> <activation-config-property-name>destinationType</activation-config-property-name> <activation-config-property-value>javax.jms.Queue</activation-config-property-value> </activation-config-property> </activation-config> </message-driven> |
public class MDBFilTraitementAsyn implements MessageListener { public void onMessage(final Message message) { ... } } |
Example of Message-Driven Beans to receive messages asynchronously:
mq.myDestination=QUEUE_NAME |
<bean id="jmsQueueListener" class="hu.vanio.jms.spring3.ibmmq.JmsQueueListener" /> <!-- and this is the message listener container --> <jms:listener-container connection-factory="jmsQueueConnectionFactory"> <jms:listener destination="${mq.myDestination}" ref="jmsQueueListener" /> </jms:listener-container> |
public class JmsQueueListener implements MessageListener { public void onMessage(Message message) { ... } } |
Example of Message Driven Beans with weblogic:
weblogic-ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?> <wls:weblogic-ejb-jar xmlns:wls="http://xmlns.oracle.com/weblogic/weblogic-ejb-jar" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd http://xmlns.oracle.com/weblogic/weblogic-ejb-jar http://xmlns.oracle.com/weblogic/weblogic-ejb-jar/1.2/weblogic-ejb-jar.xsd"> <wls:weblogic-enterprise-bean> <wls:ejb-name>NotifieMDB</wls:ejb-name> <wls:message-driven-descriptor> <wls:pool> <wls:max-beans-in-free-pool>15</wls:max-beans-in-free-pool> <wls:initial-beans-in-free-pool>15</wls:initial-beans-in-free-pool> </wls:pool> <wls:destination-jndi-name>Notification_Queue</wls:destination-jndi-name> </wls:message-driven-descriptor> <wls:enable-call-by-reference>true</wls:enable-call-by-reference> <wls:dispatch-policy>IFT.notification</wls:dispatch-policy> </wls:weblogic-enterprise-bean> </wls:weblogic-ejb-jar> |
ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?> <ejb-jar xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd" version="3.0"> <enterprise-beans> <message-driven> <ejb-name>NotifieMDB</ejb-name> <ejb-class>com.notification.mdb.NotifieMDB</ejb-class> <transaction-type>Bean</transaction-type> <message-destination-type>javax.jms.Queue</message-destination-type> </message-driven> </enterprise-beans> </ejb-jar> |
import javax.jms.Message; import javax.jms.ObjectMessage; public class NotifieMDB { public void onMessage(Message msg) { ... } } |
In order to recognize that Message Driven Bean is analyzed, the created objects have the properties CAST_RabbitMQ_Queue.exchangeName for topic and CAST_MQE_QueueCall.messengingSystem for queue set to MessageDrivenBean value.
JMSContext is the main interface in the simplified JMS API which combines in a single object Connection and Session.
import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.Queue; import javax.jms.Topic; ... public class Vendor { @Resource(lookup = "java:comp/DefaultJMSConnectionFactory") private static ConnectionFactory connectionFactory; @Resource(lookup = "jms/AQueue") private static Queue vendorOrderQueue; @Resource(lookup = "jms/CQueue") private static Queue vendorConfirmQueue; @Resource(lookup = "jms/OTopic") private static Topic supplierOrderTopic; static Random rgen = new Random(); static int throwException = 1; public static void main(String[] args) { JMSConsumer vendorOrderReceiver; MapMessage orderMessage; JMSConsumer vendorConfirmReceiver; VendorMessageListener listener; Message inMessage; MapMessage vendorOrderMessage; Message endOfMessageStream; Order order; int quantity; ... try (JMSContext context = connectionFactory.createContext(JMSContext.SESSION_TRANSACTED); JMSContext asyncContext = context.createContext(JMSContext.SESSION_TRANSACTED);) { vendorOrderReceiver = context.createConsumer(vendorOrderQueue); orderMessage = context.createMapMessage(); vendorConfirmReceiver = asyncContext.createConsumer( vendorConfirmQueue); listener = new VendorMessageListener(asyncContext, 2); vendorConfirmReceiver.setMessageListener(listener); while (true) { try { inMessage = vendorOrderReceiver.receive(); if (inMessage instanceof MapMessage) { vendorOrderMessage = (MapMessage) inMessage; } else { endOfMessageStream = context.createMessage(); endOfMessageStream.setJMSReplyTo( vendorConfirmQueue); context.createProducer().send(supplierOrderTopic, endOfMessageStream); context.commit(); break; } if (rgen.nextInt(4) == throwException) { throw new JMSException( "Simulated database concurrent access " + "exception"); } order = new Order(vendorOrderMessage); orderMessage.setInt( "VendorOrderNumber", order.orderNumber); orderMessage.setJMSReplyTo(vendorConfirmQueue); quantity = vendorOrderMessage.getInt("Quantity"); System.out.println( "Vendor: Retailer ordered " + quantity + " " + vendorOrderMessage.getString("Item")); orderMessage.setString("Item", ""); orderMessage.setInt("Quantity", quantity); context.createProducer().send(supplierOrderTopic, orderMessage); System.out.println( "Vendor: Ordered " + quantity + " CPU(s) and hard drive(s)"); context.commit(); System.out.println( " Vendor: Committed transaction 1"); } catch (JMSException e) { System.err.println( "Vendor: JMSException occurred: " + e.toString()); context.rollback(); System.err.println( " Vendor: Rolled back transaction 1"); } } listener.monitor.waitTillDone(); } catch (JMSRuntimeException e) { System.err.println( "Vendor: Exception occurred: " + e.toString()); } } } |
Result of Queue creation:
Result of Topic creation:
SQSConnection class extends javax.jms.Connection. It can be used together with the JMS standard connection methods in order to create new queues.
import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; import com.amazon.sqs.javamessaging.SQSConnection; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import com.amazon.sqs.javamessaging.ProviderConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import javax.jms.*; public class App { private static String queueName = "ymq_jms_example"; public static void main( String[] args ) throws JMSException { SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion("ru-central1") .withEndpointConfiguration(new EndpointConfiguration( "https://message-queue.api.cloud.yandex.net", "ru-central1" )) ); SQSConnection connection = connectionFactory.createConnection(); AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); if( !client.queueExists(queueName) ) { client.createQueue( queueName ); } Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("test message"); producer.send(message); } } |
Supported APIs:
Producer | Consumer |
---|---|
com.amazonaws.services.sqs.AmazonSQS.sendMessage com.amazonaws.services.sqs.AmazonSQSClient.sendMessage com.amazonaws.services.sqs.AmazonSQS.sendMessageBatch com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageBatchAsync com.amazonaws.services.sqs.AmazonSQSAsync.sendMessageAsync software.amazon.awssdk.services.sqs.SqsClient.sendMessage software.amazon.awssdk.services.sqs.SqsClient.sendMessageBatch | com.amazonaws.services.sqs.AmazonSQS.receiveMessage com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage com.amazonaws.services.sqs.AmazonSQSAsync.receiveMessageAsync software.amazon.awssdk.services.sqs.SqsClient.receiveMessage |
public class AWSResources { public static final String SQS_QUEUE_NAME = "reinvent-memes"; } |
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME public class MemeUtils { public ImageMacro submitJob(String topCaption, String bottomCaption, String imageKey, String createdBy) { String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl(); SQS.sendMessage(new SendMessageRequest(queueUrl, macro.getId())); } } |
import static com.amazonaws.memes.AWSResources.SQS_QUEUE_NAME public void run() { System.out.println("MemeWorker listening for work"); String queueUrl = SQS.getQueueUrl(new GetQueueUrlRequest(SQS_QUEUE_NAME)).getQueueUrl(); while (true) { try { ReceiveMessageResult result = SQS.receiveMessage( new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1)); for (Message msg : result.getMessages()) { executorService.submit(new MessageProcessor(queueUrl, msg)); } sleep(1000); } catch (InterruptedException e) { Thread.interrupted(); throw new RuntimeException("Worker interrupted"); } catch (Exception e) { // ignore and retry } } } |
AWS-SQS with sendMessage and receiveMessage APIs
private static final String DEFAULT_QUEUE_NAME = "test-sdk"; @RequestMapping( path = "/sqs/message", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE) public HttpEntity addMessage(@RequestBody @Valid SimpleMessage simpleMessage){ AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(); GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(DEFAULT_QUEUE_NAME); String queueUrl = getQueueUrlResult.getQueueUrl(); SendMessageRequest sendMessageRequest = new SendMessageRequest(); sendMessageRequest.setQueueUrl(queueUrl); sendMessageRequest.setMessageBody(simpleMessage.getMessage()); SendMessageResult messageResult = sqs.sendMessage(sendMessageRequest); return new ResponseEntity(messageResult, HttpStatus.CREATED); } |
AWS-SQS with sendMessage API
public class Example { public static void main(String[] args) { private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for key kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // basic serializer class for value KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } } } |
public class Example { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Collections.singletonList("customerCountries")); } } |
@KafkaListener(topics = "${topic.name}", id="id") public void listen(@Payload String message, @Header(KafkaHeaders.OFFSET) int offset, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){ ... } |
Topic value is present in properties file:
topic.name=KAFKA_TOPIC |
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = {"${kafka.event.contracting.topic}", "${kafka.legacyConsumerTopic}"}) public void listen(String message) { ... } } |
Topic values are present in properties file:
kafka.event.contracting.topic={kafka.event.contracting.topic} kafka.legacyConsumerTopic={kafka.legacyConsumerTopic} |
@Service @KafkaListener(topics = "${topic.name}") public class KafkaConsumerService { @KafkaHandler public void listen(String message) { } } |
Topic value is present in properties file:
topic.name=KAFKA_TOPIC |
public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @SendTo(BAR_TOPIC) @KafkaListener(topics = FOO_TOPIC) public Double calculate(Double data) { LOG.info("calculating square root from='{}'", data); return Math.sqrt(data); } @KafkaListener(topics = BAR_TOPIC) public void result(Double data) { LOG.info("received square root='{}'", data); } } |
@Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, Double> kafkaTemplate; public void send(Double data){ LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC); kafkaTemplate.send(FOO_TOPIC, data); } } |
public class Constants { public static final String FOO_TOPIC = "foo.t"; public static final String BAR_TOPIC = "bar.t"; } |
There are cases where @SendTo annotation has no value. This means that the default value is used: KafkaHeaders.REPLY_TOPIC. In this case we don't create any object.
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @KafkaListener public @interface MyListener{ @AliasFor(annotation = KafkaListener.class, attribute = "id") String id(); @AliasFor(annotation = KafkaListener.class, attribute = "groupId") String groupId() default ""; @AliasFor(annotation = KafkaListener.class, attribute = "topics") String[] value() default {}; @AliasFor(annotation = KafkaListener.class, attribute = "concurrency") String concurrency() default "3"; } |
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @MyListener(id = "my.group", topics = "${topic.name}") public void listen(String in) { } } |
Topic value is present in properties file:
topic.name=KAFKA_TOPIC |
@Value("${kafka.event.topic}") private String topic; @Override public void send(Map<String, String> requestMap) { String message = constructKafkaMessage(requestMap); kafkaTemplate.send(topic, message); } |
Topic value is present in properties file:
kafka.event.topic={kafka.event.topic} |
@Bean public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) { return args -> { ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo"); RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record); SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS); System.out.println("Sent ok: " + sendResult.getRecordMetadata()); ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS); System.out.println("Return value: " + consumerRecord.value()); }; } |
@Bean public KafkaMessageListenerContainer<?, ?> container(ConsumerFactory<?, ?> consumerFactory) { ContainerProperties props = new ContainerProperties("perf"); Map<String, Object> configs = new HashMap<>( ((DefaultKafkaConsumerFactory<?, ?>) consumerFactory).getConfigurationProperties()); configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 2 * 1024 * 1024); configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024); configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, false); props.setPollTimeout(100); props.setConsumerRebalanceListener(new RebalanceListener()); Listener messageListener = new Listener(); props.setMessageListener(messageListener); KafkaMessageListenerContainer<Object, Object> container = new KafkaMessageListenerContainer<>( new DefaultKafkaConsumerFactory<>(configs), props); messageListener.setContainer(container); return container; } |
For JMS, ActiveMQ, KAFKA and MessageDrivenBean, a JMS object is created, decorated with a property which specifies the vendor. Below is a table listing these properties and their values:
Vendor | Queue/Topic | Property name | Property value |
---|---|---|---|
JMS | Queue Publisher | CAST_MQE_QueueCall.messengingSystem | JMS |
Queue Receiver | CAST_MQE_QueueReceive.messengingSystem | JMS | |
Topic Publisher | CAST_RabbitMQ_Exchange.exchangeName | JMS | |
Topic Receiver | CAST_RabbitMQ_Queue.exchangeName | JMS | |
ActiveMQ | Queue Publisher | CAST_MQE_QueueCall.messengingSystem | ActiveMQ |
Queue Receiver | CAST_MQE_QueueReceive.messengingSystem | ActiveMQ | |
Topic Publisher | CAST_RabbitMQ_Exchange.exchangeName | ActiveMQ | |
Topic Receiver | CAST_RabbitMQ_Queue.exchangeName | ActiveMQ | |
KAFKA | Topic Publisher | CAST_RabbitMQ_Exchange.exchangeName | KAFKA |
Topic Receiver | CAST_RabbitMQ_Queue.exchangeName | KAFKA | |
MessageDrivenBean | Queue Receiver | CAST_MQE_QueueReceive.messengingSystem | MessageDrivenBean |
Topic Receiver | CAST_RabbitMQ_Queue.exchangeName | MessageDrivenBean |
For IBMMQ, RabbitMQ and AWS-SQS, specific objects are created.
The following cases are not handled: