New Book – Understanding Message Brokers

If this blog has been unusually quiet over the last few months, it is with good reason – I wrote another book! Understanding Message Brokers has just been released for FREE download by O’Reilly. It is a 70 page concentrated brain dump of how to reason about messaging, based on a presentation I gave at a number of conferences around Europe last year – The Myth of The Magical Messaging Fabric.


The book is in O’Reilly’s report format, and takes about a weekend to read through – perfect for the busy developer, architect, or CTO.

It takes you through two different messaging products – ActiveMQ and Kafka – to not just explain how these technologies work, but also to give you a sense of the decisions their authors made and the constraints that they were working with. The idea is that not only will you walk away with a better understanding of these products, but you will know what to look at the next time you approach any messaging technology. As a bonus, I included a chapter about some of the key patterns used to build reliable messaging applications, because it’s not just a case of having great tools – you have to know how to use them.

The book will also be available in paper form at O’Reilly conferences.



Born early morning on Monday the 21st of November. Mum and baby doing very well. The girls are massively excited to have a little brother!

A Milestone

Last week I completed a consulting engagement with my 100th client since starting work in this field 5.5 years ago. Looking at these last few years in hindsight, it feels like an unfathomable point to be at. I am struck by how much I have learned and how my perspective has developed.

Rasberry cream meringue, ooh yeah.

If you’re going to celebrate, do it with cake :)

It has been one incredible ride from start to finish, with more countries, industries, and challenges than I would have ever thought possible in an entire career. The variety of work has been incredibly broad – architecture, troubleshooting, training, development and operations; all the while dealing with those complicated squishy things called people. Two things this work could not be called, and those are mundane and repetitive.

I want to thank everyone who has helped me along the way. Without your friendship and support, I would not have made it this far. Individual achievements are often made possible only through team effort.

I have no idea what the future holds. The times seem more uncertain than usual, but I know it’s going to be one heck of a trip. Onwards and upwards!

Beyond the Hype Cycle; Co-operative Open Source

At some point in the lifetime of an open-source project, having survived rewrites, deployments in hundreds of organisations, battle-testing… it becomes boring. The sheen goes off it, and people move on to the newer and shinier. When this happens, the number of maintainers drops off, releases slow down to a trickle, and issues or improvements that users would like to get addressed gather dust in long-forgotten JIRAs. Eventually the level of inactivity causes companies to move their platforms onto other better-supported tech, not because the older is no longer useful (the ultimate measure of worth in software), but because the lack of support becomes a risk.

Over time even companies that support open-source commercially are likely to feel this pressure from within, as the teams that work on the stalwart pieces of often highly-complex infrastructure move apart over time.

This is not specific to any one piece of software, but applies to entire categories of products. What happens off to the very right hand side of the Gartner Hype Cycle? Where does the curve of enlightenment lead to? What does the long end game of a successful product look like?

Perhaps what is needed is a rethink of the relationship between the users and the maintainers of a piece of OSS. Companies often will not go into production without some sort of commercial support, which forms a type of insurance. A subscription or a license is the equivalent of an insurance premium. Over time, the lack of a major production incident, causes them to question the value of that policy. This is a bit like thinking “I have been paying for maintenance of this bridge for 5 years, and it hasn’t once fallen down. What’s the point?”

From a commercial standpoint, once the custom starts to drop off and demand rises in other sectors, it makes commercial sense to ramp down the support on the older products and divert resources elsewhere. But this is only a rational conclusion if the purpose of your company is to maximise profit, rather than support its clients in the long term. Given that this by definition is how commercial companies work, perhaps the problem is the nature of the company itself.

I have been thinking about this area for some time, having recently read Wired Differently – the 50 year story of NISC, a company providing software to electricity providers. NISC is set up as a co-operative, a model typically applied in areas such as agriculture, but that has much wider applicability; there are nearly 7,000 co-ops in the UK – banks, retailers, funeral homes, accountancies…

Under this scheme, the co-operative behaves like any other commercial company, acquiring clients, selling services etc. The big difference is structural in that the shareholders are the staff and customers of the co-operative; clients take a stake in the company through membership. The purpose of the co-op is to principally serve its members as well as possible, while operating at a lower cost than would be possible for a commercial organisation. At the end of the year, the treatment of profits is voted upon democratically by members, to either reinvest or distribute back in accordance to usage of its services. The model works particularly well in industries where specialization and stability are the most highly sought after attributes.

Co-operatives fill a niche in the market by providing long term stability, to the specialists working in those areas, as well as members who rely on the co-op’s services as a key component of their infrastructures. The service would be the ongoing development of these pieces of infrastructure, support, as well as elements that would be impossible for a commercial entity to justify, such as components that proactive monitor the infrastructure and prevent service callouts.

Perhaps it is time to apply the co-operative model to open source; it seems a natural fit. A co-operative as a form of sharing the costs of maintenance; half-way between an external vendor and a dedicated in-house team.

You can read more about the mechanics of co-operatives at Co-operatives UK.

Performance Test Tool for Apache Kafka

Towards the end of last year, I developed a performance test tool which is available at GitHub for testing the behaviour of Apache Kafka (0.9 so far). The tool was inspired by, and informed by some of the limitations of, the performance tooling in ActiveMQ, which I’m currently retrofitting with the same mechanisms.

The kafka-perf-test project builds a Fat JAR that you can take with you into any environment running Java 8, and through the use of a single JSON or YAML config file configure up a range of consumers and producers with differing behaviours pointing at one or more Kafka installations. All of the standard Kafka properties for producers and consumers are configurable, which also makes it a useful learning tool if you want to work out what sort of behaviour to expect from a Kafka installation, or just to bulk load a bunch of messages in.

A sample YAML config can be as simple as:

config: {} # applies to all producers and consumers
  config:  # applies to all producers
    bootstrap.servers: "tcp://localhost:9092" "10000"
    key.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    value.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    batch.size: "0"
    acks: "1" "10000"
  - config: {} # applies to this producer
    topic: "foo"
    messagesToSend: 2000
    messageSize: 1000
    sendBlocking: false

There is still some work to go in getting decent metrics reporting, and general polish, but for headline numbers it’s good to go. Pull requests/issues/comments most welcome.

Message Distribution and Topic Partitioning in Kafka

When coming over to Apache Kafka from other messaging systems, there’s a conceptual hump that needs to first be crossed, and that is – what is a this topic thing that messages get sent to, and how does message distribution inside it work?

Unlike regular brokers, Kafka only has one destination type – a topic (I’ll refer to it as a kTopic here to disambiguate it from JMS topics). Underlying a kTopic is a persisted data structure called a journal (think of it like an array) that can have many pointers addressing (via a numerical offset) an index inside it. Consumers subscribe to a kTopic with a consumer group, denoted by a group ID; you can think of the group ID as a named pointer.

The Kafka documentation talks about consumer groups having “group names”. In the config they are referred to by, so I will run with that convention.

If the group ID is not known by the broker, the consumer can be configured to ask the broker to point its corresponding pointer to the start of the journal (thereby consuming all messages since the broker accepted messages), or the end (consuming all messages starting from the next message to arrive). Consuming a message means moving the pointer to the next place in the journal. Pointers can also be moved backwards to consume messages that have been previously consumed, in contrast with a regular broker where messages get deleted once consumed.

This means that the consumer code may keep track of where it got up to, by storing/managing its own offset in the journal. Another difference from JMS, where state management is left up to the broker.

The best way to describe kTopic message distribution to contrast it with the behaviour of destinations in regular (non-Kafka) message brokers. Stepping back to a more traditional JMS world view, there are two types of destinations:

  • queues – first-in-first-out
  • topics – publish-subscribe

In queues, messages are dispatched in the same order that they are sent in; if there are more than one consumer, messages get dispatched in a round-robin fashion among these consumers, ensuring fair work sharing. One of the gripes about this is that by default you cannot guarantee that messages will be processed in order, as the order of processing once messages have been dispatched to consumers is non-deterministic (some consumers may run slower than others). You can address this with features such as JMS message groups, where groups of related messages are round-robin assigned to consumers, and messages within those groups are sent to the consumer to which that group is assigned.

This queueing behaviour is approximated in Kafka through the use of a single group ID for the kTopic. In this scenario a single consumer will receive all of the messages. Where it starts to get interesting is what happens when you have two consumers: here the latest consumer to join receives ALL of the unconsumed messages from the point of joining; the formerly busy consumer is suspended. This is a bit like the exclusive consumer feature in JMS, only that exclusivity is taken away by the last consumer to join; and is in stark contrast to the fair-work sharing of JMS.

One of Kafka’s underlying design goals is to be fast for both production and consumption. This has led to some interesting design trade-offs. The main one is the use of the journal as the fundamental construct – it is a very simple structure that is read and written very quickly to disk. The reason for this is that sequential disk access is extremely efficient.

If you wanted to distribute messages from a journal in a round-robin fashion, as per JMS, you would need a dispatching thread on the broker to coordinate message hand-off to consumers. This gets slower the more consumers you have (though it’s usually not visible most of the time, with a sensible number of consumers). A goal for Kafka was to scale consumption linearly with the number of consumers, which means that you need to reduce coordination.

Consider, however, the following:

  1. Coordination is an underlying property of concurrency.
  2. Horizontal scaling is achieved by applying parallelism.
  3. Concurrency is the prevention of parallelism.

So. The only way to get horizontal scaling of consumption in a queue distribution scenario is to effectively use multiple journals. Inhale deeply.

Kafka achieves this through the idea of partitioning. On the broker, you define how many partitions exist per kTopic. One partition corresponds to one journal. If you want to distribute messages fairly amongst N consumers, you need N partitions.

So how are messages distributed amongst the partitions? When messages are sent to a kTopic, they are sent through the following method on the Producer class:

    send(ProducerRecord<K,V> record): Future<RecordMetadata>;

A ProducerRecord contains a key (K) and a value (V). The value is the payload of your message (unlike in JMS there is no Message construct that allows you to send additional message metadata). The key is a business value provided by you that is used to shard your messages across the partitions.

It is the client’s responsibility to decide which partition to to send the message to, not the broker’s. This is done by configuring the Producer with an implementation of the Partitioner interface. The default behaviour is to determine the partition via key.hashCode() % numberOfPartitions. By default this will not give fair work sharing for a small set of keys, due to hashing collisions, which means that if you are partitioning to a small number of partitions (for the same number of parallel consumers) some of those partitions are likely to get more messages than the others. In the case of 2 partitions, one might not get any messages at all – leading to consumer starvation. You can balance this out through providing your own Partitioner implementation with a more business-specific partitioning function.

It is important to note that the Partitioner must consistently place related messages into the same partion, otherwise the entire purpose of this scheme – namely that related messages should be consumed in order – goes out the window.

Partitions are fairly assigned to consumers, and rebalanced when consumers come and go. Assuming 2 partitions:

  • If you have 1 consumer, they will receive messages from both partitions.
  • If you have 2, each will exclusively consume from a single one.
  • If you have 3, one will just hang around and not receive any messages.

In JMS topics, messages are distributed to all consumers who may be interested. In kTopics this is implemented by giving each consumer their own group ID. The distribution logic per-consumer is going to look exactly like the queue scenario above. In effect, this means that kTopics behave a lot like ActiveMQ’s virtual destinations or JMS 2.0 durable topics, rather than pure JMS topics – as the latter do not write messages to disk, whereas kTopics do.

Hopefully this goes a long way to clear up the fundamentals of message distribution in Kafka. It’s a really interesting messaging tech, but some of its behaviour is fundamentally shifted 90 degrees to what you would expect coming from a traditional messaging background. The price for getting great performance and utilisation in clusters have meant some additional interesting trade-offs and complexities (not covered here) in transactionality, durability and state management that are likely to catch out the unwary – more on that in another post.

How to reason about queue depth graphs

Occasionally I come across something that seems really obvious to me, but actually isn’t obvious from an outsider’s perspective. A few weeks ago I spent some time with a client (and their client) in trying to understand the behaviour of a message-driven platform that made heavy use of ActiveMQ. Without going into too many details, they had a number of production incidents where the queues started behaving erratically as far as they could see from time-series charts of queue depths. In reasoning about what was going on, we put together a visual vocabulary to help them analyse the behaviour not only of the broker but the systems around them. Here is that vocabulary.

Basic Curves

Consumption matches production.
Flat line. Here the rate of messages being placed onto a queue matches the rate of their consumption. This should be considered the natural order of things where both producers and consumers exist on a system.

Angle up. Here the production rate is higher than the consumption rate. While this is not a problem in the short term, over a long period of time a message store may fill up. This is typically seen when message rates increase temporarily from the producer, while the time it takes for a message to be consumed means that the consumers cannot keep up. To get around this you need to increase the consumption rate by adding more consumers or speeding them up.

Angle down. The consumption rate is exceeding the production rate. This normally indicates a queue draining down after a burst of producer activity.

Composite Curves

Rate of change increases. In the curves above, the rate of production has increased relative to the rate of consumption. This means that either:

  • your consumers are working slower than before; or
  • the number of messages being sent by your producers has increased.

Rate of change decreases. In the curves above, the rate of consumption has increased relative to the rate of production. This means that either:

  • your consumers are working faster; or
  • the producers are not putting as many messages onto the queue.

Curve flattens. This is a really interesting one; it indicates the following possibilities:

  • if depth is 0, your queue is empty; otherwise
  • the consumption rate has all of a sudden started matching your production rate exactly (not likely); or
  • producers have stopped producing and there is no consumption going on, i.e. you are seeing a pause. If you find yourself in a situation where you are convinced that messages should be being sent (and possibly that consumption exists) then there are a number of external system possibilities that include a long garbage collection, or something other process taking over the I/O on the storage being used by the broker; in the case of a virtualised or shared environment this may be from another server that has access to that same storage.

In broker-based messaging systems, it is important to understand that the broker is only one part of the story and that the performance of both the producers and consumers will have an impact of the overall flow of messages. Hopefully this reference will help you to reason about what is going on.

Java DSLs for ActiveMQ config and testing

As part of a larger project that I am working on, I needed to define numerous different ActiveMQ configurations inside non-trivial integration tests. Rather than instantiating brokers by hand using the internal ActiveMQ classes, or writing lots of similar-but-different XML configurations, I wrote a pair of Java DSLs to allow me to quickly spin up brokers and proxy them in JUnit using a style that should be familiar to Camel users, and a syntax that ActiveMQ users should instantly understand.

The ActiveMQ DSL is a general purpose utility for instantiating brokers in-line.

BrokerContext context = new BrokerContext("embeddedBroker").useJmx(false).persistent(false)
            .transportConnector("openwire", "tcp://").end()

// later ...

The Test DSL, uses JUnit’s @Rule functionality to manage the lifecycle of these brokers and also allows the creation of proxies that can be used to simulate network interruptions.

public class MyEmbeddedBrokerTest {
    public ProxiedBrokerResource broker = new BrokerResource(
                    .transportConnector("openwire", "tcp://").end()
                    .transportConnector("stomp", "stomp://").end()
            // port to be proxied is looked up by name
            // you can define multiple proxies for the one broker!

    public void testNetworkOutages() {
        ConnectionFactory cf =
            new ActiveMQConnectionFactory(
                "failover:(" + broker.getTcpConnectionUri("openwire") + ")"); 
        // returns proxied port 10000
        // ...
        SocketProxy socketProxy = broker.getProxy("openwire");
        socketProxy.close(); // network goes down
        // ...
        socketProxy.reopen(); // network comes back up

I hope that they might be useful (or at least instructive) to others. The DSLs along with full documentation are available on Github at jkorab/activemq-dsls. Feedback + pull requests most welcome.

Digesting Microservices at muCon

On Friday, I had the privilege of presenting at the very first Microservices conference – muCon. In my talk, Engineering Sanity into Microservices, I spoke about the technical issues surrounding state in distributed systems as a whole, how these become a bigger problem as the number of deployed services goes up, and a few suggested patterns that will help you stay sane. The video is now available on the Skilllsmatter site (registration required).

MuCon was a really enjoyable single-topic conference, the talks ranged from high-level CTO-type overviews all the way to the gory details, and war stories. It will be interesting to turn up next year to hear more of the latter.

My biggest takeaway was from Greg Young’s presentation The Future of Microservices, where he spoke about Conway’s Law. As a reminder:

Organizations which design systems … are constrained to produce designs which are copies of the communication structures of these organizations
— M. Conway

The topical corollary to which he explained as (I paraphrase):

Siloed organizations will never be able to get the benefits of a microservice architecture, as it does not correspond to their communication structures.

Read that again, and really let it sink in.

I will put a layer of interpretation on this: SOA is absolutely not dead. It is a useful tool for highly compartmentalized organizations. Microservices is not its replacement. They are two different tools for different organizational types.

That insight alone was worth turning up for.

Running ActiveMQ Replicated LevelDB on VirtualBox

I have wanted to spend some more time recently playing with Replicated LevelDB in ActiveMQ. Not wanting to hit a cloud environment such as EC2, I set up 3 VirtualBox nodes on my workstation running Fedora. The host system needs to be fairly beefy – you need at least a dedicated CPU core per image + RAM. The setup itself was relatively straightforward except for a couple of gotchas. I didn’t use Docker or similar simply because I haven’t played with it; as a rule I try not to introduce too many new pieces into a setup, otherwise it runs into a yak shaving exercise. What follows is an outline of how I got it running, and some of the things I bumped my head against.

My approach was to get the software set up on a single disk image, and then replicate that over to another couple of VMs, tweaking to get the clustering working correctly. Turns out, that works pretty well. When creating VirtualBox images, set the Network Adapter1 as a Bridged Adapter to your ethernet port, with Promiscuous Mode set to “Allow All” .

VirtualBox network adapter settings

VirtualBox network adapter settings

As a starting point, I downloaded Zookeeper 3.4.6 and ActiveMQ 5.10, and unzipped them into /opt, and created symlinks as /opt/zk and /opt/activemq respectively.

If you follow the setup instructions from the ActiveMQ site, the first thing you’ll see that what’s not covered is the Zookeeper (ZK) setup. Thankfully, the ZK website itself has a good breakdown of how to do this in the Clustered (Multi-Server) Setup section of the Administrator’s guide. The standard config file I used was /opt/zk/conf/zk.conf, configured with the following:


You have to create the /var/zk/data directory yourself. This is where ZK keeps its logs, as well as a file called myid, which contains a number that defines which node in the cluster this particular machine is.

Later, once you know the IP addresses of the machines you add the following to the end of the zk.conf file (here I’m using a 3 node cluster):


The number that follows server. corresponds to the contents of the myid file on that particular server.

For Zookeeper to work correctly, 3 ports have to be opened on the server’s firewall:

  • 2181 – the port that clients will use to connect to the ZK ensemble
  • 2888 – port used by ZK for quorum election
  • 3888 – port used by ZK for leader election

The firewall changes that you’ll need for ActiveMQ are:

  • 61616 – default Openwire port
  • 8161 – Jetty port for web console
  • 61619 – LevelDB peer-to-peer replication port

On Fedora, you access the firewall through the Firewall program (oddly enough). I set up ZK and ActiveMQ as Services in the Permanent configuration (not Runtime), and turned those two services on in the Public zone (default used).

Firewall Config

Fedora firewall settings

The ActiveMQ configuration changes are pretty straightforward. This is the minumum config change that was needed in the conf/activemq.xml:

  <replicatedLevelDB zkAddress=",,"

The zkAddress is pretty clear – these are the ZK nodes and client ports of the ZK ensemble. The data directory is defined by directory. I’ll get onto the hostname attribute in a little while.

Once I had everything set up, it was time to start playing with the cluster. Starting the first ActiveMQ node was problem free – it connected to ZK, and waited patiently until one other ActiveMQ node connected to it (quorum-based replication requires numNodesInCluster/2 + 1 instances). When a second ActiveMQ node was started, the master exploded:

 INFO | Using the pure java LevelDB implementation.
 INFO | No IOExceptionHandler registered, ignoring IO exception;Ljava/lang/Object;)Ljava/lang/Object;
	at org.apache.activemq.util.IOExceptionSupport.create([activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.might_fail(LevelDBClient.scala:552)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.replay_init(LevelDBClient.scala:657)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.start(LevelDBClient.scala:558)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.DBManager.start(DBManager.scala:648)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBStore.doStart(LevelDBStore.scala:235)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.MasterLevelDBStore.doStart(MasterLevelDBStore.scala:110)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.util.ServiceSupport.start([activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.ElectingLevelDBStore$$anonfun$start_master$1.apply$mcV$sp(ElectingLevelDBStore.scala:226)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.fusesource.hawtdispatch.package$$anon$[hawtdispatch-scala-2.11-1.21.jar:1.21]
	at java.util.concurrent.ThreadPoolExecutor.runWorker([:1.7.0_60]
	at java.util.concurrent.ThreadPoolExecutor$[:1.7.0_60]
 INFO | Stopped LevelDB[/opt/activemq/data/LevelDB]

After a bit of scratching, I happened upon AMQ-5225 in the ActiveMQ JIRA. It seems that the problem is with a classpath conflict in 5.10 (readers from the future – if you’re using a newer version, you won’t see this problem :) ). To get around it, follow these instructions:

1. remove pax-url-aether-1.5.2.jar from lib directory
2. comment out the log query section

<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">

When I made the changes, everything started to work without this exception. Once the second ActiveMQ instance was started, the master sprung to life and started logging the following to the console:

WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.

The slave on the other hand had the following text output:

INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: Connection refused
INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: Connection refused

Ad infinitum.

After a bunch of fruitless searching, I realised that the answer was right in front of me in the slave output:
(Attaching to master: tcp://localhost.localdomain:61619). It seems that the master was registering itself into ZK with its hostname (localhost.localdomain).

Once that clicked, the documentation led me to the hostname attribute on the replicatedLevelDB tag in the persistenceAdapter. This value is used by the broker to advertise its location through ZK so that slaves can connect to its LevelDB replication port. The default behaviour sees ActiveMQ trying to automatically work out the hostname.

That value being used was coming from /etc/hosts – here’s the default contents for that file:            localhost.localdomain localhost
::1            localhost6.localdomain6 localhost6

I would imagine that in a proper network setup this wouldn’t typically happen, as the box name would be worked out through DHCP + internal DNS. Simply overriding the default behaviour of the zkAddress by putting in the IP address of the virtual machine worked a treat.

With a test client on the host machine running the 3 VirtualBox instances I was able to connect to all 3 nodes and fail them over without problems.

ZK uses a list to work out which ActiveMQ slave node is going to take control when the master broker dies; the ordering is defined by who connected to the ensemble first. If you have less than quorum of ActiveMQ nodes running, no ActiveMQ nodes in the cluster will accept connections (this is managed by the replicated LevelDB store).

Since ZK also maintains a quorum, when less than the required number of servers are available, the remaining nodes will shut down their client port (2181). This will disconnect all the brokers from the ensemble, and the master broker to shut down its transportConnectors.

Happy replicated LevelDB-ing!