Beyond the Hype Cycle; Co-operative Open Source

At some point in the lifetime of an open-source project, having survived rewrites, deployments in hundreds of organisations, battle-testing… it becomes boring. The sheen goes off it, and people move on to the newer and shinier. When this happens, the number of maintainers drops off, releases slow down to a trickle, and issues or improvements that users would like to get addressed gather dust in long-forgotten JIRAs. Eventually the level of inactivity causes companies to move their platforms onto other better-supported tech, not because the older is no longer useful (the ultimate measure of worth in software), but because the lack of support becomes a risk.

Over time even companies that support open-source commercially are likely to feel this pressure from within, as the teams that work on the stalwart pieces of often highly-complex infrastructure move apart over time.

This is not specific to any one piece of software, but applies to entire categories of products. What happens off to the very right hand side of the Gartner Hype Cycle? Where does the curve of enlightenment lead to? What does the long end game of a successful product look like?

Perhaps what is needed is a rethink of the relationship between the users and the maintainers of a piece of OSS. Companies often will not go into production without some sort of commercial support, which forms a type of insurance. A subscription or a license is the equivalent of an insurance premium. Over time, the lack of a major production incident, causes them to question the value of that policy. This is a bit like thinking “I have been paying for maintenance of this bridge for 5 years, and it hasn’t once fallen down. What’s the point?”

From a commercial standpoint, once the custom starts to drop off and demand rises in other sectors, it makes commercial sense to ramp down the support on the older products and divert resources elsewhere. But this is only a rational conclusion if the purpose of your company is to maximise profit, rather than support its clients in the long term. Given that this by definition is how commercial companies work, perhaps the problem is the nature of the company itself.

I have been thinking about this area for some time, having recently read Wired Differently – the 50 year story of NISC, a company providing software to electricity providers. NISC is set up as a co-operative, a model typically applied in areas such as agriculture, but that has much wider applicability; there are nearly 7,000 co-ops in the UK – banks, retailers, funeral homes, accountancies…

Under this scheme, the co-operative behaves like any other commercial company, acquiring clients, selling services etc. The big difference is structural in that the shareholders are the staff and customers of the co-operative; clients take a stake in the company through membership. The purpose of the co-op is to principally serve its members as well as possible, while operating at a lower cost than would be possible for a commercial organisation. At the end of the year, the treatment of profits is voted upon democratically by members, to either reinvest or distribute back in accordance to usage of its services. The model works particularly well in industries where specialization and stability are the most highly sought after attributes.

Co-operatives fill a niche in the market by providing long term stability, to the specialists working in those areas, as well as members who rely on the co-op’s services as a key component of their infrastructures. The service would be the ongoing development of these pieces of infrastructure, support, as well as elements that would be impossible for a commercial entity to justify, such as components that proactive monitor the infrastructure and prevent service callouts.

Perhaps it is time to apply the co-operative model to open source; it seems a natural fit. A co-operative as a form of sharing the costs of maintenance; half-way between an external vendor and a dedicated in-house team.

You can read more about the mechanics of co-operatives at Co-operatives UK.

Performance Test Tool for Apache Kafka

Towards the end of last year, I developed a performance test tool which is available at GitHub for testing the behaviour of Apache Kafka (0.9 so far). The tool was inspired by, and informed by some of the limitations of, the performance tooling in ActiveMQ, which I’m currently retrofitting with the same mechanisms.

The kafka-perf-test project builds a Fat JAR that you can take with you into any environment running Java 8, and through the use of a single JSON or YAML config file configure up a range of consumers and producers with differing behaviours pointing at one or more Kafka installations. All of the standard Kafka properties for producers and consumers are configurable, which also makes it a useful learning tool if you want to work out what sort of behaviour to expect from a Kafka installation, or just to bulk load a bunch of messages in.

A sample YAML config can be as simple as:

---
config: {} # applies to all producers and consumers
producers:
  config:  # applies to all producers
    bootstrap.servers: "tcp://localhost:9092"
    request.timeout.ms: "10000"
    key.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    value.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    batch.size: "0"
    acks: "1"
    max.block.ms: "10000"
  instances:
  - config: {} # applies to this producer
    topic: "foo"
    messagesToSend: 2000
    messageSize: 1000
    sendBlocking: false

There is still some work to go in getting decent metrics reporting, and general polish, but for headline numbers it’s good to go. Pull requests/issues/comments most welcome.

Message Distribution and Topic Partitioning in Kafka

When coming over to Apache Kafka from other messaging systems, there’s a conceptual hump that needs to first be crossed, and that is – what is a this topic thing that messages get sent to, and how does message distribution inside it work?

Unlike regular brokers, Kafka only has one destination type – a topic (I’ll refer to it as a kTopic here to disambiguate it from JMS topics). Underlying a kTopic is a persisted data structure called a journal (think of it like an array) that can have many pointers addressing (via a numerical offset) an index inside it. Consumers subscribe to a kTopic with a consumer group, denoted by a group ID; you can think of the group ID as a named pointer.

The Kafka documentation talks about consumer groups having “group names”. In the config they are referred to by group.id, so I will run with that convention.

If the group ID is not known by the broker, the consumer can be configured to ask the broker to point its corresponding pointer to the start of the journal (thereby consuming all messages since the broker accepted messages), or the end (consuming all messages starting from the next message to arrive). Consuming a message means moving the pointer to the next place in the journal. Pointers can also be moved backwards to consume messages that have been previously consumed, in contrast with a regular broker where messages get deleted once consumed.

This means that the consumer code may keep track of where it got up to, by storing/managing its own offset in the journal. Another difference from JMS, where state management is left up to the broker.

The best way to describe kTopic message distribution to contrast it with the behaviour of destinations in regular (non-Kafka) message brokers. Stepping back to a more traditional JMS world view, there are two types of destinations:

  • queues – first-in-first-out
  • topics – publish-subscribe

In queues, messages are dispatched in the same order that they are sent in; if there are more than one consumer, messages get dispatched in a round-robin fashion among these consumers, ensuring fair work sharing. One of the gripes about this is that by default you cannot guarantee that messages will be processed in order, as the order of processing once messages have been dispatched to consumers is non-deterministic (some consumers may run slower than others). You can address this with features such as JMS message groups, where groups of related messages are round-robin assigned to consumers, and messages within those groups are sent to the consumer to which that group is assigned.

This queueing behaviour is approximated in Kafka through the use of a single group ID for the kTopic. In this scenario a single consumer will receive all of the messages. Where it starts to get interesting is what happens when you have two consumers: here the latest consumer to join receives ALL of the unconsumed messages from the point of joining; the formerly busy consumer is suspended. This is a bit like the exclusive consumer feature in JMS, only that exclusivity is taken away by the last consumer to join; and is in stark contrast to the fair-work sharing of JMS.

One of Kafka’s underlying design goals is to be fast for both production and consumption. This has led to some interesting design trade-offs. The main one is the use of the journal as the fundamental construct – it is a very simple structure that is read and written very quickly to disk. The reason for this is that sequential disk access is extremely efficient.

If you wanted to distribute messages from a journal in a round-robin fashion, as per JMS, you would need a dispatching thread on the broker to coordinate message hand-off to consumers. This gets slower the more consumers you have (though it’s usually not visible most of the time, with a sensible number of consumers). A goal for Kafka was to scale consumption linearly with the number of consumers, which means that you need to reduce coordination.

Consider, however, the following:

  1. Coordination is an underlying property of concurrency.
  2. Horizontal scaling is achieved by applying parallelism.
  3. Concurrency is the prevention of parallelism.

So. The only way to get horizontal scaling of consumption in a queue distribution scenario is to effectively use multiple journals. Inhale deeply.

Kafka achieves this through the idea of partitioning. On the broker, you define how many partitions exist per kTopic. One partition corresponds to one journal. If you want to distribute messages fairly amongst N consumers, you need N partitions.

So how are messages distributed amongst the partitions? When messages are sent to a kTopic, they are sent through the following method on the Producer class:

    send(ProducerRecord<K,V> record): Future<RecordMetadata>;

A ProducerRecord contains a key (K) and a value (V). The value is the payload of your message (unlike in JMS there is no Message construct that allows you to send additional message metadata). The key is a business value provided by you that is used to shard your messages across the partitions.

It is the client’s responsibility to decide which partition to to send the message to, not the broker’s. This is done by configuring the Producer with an implementation of the Partitioner interface. The default behaviour is to determine the partition via key.hashCode() % numberOfPartitions. By default this will not give fair work sharing for a small set of keys, due to hashing collisions, which means that if you are partitioning to a small number of partitions (for the same number of parallel consumers) some of those partitions are likely to get more messages than the others. In the case of 2 partitions, one might not get any messages at all – leading to consumer starvation. You can balance this out through providing your own Partitioner implementation with a more business-specific partitioning function.

It is important to note that the Partitioner must consistently place related messages into the same partion, otherwise the entire purpose of this scheme – namely that related messages should be consumed in order – goes out the window.

Partitions are fairly assigned to consumers, and rebalanced when consumers come and go. Assuming 2 partitions:

  • If you have 1 consumer, they will receive messages from both partitions.
  • If you have 2, each will exclusively consume from a single one.
  • If you have 3, one will just hang around and not receive any messages.

In JMS topics, messages are distributed to all consumers who may be interested. In kTopics this is implemented by giving each consumer their own group ID. The distribution logic per-consumer is going to look exactly like the queue scenario above. In effect, this means that kTopics behave a lot like ActiveMQ’s virtual destinations or JMS 2.0 durable topics, rather than pure JMS topics – as the latter do not write messages to disk, whereas kTopics do.

Hopefully this goes a long way to clear up the fundamentals of message distribution in Kafka. It’s a really interesting messaging tech, but some of its behaviour is fundamentally shifted 90 degrees to what you would expect coming from a traditional messaging background. The price for getting great performance and utilisation in clusters have meant some additional interesting trade-offs and complexities (not covered here) in transactionality, durability and state management that are likely to catch out the unwary – more on that in another post.

How to reason about queue depth graphs

Occasionally I come across something that seems really obvious to me, but actually isn’t obvious from an outsider’s perspective. A few weeks ago I spent some time with a client (and their client) in trying to understand the behaviour of a message-driven platform that made heavy use of ActiveMQ. Without going into too many details, they had a number of production incidents where the queues started behaving erratically as far as they could see from time-series charts of queue depths. In reasoning about what was going on, we put together a visual vocabulary to help them analyse the behaviour not only of the broker but the systems around them. Here is that vocabulary.

Basic Curves

Consumption matches production.
Flat line. Here the rate of messages being placed onto a queue matches the rate of their consumption. This should be considered the natural order of things where both producers and consumers exist on a system.

re_angleUp
Angle up. Here the production rate is higher than the consumption rate. While this is not a problem in the short term, over a long period of time a message store may fill up. This is typically seen when message rates increase temporarily from the producer, while the time it takes for a message to be consumed means that the consumers cannot keep up. To get around this you need to increase the consumption rate by adding more consumers or speeding them up.

re_angleDown
Angle down. The consumption rate is exceeding the production rate. This normally indicates a queue draining down after a burst of producer activity.

Composite Curves

re_flatAngle
re_angleChange
re_angleChangeDU
Rate of change increases. In the curves above, the rate of production has increased relative to the rate of consumption. This means that either:

  • your consumers are working slower than before; or
  • the number of messages being sent by your producers has increased.

re_angleChangeDD
re_angleChangeUD
Rate of change decreases. In the curves above, the rate of consumption has increased relative to the rate of production. This means that either:

  • your consumers are working faster; or
  • the producers are not putting as many messages onto the queue.

re_angleUFlat
re_angleFlat
Curve flattens. This is a really interesting one; it indicates the following possibilities:

  • if depth is 0, your queue is empty; otherwise
  • the consumption rate has all of a sudden started matching your production rate exactly (not likely); or
  • producers have stopped producing and there is no consumption going on, i.e. you are seeing a pause. If you find yourself in a situation where you are convinced that messages should be being sent (and possibly that consumption exists) then there are a number of external system possibilities that include a long garbage collection, or something other process taking over the I/O on the storage being used by the broker; in the case of a virtualised or shared environment this may be from another server that has access to that same storage.

In broker-based messaging systems, it is important to understand that the broker is only one part of the story and that the performance of both the producers and consumers will have an impact of the overall flow of messages. Hopefully this reference will help you to reason about what is going on.

Java DSLs for ActiveMQ config and testing

As part of a larger project that I am working on, I needed to define numerous different ActiveMQ configurations inside non-trivial integration tests. Rather than instantiating brokers by hand using the internal ActiveMQ classes, or writing lots of similar-but-different XML configurations, I wrote a pair of Java DSLs to allow me to quickly spin up brokers and proxy them in JUnit using a style that should be familiar to Camel users, and a syntax that ActiveMQ users should instantly understand.

The ActiveMQ DSL is a general purpose utility for instantiating brokers in-line.

BrokerContext context = new BrokerContext(
    ActiveMQBrokers.broker("embeddedBroker").useJmx(false).persistent(false)
        .transportConnectors()
            .transportConnector("openwire", "tcp://0.0.0.0:61616").end()
        .end());

context.start();
// later ...
context.end();

The Test DSL, uses JUnit’s @Rule functionality to manage the lifecycle of these brokers and also allows the creation of proxies that can be used to simulate network interruptions.

public class MyEmbeddedBrokerTest {
    @Rule
    public ProxiedBrokerResource broker = new BrokerResource(
            ActiveMQBrokers.broker("embeddedBroker").useJmx(false).persistent(false)
                .transportConnectors()
                    .transportConnector("openwire", "tcp://0.0.0.0:61616").end()
                    .transportConnector("stomp", "stomp://0.0.0.0:61618").end()
                .end())
            .withProxied("openwire").port(10000); 
            // port to be proxied is looked up by name
            // you can define multiple proxies for the one broker!

    @Test
    public void testNetworkOutages() {
        ConnectionFactory cf =
            new ActiveMQConnectionFactory(
                "failover:(" + broker.getTcpConnectionUri("openwire") + ")"); 
        // returns proxied port 10000
        
        // ...
        SocketProxy socketProxy = broker.getProxy("openwire");
        socketProxy.close(); // network goes down
        // ...
        socketProxy.reopen(); // network comes back up
    }
}

I hope that they might be useful (or at least instructive) to others. The DSLs along with full documentation are available on Github at jkorab/activemq-dsls. Feedback + pull requests most welcome.

Digesting Microservices at muCon

On Friday, I had the privilege of presenting at the very first Microservices conference – muCon. In my talk, Engineering Sanity into Microservices, I spoke about the technical issues surrounding state in distributed systems as a whole, how these become a bigger problem as the number of deployed services goes up, and a few suggested patterns that will help you stay sane. The video is now available on the Skilllsmatter site (registration required).

MuCon was a really enjoyable single-topic conference, the talks ranged from high-level CTO-type overviews all the way to the gory details, and war stories. It will be interesting to turn up next year to hear more of the latter.

My biggest takeaway was from Greg Young’s presentation The Future of Microservices, where he spoke about Conway’s Law. As a reminder:

Organizations which design systems … are constrained to produce designs which are copies of the communication structures of these organizations
— M. Conway

The topical corollary to which he explained as (I paraphrase):

Siloed organizations will never be able to get the benefits of a microservice architecture, as it does not correspond to their communication structures.

Read that again, and really let it sink in.

I will put a layer of interpretation on this: SOA is absolutely not dead. It is a useful tool for highly compartmentalized organizations. Microservices is not its replacement. They are two different tools for different organizational types.

That insight alone was worth turning up for.

Running ActiveMQ Replicated LevelDB on VirtualBox

I have wanted to spend some more time recently playing with Replicated LevelDB in ActiveMQ. Not wanting to hit a cloud environment such as EC2, I set up 3 VirtualBox nodes on my workstation running Fedora. The host system needs to be fairly beefy – you need at least a dedicated CPU core per image + RAM. The setup itself was relatively straightforward except for a couple of gotchas. I didn’t use Docker or similar simply because I haven’t played with it; as a rule I try not to introduce too many new pieces into a setup, otherwise it runs into a yak shaving exercise. What follows is an outline of how I got it running, and some of the things I bumped my head against.

My approach was to get the software set up on a single disk image, and then replicate that over to another couple of VMs, tweaking to get the clustering working correctly. Turns out, that works pretty well. When creating VirtualBox images, set the Network Adapter1 as a Bridged Adapter to your ethernet port, with Promiscuous Mode set to “Allow All” .

VirtualBox network adapter settings

VirtualBox network adapter settings

As a starting point, I downloaded Zookeeper 3.4.6 and ActiveMQ 5.10, and unzipped them into /opt, and created symlinks as /opt/zk and /opt/activemq respectively.

If you follow the setup instructions from the ActiveMQ site, the first thing you’ll see that what’s not covered is the Zookeeper (ZK) setup. Thankfully, the ZK website itself has a good breakdown of how to do this in the Clustered (Multi-Server) Setup section of the Administrator’s guide. The standard config file I used was /opt/zk/conf/zk.conf, configured with the following:

tickTime=2000
dataDir=/var/zk/data
clientPort=2181
initLimit=5
syncLimit=2

You have to create the /var/zk/data directory yourself. This is where ZK keeps its logs, as well as a file called myid, which contains a number that defines which node in the cluster this particular machine is.

Later, once you know the IP addresses of the machines you add the following to the end of the zk.conf file (here I’m using a 3 node cluster):

server.1=192.168.0.8:2888:3888
server.2=192.168.0.13:2888:3888
server.3=192.168.0.12:2888:3888

The number that follows server. corresponds to the contents of the myid file on that particular server.

For Zookeeper to work correctly, 3 ports have to be opened on the server’s firewall:

  • 2181 – the port that clients will use to connect to the ZK ensemble
  • 2888 – port used by ZK for quorum election
  • 3888 – port used by ZK for leader election

The firewall changes that you’ll need for ActiveMQ are:

  • 61616 – default Openwire port
  • 8161 – Jetty port for web console
  • 61619 – LevelDB peer-to-peer replication port

On Fedora, you access the firewall through the Firewall program (oddly enough). I set up ZK and ActiveMQ as Services in the Permanent configuration (not Runtime), and turned those two services on in the Public zone (default used).

Firewall Config

Fedora firewall settings

The ActiveMQ configuration changes are pretty straightforward. This is the minumum config change that was needed in the conf/activemq.xml:

<persistenceAdapter>
  <replicatedLevelDB zkAddress="192.168.0.8:2181,192.168.0.13:2181,192.168.0.2181"
      directory="${activemq.data}/LevelDB"
      hostname="192.168.0.8"/>
</persistenceAdapter>

The zkAddress is pretty clear – these are the ZK nodes and client ports of the ZK ensemble. The data directory is defined by directory. I’ll get onto the hostname attribute in a little while.

Once I had everything set up, it was time to start playing with the cluster. Starting the first ActiveMQ node was problem free – it connected to ZK, and waited patiently until one other ActiveMQ node connected to it (quorum-based replication requires numNodesInCluster/2 + 1 instances). When a second ActiveMQ node was started, the master exploded:

 INFO | Using the pure java LevelDB implementation.
 INFO | No IOExceptionHandler registered, ignoring IO exception
java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.might_fail(LevelDBClient.scala:552)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.replay_init(LevelDBClient.scala:657)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBClient.start(LevelDBClient.scala:558)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.DBManager.start(DBManager.scala:648)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.LevelDBStore.doStart(LevelDBStore.scala:235)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.MasterLevelDBStore.doStart(MasterLevelDBStore.scala:110)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)[activemq-client-5.10.0.jar:5.10.0]
	at org.apache.activemq.leveldb.replicated.ElectingLevelDBStore$$anonfun$start_master$1.apply$mcV$sp(ElectingLevelDBStore.scala:226)[activemq-leveldb-store-5.10.0.jar:5.10.0]
	at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:330)[hawtdispatch-scala-2.11-1.21.jar:1.21]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_60]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_60]
	at java.lang.Thread.run(Thread.java:745)[:1.7.0_60]
 INFO | Stopped LevelDB[/opt/activemq/data/LevelDB]

After a bit of scratching, I happened upon AMQ-5225 in the ActiveMQ JIRA. It seems that the problem is with a classpath conflict in 5.10 (readers from the future – if you’re using a newer version, you won’t see this problem :) ). To get around it, follow these instructions:

1. remove pax-url-aether-1.5.2.jar from lib directory
2. comment out the log query section

<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

When I made the changes, everything started to work without this exception. Once the second ActiveMQ instance was started, the master sprung to life and started logging the following to the console:

WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
WARN | Store update waiting on 1 replica(s) to catch up to log position 0.
...

The slave on the other hand had the following text output:

INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
INFO | Using the pure java LevelDB implementation.
INFO | Attaching to master: tcp://localhost.localdomain:61619
WARN | Unexpected session error: java.net.ConnectException: Connection refused
...

Ad infinitum.

After a bunch of fruitless searching, I realised that the answer was right in front of me in the slave output:
(Attaching to master: tcp://localhost.localdomain:61619). It seems that the master was registering itself into ZK with its hostname (localhost.localdomain).

Once that clicked, the documentation led me to the hostname attribute on the replicatedLevelDB tag in the persistenceAdapter. This value is used by the broker to advertise its location through ZK so that slaves can connect to its LevelDB replication port. The default behaviour sees ActiveMQ trying to automatically work out the hostname.

That value being used was coming from /etc/hosts – here’s the default contents for that file:

127.0.0.1            localhost.localdomain localhost
::1            localhost6.localdomain6 localhost6

I would imagine that in a proper network setup this wouldn’t typically happen, as the box name would be worked out through DHCP + internal DNS. Simply overriding the default behaviour of the zkAddress by putting in the IP address of the virtual machine worked a treat.

With a test client on the host machine running the 3 VirtualBox instances I was able to connect to all 3 nodes and fail them over without problems.

ZK uses a list to work out which ActiveMQ slave node is going to take control when the master broker dies; the ordering is defined by who connected to the ensemble first. If you have less than quorum of ActiveMQ nodes running, no ActiveMQ nodes in the cluster will accept connections (this is managed by the replicated LevelDB store).

Since ZK also maintains a quorum, when less than the required number of servers are available, the remaining nodes will shut down their client port (2181). This will disconnect all the brokers from the ensemble, and the master broker to shut down its transportConnectors.

Happy replicated LevelDB-ing!

ActiveMQ Network Connectors Demystified

When trying to get your head around how ActiveMQ’s networks of brokers work, it helps to have an understanding of the underlying mechanisms involved. I’ve always thought of it as the “Lego brick model” – once you understand the smaller pieces, you can reason about how they can be combined together.

When a regular client connects to a broker and sets up a subscription on a destination, ActiveMQ sees it as consumer and dispatches messages to it. On the consumer, the thread that receives messages is generally not the same one that triggers the event listener – there is an intermediate holding area for messages that have been dispatched from the broker, but not yet picked up for processing – the prefetch buffer, and it is here that received messages are placed for consumption. The thread responsible for triggering the main business logic consumes messages from this buffer one by one – either through MessageConsumer.receive() or through a MessageListener.onMessage(Message).

ActiveMQ dispatches messages out to interested consumers uniformly. In the context of queues, this means in a round-robin fashion. It does not make a distinction between whether that consumer is actually a piece of client code that triggers some business logic, or whether it is another broker.

Now, when you think about it, this implies that when two brokers (let’s call them A1 and A2) are connected in a network A1 -> A2, somewhere there must be a consumer with a prefetch buffer. A network is established by connecting A1‘s networkConnector to a transportConnector on A2.

I have always found it useful to think of a transportConnector as simply an inbound socket that understands a particular wire protocol. When you send a message into it as a client, the broker will take it, write it to a store, and send you a reply that all’s good, or send you a NACK if there’s a problem. So really it’s a straight-through mechanism; the message gets reacted to instantly – there’s no prefetch buffers here.

So if there’s no prefetch buffer on a transportConnector, for a network to work it must actually be a part of the networkConnector on the sending broker (A1). In fact, this is exactly the case.

When a network is established from A1 to A2, a proxy consumer (aka local consumer; I’m using Gary Tully’s terminology here) is created as part of the networkConnector for each destination on which there is demand in A2, or which is configured inside the networkConnector‘s staticallyIncludedDestinations block. By default, each destination being pushed has one consumer, with its own prefetch buffer.

The thread that consumes from this buffer, takes each message one-by-one and (in the case of persistent messages) synchronously sends it to the remote broker A2. A2 then takes that message, persists it and replies that the message has been consumed. The proxy consumer then marks the message as having been processed, at which point it is deemed as having been consumed from A1, is marked for deletion, and will not be redelivered to any other consumers. That’s the general principle behind store and forward.

Now, because the send from A1 to A2 is being performed by a single thread, which is sending the messages one-by-one for each destination, you will see that the throughput across the networkConnector can be quite slow relative to the rate at which messages are being placed onto A1 in the first place. This is the cost of two guarantees that ActiveMQ gives you when it accepts messages:

  1. messages will not be lost when they are marked as persistent, and
  2. messages will be delivered in the order in which they are sent

If you have messages that need to be sent over the network at a higher rate than that which the networkConnector can send given these constraints, you can relax one of these two guarantees:

  1. By marking the messages as non-persistent when you send them, the send will be asynchronous (and so, super fast), but if you lose a broker any messages that are in-flight are gone forever. This is usually not appropriate, as in most use cases reliability trumps performance.
  2. If you can handle messages out-of-order, you can define N number of networkConnectors from A1 to A2. Each connector will have a proxy consumer created for the destinations being pushed over the network – N connectors means N consumers each pushing messages. Keep in mind that message distribution in a broker is performed in a round-robin fashion, so you lose all guarantee of ordering before the messages arrive on A2. Of course, you can opt to group related messages together using message groups, which will be honored on both A1 (between proxy consumers), and actual consumers on A2.

Hopefully, this helps you in developing a mental model of how the underlying mechanics of ActiveMQ’s broker networks work. Of course there’s loads more functionality available, in that there are many more Lego bricks of different shapes there, to help you build up your broker infrastructure.

Gabriella

Gabriella
Born early morning on Monday the 17th of March. Mum and baby are back home and doing well. Older sister very excited!

Camel Cookbook Launch at LJC, March 5th

After all the hard work comes the fun. On Wednesday 5th of March, I will be giving a presentation entitled “Effective System Integrations with Apache Camel” at Skillsmatter, in London; hosted by the good folks from the London Java Community. The evening will kick off at 6:15, and I’ll be headlining 😉 at 6:45.

The premise for the talk is going to be a touch different from most of the “Intro to X” talks you may have watched – I will be discussing why most integrations you encounter tend to be half-baked and brittle, and what Apache Camel brings to the table that allows you to address the reasons behind this. The technical ones at least (there are other, non-technical ones too).

The evening is going to be the official London launch of the Apache Camel Developer’s Cookbook. I will be giving away a few copies, and there will also be a chance to buy a copy at the end of the talk for yourself, your workmates, family, friends, pets, etc. All proceeds on the night will be going to Shelter, the housing and homelessness charity. So bring your hard earned cash, grab yourself over 100 recipes for rock-solid integration, and do some good at the same time. Donate an extra £5, and I won’t deface your book with my signature 😉