Author Archive


Bootstrap Projects for Getting Started with ServiceMix 4

Over the Christmas break I cleaned up and published a set of Maven projects for getting started with ServiceMix 4.X into GitHub. I found myself reusing the same code for a number of activities, and figured it may be of broader use to others. You can find it under jkorab/smx-bootstraps.

smx-bootstraps contains within it a set of OSGI Blueprints (DI, based on the ideas behind Spring) bundles that exercise some of the core things that you will want to do with ServiceMix:

  • defining services in bundles that can be reused in other bundles
  • use Camel for writing integration code
  • use ActiveMQ for sending persistent messages between bundles, regardless of whether they are in the same container or in others
  • request-response over messaging
  • externalising your environment configuration
  • group bundles into features

The README document at the project root explains how to get started with ServiceMix, deploy bundles, change code and play with config.

Why would you want to use them?

ServiceMix went through a massive generational change between versions 3.X and 4.X, moving from JBI to an OSGi based model. While development work on it is proceeding at a huge rate, the documentation hasn’t kept up – although it is being brought up to date in the background. smx-bootstraps contains small artifacts that are hopefully easy to understand and play with, along with instructions on how to use them in the container.

The project may also be use of use as a starting starting point to further development. A fairly clean project layout exists that you can use as a reference point, which acts as a supplement to the Maven archetypes that are publicly available, such as:

  • org.apache.camel.archetypes:camel-archetype-blueprint for generating Blueprint bundles to run Camel routes; similar to smx-pinger and smx-ponger bundles in smx-bootstraps
  • org.apache.karaf.archetypes:karaf-blueprint-archetype for generating simple Blueprint bundles; such as the smx-ponger-service bundle

I have found these bundles to be a really handy way of exercising ServiceMix features, and working with various configurations. Not having to code up something new each time is a huge time saver. Hopefully you should find this as well.

I expect to expand this little project as time goes on and I find myself recreating other use cases, such as exposing web services – next on my “todo list”. Please drop me a line at “jakub dot korab at gmail” if you find this useful or have any ideas that would fit in well. Of course being GitHub, feel free to fork it or contribute back changes.

Understanding ActiveMQ Broker Networks

Networks of message brokers in ActiveMQ work quite differently to more familiar models such as that of physical networks. They are not any harder to understand or reason about but we need to have an appreciation as to what exactly each of the pieces in the puzzle do by themselves in order to understand them in them large. I will try to explain each component piece progressively moving up in complexity from a single broker through to a full blown network. At the end you should have a feel of how these networks behave and be able to reason about the interactions across different topologies.

One of the key things in understanding broker-based messaging is that the production, or sending of a message, is disconnected from the consumption of that message. The broker acts as an intermediary, serving to make the method by which a method is consumed as well as the route that the message has travelled orthogonal to its production. You shouldn’t need to understand the entire postal system to know that you post a letter in the big red box and eventually it will arrive in the little box at the front of the recipient’s house. Same idea applies here.

Producer and consumer are unaware of each other; only the broker they are connected to

Connections are shown in the direction of where they were established from (i.e. Consumer connects to Broker).

Out of the box when a standard message is sent to a queue from a producer, it is sent to the broker, which persists it in its message store. By default this is in KahaDB, but it can configured to be stored in memory, which buys performance at the cost of reliability. Once the broker has confirmation that the message has been persisted in the journal (the terms journal and message store are often used interchangeably), it responds with an acknowledgement back to the producer. The thread sending the message from the producer is blocked at this time.

On the consumption side, when a message listener is registered or a call to receive() is made, the broker creates a subscription to that queue. Messages are fetched from the message store and passed to the consumer; it’s usually done in batches, and the fetching is a lot more complex than simply read from disk, but that’s the general idea. The consumer will usually at this stage process the message and subsequently acknowledge that the message has been consumed. The broker then updates the message store marking that message as consumed, or just deleting it (this depends on the persistence mechanism).

So what happens when there are more than one consumer on a queue? All things being equal, and ignoring consumer priorities, the broker will in this case hand out incoming messages in a round-robin manner to each subscriber.

Store-and-forward

Now to scale this up to two brokers, Broker1 and Broker2. In ActiveMQ a network of brokers is set up by connecting a networkConnector to a transportConnector (think of it as a socket listening on a port). A networkConnector is an outbound connection from one broker to another.

When a subscription is made to a queue on Broker2, that broker tells the other brokers that it knows about (in our case, just Broker1) that it is interested in that queue; another subscription is now made on Broker1 with Broker2 as the consumer. As far as an ActiveMQ broker is concerned there is no difference between a standard client consuming messages, or another broker acting on behalf of a client. They are treated in the exact same manner.

So now that Broker1 sees a subscription from Broker2, what happens? The result is a hybrid of the two producer and consumer behaviours. Broker1 is the producer, and Broker2 the consumer. Messages are fetched from Broker1′s message store, passed to Broker2. Broker2 processes the message by store-ing it in its journal, and acknowledges consumption of that message. Broker1 then marks the message as consumed.

The simple consume case then applies as Broker2 forwards the message to its consumers, as tough the message was produced directly into it. Neither the producer nor consumer are aware that any network of brokers exists, it is orthogonal to their functionality – a key driver of this style of messaging.

Local and remote consumers

It has already been noted that as far as a broker is concerned, all subscriptions are equal. To it there is no difference between a local “real” consumer, and another broker that is going to forward those messages on. Hence incoming messages will be handed out round-robin as usual. If we have 2 consumers – Consumer1 on Broker1, and Consumer2 on Broker2 – if messages are produced to Broker1, both consumers will each receive the same number of messages.

A networkConnector is unidirectional by default, which means that the broker initiating the connector acts as a client, forwarding its subscriptions. Broker2 in this case subscribes on behalf of its consumers to Broker1. Broker2 however will not be made aware of subscriptions on Broker1. networkConnectors can however be made duplex, such that subscriptions are passed in both directions.

So let’s take it one step further with a network that demonstrates why it is a bad idea to connect brokers to each other in an ad-hoc manner. Let’s add Broker3 into the mix such that it connects into Broker1, and Broker2 sets up a second networkConnector into Broker3. All networkConnectors are set up as duplex.

This is a common approach people take when they first encounter broker networks and want to connect a number of brokers to each other, as they are naturally used to the internet model of network behaviour where traffic is routed down the shortest path. If we think about it from first principles, it quickly becomes apparent that is not the best approach. Let's examine what happens when a consumer connects to Broker2.

  1. Broker2 echoes the subscription to the brokers it knows about - Broker1 and Broker3.
  2. Broker3 echoes the subscription down all networkConnectors other than the one from which the request came; it subscribes to Broker1.
  3. A producer sends messages into Broker1.
  4. Broker1 stores and forwards messages to the active subscriptions on it's transportConnector; half to Broker2, and half to Broker3.
  5. Broker2 stores and forwards to it's consumer.
  6. Broker3 stores and forwards to Broker2.

Eventually everything ends up at the consumer, but some messages ended up needlessly travelling Broker1->Broker3->Broker2, while the others went by the more direct route Broker1->Broker2. Add more brokers into the mix, and the store-and-forward traffic increases exponentially as messages flow through any number of weird and wonderful routes.

Very bad! Lots of unnecessary store-and-forward.

Fortunately, it is possible to avoid this by employing other topologies, such as hub and spoke.

Better. A message can flow between any of the numbered brokers via the hub and a maximum of 3 hops.

You can also use a more nuanced approach that includes considerations such as unidirectional networkConnectors that pass only a certain subscriptions, or reducing consumer priority such that further consumers have a lower priority than closer ones.

Each network design needs to be considered separately and trades off considerations such message load, amount of hardware at your disposal, latency (number of hops) and reliability. When you understand how all the parts fit and think about the overall topology from first principles, it's much easier to work through.

Batching JMS messages for performance; not so fast

Recently an idea had crossed my radar around speeding up performance of messaging producers; batching messages into a single transaction. The idea being that there is overhead in the standard behaviour of a message bus that can be optimised out if you group messages into a single transaction.

My initial thoughts about this were that it was unlikely; at best the performance would be the same as an untransacted client. The comments were framed in conversations of Spring’s JMSTemplate, which continually walks the object tree ConnectionFactory -> Connection -> Session -> MessageProducer, and back closing them off. Each close causes some network traffic between the library and the bus. My thoughts were that perhaps the discussion hadn’t accounted for caching these objects via a CachingConnectionFactory. I resolved to find out.

My other concern was that while it could be done in the general case, it didn’t make a lot of sense.

Generally speaking there are two categories of messages that will concern an application developer. Sending throwaway messages that aren’t critical is one thing – you can not persist them on the broker side and save 90% of the overhead. You can also do asynchronous sends, which won’t persist the messages and save further time by not giving your app confirmation that the broker received them. If you lose them, it won’t be a big deal.

The other type are the ones that you really care about. Things that reflect or affect a change in the real world – orders, instructions, etc. The value of a broker in this case comes from knowing that once it has the message, it will be dealt with.

In ActiveMQ, this guarantee comes via the message store. When you send a message the broker will not acknowledge receipt until the message has been persisted. If the broker goes down, it can be restarted, and any persisted messages may be consumed as though nothing happened, or another broker takes over transparently using the same message store (HA). As you can imagine, there is natural overhead. Two network traversals, and some I/O to disk. Keep this in mind.

I tested a couple of different scenarios.

  • JMSTemplate with a CachingConnectionFactory. This is the baseline.
  • An untransacted, persistent session. Each message gets sent off to the broker as they come in. The broker places the message in a persistent store. This is like the logic of the JMSTemplate, but where you control the lifecycles of JMS objects.
  • An untransacted, unpersisted session. Each message gets sent off to the broker as they come in. The broker stores the message in memory only for delivery.
  • Transacted sessions. Messages get bundled together into a transaction and committed.

Messages were posted to a queue with no consumers on a single broker with default setttings.

The results surprised me.

Average message send time


Average message throughput

Timings should be considered indicative only, and are only there for a relative comparison. Initial spikes should be considered part of a warm-up, and ignored.

Firstly, getting down to the JMS API was consistently faster than with JMSTemplate (not surprising in hindsight given its object walk). There was also indeed an upshot in performance from batching messages together for delivery in a transaction. The more messages there were in a batch, the better the performance (I repeated this with 10, 100 and 1000 messages per transaction). The results approached those of the non-persistent delivery mode, but never quite reached it. What accounts for this?

I did some digging, and came up with this description of transactions in ActiveMQ. It appears that there is a separate store in memory for uncommitted transactions. While a transaction is open messages are placed here, and are then copied over to the main store on commit.

So what to make of this? Should you forgo JMSTemplate and batch everything? Well, as with pretty much anything, it depends.

JMSTemplate is simple to configure and use – it also ties in nicely to the various Spring abstractions (TransactionManagers come to mind). There are certainly some upsides to getting down to the JMS API, however they come at a cost of time coding and losing those hooks; which you might regret later.

Batching itself is a funny one, in that it’s one of those things that could be easily misused. Transactions are not there for improving performance. They exist as a logical construct for making atomic changes. If you batch up messages in memory to send to the broker for a performance upshot that are unrelated; and your container goes down before you’ve had a chance to commit; you have violated the contract of “this business action has been completed”, and you lost your messages. Not something you want for important, change-the-world messages.

If you have messages that are relatively unimportant that won’t affect the correct operation of your system, there are better options available to you for improving performance such as dropping persistence.

If, however, you have related messages tied to a single operation, then perhaps getting down to the JMS API and batching these together under a single transaction is something you might want to consider. The key here is ensuring that you don’t lose the real meaning of transaction.


Update 1 (13/11): Replaced graphs with log scale to make the series clearer.

Update 2 (13/11):Thanks to James Strachan for an extended explanation:

The reason transactions are so much faster on the client site can be explained comparing it to the normal case, where the client blocks until the message has definitely gone to disk on the broker.

In a transaction the JMS client doesn’t need to wait until the broker confirms the message because it’s going straight into the transactional store (memory). Because the broker will complain on a commit if something has gone awry, the client can afford to not block at all doing a send or acknowledge (not even blocking for it to be sent over the socket), so it behaves like a true async operation. This means it’s very fast.

This explains why transactional performance approaches that of asych operations with the size of the batch – because for all intents and purposes it is nearly asynchronous. The only cost is that of the synchronous commit(), which is a flush from the transactional store to the journal.

You could argue that this doesn’t really affect the actual throughput – as the difference between batching & no transactions is just the client sitting around blocking. The benefit of the transactions is that it’s making each client much more effective. I.e. to get equivalent throughput you would need more clients running concurrently. In the case of the Camel, this could be achieved by configuring in a larger thread pool.

A brief outline of when to use transactions.

Configuring ActiveMQ transactions in Spring

It’s easy to configure Message Driven POJOs over ActiveMQ with Spring, but slightly more involved once you want to get transactions working correctly; here’s how to do it.

Start with a basic DefaultMessageListenerContainer config.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-3.0.xsd

http://activemq.apache.org/schema/core

http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- setup an embedded JMS Broker for testing purposes -->
    <amq:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="amq.connectionFactory"
            class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL"
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="consumer.connectionFactory"
            class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amq.connectionFactory" />
    </bean>

    <bean id="consumer.messageListener"
            class="net.jakubkorab.amq.CountingMessageListener" />

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
    </bean>
</beans>

An embedded broker is started, and a MessageListener (CountingMessageListener) is set up to pick up messages from a queue (sample.messages). MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details). So what happens if the listener throws an exception?

According to the intent of the URI configration on the connection factory, we’d like to see the broker attempt to redeliver the message again, and if that fails the message should end up in a dead letter queue (ActiveMQ.DLQ). However, if you test it, that’s not what happens at all.

You will not see any further redelivery attempts and there’s nothing in the dead letter queue. Why? Because you haven’t configured up a TransactionManager, Spring’s DefaultMessageListenerContainer picks up the message and immediately acknowledges it. As far as the broker is concerned, it’s job is done.

So let’s remedy this by adding one.

    <bean id="local.transactionManager"
            class="org.springframework.jms.connection.JmsTransactionManager">
        <!-- can also refer to amq.connectionFactory -->
        <property name="connectionFactory" ref="consumer.connectionFactory" />
    </bean>

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="topic:sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
        <!-- add a reference to the transaction manager -->
        <property name="transactionManager" ref="local.transactionManager" />
    </bean>

This uses Spring’s own TransactionManager instance to now make sure that the message is consumed with no issues. Any exceptions thrown will now cause the broker to redeliver the message according to the redelivery policy specified in the connection URI, and once that is exceeded to send the offending message to the dead letter queue.


Update 13/09: Thanks to Sue Macey for pointing out this fragment from the Spring docs:

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.


This is sufficient for most cases. However let’s say you want to update a database via your listener. Transactions ought to be atomic, and the JmsTransactionManager won’t do the job for you here – you need to bring out the big guns.

Once you start managing more than one resource (broker, database etc) in a transaction, you bring in a sizeable overhead. Distributed transactions will involve some sort of two-phase commit protocol, and the chatter adds processing time to each operation. Should you decide that it’s definitely something you want to do, JTA is the tool for the job.

If you’re running in a full JEE container and using a DataSource that supports XA transactions (some do, some don’t), you’re in luck:

    <bean id="consumer.messagerListener"
            class="net.jakubkorab.amq.DBAwareMessageListener" >
        <!-- listener uses a JdbcTemplate with a DataSource defined elsewhere -->
        <property name="dataSource" ref="xa.dataSource" />
    </bean>

    <!-- use the XA-specific version of the connection factory -->
    <bean id="amq.connectionFactory"
            class="org.apache.activemq.ActiveMQXAConnectionFactory"
            depends-on="my-broker">
        <property name="brokerURL"
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="jta.transactionManager"
            class="org.springframework.transaction.jta.JtaTransactionManager"
            depends-on="my-broker"/>

    <bean id="consumer.jmsContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messagerListener" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jta.transactionManager" />
        <!--
            now tell the broker that it's involved in a transaction; this has the same effect
            as creating a transacted session using the JMS API
        -->
        <property name="sessionTransacted" value="true" />
    </bean>

The JtaTransactionManager will pick up and manage transactional resources using the JTA implementation from the server. However, you need a full JEE server for this. Tomcat for example, does not come with JTA. Neither for that matter will JUnit, which makes testing your transaction config problematic. In these cases, you need to make use of an external JTA implementation, such as Atomikos or JTOM.

Atomikos’ approach to transaction management outside of the server is to place their own wrappers around your resource definitions. Their transaction manager will then scan the Spring ApplicationContext to find these. To set it up takes a final step:

    <!-- implementation-specific wrapper used by the transaction manager -->
    <bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
    </bean>

    <bean id="jta.transactionManager"
            class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager"
                    init-method="init"
                    destroy-method="close">
                <property name="forceShutdown" value="false" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp">
                <property name="transactionTimeout" value="300" />
            </bean>
        </property>
    </bean>

The JtaTransactionManager is configured to use an external underlying TransactionManager implementation, and wrappers are provided around the resources.

Assuming interaction with an HSQLDB instance, which doesn’t support XA transactions, you would add a further:

    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos NonXA to XA DataSource -->
    <bean id="xa.dataSource" class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
        <property name="uniqueResourceName" value="hsqldb" />
        <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
        <property name="url" value="jdbc:hsqldb:mem:myTestDb" />
        <property name="user" value="sa" />
        <property name="password" value="" />
        <property name="poolSize" value="3" />
    </bean>

When using a database that supports XA, you would use a com.atomikos.jdbc.AtomikosDataSourceBean instead.

The above should get you going, but remember that it’s important with any kind of transactional config to test that it actually works.

Give your DB the love it deserves

Pity the poor database. As critical to most apps as a foundation to a building. And as interesting as an accounting seminar at a nudist colony.

Not sexy enough for the attentions of the senior dev, or considered to be “well understood”, DB work frequently end up getting handed off to the junior guys on the team. Who promptly make all the mistakes the senior guys have learned not to make. Mistakes which end up with massive hunks of sub-optimal compensating code in layers above them. Then they write some code off the back of them. Viola! Instant technical debt.

Queue self-perpetuating “relational databases aren’t web scale“, “normalised schemas aren’t performant”, “You don’t have these problems with NoSQL”.

Senior guys often don’t have the time to deal with it. DBAs aren’t seen as being responsive enough for JFDI/iterative development. Peer review at the end is too late.

So what’s the fix? Just enough design. Back of napkin. Whack together a schema, and talk through with someone (else) who knows what they’re doing. Then code. It’s not rocket science.

Fashion!

Read this and chuckled.

“Our industry, the global programming community, is fashion-driven to a degree that would embarrass haute couture designers from New York to Paris. We’re slaves to fashion. Fashion dictates the programming languages people study in school, the languages employers hire for, the languages that get to be in books on shelves. A naive outsider might wonder if the quality of a language matters a little, just a teeny bit at least, but in the real world fashion trumps all.”

Original from Forword to Joy of Clojure.

Goldilocks actors: not too many, not too few

After my last post on parallelism with Scala actors, I had a thought: when doing a calculation like this, am I actually making the most of my resources? If as in the Pi example, more cycles will generally lead to a better result, surely if I have a limited amount of time to get the best value I want to squeeze every last bit of juice from my hardware. It may seem a trivial question for a small problem, but this has real-world implications in domains like banking where a few more cycles in a pricing or risk calculation means a difference in income.

There’s a bit of lore that says that the optimal number of threads to perform an opertation is equal to the number of processors. Likewise, there’s a generally accepted idea that you will get a progressive deterioration in performance as more threads are run concurrently due to context-switching.

In other words, you can have too many actors. Gratuitous use of images to make tech blog seem more humorous.

I wasn’t convinced, and decided to try it out for myself. My thinking was that the one-thread per processor idea was a bit too clean. With processor hyperthreading, I/O overhead, JVM magic and other factors, it was pretty likely that at least (number of processors + 1) * actors would be able to run before performance degraded.

Stand back! I’m about to try Science!

So, I rewote the example to test throughput (check it out if you’re interested, but it’s more or less the same as the first, but with more jiggery-pokery), and ran it on my dual-core laptop:

1 actor (output cleaned up):

...
PiEvaluator:Shutting down generators
PointGenerator[1]:exiting having generated 47040000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 47040000 points
Aggregator:Pi is approx: 3.1415372448979593


2 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141657176679491
PointGenerator[2]:exiting having generated 34000000 points
PointGenerator[1]:exiting having generated 33690000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 67690000 points
Aggregator:Pi is approx: 3.1416520608657112


3 actors:

...
PiEvaluator:Shutting down generators
PointGenerator[1]:exiting having generated 20470000 points
PointGenerator[2]:exiting having generated 27680000 points
PointGenerator[3]:exiting having generated 20410000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 68560000 points
Aggregator:Pi is approx: 3.141365577596266


4 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141612733060482
PointGenerator[2]:exiting having generated 27660000 points
PointGenerator[1]:exiting having generated 19670000 points
PointGenerator[3]:exiting having generated 18790000 points
PointGenerator[4]:exiting having generated 10000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 66130000 points
Aggregator:Pi is approx: 3.1416107061847875


5 actors:

...
PiEvaluator:Shutting down generators
Aggregator:Pi is approx: 3.141523427529626
PointGenerator[2]:exiting having generated 28220000 points
PointGenerator[4]:exiting having generated 10000 points
PointGenerator[5]:exiting having generated 10000 points
PointGenerator[1]:exiting having generated 18810000 points
PointGenerator[3]:exiting having generated 18850000 points
PiEvaluator:Shutting down aggregator
Aggregator:Considered 65900000 points
Aggregator:Pi is approx: 3.141531047040971


I ran this quite a few times to verify that I wasn’t getting inconsistent results due to GC or other one-off factors. The results were pretty consistent:

  • The jump from 1 to 2 actors gave a huge boost in performance as the second processor joined the fray. Interesting to know that there wasn’t exactly a 100% performance boost, more like ~50%. There were programs, notably Spotify, running on my PC at the same time as the test, so that may account for it.
  • When a third actor was introduced, there was a proportionately small, but repeatable increase in the number of calculations that could be run at the same time, equivalent to 2-3%
  • beyond 3 actors, the Scala actors library itself looks to have kicked in (I may be wrong here, so please correct me if you know otherwise). The 4th and 5th actors were starved of work as opposed to being allowed to degrade the performance of the first three as was expected. It wasn’t until the first three actors started shutting down that the ones created later got to finish their first batch of work.

So, don’t take any known lore for granted. Much like anywhere else on the JVM, tuning performance is down to experimentation.

As an aside, it was interesting to note that beyond a point, my evaluation of Pi wasn’t getting any better. I put it down to the imprecision of floating-point arithmetic. Key take-away point: when programming a moon launch, don’t use doubles.

Better living through parallelism

Judging by the interest to my last actors post, I thought I’d throw up a piece of code that uses actors anonymously to parallelise a long running operation. Not every operation can be parallelised, most things we work on tend to be fairly sequential. However, sometimes if you can split up the work to perform an operation, you can get some serious bang for your buck.

Let’s take a pretty interesting little problem as an example, working out pi. I first encountered this problem in a job interview a while back (yeah, I thought it was out there too). It’s an example of what’s known as a Monte Carlo Simulation. The general way to work this out is described in detail in this Google Code University tutorial on parallel programming (worth taking a look).

The genaral approach works on the basis of this diagram.

The steps are as follows:

  1. generate points randomly in the square
  2. work out whether each point falls into the circle (distance from centre is less than the radius, use Pythagoras here)
  3. the proportion of points in the circle to those in the square is approximately the same as that of the areas of the two shapes; transpose the area formulas (I’ll show it in the code) to work out an approximate value of pi

It’s a nice little problem, because the more points you generate, the better the estimate gets. It’s also easily parallelisable as the first two steps can be performed repeatedly by anyone. Let’s do a first pass in a single thread:

package net.jakubkorab.pi_evaluator

import scala.actors._
import scala.actors.Actor._
import scala.math._

object PiEvaluator {
	val sideLength : Double = 1
	val radius : Double = sideLength / 2
	val totalPointsToSample : Long = 10000

	def isRandomPointWithinCircleRadius() : Boolean = {
		val x = (random * sideLength) - radius
		val y = (random * sideLength) - radius
		val hypotenuse = sqrt(pow(abs(x), 2) + pow(abs(y), 2)) // pythagoras
		hypotenuse <= radius
	}

	def pointsInCircle(pointsToSample : Long) : Long = {
		(1L to pointsToSample)
			.map((i : Long) => isRandomPointWithinCircleRadius())
			.foldLeft(0L) { (pointCount : Long, pointInCircle : Boolean) =>
				if (pointInCircle) pointCount + 1 else pointCount
			}
	}

	def approximatePi(pointsEvaluated : Long, pointsInCircle : Long) = {
		println("After " + pointsEvaluated
			+ " samples, the number of points in a circle was " + pointsInCircle)

		val areaOfSquare = sideLength * sideLength
		// areaOfCircle/areaOfSquare =~ pointsInCircle/pointsEvaluated, so
		val areaOfCircle = pointsInCircle * areaOfSquare / pointsEvaluated
		// areaOfCircle = pi * r^2, so
		val approximatePi = areaOfCircle / pow(radius, 2)
		println("Pi is approx: " + approximatePi)
	}

	def main(args : Array[String]) = {
		approximatePi(totalPointsToSample, pointsInCircle(totalPointsToSample))
	}
}

At line 20, there’s a nice little bit of map-reduce action going on (fold is an equivalent name). What’s going on is that for each number, we’re generating a random number an determining whether it’s in a circle. Then we fold/reduce the list of booleans returned by the map function to give a count of all the points that were generated within the radius of the circle. It’s a super useful little programming construct.

We then call approximatePi() to do our maths for us.

Running this (I’m doing it through SBT, that’s why the [Info] outputs) we get:

After 10000 samples, the number of points in a circle was 7883
Pi is approx: 3.1532
[info] == run ==
[success] Successful.
[info]
[info] Total time: 5 s, completed 30-Jan-2011 13:45:46


Not bad, but we need more points to get a better result. We know that pi should be around 3.14159, but we’re not quite there yet. Scaling up our numbers:

Samples Time (s) Pi
10,000 5 3.1532
100,000 5 3.13676
1,000,000 7 3.141188
10,000,000 20 3.1424864
100,000,000 189 3.14139356


Ok, we’re approaching our known value, but this approach has shown some stress. It’s not scaling too well. It’s slowing right down, but my processor (Intel Core2 Duo T9300 @ 2.5GHz) is showing only ~60% usage. If we could break down the point generation among a number of threads, and then tally up the result at the end, we could make better use of our hardware. Enter fork-join.

Fork-join is probably one of the most common things that you’d want to do with concurrency. It’s a bit of a pain in the butt in Java (task executor framework, futures etc.). It really shouldn’t be. There’s some excellent work going on around a framework for this, and map-reduce too, for future versions of the language. In the meantime, you need to jump through hoops – cue fishing through Concurrency in Practice. In Scala though, it’s really easy using actors. So we rewrite main:

	val workers = 10
	def main(args : Array[String]) = {
		// break down the job between worker - there's a rounding issue here, but never mind
		val samplesPerWorker = totalPointsToSample / workers
		1 to workers foreach { (workerNumber : Int) =>
			val worker = actor {
				receive {
					case pointsToEvaluate : Long => {
						sender ! (pointsToEvaluate, pointsInCircle(pointsToEvaluate)) // reply with a tuple
					}
				}
			}
			worker ! samplesPerWorker
		}

		var workersReplied = 0
		var totalPointsEvaluated = 0L
		var totalPointsInCircle = 0L
		while (workersReplied < workers) {
			receive {
				case (pointsEvaluated : Long, pointsInCircle : Long) => {
					workersReplied += 1
					totalPointsEvaluated += pointsEvaluated
					totalPointsInCircle += pointsInCircle
					if (workersReplied % 100 == 0) {
						println(workersReplied + " workers replied")
					}
				}
			}
		}
		approximatePi(totalPointsEvaluated, totalPointsInCircle)
	}
Samples Workers Time (s) Pi
10,000,000 10 18 3.1427624
100,000,000 10 64 3.1418042


64s is a lot better than our original 189s! And this effect just gets more pronounced the more processors you have (where the optimal number of actors matches the number of processors; at least it should be – feel free to play around with this).

In the second version of main, the actor keyword/block is actually a method on the Actor object that creates an anonymous actor instance and instantly starts it. We ping some messages to it, and wait for the results using receive().

Easy, and you can apply it for a ton of different tasks.

Why I dig Scala: Concurrency and the Dining Philosophers

I am occasionally asked what the big deal is about Scala. For me, to decide whether a programming language is worthwhile is dependent on two practical questions: does it aid comprehension, and does it reduce code. The two are not necessarily interchangeable. Terseness, after all, does nothing to aid comprehension. Scala scores points on both counts. It also has a sweet spot that I haven’t encountered elsewhere, that is that it lends itself to concurrent programming in a way which is easy to reason about, and therefore get right. It does this through a set of supporting language features that combined allows us to code at a higher level of abstraction: a leaning towards immutability, functional constructs (such as closures), as well as a familiar way to model a domain via object orientation.

Put together, it’s massively powerful. As it runs on the JVM you could code Scala concurrency the same way as Java, with the bog-standard tools such as wait/notify/notifyAll or the JDK 5+ concurrency libraries. You might get some shorter code, but it misses the point. Scala comes with an implementation of the Erlang-inspired actors model out of the box, which lets you deal with the problems of concurrency in a manner that is much easier to reason about. Actors aren’t a language construct, but a library that makes use the underlying platform (the JVM) and Scala’s language features to provide a much simpler mental model for us to deal with. Actors are “like” threads (not really, but close enough for a starting point) that send and receive messages to and from other actors. How does this aid comprehension? Synchronous and asynchronous messages are very simple to reason about, and IMHO much more straightforward than the Java concurrency libraries (compare the actors documentation to the Trains Book).

Consider the classic computer science concurrency problem, the Dining Philosophers. A number of philosophers sit down at a round table to do some eating and thinking. Each philosopher brings with him a single chopstick that he places on his right hand side. So you have X philosophers and X chopsticks. To eat, a philosopher must pick up the chopsticks on his left and right sides. Leaving aside hygiene issues, it’s a cool toy problem around resource contention. So, how would you do this in Scala? The mental leap to be made is that “everything is an actor”. Given a number of philosophers dining at a table, it’s quite nicely modelled if you think about both the philosophers and the table as actors that pass messages beteen each other. If you want to see the whole file (~150 lines), it’s available here.

Firstly the messages that we’re going to be passing around:

package net.jakubkorab.philosophers

import messages._
import scala.actors._
import scala.actors.Actor._
import scala.math._

package messages {
	class Chopstick(val position : Int)

	object Side extends Enumeration {
		type Side = Value
		val Left, Right = Value

		def randomSide() = { Side(floor(Side.values.size * random).intValue) }
		def otherSide(side : Side.Value) = { Side.values.find{_ != side}.get }
	}

	sealed abstract class Message

	abstract class TableMessage() extends Message
	case class AllFinished() extends TableMessage

	abstract class ChopstickResponse() extends TableMessage
	case class ChopstickAvailable(val chopstick : Chopstick) extends ChopstickResponse
	case class ChopstickUnavailable() extends ChopstickResponse 

	abstract class DinerMessage() extends Message
	case class RequestChopstick(val philosopher : Philosopher, val side : Side.Value) extends DinerMessage
	case class ReplaceChopstick(val chopstick : Chopstick) extends DinerMessage
	case class CouldNotEatAnotherBite(val guest : String) extends DinerMessage
}

Messages don’t actually need to be of any particular type, I just like thinking of that sort of thing in a hierarchy. All of the messages that I’ll pass around are subclasses of Message.

Now for our philosophers:

class Philosopher(val name : String, val wordsOfWisdom : String) extends Actor {
	var table : Actor = null
	var seatedAt : Int = -1

	private var timesLeftToEat = 3
	override def act() = {
		while (timesLeftToEat > 0) {
			think()
			val side = Side.randomSide // pick a chopstick to use first
			say("Requesting chopstick 1 on " + side)
			table !? (1000, RequestChopstick(this, side)) match {
				case Some(ChopstickAvailable(chopstick1 : Chopstick)) => {
					pause() // put in a delay so we can see actors switching
					val otherSide = Side.otherSide(side) // request the other
					say("Requesting chopstick 2 on " + otherSide)
					table !? (100, RequestChopstick(this, otherSide)) match {
						case Some(ChopstickAvailable(chopstick2 : Chopstick)) => {
								eat()
								pause()
								table ! ReplaceChopstick(chopstick1) // return chopsticks
								pause()
								table ! ReplaceChopstick(chopstick2)
							}
						case Some(ChopstickUnavailable()) => {
							say("No " + otherSide + " chopstick");
							table ! ReplaceChopstick(chopstick1);
						}
						case None => { say("None"); table ! ReplaceChopstick(chopstick1) }
					}
				}
				case Some(ChopstickUnavailable()) => { say("No " + side + " chopstick") } // no luck getting a chopstick
				case None => say("None")
			}
		}
		react {
			case AllFinished => { say(wordsOfWisdom); exit }
		}
	}

	private def think() = { say("Hmm"); pause() }
	private def eat() = {
		say("Nom nom");
		timesLeftToEat -= 1
		if (timesLeftToEat == 0) {
			table ! CouldNotEatAnotherBite(name)
		}
	}
	private def say(s : String) = { println(name + ": " + s) }
	private def pause() = { Thread.sleep(ceil(random * 1000).intValue) }
}
object Philosopher {
	def apply(name : String, wordsOfWisdom : String) = new Philosopher(name, wordsOfWisdom)
}

As I said, it’s pretty straightforward if you think of an actor as a Thread. Think of act() as the equivalent of Runnable#run(). Philosophers will be instantiated with a name and some words of wisdom they’ll come up with. Once they’re sat at a table, they’ll receive an instance of table for them to communicate with and a place where they’re sitting. Messages are sent either asynchronously to the table using the ! method, or synchronously using !? (in which case the number that follows is a timeout). The syntax may be unfamiliar, but I think it reads pretty easily even to those unfamiliar with Scala. I won’t go through it in detail. A philosopher sends a chopstick request to the table and gets a response, either that a chopstick is available, or that it’s unavailable. Pretty straightforward.

So, now the table.

class Table(val philosophers : Set[Philosopher]) extends Actor {
	if (philosophers.size < 2) throw new IllegalArgumentException("At least 2 philosophers must dine together")
	var chopsticks = new Array[Chopstick](philosophers.size)
	var location = 0
	philosophers.foreach { philosopher =>
		chopsticks(location) = new Chopstick(location)  // lay the cutlery
		philosopher.seatedAt = location
		location += 1
	}
	var guestsEating = philosophers.size

	override def act() = {
		println("Starting the meal")
		philosophers.foreach { philosopher => philosopher.table = self; philosopher.start  } // let's go
		while (true) {
			receive {
				case RequestChopstick(philosopher : Philosopher, side : Side.Value) => giveChopstickIfAvailable(philosopher, side)
				case ReplaceChopstick(chopstick : Chopstick) => replaceChopstick(chopstick)
				case CouldNotEatAnotherBite(guest : String) => guestFinished(guest)
			}
		}
	}

	private def giveChopstickIfAvailable(philosopher : Philosopher, side : Side.Value) = {
		var index = if (side == Side.Right) philosopher.seatedAt else philosopher.seatedAt - 1
		if (index < 0) { index = philosophers.size - 1 } // get the one on the end of the array

		val chopstick = chopsticks(index)
		if (chopstick == null) {
			println("No chopstick available at " + index)
			sender ! ChopstickUnavailable() // sender, not philosopher!
		} else {
			chopsticks(index) = null
			sender ! ChopstickAvailable(chopstick)
		}
	}

	private def replaceChopstick(chopstick : Chopstick) = {
		chopsticks(chopstick.position) = chopstick
	}

	private def guestFinished(guest : String ) = {
		println(guest + " is done")
		guestsEating -= 1
		if (guestsEating == 0) {
			philosophers.foreach {_ ! AllFinished}
			println("All done")
			exit
		}
	}
}

The role of the table is to manage the resources, in this case the chopsticks. Calling start() on an actor is analogous to Thread#start().

And now, to kick it all off, let’s stick some philosophers on a table. I have chosen the Greco-Roman Stoics for their easy going approach to life, but any school of thought will do. Chinese philosophers may have been more appropriate to the cutlery. My example, my choice.

object PhilosophersLauncher {
	def main(args : Array[String]) = {
		val table = new Table(
			Set(Philosopher("Seneca the Younger", "The point is, not how long you live, but how nobly you live."),
				Philosopher("Epictetus", "Freedom is secured not by the fulfilling of men's desires, but by the removal of desire." ),
				Philosopher("Marcus Aurelius", "Everything is right for me, which is right for you, O Universe."),
				Philosopher("Zeno of Citium", "Shit happens.")) // one of his lesser known ones
			).start
	}
}

So, does it work?

Starting the meal
Seneca the Younger: Hmm
Epictetus: Hmm
Marcus Aurelius: Hmm
Seneca the Younger: Requesting chopstick 1 on Right
Marcus Aurelius: Requesting chopstick 1 on Right
Epictetus: Requesting chopstick 1 on Right
Marcus Aurelius: Requesting chopstick 2 on Left
No chopstick available at 1
Marcus Aurelius: No Left chopstick
Marcus Aurelius: Hmm
Epictetus: Requesting chopstick 2 on Left
No chopstick available at 0
Epictetus: No Left chopstick
Epictetus: Hmm
Marcus Aurelius: Requesting chopstick 1 on Left
Seneca the Younger: Requesting chopstick 2 on Left
Seneca the Younger: Nom nom
Marcus Aurelius: Requesting chopstick 2 on Right
Marcus Aurelius: Nom nom
Epictetus: Requesting chopstick 1 on Left
No chopstick available at 0
Epictetus: No Left chopstick
Epictetus: Hmm
...
Epictetus: Requesting chopstick 1 on Left
Zeno of Citium: Requesting chopstick 1 on Left
Epictetus: Requesting chopstick 2 on Right
Epictetus: Nom nom
Epictetus is done
Zeno of Citium: Requesting chopstick 2 on Right
Zeno of Citium: Nom nom
Zeno of Citium: Hmm
Zeno of Citium: Requesting chopstick 1 on Right
Zeno of Citium: Requesting chopstick 2 on Left
Zeno of Citium: Nom nom
Zeno of Citium is done
All done
Marcus Aurelius: Everything is right for me, which is right for you, O Universe.
Epictetus: Freedom is secured not by the fulfilling of men's desires, but by the removal of desire.
Seneca the Younger: The point is, not how long you live, but how nobly you live.
Zeno of Citium: Shit happens.


Yup. I think it’s pretty easy to make sense of all this. You can easily reason about what happens when, just by drawing a sequence diagram. Consider the backdown strategy when a philosopher can’t get hold of a chopstick:

Zeto->Table: RequestChopstick(Left)
activate Table
Table-->Zeto: ChopstickAvailable(C0)
deactivate Table

Epictetus->Table: RequestChopstick(Left)
activate Table
Table-->Epictetus: ChopstickAvailable(C1)
deactivate Table

Zeto->Table: RequestChopstick(Right)
activate Table
Table-->Zeto: ChopstickUnavailable()
deactivate Table
Zeto->Table: ReplaceChopstick(C0)

note over Zeto: Sleeps for a bit before trying again

Epictetus->Table: RequestChopstick(Right)
activate Table
Table-->Epictetus: ChopstickAvailable(C0)
deactivate Table
Epictetus->Epictetus: Eat
Epictetus->Table: ReplaceChopstick(C1)
Epictetus->Table: ReplaceChopstick(C0)


So, actors are cool, and in Scala they are easy to reason about due to the syntax and ability to mix in OO concepts. For a comparison, check out the equivalent in Java using semaphores. The Scala version is far less code and much easier to comprehend. And if you think that’s cool, check out Akka.

Bored with software?

What’s interesting right now in software isn’t the new shiny thing. We already have the tools to do most of what we want. What’s interesting is scale and change.

You build a system. Then you realize you need to break out and share functionality via modules. Then you want to manage them independently in live environments. And not take the system down. And have the old transactions finish on the old code while the new work hits the new code.

You build logic. It grows to the point where your original hand crafted solution is too unweildy. You need a rules engine, or workflow. Your code needs to keep running. A rewrite is not an option. Rework, refactor, augment, migrate. But don’t break what’s there.

You just wanted to integrate to that one external system. Web services behind a facade. Now another, this time via messaging. All of a sudden it’s 12. Integration framework? ESB? You’re in a cluster, shared network memory, processes that can only run in one place at a time. What’s the last straw, the tipping point to your next upgrade? Where to from here?

That’s what’s interesting.