Archive for the ‘Kafka’ Category


Performance Test Tool for Apache Kafka

Towards the end of last year, I developed a performance test tool which is available at GitHub for testing the behaviour of Apache Kafka (0.9 so far). The tool was inspired by, and informed by some of the limitations of, the performance tooling in ActiveMQ, which I’m currently retrofitting with the same mechanisms.

The kafka-perf-test project builds a Fat JAR that you can take with you into any environment running Java 8, and through the use of a single JSON or YAML config file configure up a range of consumers and producers with differing behaviours pointing at one or more Kafka installations. All of the standard Kafka properties for producers and consumers are configurable, which also makes it a useful learning tool if you want to work out what sort of behaviour to expect from a Kafka installation, or just to bulk load a bunch of messages in.

A sample YAML config can be as simple as:

---
config: {} # applies to all producers and consumers
producers:
  config:  # applies to all producers
    bootstrap.servers: "tcp://localhost:9092"
    request.timeout.ms: "10000"
    key.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    value.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
    batch.size: "0"
    acks: "1"
    max.block.ms: "10000"
  instances:
  - config: {} # applies to this producer
    topic: "foo"
    messagesToSend: 2000
    messageSize: 1000
    sendBlocking: false

There is still some work to go in getting decent metrics reporting, and general polish, but for headline numbers it’s good to go. Pull requests/issues/comments most welcome.

Message Distribution and Topic Partitioning in Kafka

When coming over to Apache Kafka from other messaging systems, there’s a conceptual hump that needs to first be crossed, and that is – what is a this topic thing that messages get sent to, and how does message distribution inside it work?

Unlike regular brokers, Kafka only has one destination type – a topic (I’ll refer to it as a kTopic here to disambiguate it from JMS topics). Underlying a kTopic is a persisted data structure called a journal (think of it like an array) that can have many pointers addressing (via a numerical offset) an index inside it. Consumers subscribe to a kTopic with a consumer group, denoted by a group ID; you can think of the group ID as a named pointer.

The Kafka documentation talks about consumer groups having “group names”. In the config they are referred to by group.id, so I will run with that convention.

If the group ID is not known by the broker, the consumer can be configured to ask the broker to point its corresponding pointer to the start of the journal (thereby consuming all messages since the broker accepted messages), or the end (consuming all messages starting from the next message to arrive). Consuming a message means moving the pointer to the next place in the journal. Pointers can also be moved backwards to consume messages that have been previously consumed, in contrast with a regular broker where messages get deleted once consumed.

This means that the consumer code may keep track of where it got up to, by storing/managing its own offset in the journal. Another difference from JMS, where state management is left up to the broker.

The best way to describe kTopic message distribution to contrast it with the behaviour of destinations in regular (non-Kafka) message brokers. Stepping back to a more traditional JMS world view, there are two types of destinations:

  • queues – first-in-first-out
  • topics – publish-subscribe

In queues, messages are dispatched in the same order that they are sent in; if there are more than one consumer, messages get dispatched in a round-robin fashion among these consumers, ensuring fair work sharing. One of the gripes about this is that by default you cannot guarantee that messages will be processed in order, as the order of processing once messages have been dispatched to consumers is non-deterministic (some consumers may run slower than others). You can address this with features such as JMS message groups, where groups of related messages are round-robin assigned to consumers, and messages within those groups are sent to the consumer to which that group is assigned.

This queueing behaviour is approximated in Kafka through the use of a single group ID for the kTopic. In this scenario a single consumer will receive all of the messages. Where it starts to get interesting is what happens when you have two consumers: here the latest consumer to join receives ALL of the unconsumed messages from the point of joining; the formerly busy consumer is suspended. This is a bit like the exclusive consumer feature in JMS, only that exclusivity is taken away by the last consumer to join; and is in stark contrast to the fair-work sharing of JMS.

One of Kafka’s underlying design goals is to be fast for both production and consumption. This has led to some interesting design trade-offs. The main one is the use of the journal as the fundamental construct – it is a very simple structure that is read and written very quickly to disk. The reason for this is that sequential disk access is extremely efficient.

If you wanted to distribute messages from a journal in a round-robin fashion, as per JMS, you would need a dispatching thread on the broker to coordinate message hand-off to consumers. This gets slower the more consumers you have (though it’s usually not visible most of the time, with a sensible number of consumers). A goal for Kafka was to scale consumption linearly with the number of consumers, which means that you need to reduce coordination.

Consider, however, the following:

  1. Coordination is an underlying property of concurrency.
  2. Horizontal scaling is achieved by applying parallelism.
  3. Concurrency is the prevention of parallelism.

So. The only way to get horizontal scaling of consumption in a queue distribution scenario is to effectively use multiple journals. Inhale deeply.

Kafka achieves this through the idea of partitioning. On the broker, you define how many partitions exist per kTopic. One partition corresponds to one journal. If you want to distribute messages fairly amongst N consumers, you need N partitions.

So how are messages distributed amongst the partitions? When messages are sent to a kTopic, they are sent through the following method on the Producer class:

    send(ProducerRecord<K,V> record): Future<RecordMetadata>;

A ProducerRecord contains a key (K) and a value (V). The value is the payload of your message (unlike in JMS there is no Message construct that allows you to send additional message metadata). The key is a business value provided by you that is used to shard your messages across the partitions.

It is the client’s responsibility to decide which partition to to send the message to, not the broker’s. This is done by configuring the Producer with an implementation of the Partitioner interface. The default behaviour is to determine the partition via key.hashCode() % numberOfPartitions. By default this will not give fair work sharing for a small set of keys, due to hashing collisions, which means that if you are partitioning to a small number of partitions (for the same number of parallel consumers) some of those partitions are likely to get more messages than the others. In the case of 2 partitions, one might not get any messages at all – leading to consumer starvation. You can balance this out through providing your own Partitioner implementation with a more business-specific partitioning function.

It is important to note that the Partitioner must consistently place related messages into the same partion, otherwise the entire purpose of this scheme – namely that related messages should be consumed in order – goes out the window.

Partitions are fairly assigned to consumers, and rebalanced when consumers come and go. Assuming 2 partitions:

  • If you have 1 consumer, they will receive messages from both partitions.
  • If you have 2, each will exclusively consume from a single one.
  • If you have 3, one will just hang around and not receive any messages.

In JMS topics, messages are distributed to all consumers who may be interested. In kTopics this is implemented by giving each consumer their own group ID. The distribution logic per-consumer is going to look exactly like the queue scenario above. In effect, this means that kTopics behave a lot like ActiveMQ’s virtual destinations or JMS 2.0 durable topics, rather than pure JMS topics – as the latter do not write messages to disk, whereas kTopics do.

Hopefully this goes a long way to clear up the fundamentals of message distribution in Kafka. It’s a really interesting messaging tech, but some of its behaviour is fundamentally shifted 90 degrees to what you would expect coming from a traditional messaging background. The price for getting great performance and utilisation in clusters have meant some additional interesting trade-offs and complexities (not covered here) in transactionality, durability and state management that are likely to catch out the unwary – more on that in another post.