Archive for the ‘thoughts’ Category


Running ActiveMQ Replicated LevelDB on VirtualBox

I have wanted to spend some more time recently playing with Replicated LevelDB in ActiveMQ. Not wanting to hit a cloud environment such as EC2, I set up 3 VirtualBox nodes on my workstation running Fedora. The host system needs to be fairly beefy – you need at least a dedicated CPU core per image + RAM. The setup itself was relatively straightforward except for a couple of gotchas. I didn’t use Docker or similar simply because I haven’t played with it; as a rule I try not to introduce too many new pieces into a setup, otherwise it runs into a yak shaving exercise. What follows is an outline of how I got it running, and some of the things I bumped my head against.

My approach was to get the software set up on a single disk image, and then replicate that over to another couple of VMs, tweaking to get the clustering working correctly. Turns out, that works pretty well. When creating VirtualBox images, set the Network Adapter1 as a Bridged Adapter to your ethernet port, with Promiscuous Mode set to “Allow All” .

VirtualBox network adapter settings

VirtualBox network adapter settings

As a starting point, I downloaded Zookeeper 3.4.6 and ActiveMQ 5.10, and unzipped them into /opt, and created symlinks as /opt/zk and /opt/activemq respectively.

If you follow the setup instructions from the ActiveMQ site, the first thing you’ll see that what’s not covered is the Zookeeper (ZK) setup. Thankfully, the ZK website itself has a good breakdown of how to do this in the Clustered (Multi-Server) Setup section of the Administrator’s guide. The standard config file I used was /opt/zk/conf/zk.conf, configured with the following:

tickTime=2000
dataDir=/var/zk/data
clientPort=2181
initLimit=5
syncLimit=2

You have to create the /var/zk/data directory yourself. This is where ZK keeps its logs, as well as a file called myid, which contains a number that defines which node in the cluster this particular machine is.

Later, once you know the IP addresses of the machines you add the following to the end of the zk.conf file (here I’m using a 3 node cluster):

server.1=192.168.0.8:2888:3888
server.2=192.168.0.13:2888:3888
server.3=192.168.0.12:2888:3888

The number that follows server. corresponds to the contents of the myid file on that particular server.

For Zookeeper to work correctly, 3 ports have to be opened on the server’s firewall:

  • 2181 – the port that clients will use to connect to the ZK ensemble
  • 2888 – port used by ZK for quorum election
  • 3888 – port used by ZK for leader election

The firewall changes that you’ll need for ActiveMQ are:

  • 61616 – default Openwire port
  • 8161 – Jetty port for web console
  • 61619 – LevelDB peer-to-peer replication port

On Fedora, you access the firewall through the Firewall program (oddly enough). I set up ZK and ActiveMQ as Services in the Permanent configuration (not Runtime), and turned those two services on in the Public zone (default used).

Firewall Config

Fedora firewall settings

The ActiveMQ configuration changes are pretty straightforward. This is the minumum config change that was needed in the conf/activemq.xml:

<persistenceAdapter>
  <replicatedLevelDB zkAddress="192.168.0.8:2181,192.168.0.13:2181,192.168.0.2181"
      directory="${activemq.data}/LevelDB"
      hostname="192.168.0.8"/>
</persistenceAdapter>

The zkAddress is pretty clear – these are the ZK nodes and client ports of the ZK ensemble. The data directory is defined by directory. I’ll get onto the hostname attribute in a little while.

Once I had everything set up, it was time to start playing with the cluster. Starting the first ActiveMQ node was problem free – it connected to ZK, and waited patiently until one other ActiveMQ node connected to it (quorum-based replication requires numNodesInCluster/2 + 1 instances). When a second ActiveMQ node was started, the master exploded:

 INFO | Using the pure java LevelDB implementation.
 INFO | No IOExceptionHandler registered, ignoring IO exception
java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.might_fail(LevelDBClient.scala:552)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.replay_init(LevelDBClient.scala:657)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.start(LevelDBClient.scala:558)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.DBManager.start(DBManager.scala:648)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBStore.doStart(LevelDBStore.scala:235)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.MasterLevelDBStore.doStart(MasterLevelDBStore.scala:110)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.ElectingLevelDBStore$$anonfun$start_master$1.apply$mcV$sp(ElectingLevelDBStore.scala:226)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:330)[hawtdispatch-scala-2.11-1.21.jar:1.21]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_60]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_60]
	at java.lang.Thread.run(Thread.java:745)[:1.7.0_60]
 INFO | Stopped LevelDB[/opt/activemq/data/LevelDB]

After a bit of scratching, I happened upon AMQ-5225 in the ActiveMQ JIRA. It seems that the problem is with a classpath conflict in 5.10 (readers from the future – if you’re using a newer version, you won’t see this problem :) ). To get around it, follow these instructions:

1. remove pax-url-aether-1.5.2.jar from lib directory
2. comment out the log query section

<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

When I made the changes, everything started to work without this exception. Once the second ActiveMQ instance was started, the master sprung to life and started logging the following to the console:

WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
...

The slave on the other hand had the following text output:

INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
...

Ad infinitum.

After a bunch of fruitless searching, I realised that the answer was right in front of me in the slave output:
(Attaching to master: tcp://localhost.localdomain:61619). It seems that the master was registering itself into ZK with its hostname (localhost.localdomain).

Once that clicked, the documentation led me to the hostname attribute on the replicatedLevelDB tag in the persistenceAdapter. This value is used by the broker to advertise its location through ZK so that slaves can connect to its LevelDB replication port. The default behaviour sees ActiveMQ trying to automatically work out the hostname.

That value being used was coming from /etc/hosts – here’s the default contents for that file:

127.0.0.1            localhost.localdomain localhost
::1            localhost6.localdomain6 localhost6

I would imagine that in a proper network setup this wouldn’t typically happen, as the box name would be worked out through DHCP + internal DNS. Simply overriding the default behaviour of the zkAddress by putting in the IP address of the virtual machine worked a treat.

With a test client on the host machine running the 3 VirtualBox instances I was able to connect to all 3 nodes and fail them over without problems.

ZK uses a list to work out which ActiveMQ slave node is going to take control when the master broker dies; the ordering is defined by who connected to the ensemble first. If you have less than quorum of ActiveMQ nodes running, no ActiveMQ nodes in the cluster will accept connections (this is managed by the replicated LevelDB store).

Since ZK also maintains a quorum, when less than the required number of servers are available, the remaining nodes will shut down their client port (2181). This will disconnect all the brokers from the ensemble, and the master broker to shut down its transportConnectors.

Happy replicated LevelDB-ing!

Apache Camel Developer’s Cookbook

I am pleased to finally announce the reason that I have fallen off the face of the Earth over the past year – myself and Scott Cranton have just finished writing the Apache Camel Developer’s Cookbook. The book will be published by Packt Publishing later this month. It is available for pre-order on the Packt website, and also on Amazon (where it’s currently listed by its working title – Camel Enterprise Integration Cookbook). After 12 months of hard work and late nights (as well as early mornings) at home, in planes, trains, automobiles, hotel rooms, and airport lounges we’re very excited. And relieved – it has been a huge job writing about the ins-and-outs of this incredibly rich integration toolset.

Apache Camel Developer's Cookbook Our intention when writing the book was to develop something that complemented Claus Ibsen’s and Jon Anstey’s awesome Camel in Action. After 3 years Camel in Action continues to be an outstanding reference and a great text for learning Apache Camel inside-out, and has aged incredibly well – a testament to Camel’s solid underlying design. One of the comments that we heard frequently when out on site with clients was that they had bought it, and were going to get around to reading it… eventually.

Our main driver, therefore, was to write a book for the busy system integration developer who just needed to Get Stuff Done. All without requiring them to read a whole book and learn the entire framework, while at the same time not glossing over any of the key issues that they needed to consider in order to build a complete production-ready integration with Apache Camel – you know, with good performance, error handling, transactions etc. The result is a book with over 100 how-to recipes, each written in both of Camel’s Java and XML DSL variants, put together in such a way that people can jump around and get just the info that they need in order to get the job done.

There is a lot of good content there across all difficulty levels. Developers new to Apache Camel will find useful information on how to set up Camel in both regular Java and Spring-based applications, through to the ins-and-outs of the various Enterprise Integration Patterns (EIPs) (how they are affected by multithreading, transactions etc.), payload transformations and testing. There is plenty of good stuff for experienced developers too as we work through parallel and asynchronous processing, error handling and compensation, transactions and idempotency, monitoring and debugging, as well as detailing Camel’s support for security – your company encrypts all traffic, right? No? Well, no more excuses.

The chapters cover:

  • Structuring routes – everything from how to integrate the framework through to route templating
  • Message routing – a coverage of the main routing patterns
  • Routing to your code – how Camel interacts with your Java Code (bean binding, processors etc.)
  • Transformation – moving between XML, JSON, CSVs etc.
  • Splitting and Aggregating – a deep dive into the related Splitter and Aggregator EIPs
  • Parallel Processing – outlines Camel’s support for scaling out processing
  • Error Handling and Compensation – dealing with failure, including capabilities for triggering compensating logic
  • Transactions and Idempotency – how to handle failure of transactional (JDBC, JMS) and non-transactional (web services) resources
  • Testing – how to verify your routes’ behavior without the need for backend systems
  • Monitoring and Debugging – describes Camel’s support for logging, tracing, and debugging
  • Security – encrypting communication between systems, hiding sensitive configuration information, non-repudiation using certificates, and applying authentication and authorization to your routes
  • Web Services – a deep dive into working with one of Camel’s main use cases: SOAP web services

Some of the juicier recipes include:

  • Working with asynchronous APIs
  • Defining completion actions dynamically
  • Testing routes with fixed endpoints using conditional events
  • Digitally signing and verifying messages
  • Enabling step-by-step tracing in code
  • Monitoring other systems using the Camel JMX Component
  • Idempotency inside transactions
  • Setting up XA transactions over multiple transactional resources (many thanks to the guys at Atomikos for their help on this one)

The book is right up to date, with coverage of Camel 2.12.2 and a bit of eye candy thrown in for the monitoring chapter thanks to the hawtio project.

One of the things that we agreed that bugged us about most technical books was that the code frequently did not work. So right at the start we decided that every single snippet of code had to be taken straight from a fully-functional unit test. Whenever we made a statement in the book about how something behaved, we wrote a test for it (and found lots of interesting undocumented corner cases and gotchas, which we subsequently wrote up). The result is a massive set of examples, with 450 supporting tests – all readily usable (I have used them myself with clients on site). If you are interested in taking a look, all of the code is available on github at CamelCookbook/camel-cookbook-examples.

From the feedback that we have received from our reviewers – both formal and informal – is that we hit the mark; something we are really thrilled about. We were really privileged to have a number of Camel committers reviewing the book as well as our own clients who had varying levels of expertise with the framework. Many thanks go out to Claus Ibsen, Christian Posta, Bilgin Ibryam and Phil Wilkins for going through it all with a fine-toothed comb.

We hope that the book proves useful to the entire Apache Camel community, and further popularizes this amazingly productive integration framework.

Now consulting independently

After a long time working as a consultant for others, I felt that the time was right to take off the training wheels and break out on my own. Earlier this month, I re-focused my company Ameliant to independent work, delivering consulting around the Apache suite of integration technologies that I have been blogging about for some time – Camel, ActiveMQ, CXF and ServiceMix.

Today, I’m pleased to launch the company’s new site at ameliant.com. It’s now also reachable from the little leaf motif on the blog. I’m really pleased with the way it turned out.

Big thanks to Luke Burford at Lunamedia for the design and development of the site. He’s also the guy who put this blog design together, which is why the designs are so complementary.

I am looking forward to this new endeavour.

Deep testing of integrations with Camel

One of the things that often comes up in client conversations about developing integration code with Camel is what test support is available, and more to the point appropriate, for testing integrations. There is a spectrum of test types that can be performed, ranging from fully automated unit tests to full-blown multi-system, user-based “click and see the flow through effects” tests. Camel came with comprehensive test support baked in from it’s very inception, but the mechanisms that are available can be used to go way beyond the standard unit test.

Unit tests

Without wanting to get academic about it, let’s define a unit test as being one that tests the logic encapsulated within a block of code without external side effects. Unit testing straightforward classes is trivial. If you want to make use of external service classes, these can be mocked using your favourite mocking library and injected into the class under test. Camel routes are a little different, in that what they define isn’t executed directly, but rather builds up a set of instructions that are handed to the Camel runtime for execution.

Camel has extensive support for testing routes defined using both the Java DSL as well as the Spring/Blueprints XML DSL. In general the pattern is:

  1. instantiate a RouteBuilder or Spring context containing the routes with a CamelContext, and start the context (this is handled for you by CamelTestSupport or CamelSpringTestSupport – see Camel testing). These should contain direct: endpoints as the inputs to the routes (consumers) and mock: endpoints as the outputs (producers).
  2. get a hold of the mock endpoints, and outline the expectations. A MockEndpoint itself uses a directed builder DSL to allow tou to define a suite of comprehensive expectation, ranging from checking the number of messages received to the details of an individual message. You can make full use of Camel expressions in these tests as well.
  3. create messages that you want to feed in to the route and send them to the direct: endpoint at the top of the route under test using a ProducerTemplate.
  4. assert that the mock endpoints received the expected messages.

An example of this approach can be seen in the RssConsumerRouteBuilderTest in the horo-app I blogged about yesterday.

There are a couple of things that you need to employ this approach successfully. If using Java, the RouteBuilder class that defines your routes should have the ability to have the route endpoint URIs injected and any beans that touch external resources into it – see RssConsumerRouteBuilder. The external beans can easily be mocked as in a standard unit test.

Using the Spring DSL, we can still employ the same general approach, but we need to jump through a couple of hoops to do it. Consider what you would need to do the equivalent. A simple route might be defined via:

    <route id="fileCopyRoute">
        <from uri="file:///some/directory"/>
        <to uri="file:///some/other/directory"/>
    </route>

You can externalise any URIs using Spring’s property support:

    <route id="fileCopyRoute">
        <from uri="${fileCopyRoute.input}"/>
        <to uri="${fileCopyRoute.output}"/>
    </route>

You could then define a PropertyPlaceHolderConfigurer with a properties file that defines these properties as

fileCopyRoute.input=file:///some/directory
fileCopyRoute.output=file:///some/other/directory

The definition of this class should be in a Spring context file seperate to that of your route definitions. For testing you would run the routes with another test XML file that defines a PropertyPlaceHolderConfigurer that points to a test file with the test URIs:

fileCopyRoute.input=direct:fileCopyRoute.in
fileCopyRoute.output=mock:fileCopyRoute.out

This is usually why Spring DM/Blueprints based bundle projects split the config across (a minimum of) two context files. One (META-INF/spring/spring-context-osgi.xml) contains all of the beans that touch the OSGi runtime including the properties mechanism, and the other (META-INF/spring/spring-context.xml) contains your physical routes. When testing you can easily switch out the OSGi bits via another config file. This allows you to inject in other bits during a unit test of the XML-based routes, or when using the camel-maven-plugin in order to run those routes off the command line without an OSGi container like ServiceMix.

Embedded integration tests

Sometimes, testing just the route logic isn’t enough. When I was building out the horo-app, I happily coded up my routes, tested tham and deployed, only to have them blow up immediately. What happened? The objects that I was expecting to receive from the RSS component didn’t match those the component actually sent out. So I changed tact. To engage the component as part of the route I needed a web server to serve the file that fed the test.

Integration testing is usually pretty problematic in that you need an external system servicing your tests – and when you are in an environment where the service changes, you can break the code of the other people working against the same system. But there is a solution! Sun’s Java 6 comes with an embeddable web server that you can start up as part of your integration tests.

The approach that I used was to spin up this server at the start of my test, and configure it programatically to serve up a response suitable for my test when a certain resource was consumed. The server was started on port 0, which means that it’s up to the runtime to assign an available port on the machine when the test runs. This is very important as it enables multiple instances of the same test to run at the same time, as is often the case on CI servers. Without it, tests would trip over each other. Similar approaches are possible using other embeddable server types, such as LDAP via ApacheDS, messaging via ActiveMQ, or databases via H2 or Derby.

Tests that require an external resource often start failing on large projects without any changes on the programmer’s side due to this exact reason – the underlying system dependencies changing. By embedding the server to test your integration against, you decouple yourself from that dependency.

The routes in your test then inject the URI to the embedded resource. In my case, I whipped up an integration test version of the original unit test (RssConsumerRouteBuilderITCase) to do exactly this. Integration tests can be wired in to a seperate part of the Maven build lifecycle using the maven-failsafe-plugin and use a different naming convention (*ITCase.java as opposed to *Test.java).

Usually the way the you structure your tests to avoid duplicating the lifecycle of these embedded backends ends up relying on a test class hierarchy, which may end up looking like:

  • CamelTestSupport
    • CamelTestSupportWithDatabase
    • CamelTestSupportWithWebserver

which I don’t really like, as you inevitably end up requiring two kinds of resource in a test. A much better option is to manage these extended resources using JUnit’s @Rule annotation. This treats any object that implements the org.junit.rules.ExternalResource interface as an aspect of the test, stopping and starting it as part of the test’s lifecycle. As such, you can compose your test of as many of these dependencies as you like – all without a rigid class hierarchy.

This approach allows you to test your integration code against a physical backend, without requiring that backend to be shared between developers. This decouples your development from the rest of the team and allows your integration tests to be run in a CI server. A huge win, as only tests which are deterministic end up being run and maintained in the long term.

#winning!

Machiavelli on software

It must be remembered that there is nothing more difficult to plan, more doubtful of success, nor more dangerous to manage than the creation of a new system. For the initiator has the enmity of all who would profit by the preservation of the old institutions and merely lukewarm defenders in those who would gain by the new ones.

— Machiavelli “The Prince”

IT side-effects at the NHS

My mother has a a phrase – professional illness. It’s the moment that she (an environmental engineer) walks into a random building and promptly looks at the air ducts. I suffer from the same thing – only around tech. Before and after the birth of my daughter, I have had more chances than ever to deal with the NHS. In that time, I witnessed a couple of events that made me step back and think about the way that IT in general conducts itself.

I don’t work with end-user business apps these days, but having spent years doing just that, still feel the pain of those that do. While agility and user input are all the rage, the reality is that we as developers are often so disconnected from end users that we just don’t feel that pain, and some things don’t fit in neatly into bug reports. Add to the normal IT project multiple layers of go-betweens, project managers, business analysts, ivory tower architects, and things of concern fall through the cracks.

At a late stage appointment with a midwife, we had the pleasure of arriving on the day of the rollout of a new patient record system. We have all heard about these things, mostly because they’re delivered way over time and budget. It was interesting to see it from an end user perspective. Having spent an hour or so doing what midwives do, we sat down while for the first time a mid-50’s lady set down in front of a system she’d only ever presumably seen in an “induction”. Like code handovers, these involve a dreary talk to a group of people with some vague handwaving, all moving too fast for anyone to get a sense of what’s really going on. Then a pat on the back and “off you go”.

It all seemed so straightforward, a menu on the top all looking very Office 2010, a list of appointments in a side pane and forms to fill out. Everything looking uniform, and as a result fairly difficult to navigate without reading it all out. To the developers it must have all seemed so obvious, it’s just a forms application, and of course Checkup B follows Investigation A. It’s easy to think this way when you have been looking non-stop at the same app for weeks/months. A quick usability test with a fresh pair of eyes would have made life so much easier for a new user.

Coming in towards the end of the pregnancy we arrived, as you do, with a folder of paperwork from previous scans and checkups. Presumably this was exactly what this record system was to replace. You carry these collections of paper around the whole time, just in case something happens, so that the relevant health professionals can at a glance get your background details. It seems there was a bit of a hitch. In order to record details of a final scan, you obviously need the details of all the previous appointments. The system wouldn’t let you submit just that form. Cue 45 minutes of a midwife copying paperwork into the system, all while our eyes glazed over and other patients were filled up the waiting room. Major own goal, and yet so simple to deal with. Presumably since getting the data imported from paper would be impractical (mums aren’t going to hand over their potentially needed baby notes to get sent to an offshore data entry shop), either use the system for new pregnancies only, or loosen the constraints such that the workflow can be entered into in the middle.

Our next IT speedbump got me thinking about open standards/data, when a pediatric doctor checked Alex before discharge. The doctor had come from another hospital, and had no idea how to use the software at the one we were in. Presumably, both systems had access to the same data, though managed it differently. Open standards are often touted as a “Good Thing”, providers can develop systems that operate against those standards and consumers (hospitals, GP surgeries and the like) buy the “best” solutions (from an indistinguishable selection). On the face of it, the idea is actually quite good – increased competition yields better prices, and innovation (though I’m skeptical as to how much of that you can have filling in forms). I get the impression that side-effects such as the one of staff moving around having difficulties are the tip of the iceberg. Centralised procurement often yields sub-optimal results, and massive cost overruns from kitchen sink requirements. Building 95% similar systems over and over seems like it has its own problems. I don’t know what the answer to that one is, or whether one in fact exists, but I have no doubt it’s worth thinking about.

Understanding ActiveMQ Broker Networks

Networks of message brokers in ActiveMQ work quite differently to more familiar models such as that of physical networks. They are not any harder to understand or reason about but we need to have an appreciation as to what exactly each of the pieces in the puzzle do by themselves in order to understand them in them large. I will try to explain each component piece progressively moving up in complexity from a single broker through to a full blown network. At the end you should have a feel of how these networks behave and be able to reason about the interactions across different topologies.

One of the key things in understanding broker-based messaging is that the production, or sending of a message, is disconnected from the consumption of that message. The broker acts as an intermediary, serving to make the method by which a method is consumed as well as the route that the message has travelled orthogonal to its production. You shouldn’t need to understand the entire postal system to know that you post a letter in the big red box and eventually it will arrive in the little box at the front of the recipient’s house. Same idea applies here.

Producer and consumer are unaware of each other; only the broker they are connected to

Connections are shown in the direction of where they were established from (i.e. Consumer connects to Broker).

Out of the box when a standard message is sent to a queue from a producer, it is sent to the broker, which persists it in its message store. By default this is in KahaDB, but it can configured to be stored in memory, which buys performance at the cost of reliability. Once the broker has confirmation that the message has been persisted in the journal (the terms journal and message store are often used interchangeably), it responds with an acknowledgement back to the producer. The thread sending the message from the producer is blocked at this time.

On the consumption side, when a message listener is registered or a call to receive() is made, the broker creates a subscription to that queue. Messages are fetched from the message store and passed to the consumer; it’s usually done in batches, and the fetching is a lot more complex than simply read from disk, but that’s the general idea. With the default behaviour, assuming the Session.AUTO_ACKNOWLEDGE is being used, the consumer will acknowledge that the message has been received before processing it. On receiving the acknowledgement, the broker updates the message store marking that message as consumed, or just deletes it (this depends on the persistence mechanism).

Consuming messages

If you want the consumer to acknowledge the message after it has been sucessfully consumed, you need to set up transaction management, or handle it manually using Session.CLIENT_ACKNOWLEDGE.

So what happens when there are more than one consumer on a queue? All things being equal, and ignoring consumer priorities, the broker will in this case hand out incoming messages in a round-robin manner to each subscriber.

Store-and-forward

Now to scale this up to two brokers, Broker1 and Broker2. In ActiveMQ a network of brokers is set up by connecting a networkConnector to a transportConnector (think of it as a socket listening on a port). A networkConnector is an outbound connection from one broker to another.

When a subscription is made to a queue on Broker2, that broker tells the other brokers that it knows about (in our case, just Broker1) that it is interested in that queue; another subscription is now made on Broker1 with Broker2 as the consumer. As far as an ActiveMQ broker is concerned there is no difference between a standard client consuming messages, or another broker acting on behalf of a client. They are treated in the exact same manner.

So now that Broker1 sees a subscription from Broker2, what happens? The result is a hybrid of the two producer and consumer behaviours. Broker1 is the producer, and Broker2 the consumer. Messages are fetched from Broker1’s message store, passed to Broker2. Broker2 processes the message by store-ing it in its journal, and acknowledges consumption of that message. Broker1 then marks the message as consumed.

The simple consume case then applies as Broker2 forwards the message to its consumers, as tough the message was produced directly into it. Neither the producer nor consumer are aware that any network of brokers exists, it is orthogonal to their functionality – a key driver of this style of messaging.

Local and remote consumers

It has already been noted that as far as a broker is concerned, all subscriptions are equal. To it there is no difference between a local “real” consumer, and another broker that is going to forward those messages on. Hence incoming messages will be handed out round-robin as usual. If we have 2 consumers – Consumer1 on Broker1, and Consumer2 on Broker2 – if messages are produced to Broker1, both consumers will each receive the same number of messages.

A networkConnector is unidirectional by default, which means that the broker initiating the connector acts as a client, forwarding its subscriptions. Broker2 in this case subscribes on behalf of its consumers to Broker1. Broker2 however will not be made aware of subscriptions on Broker1. networkConnectors can however be made duplex, such that subscriptions are passed in both directions.

So let’s take it one step further with a network that demonstrates why it is a bad idea to connect brokers to each other in an ad-hoc manner. Let’s add Broker3 into the mix such that it connects into Broker1, and Broker2 sets up a second networkConnector into Broker3. All networkConnectors are set up as duplex.

This is a common approach people take when they first encounter broker networks and want to connect a number of brokers to each other, as they are naturally used to the internet model of network behaviour where traffic is routed down the shortest path. If we think about it from first principles, it quickly becomes apparent that is not the best approach. Let’s examine what happens when a consumer connects to Broker2.

  1. Broker2 echoes the subscription to the brokers it knows about – Broker1 and Broker3.
  2. Broker3 echoes the subscription down all networkConnectors other than the one from which the request came; it subscribes to Broker1.
  3. A producer sends messages into Broker1.
  4. Broker1 stores and forwards messages to the active subscriptions on it’s transportConnector; half to Broker2, and half to Broker3.
  5. Broker2 stores and forwards to it’s consumer.
  6. Broker3 stores and forwards to Broker2.

Eventually everything ends up at the consumer, but some messages ended up needlessly travelling Broker1->Broker3->Broker2, while the others went by the more direct route Broker1->Broker2. Add more brokers into the mix, and the store-and-forward traffic increases exponentially as messages flow through any number of weird and wonderful routes.

Very bad! Lots of unnecessary store-and-forward.

Fortunately, it is possible to avoid this by employing other topologies, such as hub and spoke.

Better. A message can flow between any of the numbered brokers via the hub and a maximum of 3 hops (though it puts a lot of load onto the hub broker).

You can also use a more nuanced approach that includes considerations such as unidirectional networkConnectors that pass only a certain subscriptions, or reducing consumer priority such that further consumers have a lower priority than closer ones.

Each network design needs to be considered separately and trades off considerations such message load, amount of hardware at your disposal, latency (number of hops) and reliability. When you understand how all the parts fit and think about the overall topology from first principles, it’s much easier to work through.

Update (July 2014): To get a better understanding of what’s going on under the covers, check out the details of how network connectors work.

Batching JMS messages for performance; not so fast

Recently an idea had crossed my radar around speeding up performance of messaging producers; batching messages into a single transaction. The idea being that there is overhead in the standard behaviour of a message bus that can be optimised out if you group messages into a single transaction.

My initial thoughts about this were that it was unlikely; at best the performance would be the same as an untransacted client. The comments were framed in conversations of Spring’s JMSTemplate, which continually walks the object tree ConnectionFactory -> Connection -> Session -> MessageProducer, and back closing them off. Each close causes some network traffic between the library and the bus. My thoughts were that perhaps the discussion hadn’t accounted for caching these objects via a CachingConnectionFactory. I resolved to find out.

My other concern was that while it could be done in the general case, it didn’t make a lot of sense.

Generally speaking there are two categories of messages that will concern an application developer. Sending throwaway messages that aren’t critical is one thing – you can not persist them on the broker side and save 90% of the overhead. You can also do asynchronous sends, which won’t persist the messages and save further time by not giving your app confirmation that the broker received them. If you lose them, it won’t be a big deal.

The other type are the ones that you really care about. Things that reflect or affect a change in the real world – orders, instructions, etc. The value of a broker in this case comes from knowing that once it has the message, it will be dealt with.

In ActiveMQ, this guarantee comes via the message store. When you send a message the broker will not acknowledge receipt until the message has been persisted. If the broker goes down, it can be restarted, and any persisted messages may be consumed as though nothing happened, or another broker takes over transparently using the same message store (HA). As you can imagine, there is natural overhead. Two network traversals, and some I/O to disk. Keep this in mind.

I tested a couple of different scenarios.

  • JMSTemplate with a CachingConnectionFactory. This is the baseline.
  • An untransacted, persistent session. Each message gets sent off to the broker as they come in. The broker places the message in a persistent store. This is like the logic of the JMSTemplate, but where you control the lifecycles of JMS objects.
  • An untransacted, unpersisted session. Each message gets sent off to the broker as they come in. The broker stores the message in memory only for delivery.
  • Transacted sessions. Messages get bundled together into a transaction and committed.

Messages were posted to a queue with no consumers on a single broker with default setttings.

The results surprised me.

Average message send time


Average message throughput

Timings should be considered indicative only, and are only there for a relative comparison. Initial spikes should be considered part of a warm-up, and ignored.

Firstly, getting down to the JMS API was consistently faster than with JMSTemplate (not surprising in hindsight given its object walk). There was also indeed an upshot in performance from batching messages together for delivery in a transaction. The more messages there were in a batch, the better the performance (I repeated this with 10, 100 and 1000 messages per transaction). The results approached those of the non-persistent delivery mode, but never quite reached it. What accounts for this?

I did some digging, and came up with this description of transactions in ActiveMQ. It appears that there is a separate store in memory for uncommitted transactions. While a transaction is open messages are placed here, and are then copied over to the main store on commit.

So what to make of this? Should you forgo JMSTemplate and batch everything? Well, as with pretty much anything, it depends.

JMSTemplate is simple to configure and use – it also ties in nicely to the various Spring abstractions (TransactionManagers come to mind). There are certainly some upsides to getting down to the JMS API, however they come at a cost of time coding and losing those hooks; which you might regret later.

Batching itself is a funny one, in that it’s one of those things that could be easily misused. Transactions are not there for improving performance. They exist as a logical construct for making atomic changes. If you batch up messages in memory to send to the broker for a performance upshot that are unrelated; and your container goes down before you’ve had a chance to commit; you have violated the contract of “this business action has been completed”, and you lost your messages. Not something you want for important, change-the-world messages.

If you have messages that are relatively unimportant that won’t affect the correct operation of your system, there are better options available to you for improving performance such as dropping persistence.

If, however, you have related messages tied to a single operation, then perhaps getting down to the JMS API and batching these together under a single transaction is something you might want to consider. The key here is ensuring that you don’t lose the real meaning of transaction.


Update 1 (13/11): Replaced graphs with log scale to make the series clearer.

Update 2 (13/11):Thanks to James Strachan for an extended explanation:

The reason transactions are so much faster on the client site can be explained comparing it to the normal case, where the client blocks until the message has definitely gone to disk on the broker.

In a transaction the JMS client doesn’t need to wait until the broker confirms the message because it’s going straight into the transactional store (memory). Because the broker will complain on a commit if something has gone awry, the client can afford to not block at all doing a send or acknowledge (not even blocking for it to be sent over the socket), so it behaves like a true async operation. This means it’s very fast.

This explains why transactional performance approaches that of asych operations with the size of the batch – because for all intents and purposes it is nearly asynchronous. The only cost is that of the synchronous commit(), which is a flush from the transactional store to the journal.

You could argue that this doesn’t really affect the actual throughput – as the difference between batching & no transactions is just the client sitting around blocking. The benefit of the transactions is that it’s making each client much more effective. I.e. to get equivalent throughput you would need more clients running concurrently. In the case of the Camel, this could be achieved by configuring in a larger thread pool.

A brief outline of when to use transactions.

Give your DB the love it deserves

Pity the poor database. As critical to most apps as a foundation to a building. And as interesting as an accounting seminar at a nudist colony.

Not sexy enough for the attentions of the senior dev, or considered to be “well understood”, DB work frequently end up getting handed off to the junior guys on the team. Who promptly make all the mistakes the senior guys have learned not to make. Mistakes which end up with massive hunks of sub-optimal compensating code in layers above them. Then they write some code off the back of them. Viola! Instant technical debt.

Queue self-perpetuating “relational databases aren’t web scale“, “normalised schemas aren’t performant”, “You don’t have these problems with NoSQL”.

Senior guys often don’t have the time to deal with it. DBAs aren’t seen as being responsive enough for JFDI/iterative development. Peer review at the end is too late.

So what’s the fix? Just enough design. Back of napkin. Whack together a schema, and talk through with someone (else) who knows what they’re doing. Then code. It’s not rocket science.

Goldilocks actors: not too many, not too few

After my last post on parallelism with Scala actors, I had a thought: when doing a calculation like this, am I actually making the most of my resources? If as in the Pi example, more cycles will generally lead to a better result, surely if I have a limited amount of time to get the best value I want to squeeze every last bit of juice from my hardware. It may seem a trivial question for a small problem, but this has real-world implications in domains like banking where a few more cycles in a pricing or risk calculation means a difference in income.

There’s a bit of lore that says that the optimal number of threads to perform an opertation is equal to the number of processors. Likewise, there’s a generally accepted idea that you will get a progressive deterioration in performance as more threads are run concurrently due to context-switching.

In other words, you can have too many actors. Gratuitous use of images to make tech blog seem more humorous.

I wasn’t convinced, and decided to try it out for myself. My thinking was that the one-thread per processor idea was a bit too clean. With processor hyperthreading, I/O overhead, JVM magic and other factors, it was pretty likely that at least (number of processors + 1) * actors would be able to run before performance degraded.

Stand back! I’m about to try Science!

So, I rewote the example to test throughput (check it out if you’re interested, but it’s more or less the same as the first, but with more jiggery-pokery), and ran it on my dual-core laptop:

1 actor (output cleaned up):

...
PiEvaluator:Shutting down generators
PointGenerator[1]:exiting having generated 47040000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 47040000 points
Aggregator:Pi is approx: 3.1415372448979593

2 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141657176679491
PointGenerator[2]:exiting having generated 34000000 points
PointGenerator[1]:exiting having generated 33690000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 67690000 points
Aggregator:Pi is approx: 3.1416520608657112

3 actors:

...
PiEvaluator:Shutting down generators
PointGenerator[1]:exiting having generated 20470000 points
PointGenerator[2]:exiting having generated 27680000 points
PointGenerator[3]:exiting having generated 20410000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 68560000 points
Aggregator:Pi is approx: 3.141365577596266

4 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141612733060482
PointGenerator[2]:exiting having generated 27660000 points
PointGenerator[1]:exiting having generated 19670000 points
PointGenerator[3]:exiting having generated 18790000 points
PointGenerator[4]:exiting having generated 10000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 66130000 points
Aggregator:Pi is approx: 3.1416107061847875

5 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141523427529626
PointGenerator[2]:exiting having generated 28220000 points
PointGenerator[4]:exiting having generated 10000 points
PointGenerator[5]:exiting having generated 10000 points
PointGenerator[1]:exiting having generated 18810000 points
PointGenerator[3]:exiting having generated 18850000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 65900000 points
Aggregator:Pi is approx: 3.141531047040971

I ran this quite a few times to verify that I wasn’t getting inconsistent results due to GC or other one-off factors. The results were pretty consistent:

  • The jump from 1 to 2 actors gave a huge boost in performance as the second processor joined the fray. Interesting to know that there wasn’t exactly a 100% performance boost, more like ~50%. There were programs, notably Spotify, running on my PC at the same time as the test, so that may account for it.
  • When a third actor was introduced, there was a proportionately small, but repeatable increase in the number of calculations that could be run at the same time, equivalent to 2-3%
  • beyond 3 actors, the Scala actors library itself looks to have kicked in (I may be wrong here, so please correct me if you know otherwise). The 4th and 5th actors were starved of work as opposed to being allowed to degrade the performance of the first three as was expected. It wasn’t until the first three actors started shutting down that the ones created later got to finish their first batch of work.

So, don’t take any known lore for granted. Much like anywhere else on the JVM, tuning performance is down to experimentation.

As an aside, it was interesting to note that beyond a point, my evaluation of Pi wasn’t getting any better. I put it down to the imprecision of floating-point arithmetic. Key take-away point: when programming a moon launch, don’t use doubles.