Archive for the ‘ActiveMQ’ Category


New Book – Understanding Message Brokers

If this blog has been unusually quiet over the last few months, it is with good reason – I wrote another book! Understanding Message Brokers has just been released for FREE download by O’Reilly. It is a 70 page concentrated brain dump of how to reason about messaging, based on a presentation I gave at a number of conferences around Europe last year – The Myth of The Magical Messaging Fabric.

umb

The book is in O’Reilly’s report format, and takes about a weekend to read through – perfect for the busy developer, architect, or CTO.

It takes you through two different messaging products – ActiveMQ and Kafka – to not just explain how these technologies work, but also to give you a sense of the decisions their authors made and the constraints that they were working with. The idea is that not only will you walk away with a better understanding of these products, but you will know what to look at the next time you approach any messaging technology. As a bonus, I included a chapter about some of the key patterns used to build reliable messaging applications, because it’s not just a case of having great tools – you have to know how to use them.

The book will also be available in paper form at O’Reilly conferences.

Enjoy!

How to reason about queue depth graphs

Occasionally I come across something that seems really obvious to me, but actually isn’t obvious from an outsider’s perspective. A few weeks ago I spent some time with a client (and their client) in trying to understand the behaviour of a message-driven platform that made heavy use of ActiveMQ. Without going into too many details, they had a number of production incidents where the queues started behaving erratically as far as they could see from time-series charts of queue depths. In reasoning about what was going on, we put together a visual vocabulary to help them analyse the behaviour not only of the broker but the systems around them. Here is that vocabulary.

Basic Curves

Consumption matches production.
Flat line. Here the rate of messages being placed onto a queue matches the rate of their consumption. This should be considered the natural order of things where both producers and consumers exist on a system.

re_angleUp
Angle up. Here the production rate is higher than the consumption rate. While this is not a problem in the short term, over a long period of time a message store may fill up. This is typically seen when message rates increase temporarily from the producer, while the time it takes for a message to be consumed means that the consumers cannot keep up. To get around this you need to increase the consumption rate by adding more consumers or speeding them up.

re_angleDown
Angle down. The consumption rate is exceeding the production rate. This normally indicates a queue draining down after a burst of producer activity.

Composite Curves

re_flatAngle
re_angleChange
re_angleChangeDU
Rate of change increases. In the curves above, the rate of production has increased relative to the rate of consumption. This means that either:

  • your consumers are working slower than before; or
  • the number of messages being sent by your producers has increased.

re_angleChangeDD
re_angleChangeUD
Rate of change decreases. In the curves above, the rate of consumption has increased relative to the rate of production. This means that either:

  • your consumers are working faster; or
  • the producers are not putting as many messages onto the queue.

re_angleUFlat
re_angleFlat
Curve flattens. This is a really interesting one; it indicates the following possibilities:

  • if depth is 0, your queue is empty; otherwise
  • the consumption rate has all of a sudden started matching your production rate exactly (not likely); or
  • producers have stopped producing and there is no consumption going on, i.e. you are seeing a pause. If you find yourself in a situation where you are convinced that messages should be being sent (and possibly that consumption exists) then there are a number of external system possibilities that include a long garbage collection, or something other process taking over the I/O on the storage being used by the broker; in the case of a virtualised or shared environment this may be from another server that has access to that same storage.

In broker-based messaging systems, it is important to understand that the broker is only one part of the story and that the performance of both the producers and consumers will have an impact of the overall flow of messages. Hopefully this reference will help you to reason about what is going on.

Java DSLs for ActiveMQ config and testing

As part of a larger project that I am working on, I needed to define numerous different ActiveMQ configurations inside non-trivial integration tests. Rather than instantiating brokers by hand using the internal ActiveMQ classes, or writing lots of similar-but-different XML configurations, I wrote a pair of Java DSLs to allow me to quickly spin up brokers and proxy them in JUnit using a style that should be familiar to Camel users, and a syntax that ActiveMQ users should instantly understand.

The ActiveMQ DSL is a general purpose utility for instantiating brokers in-line.

BrokerContext context = new BrokerContext(
    ActiveMQBrokers.broker("embeddedBroker").useJmx(false).persistent(false)
        .transportConnectors()
            .transportConnector("openwire", "tcp://0.0.0.0:61616").end()
        .end());

context.start();
// later ...
context.end();

The Test DSL, uses JUnit’s @Rule functionality to manage the lifecycle of these brokers and also allows the creation of proxies that can be used to simulate network interruptions.

public class MyEmbeddedBrokerTest {
    @Rule
    public ProxiedBrokerResource broker = new BrokerResource(
            ActiveMQBrokers.broker("embeddedBroker").useJmx(false).persistent(false)
                .transportConnectors()
                    .transportConnector("openwire", "tcp://0.0.0.0:61616").end()
                    .transportConnector("stomp", "stomp://0.0.0.0:61618").end()
                .end())
            .withProxied("openwire").port(10000); 
            // port to be proxied is looked up by name
            // you can define multiple proxies for the one broker!

    @Test
    public void testNetworkOutages() {
        ConnectionFactory cf =
            new ActiveMQConnectionFactory(
                "failover:(" + broker.getTcpConnectionUri("openwire") + ")"); 
        // returns proxied port 10000
        
        // ...
        SocketProxy socketProxy = broker.getProxy("openwire");
        socketProxy.close(); // network goes down
        // ...
        socketProxy.reopen(); // network comes back up
    }
}

I hope that they might be useful (or at least instructive) to others. The DSLs along with full documentation are available on Github at jkorab/activemq-dsls. Feedback + pull requests most welcome.

Running ActiveMQ Replicated LevelDB on VirtualBox

I have wanted to spend some more time recently playing with Replicated LevelDB in ActiveMQ. Not wanting to hit a cloud environment such as EC2, I set up 3 VirtualBox nodes on my workstation running Fedora. The host system needs to be fairly beefy – you need at least a dedicated CPU core per image + RAM. The setup itself was relatively straightforward except for a couple of gotchas. I didn’t use Docker or similar simply because I haven’t played with it; as a rule I try not to introduce too many new pieces into a setup, otherwise it runs into a yak shaving exercise. What follows is an outline of how I got it running, and some of the things I bumped my head against.

My approach was to get the software set up on a single disk image, and then replicate that over to another couple of VMs, tweaking to get the clustering working correctly. Turns out, that works pretty well. When creating VirtualBox images, set the Network Adapter1 as a Bridged Adapter to your ethernet port, with Promiscuous Mode set to “Allow All” .

VirtualBox network adapter settings

VirtualBox network adapter settings

As a starting point, I downloaded Zookeeper 3.4.6 and ActiveMQ 5.10, and unzipped them into /opt, and created symlinks as /opt/zk and /opt/activemq respectively.

If you follow the setup instructions from the ActiveMQ site, the first thing you’ll see that what’s not covered is the Zookeeper (ZK) setup. Thankfully, the ZK website itself has a good breakdown of how to do this in the Clustered (Multi-Server) Setup section of the Administrator’s guide. The standard config file I used was /opt/zk/conf/zk.conf, configured with the following:

tickTime=2000
dataDir=/var/zk/data
clientPort=2181
initLimit=5
syncLimit=2

You have to create the /var/zk/data directory yourself. This is where ZK keeps its logs, as well as a file called myid, which contains a number that defines which node in the cluster this particular machine is.

Later, once you know the IP addresses of the machines you add the following to the end of the zk.conf file (here I’m using a 3 node cluster):

server.1=192.168.0.8:2888:3888
server.2=192.168.0.13:2888:3888
server.3=192.168.0.12:2888:3888

The number that follows server. corresponds to the contents of the myid file on that particular server.

For Zookeeper to work correctly, 3 ports have to be opened on the server’s firewall:

  • 2181 – the port that clients will use to connect to the ZK ensemble
  • 2888 – port used by ZK for quorum election
  • 3888 – port used by ZK for leader election

The firewall changes that you’ll need for ActiveMQ are:

  • 61616 – default Openwire port
  • 8161 – Jetty port for web console
  • 61619 – LevelDB peer-to-peer replication port

On Fedora, you access the firewall through the Firewall program (oddly enough). I set up ZK and ActiveMQ as Services in the Permanent configuration (not Runtime), and turned those two services on in the Public zone (default used).

Firewall Config

Fedora firewall settings

The ActiveMQ configuration changes are pretty straightforward. This is the minumum config change that was needed in the conf/activemq.xml:

<persistenceAdapter>
  <replicatedLevelDB zkAddress="192.168.0.8:2181,192.168.0.13:2181,192.168.0.2181"
      directory="${activemq.data}/LevelDB"
      hostname="192.168.0.8"/>
</persistenceAdapter>

The zkAddress is pretty clear – these are the ZK nodes and client ports of the ZK ensemble. The data directory is defined by directory. I’ll get onto the hostname attribute in a little while.

Once I had everything set up, it was time to start playing with the cluster. Starting the first ActiveMQ node was problem free – it connected to ZK, and waited patiently until one other ActiveMQ node connected to it (quorum-based replication requires numNodesInCluster/2 + 1 instances). When a second ActiveMQ node was started, the master exploded:

 INFO | Using the pure java LevelDB implementation.
 INFO | No IOExceptionHandler registered, ignoring IO exception
java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.might_fail(LevelDBClient.scala:552)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.replay_init(LevelDBClient.scala:657)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.start(LevelDBClient.scala:558)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.DBManager.start(DBManager.scala:648)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBStore.doStart(LevelDBStore.scala:235)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.MasterLevelDBStore.doStart(MasterLevelDBStore.scala:110)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.ElectingLevelDBStore$$anonfun$start_master$1.apply$mcV$sp(ElectingLevelDBStore.scala:226)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:330)[hawtdispatch-scala-2.11-1.21.jar:1.21]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_60]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_60]
	at java.lang.Thread.run(Thread.java:745)[:1.7.0_60]
 INFO | Stopped LevelDB[/opt/activemq/data/LevelDB]

After a bit of scratching, I happened upon AMQ-5225 in the ActiveMQ JIRA. It seems that the problem is with a classpath conflict in 5.10 (readers from the future – if you’re using a newer version, you won’t see this problem :) ). To get around it, follow these instructions:

1. remove pax-url-aether-1.5.2.jar from lib directory
2. comment out the log query section

<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

When I made the changes, everything started to work without this exception. Once the second ActiveMQ instance was started, the master sprung to life and started logging the following to the console:

WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
...

The slave on the other hand had the following text output:

INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
...

Ad infinitum.

After a bunch of fruitless searching, I realised that the answer was right in front of me in the slave output:
(Attaching to master: tcp://localhost.localdomain:61619). It seems that the master was registering itself into ZK with its hostname (localhost.localdomain).

Once that clicked, the documentation led me to the hostname attribute on the replicatedLevelDB tag in the persistenceAdapter. This value is used by the broker to advertise its location through ZK so that slaves can connect to its LevelDB replication port. The default behaviour sees ActiveMQ trying to automatically work out the hostname.

That value being used was coming from /etc/hosts – here’s the default contents for that file:

127.0.0.1            localhost.localdomain localhost
::1            localhost6.localdomain6 localhost6

I would imagine that in a proper network setup this wouldn’t typically happen, as the box name would be worked out through DHCP + internal DNS. Simply overriding the default behaviour of the zkAddress by putting in the IP address of the virtual machine worked a treat.

With a test client on the host machine running the 3 VirtualBox instances I was able to connect to all 3 nodes and fail them over without problems.

ZK uses a list to work out which ActiveMQ slave node is going to take control when the master broker dies; the ordering is defined by who connected to the ensemble first. If you have less than quorum of ActiveMQ nodes running, no ActiveMQ nodes in the cluster will accept connections (this is managed by the replicated LevelDB store).

Since ZK also maintains a quorum, when less than the required number of servers are available, the remaining nodes will shut down their client port (2181). This will disconnect all the brokers from the ensemble, and the master broker to shut down its transportConnectors.

Happy replicated LevelDB-ing!

ActiveMQ Network Connectors Demystified

When trying to get your head around how ActiveMQ’s networks of brokers work, it helps to have an understanding of the underlying mechanisms involved. I’ve always thought of it as the “Lego brick model” – once you understand the smaller pieces, you can reason about how they can be combined together.

When a regular client connects to a broker and sets up a subscription on a destination, ActiveMQ sees it as consumer and dispatches messages to it. On the consumer, the thread that receives messages is generally not the same one that triggers the event listener – there is an intermediate holding area for messages that have been dispatched from the broker, but not yet picked up for processing – the prefetch buffer, and it is here that received messages are placed for consumption. The thread responsible for triggering the main business logic consumes messages from this buffer one by one – either through MessageConsumer.receive() or through a MessageListener.onMessage(Message).

ActiveMQ dispatches messages out to interested consumers uniformly. In the context of queues, this means in a round-robin fashion. It does not make a distinction between whether that consumer is actually a piece of client code that triggers some business logic, or whether it is another broker.

Now, when you think about it, this implies that when two brokers (let’s call them A1 and A2) are connected in a network A1 -> A2, somewhere there must be a consumer with a prefetch buffer. A network is established by connecting A1‘s networkConnector to a transportConnector on A2.

I have always found it useful to think of a transportConnector as simply an inbound socket that understands a particular wire protocol. When you send a message into it as a client, the broker will take it, write it to a store, and send you a reply that all’s good, or send you a NACK if there’s a problem. So really it’s a straight-through mechanism; the message gets reacted to instantly – there’s no prefetch buffers here.

So if there’s no prefetch buffer on a transportConnector, for a network to work it must actually be a part of the networkConnector on the sending broker (A1). In fact, this is exactly the case.

When a network is established from A1 to A2, a proxy consumer (aka local consumer; I’m using Gary Tully’s terminology here) is created as part of the networkConnector for each destination on which there is demand in A2, or which is configured inside the networkConnector‘s staticallyIncludedDestinations block. By default, each destination being pushed has one consumer, with its own prefetch buffer.

The thread that consumes from this buffer, takes each message one-by-one and (in the case of persistent messages) synchronously sends it to the remote broker A2. A2 then takes that message, persists it and replies that the message has been consumed. The proxy consumer then marks the message as having been processed, at which point it is deemed as having been consumed from A1, is marked for deletion, and will not be redelivered to any other consumers. That’s the general principle behind store and forward.

Now, because the send from A1 to A2 is being performed by a single thread, which is sending the messages one-by-one for each destination, you will see that the throughput across the networkConnector can be quite slow relative to the rate at which messages are being placed onto A1 in the first place. This is the cost of two guarantees that ActiveMQ gives you when it accepts messages:

  1. messages will not be lost when they are marked as persistent, and
  2. messages will be delivered in the order in which they are sent

If you have messages that need to be sent over the network at a higher rate than that which the networkConnector can send given these constraints, you can relax one of these two guarantees:

  1. By marking the messages as non-persistent when you send them, the send will be asynchronous (and so, super fast), but if you lose a broker any messages that are in-flight are gone forever. This is usually not appropriate, as in most use cases reliability trumps performance.
  2. If you can handle messages out-of-order, you can define N number of networkConnectors from A1 to A2. Each connector will have a proxy consumer created for the destinations being pushed over the network – N connectors means N consumers each pushing messages. Keep in mind that message distribution in a broker is performed in a round-robin fashion, so you lose all guarantee of ordering before the messages arrive on A2. Of course, you can opt to group related messages together using message groups, which will be honored on both A1 (between proxy consumers), and actual consumers on A2.

Hopefully, this helps you in developing a mental model of how the underlying mechanics of ActiveMQ’s broker networks work. Of course there’s loads more functionality available, in that there are many more Lego bricks of different shapes there, to help you build up your broker infrastructure.

Monitoring ActiveMQ via HTTP

29/4/2014 The following information is a little bit stale. Since version 5.9.1 of ActiveMQ, Hawt.io is no longer part of the ActiveMQ distribution from Apache, but Jolokia is. Jolokia runs on http://localhost:8161/api/jolokia by default, so if you keep this in mind while reading this post, the remaining instructions are still correct. Dejan Bosanac has written about how to install Hawt.io on ActiveMQ post 5.9.1 if you want to set it up (which of course you do – because it’s awesome.)

It’s been a while since I blogged, since my year has been consumed by a not-so-secret-if-you-follow-me-on-twitter project, but I thought I’d take a break to write about something pretty significant in the ActiveMQ landscape. Unusually enough it’s not the addition of a new feature, but rather an enabling technology.

Unless you have been hiding under a rock this year, you will have probably heard about the “One Console to Rule Them All” – Hawt.io. In short it’s a console for building consoles, with built-in plugins for all the shiny things in the Apache integration toolchain that I work with – ActiveMQ, Camel, Karaf etc. as well as a bunch of other stuff including Fabric, JBoss, Tomcat, Infinispan, OpenEJB etc. And you can also build your own plugins. Hawt.io is pretty damn cool – you can deploy it into a Karaf or web container, it automatically detects what tech pieces it knows about, and exposes a super-shiny console.

Hawt.io login page

Ooo shiny!

The killer feature for me, since I use it on site with clients all the time, is that it gets all this info through JMX – in a JavaScript console written in AngularJS running in your browser! Which for me means no installing jvisualvm and opening ports in firewalls to access the JMX port 1099 on the host machine.

Hawt.io on ActiveMQ

Look mum, no JMX client!

As neat as Hawt.io is, it’s what’s inside that’s really exciting. The secret sauce here is the framework that exposes all the JMX stats to the front-end – a tiny tool called Jolokia. Jolokia is a web app that exposes the JMX stats of the JVM that it’s running inside of over HTTP via REST/JSON. It’s also been included in ActiveMQ since version 5.8.

I’ll let this sink in for a bit.

Ready? What this means is that in ActiveMQ 5.9 you can do this:

$ curl -u admin http://localhost:8161/hawtio/jolokia/ && echo ""

In ActiveMQ 5.8 you replace hawtio/jolokia in the above URI with api/jolokia.

Which after prompting you for the default JMX password (admin/admin) gives you this:

{
    "timestamp":1384809685,
    "status":200,
    "request":{
        "type":"version"
    },
    "value":{
        "protocol":"7.0",
        "agent":"1.1.4",
        "info":{
            "product":"jetty",
            "vendor":"Eclipse",
            "version":"7.6.9.v20130131"
        }
    }
}

This means that you can now access any JMX stats exposed by ActiveMQ from any environment that can make a web request – shell scripts, admin GUIs, anything!

You can access specific parts of the JMX tree by passing a JSON request through that acts a a query parameter. For example to aceess the heap memory usage of the JVM you would pass in:

{
    "type":"read",
    "mbean":"java.lang:type=Memory",
    "attribute":"HeapMemoryUsage",
    "path":"used"
}

Here’s the escaped curl request:

$ curl -u admin -d "{\"type\":\"read\",\"mbean\":\"java.lang:type=Memory\",\"attribute\":\"HeapMemoryUsage\",\"path\":\"used\"}" http://localhost:8161/hawtio/jolokia/ && echo ""

And here’s the response:

{
    "timestamp":1384811291,
    "status":200,
    "request":{
        "mbean":"java.lang:type=Memory",
        "path":"used",
        "attribute":"HeapMemoryUsage",
        "type":"read"
    },
    "value":224135568
}

We can also use this to access the really juicy, interesting ActiveMQ stats. Here we’ll grab the broker’s MemoryPercentUsage. You can locate the path you are interested in via jvisualvm/jconsole:

ActiveMQ JMX MemoryPercentUsage

Hunting through the JMX tree…


Here’s our formatted JSON payload for the query:

{
    "type":"read",
    "mbean":"org.apache.activemq:type=Broker,brokerName=localhost",
    "attribute":"MemoryPercentUsage"
}

Here’s the escaped curl request:

$ curl -u admin -d "{\"type\":\"read\",\"mbean\":\"org.apache.activemq:type=Broker,brokerName=localhost\",\"attribute\":\"MemoryPercentUsage\"}" http://localhost:8161/hawtio/jolokia/ && echo ""

And here’s the response:

{
    "timestamp":1384811228,
    "status":200,
    "request":{
        "mbean":"org.apache.activemq:brokerName=localhost,type=Broker",
        "attribute":"MemoryPercentUsage",
        "type":"read"
    },
    "value":0
}

Plenty of room for non-persisted queue messages! 😉

It’s yet another tool that adds an already rich set of features for monitoring ActiveMQ that include:

Combine it with Camel, and you could even periodically send SNMP traps. Open source FTW!

Bootstrap Projects for Getting Started with ServiceMix 4

Over the Christmas break I cleaned up and published a set of Maven projects for getting started with ServiceMix 4.4.1+ into GitHub. I found myself reusing the same code for a number of activities, and figured it may be of broader use to others. You can find it under FuseByExample/smx-bootstraps.

smx-bootstraps contains within it a set of OSGI Blueprints (DI, based on the ideas behind Spring) bundles that exercise some of the core things that you will want to do with ServiceMix:

  • defining services in bundles that can be reused in other bundles
  • use Camel for writing integration code
  • use ActiveMQ for sending persistent messages between bundles, regardless of whether they are in the same container or in others
  • request-response over messaging
  • Update 20/02/2012: RESTful web services!!!
  • externalising your environment configuration
  • group bundles into features

The README document at the project root explains how to get started with ServiceMix, deploy bundles, change code and play with config.

Why would you want to use them?

ServiceMix went through a massive generational change between versions 3.X and 4.X, moving from JBI to an OSGi based model. While development work on it is proceeding at a huge rate, the documentation hasn’t kept up – although it is being brought up to date in the background. smx-bootstraps contains small artifacts that are hopefully easy to understand and play with, along with instructions on how to use them in the container.

The project may also be use of use as a starting starting point to further development. A fairly clean project layout exists that you can use as a reference point, which acts as a supplement to the Maven archetypes that are publicly available, such as:

  • org.apache.camel.archetypes:camel-archetype-blueprint for generating Blueprint bundles to run Camel routes; similar to smx-pinger and smx-ponger bundles in smx-bootstraps
  • org.apache.karaf.archetypes:karaf-blueprint-archetype for generating simple Blueprint bundles; such as the smx-ponger-service bundle

I have found these bundles to be a really handy way of exercising ServiceMix features, and working with various configurations. Not having to code up something new each time is a huge time saver. Hopefully you should find this as well.

I expect to expand this little project as time goes on and I find myself recreating other use cases, such as exposing web services – next on my “todo list”. Please drop me a line at “jakub dot korab at gmail” if you find this useful or have any ideas that would fit in well. Of course being GitHub, feel free to fork it or contribute back changes.

Understanding ActiveMQ Broker Networks

Networks of message brokers in ActiveMQ work quite differently to more familiar models such as that of physical networks. They are not any harder to understand or reason about but we need to have an appreciation as to what exactly each of the pieces in the puzzle do by themselves in order to understand them in them large. I will try to explain each component piece progressively moving up in complexity from a single broker through to a full blown network. At the end you should have a feel of how these networks behave and be able to reason about the interactions across different topologies.

One of the key things in understanding broker-based messaging is that the production, or sending of a message, is disconnected from the consumption of that message. The broker acts as an intermediary, serving to make the method by which a method is consumed as well as the route that the message has travelled orthogonal to its production. You shouldn’t need to understand the entire postal system to know that you post a letter in the big red box and eventually it will arrive in the little box at the front of the recipient’s house. Same idea applies here.

Producer and consumer are unaware of each other; only the broker they are connected to

Connections are shown in the direction of where they were established from (i.e. Consumer connects to Broker).

Out of the box when a standard message is sent to a queue from a producer, it is sent to the broker, which persists it in its message store. By default this is in KahaDB, but it can configured to be stored in memory, which buys performance at the cost of reliability. Once the broker has confirmation that the message has been persisted in the journal (the terms journal and message store are often used interchangeably), it responds with an acknowledgement back to the producer. The thread sending the message from the producer is blocked at this time.

On the consumption side, when a message listener is registered or a call to receive() is made, the broker creates a subscription to that queue. Messages are fetched from the message store and passed to the consumer; it’s usually done in batches, and the fetching is a lot more complex than simply read from disk, but that’s the general idea. With the default behaviour, assuming the Session.AUTO_ACKNOWLEDGE is being used, the consumer will acknowledge that the message has been received before processing it. On receiving the acknowledgement, the broker updates the message store marking that message as consumed, or just deletes it (this depends on the persistence mechanism).

Consuming messages

If you want the consumer to acknowledge the message after it has been sucessfully consumed, you need to set up transaction management, or handle it manually using Session.CLIENT_ACKNOWLEDGE.

So what happens when there are more than one consumer on a queue? All things being equal, and ignoring consumer priorities, the broker will in this case hand out incoming messages in a round-robin manner to each subscriber.

Store-and-forward

Now to scale this up to two brokers, Broker1 and Broker2. In ActiveMQ a network of brokers is set up by connecting a networkConnector to a transportConnector (think of it as a socket listening on a port). A networkConnector is an outbound connection from one broker to another.

When a subscription is made to a queue on Broker2, that broker tells the other brokers that it knows about (in our case, just Broker1) that it is interested in that queue; another subscription is now made on Broker1 with Broker2 as the consumer. As far as an ActiveMQ broker is concerned there is no difference between a standard client consuming messages, or another broker acting on behalf of a client. They are treated in the exact same manner.

So now that Broker1 sees a subscription from Broker2, what happens? The result is a hybrid of the two producer and consumer behaviours. Broker1 is the producer, and Broker2 the consumer. Messages are fetched from Broker1’s message store, passed to Broker2. Broker2 processes the message by store-ing it in its journal, and acknowledges consumption of that message. Broker1 then marks the message as consumed.

The simple consume case then applies as Broker2 forwards the message to its consumers, as tough the message was produced directly into it. Neither the producer nor consumer are aware that any network of brokers exists, it is orthogonal to their functionality – a key driver of this style of messaging.

Local and remote consumers

It has already been noted that as far as a broker is concerned, all subscriptions are equal. To it there is no difference between a local “real” consumer, and another broker that is going to forward those messages on. Hence incoming messages will be handed out round-robin as usual. If we have 2 consumers – Consumer1 on Broker1, and Consumer2 on Broker2 – if messages are produced to Broker1, both consumers will each receive the same number of messages.

A networkConnector is unidirectional by default, which means that the broker initiating the connector acts as a client, forwarding its subscriptions. Broker2 in this case subscribes on behalf of its consumers to Broker1. Broker2 however will not be made aware of subscriptions on Broker1. networkConnectors can however be made duplex, such that subscriptions are passed in both directions.

So let’s take it one step further with a network that demonstrates why it is a bad idea to connect brokers to each other in an ad-hoc manner. Let’s add Broker3 into the mix such that it connects into Broker1, and Broker2 sets up a second networkConnector into Broker3. All networkConnectors are set up as duplex.

This is a common approach people take when they first encounter broker networks and want to connect a number of brokers to each other, as they are naturally used to the internet model of network behaviour where traffic is routed down the shortest path. If we think about it from first principles, it quickly becomes apparent that is not the best approach. Let’s examine what happens when a consumer connects to Broker2.

  1. Broker2 echoes the subscription to the brokers it knows about – Broker1 and Broker3.
  2. Broker3 echoes the subscription down all networkConnectors other than the one from which the request came; it subscribes to Broker1.
  3. A producer sends messages into Broker1.
  4. Broker1 stores and forwards messages to the active subscriptions on it’s transportConnector; half to Broker2, and half to Broker3.
  5. Broker2 stores and forwards to it’s consumer.
  6. Broker3 stores and forwards to Broker2.

Eventually everything ends up at the consumer, but some messages ended up needlessly travelling Broker1->Broker3->Broker2, while the others went by the more direct route Broker1->Broker2. Add more brokers into the mix, and the store-and-forward traffic increases exponentially as messages flow through any number of weird and wonderful routes.

Very bad! Lots of unnecessary store-and-forward.

Fortunately, it is possible to avoid this by employing other topologies, such as hub and spoke.

Better. A message can flow between any of the numbered brokers via the hub and a maximum of 3 hops (though it puts a lot of load onto the hub broker).

You can also use a more nuanced approach that includes considerations such as unidirectional networkConnectors that pass only a certain subscriptions, or reducing consumer priority such that further consumers have a lower priority than closer ones.

Each network design needs to be considered separately and trades off considerations such message load, amount of hardware at your disposal, latency (number of hops) and reliability. When you understand how all the parts fit and think about the overall topology from first principles, it’s much easier to work through.

Update (July 2014): To get a better understanding of what’s going on under the covers, check out the details of how network connectors work.

Batching JMS messages for performance; not so fast

Recently an idea had crossed my radar around speeding up performance of messaging producers; batching messages into a single transaction. The idea being that there is overhead in the standard behaviour of a message bus that can be optimised out if you group messages into a single transaction.

My initial thoughts about this were that it was unlikely; at best the performance would be the same as an untransacted client. The comments were framed in conversations of Spring’s JMSTemplate, which continually walks the object tree ConnectionFactory -> Connection -> Session -> MessageProducer, and back closing them off. Each close causes some network traffic between the library and the bus. My thoughts were that perhaps the discussion hadn’t accounted for caching these objects via a CachingConnectionFactory. I resolved to find out.

My other concern was that while it could be done in the general case, it didn’t make a lot of sense.

Generally speaking there are two categories of messages that will concern an application developer. Sending throwaway messages that aren’t critical is one thing – you can not persist them on the broker side and save 90% of the overhead. You can also do asynchronous sends, which won’t persist the messages and save further time by not giving your app confirmation that the broker received them. If you lose them, it won’t be a big deal.

The other type are the ones that you really care about. Things that reflect or affect a change in the real world – orders, instructions, etc. The value of a broker in this case comes from knowing that once it has the message, it will be dealt with.

In ActiveMQ, this guarantee comes via the message store. When you send a message the broker will not acknowledge receipt until the message has been persisted. If the broker goes down, it can be restarted, and any persisted messages may be consumed as though nothing happened, or another broker takes over transparently using the same message store (HA). As you can imagine, there is natural overhead. Two network traversals, and some I/O to disk. Keep this in mind.

I tested a couple of different scenarios.

  • JMSTemplate with a CachingConnectionFactory. This is the baseline.
  • An untransacted, persistent session. Each message gets sent off to the broker as they come in. The broker places the message in a persistent store. This is like the logic of the JMSTemplate, but where you control the lifecycles of JMS objects.
  • An untransacted, unpersisted session. Each message gets sent off to the broker as they come in. The broker stores the message in memory only for delivery.
  • Transacted sessions. Messages get bundled together into a transaction and committed.

Messages were posted to a queue with no consumers on a single broker with default setttings.

The results surprised me.

Average message send time


Average message throughput

Timings should be considered indicative only, and are only there for a relative comparison. Initial spikes should be considered part of a warm-up, and ignored.

Firstly, getting down to the JMS API was consistently faster than with JMSTemplate (not surprising in hindsight given its object walk). There was also indeed an upshot in performance from batching messages together for delivery in a transaction. The more messages there were in a batch, the better the performance (I repeated this with 10, 100 and 1000 messages per transaction). The results approached those of the non-persistent delivery mode, but never quite reached it. What accounts for this?

I did some digging, and came up with this description of transactions in ActiveMQ. It appears that there is a separate store in memory for uncommitted transactions. While a transaction is open messages are placed here, and are then copied over to the main store on commit.

So what to make of this? Should you forgo JMSTemplate and batch everything? Well, as with pretty much anything, it depends.

JMSTemplate is simple to configure and use – it also ties in nicely to the various Spring abstractions (TransactionManagers come to mind). There are certainly some upsides to getting down to the JMS API, however they come at a cost of time coding and losing those hooks; which you might regret later.

Batching itself is a funny one, in that it’s one of those things that could be easily misused. Transactions are not there for improving performance. They exist as a logical construct for making atomic changes. If you batch up messages in memory to send to the broker for a performance upshot that are unrelated; and your container goes down before you’ve had a chance to commit; you have violated the contract of “this business action has been completed”, and you lost your messages. Not something you want for important, change-the-world messages.

If you have messages that are relatively unimportant that won’t affect the correct operation of your system, there are better options available to you for improving performance such as dropping persistence.

If, however, you have related messages tied to a single operation, then perhaps getting down to the JMS API and batching these together under a single transaction is something you might want to consider. The key here is ensuring that you don’t lose the real meaning of transaction.


Update 1 (13/11): Replaced graphs with log scale to make the series clearer.

Update 2 (13/11):Thanks to James Strachan for an extended explanation:

The reason transactions are so much faster on the client site can be explained comparing it to the normal case, where the client blocks until the message has definitely gone to disk on the broker.

In a transaction the JMS client doesn’t need to wait until the broker confirms the message because it’s going straight into the transactional store (memory). Because the broker will complain on a commit if something has gone awry, the client can afford to not block at all doing a send or acknowledge (not even blocking for it to be sent over the socket), so it behaves like a true async operation. This means it’s very fast.

This explains why transactional performance approaches that of asych operations with the size of the batch – because for all intents and purposes it is nearly asynchronous. The only cost is that of the synchronous commit(), which is a flush from the transactional store to the journal.

You could argue that this doesn’t really affect the actual throughput – as the difference between batching & no transactions is just the client sitting around blocking. The benefit of the transactions is that it’s making each client much more effective. I.e. to get equivalent throughput you would need more clients running concurrently. In the case of the Camel, this could be achieved by configuring in a larger thread pool.

A brief outline of when to use transactions.

Configuring ActiveMQ transactions in Spring

It’s easy to configure Message Driven POJOs over ActiveMQ with Spring, but slightly more involved once you want to get transactions working correctly; here’s how to do it.

Start with a basic DefaultMessageListenerContainer config.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://activemq.apache.org/schema/core 
       http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- setup an embedded JMS Broker for testing purposes -->
    <amq:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="consumer.connectionFactory" 
            class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amq.connectionFactory" />
    </bean>

    <bean id="consumer.messageListener" 
            class="net.jakubkorab.amq.CountingMessageListener" />

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
    </bean>
</beans>

An embedded broker is started, and a MessageListener (CountingMessageListener) is set up to pick up messages from a queue (sample.messages). MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details). So what happens if the listener throws an exception?

According to the intent of the URI configration on the connection factory, we’d like to see the broker attempt to redeliver the message again, and if that fails the message should end up in a dead letter queue (ActiveMQ.DLQ). However, if you test it, that’s not what happens at all.

You will not see any further redelivery attempts and there’s nothing in the dead letter queue. Why? Because you haven’t configured up a TransactionManager, Spring’s DefaultMessageListenerContainer picks up the message and immediately acknowledges it. As far as the broker is concerned, it’s job is done.

So let’s remedy this by adding one.

    <bean id="local.transactionManager" 
            class="org.springframework.jms.connection.JmsTransactionManager">
        <!-- can also refer to amq.connectionFactory -->
        <property name="connectionFactory" ref="consumer.connectionFactory" />
    </bean>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="topic:sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
        <!-- add a reference to the transaction manager -->
        <property name="transactionManager" ref="local.transactionManager" />
    </bean>

This uses Spring’s own TransactionManager instance to now make sure that the message is consumed with no issues. Any exceptions thrown will now cause the broker to redeliver the message according to the redelivery policy specified in the connection URI, and once that is exceeded to send the offending message to the dead letter queue.


Update 13/09: Thanks to Sue Macey for pointing out this fragment from the Spring docs:

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.


This is sufficient for most cases. However let’s say you want to update a database via your listener. Transactions ought to be atomic, and the JmsTransactionManager won’t do the job for you here – you need to bring out the big guns.

Once you start managing more than one resource (broker, database etc) in a transaction, you bring in a sizeable overhead. Distributed transactions will involve some sort of two-phase commit protocol, and the chatter adds processing time to each operation. Should you decide that it’s definitely something you want to do, JTA is the tool for the job.

If you’re running in a full JEE container and using a DataSource that supports XA transactions (some do, some don’t), you’re in luck:

    <bean id="consumer.messagerListener" 
            class="net.jakubkorab.amq.DBAwareMessageListener" >
        <!-- listener uses a JdbcTemplate with a DataSource defined elsewhere -->
        <property name="dataSource" ref="xa.dataSource" />
    </bean>

    <!-- use the XA-specific version of the connection factory -->
    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQXAConnectionFactory" 
            depends-on="my-broker">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager" 
            depends-on="my-broker"/>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messagerListener" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jta.transactionManager" />
        <!-- 
            now tell the broker that it's involved in a transaction; this has the same effect
            as creating a transacted session using the JMS API
        -->
        <property name="sessionTransacted" value="true" />
    </bean>

The JtaTransactionManager will pick up and manage transactional resources using the JTA implementation from the server. However, you need a full JEE server for this. Tomcat for example, does not come with JTA. Neither for that matter will JUnit, which makes testing your transaction config problematic. In these cases, you need to make use of an external JTA implementation, such as Atomikos or JTOM.

Atomikos’ approach to transaction management outside of the server is to place their own wrappers around your resource definitions. Their transaction manager will then scan the Spring ApplicationContext to find these. To set it up takes a final step:

    <!-- implementation-specific wrapper used by the transaction manager -->
    <bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager" 
                    init-method="init" 
                    destroy-method="close">
                <property name="forceShutdown" value="false" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp">
                <property name="transactionTimeout" value="300" />
            </bean>
        </property>
    </bean>

The JtaTransactionManager is configured to use an external underlying TransactionManager implementation, and wrappers are provided around the resources.

Assuming interaction with an HSQLDB instance, which doesn’t support XA transactions, you would add a further:

    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos NonXA to XA DataSource -->
    <bean id="xa.dataSource" class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
        <property name="uniqueResourceName" value="hsqldb" />
        <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
        <property name="url" value="jdbc:hsqldb:mem:myTestDb" />
        <property name="user" value="sa" />
        <property name="password" value="" />
        <property name="poolSize" value="3" />
    </bean>

When using a database that supports XA, you would use a com.atomikos.jdbc.AtomikosDataSourceBean instead.

The above should get you going, but remember that it’s important with any kind of transactional config to test that it actually works.