Archive for the ‘activemq’ Category


Bootstrap Projects for Getting Started with ServiceMix 4

Over the Christmas break I cleaned up and published a set of Maven projects for getting started with ServiceMix 4.4.1+ into GitHub. I found myself reusing the same code for a number of activities, and figured it may be of broader use to others. You can find it under FuseByExample/smx-bootstraps.

smx-bootstraps contains within it a set of OSGI Blueprints (DI, based on the ideas behind Spring) bundles that exercise some of the core things that you will want to do with ServiceMix:

  • defining services in bundles that can be reused in other bundles
  • use Camel for writing integration code
  • use ActiveMQ for sending persistent messages between bundles, regardless of whether they are in the same container or in others
  • request-response over messaging
  • Update 20/02/2012: RESTful web services!!!
  • externalising your environment configuration
  • group bundles into features

The README document at the project root explains how to get started with ServiceMix, deploy bundles, change code and play with config.

Why would you want to use them?

ServiceMix went through a massive generational change between versions 3.X and 4.X, moving from JBI to an OSGi based model. While development work on it is proceeding at a huge rate, the documentation hasn’t kept up – although it is being brought up to date in the background. smx-bootstraps contains small artifacts that are hopefully easy to understand and play with, along with instructions on how to use them in the container.

The project may also be use of use as a starting starting point to further development. A fairly clean project layout exists that you can use as a reference point, which acts as a supplement to the Maven archetypes that are publicly available, such as:

  • org.apache.camel.archetypes:camel-archetype-blueprint for generating Blueprint bundles to run Camel routes; similar to smx-pinger and smx-ponger bundles in smx-bootstraps
  • org.apache.karaf.archetypes:karaf-blueprint-archetype for generating simple Blueprint bundles; such as the smx-ponger-service bundle

I have found these bundles to be a really handy way of exercising ServiceMix features, and working with various configurations. Not having to code up something new each time is a huge time saver. Hopefully you should find this as well.

I expect to expand this little project as time goes on and I find myself recreating other use cases, such as exposing web services – next on my “todo list”. Please drop me a line at “jakub dot korab at gmail” if you find this useful or have any ideas that would fit in well. Of course being GitHub, feel free to fork it or contribute back changes.

Understanding ActiveMQ Broker Networks

Networks of message brokers in ActiveMQ work quite differently to more familiar models such as that of physical networks. They are not any harder to understand or reason about but we need to have an appreciation as to what exactly each of the pieces in the puzzle do by themselves in order to understand them in them large. I will try to explain each component piece progressively moving up in complexity from a single broker through to a full blown network. At the end you should have a feel of how these networks behave and be able to reason about the interactions across different topologies.

One of the key things in understanding broker-based messaging is that the production, or sending of a message, is disconnected from the consumption of that message. The broker acts as an intermediary, serving to make the method by which a method is consumed as well as the route that the message has travelled orthogonal to its production. You shouldn’t need to understand the entire postal system to know that you post a letter in the big red box and eventually it will arrive in the little box at the front of the recipient’s house. Same idea applies here.

Producer and consumer are unaware of each other; only the broker they are connected to

Connections are shown in the direction of where they were established from (i.e. Consumer connects to Broker).

Out of the box when a standard message is sent to a queue from a producer, it is sent to the broker, which persists it in its message store. By default this is in KahaDB, but it can configured to be stored in memory, which buys performance at the cost of reliability. Once the broker has confirmation that the message has been persisted in the journal (the terms journal and message store are often used interchangeably), it responds with an acknowledgement back to the producer. The thread sending the message from the producer is blocked at this time.

On the consumption side, when a message listener is registered or a call to receive() is made, the broker creates a subscription to that queue. Messages are fetched from the message store and passed to the consumer; it’s usually done in batches, and the fetching is a lot more complex than simply read from disk, but that’s the general idea. With the default behaviour, assuming the Session.AUTO_ACKNOWLEDGE is being used, the consumer will acknowledge that the message has been received before processing it. On receiving the acknowledgement, the broker updates the message store marking that message as consumed, or just deletes it (this depends on the persistence mechanism).

Consuming messages

If you want the consumer to acknowledge the message after it has been sucessfully consumed, you need to set up transaction management, or handle it manually using Session.CLIENT_ACKNOWLEDGE.

So what happens when there are more than one consumer on a queue? All things being equal, and ignoring consumer priorities, the broker will in this case hand out incoming messages in a round-robin manner to each subscriber.

Store-and-forward

Now to scale this up to two brokers, Broker1 and Broker2. In ActiveMQ a network of brokers is set up by connecting a networkConnector to a transportConnector (think of it as a socket listening on a port). A networkConnector is an outbound connection from one broker to another.

When a subscription is made to a queue on Broker2, that broker tells the other brokers that it knows about (in our case, just Broker1) that it is interested in that queue; another subscription is now made on Broker1 with Broker2 as the consumer. As far as an ActiveMQ broker is concerned there is no difference between a standard client consuming messages, or another broker acting on behalf of a client. They are treated in the exact same manner.

So now that Broker1 sees a subscription from Broker2, what happens? The result is a hybrid of the two producer and consumer behaviours. Broker1 is the producer, and Broker2 the consumer. Messages are fetched from Broker1′s message store, passed to Broker2. Broker2 processes the message by store-ing it in its journal, and acknowledges consumption of that message. Broker1 then marks the message as consumed.

The simple consume case then applies as Broker2 forwards the message to its consumers, as tough the message was produced directly into it. Neither the producer nor consumer are aware that any network of brokers exists, it is orthogonal to their functionality – a key driver of this style of messaging.

Local and remote consumers

It has already been noted that as far as a broker is concerned, all subscriptions are equal. To it there is no difference between a local “real” consumer, and another broker that is going to forward those messages on. Hence incoming messages will be handed out round-robin as usual. If we have 2 consumers – Consumer1 on Broker1, and Consumer2 on Broker2 – if messages are produced to Broker1, both consumers will each receive the same number of messages.

A networkConnector is unidirectional by default, which means that the broker initiating the connector acts as a client, forwarding its subscriptions. Broker2 in this case subscribes on behalf of its consumers to Broker1. Broker2 however will not be made aware of subscriptions on Broker1. networkConnectors can however be made duplex, such that subscriptions are passed in both directions.

So let’s take it one step further with a network that demonstrates why it is a bad idea to connect brokers to each other in an ad-hoc manner. Let’s add Broker3 into the mix such that it connects into Broker1, and Broker2 sets up a second networkConnector into Broker3. All networkConnectors are set up as duplex.

This is a common approach people take when they first encounter broker networks and want to connect a number of brokers to each other, as they are naturally used to the internet model of network behaviour where traffic is routed down the shortest path. If we think about it from first principles, it quickly becomes apparent that is not the best approach. Let's examine what happens when a consumer connects to Broker2.

  1. Broker2 echoes the subscription to the brokers it knows about - Broker1 and Broker3.
  2. Broker3 echoes the subscription down all networkConnectors other than the one from which the request came; it subscribes to Broker1.
  3. A producer sends messages into Broker1.
  4. Broker1 stores and forwards messages to the active subscriptions on it's transportConnector; half to Broker2, and half to Broker3.
  5. Broker2 stores and forwards to it's consumer.
  6. Broker3 stores and forwards to Broker2.

Eventually everything ends up at the consumer, but some messages ended up needlessly travelling Broker1->Broker3->Broker2, while the others went by the more direct route Broker1->Broker2. Add more brokers into the mix, and the store-and-forward traffic increases exponentially as messages flow through any number of weird and wonderful routes.

Very bad! Lots of unnecessary store-and-forward.

Fortunately, it is possible to avoid this by employing other topologies, such as hub and spoke.

Better. A message can flow between any of the numbered brokers via the hub and a maximum of 3 hops (though it puts a lot of load onto the hub broker).

You can also use a more nuanced approach that includes considerations such as unidirectional networkConnectors that pass only a certain subscriptions, or reducing consumer priority such that further consumers have a lower priority than closer ones.

Each network design needs to be considered separately and trades off considerations such message load, amount of hardware at your disposal, latency (number of hops) and reliability. When you understand how all the parts fit and think about the overall topology from first principles, it's much easier to work through.

Batching JMS messages for performance; not so fast

Recently an idea had crossed my radar around speeding up performance of messaging producers; batching messages into a single transaction. The idea being that there is overhead in the standard behaviour of a message bus that can be optimised out if you group messages into a single transaction.

My initial thoughts about this were that it was unlikely; at best the performance would be the same as an untransacted client. The comments were framed in conversations of Spring’s JMSTemplate, which continually walks the object tree ConnectionFactory -> Connection -> Session -> MessageProducer, and back closing them off. Each close causes some network traffic between the library and the bus. My thoughts were that perhaps the discussion hadn’t accounted for caching these objects via a CachingConnectionFactory. I resolved to find out.

My other concern was that while it could be done in the general case, it didn’t make a lot of sense.

Generally speaking there are two categories of messages that will concern an application developer. Sending throwaway messages that aren’t critical is one thing – you can not persist them on the broker side and save 90% of the overhead. You can also do asynchronous sends, which won’t persist the messages and save further time by not giving your app confirmation that the broker received them. If you lose them, it won’t be a big deal.

The other type are the ones that you really care about. Things that reflect or affect a change in the real world – orders, instructions, etc. The value of a broker in this case comes from knowing that once it has the message, it will be dealt with.

In ActiveMQ, this guarantee comes via the message store. When you send a message the broker will not acknowledge receipt until the message has been persisted. If the broker goes down, it can be restarted, and any persisted messages may be consumed as though nothing happened, or another broker takes over transparently using the same message store (HA). As you can imagine, there is natural overhead. Two network traversals, and some I/O to disk. Keep this in mind.

I tested a couple of different scenarios.

  • JMSTemplate with a CachingConnectionFactory. This is the baseline.
  • An untransacted, persistent session. Each message gets sent off to the broker as they come in. The broker places the message in a persistent store. This is like the logic of the JMSTemplate, but where you control the lifecycles of JMS objects.
  • An untransacted, unpersisted session. Each message gets sent off to the broker as they come in. The broker stores the message in memory only for delivery.
  • Transacted sessions. Messages get bundled together into a transaction and committed.

Messages were posted to a queue with no consumers on a single broker with default setttings.

The results surprised me.

Average message send time


Average message throughput

Timings should be considered indicative only, and are only there for a relative comparison. Initial spikes should be considered part of a warm-up, and ignored.

Firstly, getting down to the JMS API was consistently faster than with JMSTemplate (not surprising in hindsight given its object walk). There was also indeed an upshot in performance from batching messages together for delivery in a transaction. The more messages there were in a batch, the better the performance (I repeated this with 10, 100 and 1000 messages per transaction). The results approached those of the non-persistent delivery mode, but never quite reached it. What accounts for this?

I did some digging, and came up with this description of transactions in ActiveMQ. It appears that there is a separate store in memory for uncommitted transactions. While a transaction is open messages are placed here, and are then copied over to the main store on commit.

So what to make of this? Should you forgo JMSTemplate and batch everything? Well, as with pretty much anything, it depends.

JMSTemplate is simple to configure and use – it also ties in nicely to the various Spring abstractions (TransactionManagers come to mind). There are certainly some upsides to getting down to the JMS API, however they come at a cost of time coding and losing those hooks; which you might regret later.

Batching itself is a funny one, in that it’s one of those things that could be easily misused. Transactions are not there for improving performance. They exist as a logical construct for making atomic changes. If you batch up messages in memory to send to the broker for a performance upshot that are unrelated; and your container goes down before you’ve had a chance to commit; you have violated the contract of “this business action has been completed”, and you lost your messages. Not something you want for important, change-the-world messages.

If you have messages that are relatively unimportant that won’t affect the correct operation of your system, there are better options available to you for improving performance such as dropping persistence.

If, however, you have related messages tied to a single operation, then perhaps getting down to the JMS API and batching these together under a single transaction is something you might want to consider. The key here is ensuring that you don’t lose the real meaning of transaction.


Update 1 (13/11): Replaced graphs with log scale to make the series clearer.

Update 2 (13/11):Thanks to James Strachan for an extended explanation:

The reason transactions are so much faster on the client site can be explained comparing it to the normal case, where the client blocks until the message has definitely gone to disk on the broker.

In a transaction the JMS client doesn’t need to wait until the broker confirms the message because it’s going straight into the transactional store (memory). Because the broker will complain on a commit if something has gone awry, the client can afford to not block at all doing a send or acknowledge (not even blocking for it to be sent over the socket), so it behaves like a true async operation. This means it’s very fast.

This explains why transactional performance approaches that of asych operations with the size of the batch – because for all intents and purposes it is nearly asynchronous. The only cost is that of the synchronous commit(), which is a flush from the transactional store to the journal.

You could argue that this doesn’t really affect the actual throughput – as the difference between batching & no transactions is just the client sitting around blocking. The benefit of the transactions is that it’s making each client much more effective. I.e. to get equivalent throughput you would need more clients running concurrently. In the case of the Camel, this could be achieved by configuring in a larger thread pool.

A brief outline of when to use transactions.

Configuring ActiveMQ transactions in Spring

It’s easy to configure Message Driven POJOs over ActiveMQ with Spring, but slightly more involved once you want to get transactions working correctly; here’s how to do it.

Start with a basic DefaultMessageListenerContainer config.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-3.0.xsd

http://activemq.apache.org/schema/core

http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- setup an embedded JMS Broker for testing purposes -->
    <amq:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="amq.connectionFactory"
            class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL"
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="consumer.connectionFactory"
            class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amq.connectionFactory" />
    </bean>

    <bean id="consumer.messageListener"
            class="net.jakubkorab.amq.CountingMessageListener" />

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
    </bean>
</beans>

An embedded broker is started, and a MessageListener (CountingMessageListener) is set up to pick up messages from a queue (sample.messages). MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details). So what happens if the listener throws an exception?

According to the intent of the URI configration on the connection factory, we’d like to see the broker attempt to redeliver the message again, and if that fails the message should end up in a dead letter queue (ActiveMQ.DLQ). However, if you test it, that’s not what happens at all.

You will not see any further redelivery attempts and there’s nothing in the dead letter queue. Why? Because you haven’t configured up a TransactionManager, Spring’s DefaultMessageListenerContainer picks up the message and immediately acknowledges it. As far as the broker is concerned, it’s job is done.

So let’s remedy this by adding one.

    <bean id="local.transactionManager"
            class="org.springframework.jms.connection.JmsTransactionManager">
        <!-- can also refer to amq.connectionFactory -->
        <property name="connectionFactory" ref="consumer.connectionFactory" />
    </bean>

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="topic:sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
        <!-- add a reference to the transaction manager -->
        <property name="transactionManager" ref="local.transactionManager" />
    </bean>

This uses Spring’s own TransactionManager instance to now make sure that the message is consumed with no issues. Any exceptions thrown will now cause the broker to redeliver the message according to the redelivery policy specified in the connection URI, and once that is exceeded to send the offending message to the dead letter queue.


Update 13/09: Thanks to Sue Macey for pointing out this fragment from the Spring docs:

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.


This is sufficient for most cases. However let’s say you want to update a database via your listener. Transactions ought to be atomic, and the JmsTransactionManager won’t do the job for you here – you need to bring out the big guns.

Once you start managing more than one resource (broker, database etc) in a transaction, you bring in a sizeable overhead. Distributed transactions will involve some sort of two-phase commit protocol, and the chatter adds processing time to each operation. Should you decide that it’s definitely something you want to do, JTA is the tool for the job.

If you’re running in a full JEE container and using a DataSource that supports XA transactions (some do, some don’t), you’re in luck:

    <bean id="consumer.messagerListener"
            class="net.jakubkorab.amq.DBAwareMessageListener" >
        <!-- listener uses a JdbcTemplate with a DataSource defined elsewhere -->
        <property name="dataSource" ref="xa.dataSource" />
    </bean>

    <!-- use the XA-specific version of the connection factory -->
    <bean id="amq.connectionFactory"
            class="org.apache.activemq.ActiveMQXAConnectionFactory"
            depends-on="my-broker">
        <property name="brokerURL"
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="jta.transactionManager"
            class="org.springframework.transaction.jta.JtaTransactionManager"
            depends-on="my-broker"/>

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messagerListener" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jta.transactionManager" />
        <!--
            now tell the broker that it's involved in a transaction; this has the same effect
            as creating a transacted session using the JMS API
        -->
        <property name="sessionTransacted" value="true" />
    </bean>

The JtaTransactionManager will pick up and manage transactional resources using the JTA implementation from the server. However, you need a full JEE server for this. Tomcat for example, does not come with JTA. Neither for that matter will JUnit, which makes testing your transaction config problematic. In these cases, you need to make use of an external JTA implementation, such as Atomikos or JTOM.

Atomikos’ approach to transaction management outside of the server is to place their own wrappers around your resource definitions. Their transaction manager will then scan the Spring ApplicationContext to find these. To set it up takes a final step:

    <!-- implementation-specific wrapper used by the transaction manager -->
    <bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
    </bean>

    <bean id="jta.transactionManager"
            class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager"
                    init-method="init"
                    destroy-method="close">
                <property name="forceShutdown" value="false" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp">
                <property name="transactionTimeout" value="300" />
            </bean>
        </property>
    </bean>

The JtaTransactionManager is configured to use an external underlying TransactionManager implementation, and wrappers are provided around the resources.

Assuming interaction with an HSQLDB instance, which doesn’t support XA transactions, you would add a further:

    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos NonXA to XA DataSource -->
    <bean id="xa.dataSource" class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
        <property name="uniqueResourceName" value="hsqldb" />
        <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
        <property name="url" value="jdbc:hsqldb:mem:myTestDb" />
        <property name="user" value="sa" />
        <property name="password" value="" />
        <property name="poolSize" value="3" />
    </bean>

When using a database that supports XA, you would use a com.atomikos.jdbc.AtomikosDataSourceBean instead.

The above should get you going, but remember that it’s important with any kind of transactional config to test that it actually works.