I have started to create a server Process that sends a status update message that is consumed by remote clients via an activeMQ VirtualTopic.

* note in the code examples, many comments and log statements have been removed for clarity here, but may bee seen in the log stack trace sections.

mdpWhat I want to be able to do, is verify that I can get desired messages to multiple clients, and run various assertions. I thought to so this with a wiretap via AOP into a client Mock. This is what I was going toward

Server

The server was fairly simple, where I wanted to have a service that sends a Map message to a VirtualTopic:

@EndpointInject(uri="seda:pushStatusUpdate")
 ProducerTemplate producer;
public boolean sendStatusUpdate(String body) {
 log.info("sendStatusUpdate: " + body);

 Map<String, Object> gameState = new HashMap<String, Object>();
 gameState.put("tableId", "123");
 gameState.put("player1", "player-status");

 producer.sendBodyAndHeader("seda:pushStatusUpdate", gameState, "tableId", "123");
 return true;
 }
I then setup the following server side route to direct messages to my VirtualTopic
@Override
 public void configure() throws Exception {
 // add tracer as an interceptor so it will log the exchange executions at runtime
 // this can aid us to understand/see how the exchanges is routed etc.
 getContext().addInterceptStrategy(new Tracer());

 from("seda:pushStatusUpdate")
 .process(new VirtualTopicAuditor())
 .to("activemq:topic:VirtualTopic.Table.1");
 }
Now I run the server via the Maven Camel Run Plugin in my pom.xml as follows:
<plugin>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-maven-plugin</artifactId>
 <version>${camel.version}</version>
 <configuration>
 <fileApplicationContextUri>src/test/resources/camel-server-test.xml</fileApplicationContextUri>
 </configuration>
 </plugin>
* NOTE: The default way to use the camel plugin is to us:
<strong></strong>META-INF/spring/*.xml;YOUR_FILE_NAME_IN_THE_CLASS_PATH.xml
However, I wanted to use a separate server for testing, and wanted to add it to src/test/resources, thus I have to use the <fileApplicationContextUri> tag instead.

Then by opening a separate command prompt, then running mvn clean compile camel:run -e as follows:

[cmd image here]

JUnit Test

What I wanted to do was to create a unit test that would prove I could multiple consumers subscribing to my VirtualTopic, and validate that message that each client gets.

@ContextConfiguration(locations = { "classpath:applicationContext-test.xml" })
public class TableServiceTest extends AbstractJUnit4SpringContextTests {

@Autowired
protected SpringCamelContext camel;

@EndpointInject(uri = "mock://resultClient1")
protected MockEndpoint resultClient1;
@EndpointInject(uri = "mock://resultClient2")
... *up to 11 clients here *

@Before

public void initTests(){

    resultClient1.reset();
    resultClient2.reset();
    ... *up to 11 clients here *

}

@Test
public void testSendBetMessage() throws Exception {
    resultClient1.expectedMessageCount(1);
    resultClient2.expectedMessageCount(1);
    ... *up to 11 clients here *

    // Send the test message to make Server Service create our Status Message
    producerTemplate.sendBody("jms:queue:bets",
    ExchangePattern.InOnly, 22);

    // now lets assert that the mock endpoint received messages
    resultClient1.assertIsSatisfied();
    resultClient2.assertIsSatisfied();
    ... *up to 11 clients here *
}
So what I am trying to do, is determine if each client gets the appropriate number of messages.
&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
"&gt;

&lt;bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"&gt;
    &lt;property name="locations"&gt;
    &lt;list&gt;
        &lt;value&gt;classpath:test.properties&lt;/value&gt;
    &lt;/list&gt;
    &lt;/property&gt;
&lt;/bean&gt;

&lt;bean id="clientRoutes" class="com.wiredducks.test.routes.ClientRoutes" /&gt;

&lt;!--
Camel JMSProducer to be able to send messages to a remote Active MQ
server
--&gt;
&lt;bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"&gt;
    &lt;property name="brokerURL" value="tcp://localhost:61616" /&gt;
&lt;/bean&gt;

&lt;bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&gt;
    &lt;property name="brokerURL" value="tcp://localhost:61616" /&gt;
&lt;/bean&gt;

&lt;bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager"&gt;
    &lt;property name="connectionFactory" ref="connectionFactory" /&gt;
&lt;/bean&gt;

&lt;import resource="classpath:camel-client.xml" /&gt;

&lt;!--
we need to create test clients to validate multiple recipients will
get the status update message.
--&gt;
&lt;import resource="classpath:clients/test-client1.xml" /&gt;
&lt;import resource="classpath:clients/test-client2.xml" /&gt;
...

&lt;import resource="classpath:clients/test-changeDestination.xml" /&gt;

&lt;/beans&gt;
This is the camel-client.xml for this test
&lt;context:component-scan
base-package="com.wiredducks.routes.test" /&gt;

&lt;camel:camelContext id="camel"&gt;
&lt;camel:package&gt;com.wiredducks.routes.test&lt;/camel:package&gt;

&lt;camel:consumerTemplate id="consumer" /&gt;
&lt;camel:routeBuilder ref="clientRoutes" /&gt;
&lt;/camel:camelContext&gt;

&lt;bean id="clientRoutes"/&gt;

&lt;camel:template id="producer" /&gt;

&lt;bean id="jms"&gt;
    &lt;property name="brokerURL" value="tcp://localhost:61616" /&gt;
&lt;/bean&gt;

&lt;bean id="connectionFactory"&gt;
    &lt;property name="brokerURL" value="tcp://localhost:61616" /&gt;
&lt;/bean&gt;

&lt;bean id="transactionManager"&gt;
    &lt;property name="connectionFactory" ref="connectionFactory" /&gt;
&lt;/bean&gt;

&lt;import resource="clients/test-client1.xml" /&gt;
&lt;import resource="clients/test-client2.xml" /&gt;
... *up to 11 clients here *
Then I setup a client side route for my tests.
@Override
public void configure() throws Exception {
    from("seda://resultClient1")
         .process(new ClientRouteAuditor())
             .to("mock://resultClient1");

    from("seda:resultClient2").to("mock:resultClient2");
        ... *up to 11 clients here *
}
...
class ClientRouteAuditor implements Processor {

 public void process(Exchange exchange) throws Exception {
     DefaultMessage message = (DefaultMessage) exchange.getIn();

     log.info("//--- start ClientRouteAuditor ---//");

     log.info("tableId " + message.getHeader("tableId"));
     log.info(message.getBody());
     log.info("//--- end ClientRouteAuditor ---//");
  }
}

Camel Client (s)

I wanted to simulate multiple clients

&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
        "&gt;

  &lt;jms:listener-container client-id="jmsContainer1" transaction-manager="transactionManager"&gt;
    &lt;jms:listener id="jmsListener1"
                  destination="Consumer.1.VirtualTopic.Table.1"
                ref="testClient1"
                method="onMessage" /&gt;
  &lt;/jms:listener-container&gt;

  &lt;bean id="testClient1"
          class="com.wiredducks.client.StatusUpdateService"
          scope="prototype"&gt;
    &lt;property name="consumerId" value="1" /&gt;
    &lt;property name="tableId" value="1" /&gt;
  &lt;/bean&gt;

  &lt;!-- Aspect that tracks all the invocations of the business service --&gt;
  &lt;bean id="messageDrivenMockWiretapClient1" class="com.baselogic.test.MessageDrivenMockWiretap"&gt;
    &lt;property name="destinationEndpoint" ref="resultClient1" /&gt;
  &lt;/bean&gt;

  &lt;aop:config&gt;
    &lt;aop:pointcut id="onMessageCall"
      expression="execution(* com.wiredducks.client.StatusUpdateService.onMessage(..)) &amp;amp;&amp;amp; args(message)" /&gt;
    &lt;aop:aspect id="aspectMessageDrivenMockWiretap" ref="messageDrivenMockWiretapClient1"&gt;
      &lt;aop:before method="tap" pointcut-ref="onMessageCall" /&gt;
    &lt;/aop:aspect&gt;
  &lt;/aop:config&gt;
&lt;/beans&gt;

Message Driven Pojo (MDP)

I wanted to create an MDP on each client so that each remote client could subscribe to my VirtualTopic.

public class StatusUpdateService implements MessageListener {
...
    public void onMessage(Message message) {
        log.info("//----- StatusUpdateService.onMessage -----------------------------//");
  log.info("//----- " + consumerId + " -----------------------------//");

        if(message instanceof ActiveMQMapMessage){
            log.info("//----- ActiveMQMapMessage -------------------//");
            ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
            log.info(mapMessage.toString());
            // Set Map via Spring.
        }
        else if(message instanceof ActiveMQTextMessage){
            log.info("//----- ActiveMQTextMessage -------------------//");
            ActiveMQTextMessage mapMessage = (ActiveMQTextMessage) message;
            log.info(mapMessage.toString());
        }
    }
here…

Message Driven Wiretap Audit (AOP)

Now that I have multiple MPD’s consuming from a Virtual topic, I wanted to be able to perform more unit test functions with the Camel Mock framework. In the above camel-client.xml, there was an AOP declaration for a before point cut:

&lt;!-- Aspect that tracks all the invocations of the business service --&gt;
&lt;bean id="messageDrivenMockWiretapClient1"&gt;
&lt;property name="destinationEndpoint"&gt;
&lt;camel:endpoint uri="seda:resultClient1"/&gt;
&lt;/property&gt;
&lt;/bean&gt;

&lt;aop:config&gt;
    &lt;aop:pointcut   id="onMessageCall"
        expression="<strong>execution(* com.wiredducks.service.impl.StatusUpdateService.onMessage(..))
                      &amp;amp;&amp;amp; args(message)</strong>"/&gt;
    &lt;aop:aspect     id="aspectMessageDrivenMockWiretap"
                    ref="messageDrivenMockWiretapClient1"&gt;
        &lt;aop:before method="tap"
                    pointcut-ref="onMessageCall"/&gt;
    &lt;/aop:aspect&gt;
&lt;/aop:config&gt;
This is the Audit bean, the is only suppose to forward additional message to the set destination. In my case, to a mock endpoint.
@Aspect
public class MessageDrivenMockWiretap {
...
private Endpoint destinationEndpoint;

@Required
public void setDestinationEndpoint(Endpoint destinationEndpoint) {
    this.destinationEndpoint = destinationEndpoint;
}

public void tap(Object message) {

    if(message instanceof ActiveMQMapMessage){
        log.info("//----- ActiveMQMapMessage -------------------//");
        ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
        log.info(mapMessage.toString());
    }
    else if(message instanceof ActiveMQTextMessage){
        log.info("//----- ActiveMQTextMessage -------------------//");
        ActiveMQTextMessage mapMessage = (ActiveMQTextMessage) message;
        log.info(mapMessage.toString());
    } else{
        log.info("//----- Other Message -------------------//");
    }

    String msg = "MessageDrivenMockWiretap:  " + message;

    // now send the message to the backup store using the Camel Message Endpoint pattern
    Exchange exchange = destinationEndpoint.createExchange();
    exchange.getIn().setBody(msg);

    try{
        destinationEndpoint.createProducer().process(exchange);
    }catch(Exception e){
        e.printStackTrace();
    }
}// tap
[more…]

Running the Unit Tests

Now I open  another command prompt to run the Unit Tests using mvn clean verify -e to run the unit tests. Now at this time, there should be a client MDP connecting to a VirtualTopic, and with AOP, a wiretap sends the message to a SEDA queue, then to a Mock for Testing.

On the command window with the server side Camel running. I see the Message getting created, and sent to the VirtualTopic:

INFO: //--- SendStatusUpdate.process() ---//
Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process
INFO: ID:mick-knutsons-macbook.local-63225-1252262382326-0:73:1:1:2 &gt;&gt;&gt; com.wiredducks.processors.SendStatusUpdate@4064397c --&gt; ref:tableService method: sendStatusUpdate, Pattern:InOnly, Headers:{JMSRedelivered=false, JMSXGroupID=null, JMSPriority=4, JMSExpiration=0, JMSDestination=queue://sendStatusUpdate, JMSCorrelationID=null, JMSDeliveryMode=2, JMSReplyTo=null, JMSMessageID=ID:mick-knutsons-macbook.local-63225-1252262382326-0:73:1:1:2, JMSType=null, JMSTimestamp=1252262395046}, BodyType:Integer, Body:1
Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate
INFO: //----------------------------------------------------------------//
Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate
INFO: sendStatusUpdate Message: 1
Sep 6, 2009 11:39:55 AM com.wiredducks.service.impl.TableServiceImpl sendStatusUpdate
INFO: //----- Messge Sent ----------------------------------------------//
Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process
INFO: ID-mick-knutsons-macbook-local-60727-1252260656775-0-39 &gt;&gt;&gt; from(seda://pushStatusUpdate) --&gt; log://com.wiredducks.processors.VirtualTopicAuditor?level=DEBUG, Pattern:InOnly, Headers:{tableId=1}, BodyType:java.util.HashMap, Body:{tableId=1, player1=raise}
Sep 6, 2009 11:39:55 AM org.apache.camel.processor.Logger process
INFO: ID-mick-knutsons-macbook-local-60727-1252260656775-0-39 &gt;&gt;&gt; log://com.wiredducks.processors.VirtualTopicAuditor?level=DEBUG --&gt; com.wiredducks.processors.VirtualTopicAuditor@603bb54b, Pattern:InOnly, Headers:{tableId=1}, BodyType:java.util.HashMap, Body:{tableId=1, player1=raise}
Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process
INFO: //--- VirtualTopicAuditor ---//
Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process
INFO: tableId 1
Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process
INFO: {tableId=1, player1=raise}
Sep 6, 2009 11:39:55 AM com.wiredducks.processors.VirtualTopicAuditor process
INFO: //--- VirtualTopicAuditor ---//
I noticed that the MDP gets called with the correct Map Message, as well as the MessageDrivenWiretap which leads me to conclude that the wiretap works from the MDP to the Processor I created.
252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710189, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5c62f8df, marshalledProperties = org.apache.activemq.util.ByteSequence@65c127db, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
MessageDriveMockWiretap Message ActiveMQMapMessage {commandId = 7, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710189, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5c62f8df, marshalledProperties = org.apache.activemq.util.ByteSequence@65c127db, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
[                jmsListener1-1] StatusUpdateService            INFO  //----- StatusUpdateService.onMessage -----------------------------//
[                jmsListener1-1] StatusUpdateService            INFO  //----- 1 -----------------------------//
[                jmsListener1-1] StatusUpdateService            INFO  //----- ActiveMQMapMessage -------------------//
[                jmsListener1-1] StatusUpdateService            INFO  ActiveMQMapMessage {commandId = 6, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1, destination = topic://VirtualTopic.Table.1, transactionId = null, expiration = 0, timestamp = 1252262710157, arrival = 0, brokerInTime = 1252262710157, brokerOutTime = 1252262710167, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@69017455, marshalledProperties = org.apache.activemq.util.ByteSequence@5e7cae4e, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }

[cp://localhost/127.0.0.1:61616] WireFormatNegotiator           DEBUG tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}
[               jmsContainer4-1] MessageDrivenMockWiretap       INFO  //&gt;&gt;&gt;&gt;&gt; tap(Object message) &gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;//
[               jmsContainer4-1] MessageDrivenMockWiretap       INFO  //----- ActiveMQMapMessage -------------------//
MessageDriveMockWiretap Message ActiveMQMapMessage {commandId = 7, responseRequired = true, messageId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:mick-knutsons-macbook.local-60717-1252260655336-2:12:1:1,
destination = <strong>topic://VirtualTopic.Table.1</strong>, transactionId = null, expiration = 0, timestamp = 1252262710167, arrival = 0, brokerInTime = 1252262710169, brokerOutTime = 1252262710178, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@15de4ebf, marshalledProperties = org.apache.activemq.util.ByteSequence@3603e8d0, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {tableId=1}, readOnlyProperties = true,
readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
Then the message seems like it would get to the mock://resultClient1 at this point. but in my unit test, this is not the case.

Current Issue with the above approach

The current issue is random at least to my eyes. It appears, that sometimes, I do not get any message sent to the mock endpoint.

testSendBetMessage(com.wiredducks.service.test.TableServiceTest)  Time elapsed: 20.016 sec  &lt;&lt;&lt; FAILURE!
java.lang.AssertionError: mock://resultClient1 Received message count. Expected: <strong>&lt;1&gt;</strong> but was: <strong>&lt;0&gt;</strong>
Thus, we have already gotten the message, we just need to forward the message to the Mock.

This does work sometimes, but there is no pattern to make it work

Unresolved issues

There are a few issues I still have left to solve in this solution.

  1. Changing MDP destination dynamically, and testing I can receive a message on the new Destination.
[more…]

Conclusion

Conclusion to ensue…

[more…]
<span class="code-tag">&lt;applicationContextUri&gt;</span>META-INF/spring/*.xml;YOUR_FILE_NAME_IN_THE_CLASS_PATH.xml<span class="code-tag">&lt;/applicationContextUri&gt;</span>

Mick Knutson

Java, JavaEE, J2EE, WebLogic, WebSphere, JBoss, Tomcat, Oracle, Spring, Maven, Architecture, Design, Mentoring, Instructor and Agile Consulting. http://www.baselogic.com/blog/resume

View all posts

Java / JavaEE / SpringFramework Channel

BLiNC Supporters

BLiNC Adsense