Configuring ActiveMQ transactions in Spring

It’s easy to configure Message Driven POJOs over ActiveMQ with Spring, but slightly more involved once you want to get transactions working correctly; here’s how to do it.

Start with a basic DefaultMessageListenerContainer config.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd


http://activemq.apache.org/schema/core


http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- setup an embedded JMS Broker for testing purposes -->
    <amq:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="consumer.connectionFactory" 
            class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amq.connectionFactory" />
    </bean>

    <bean id="consumer.messageListener" 
            class="net.jakubkorab.amq.CountingMessageListener" />

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
    </bean>
</beans>

An embedded broker is started, and a MessageListener (CountingMessageListener) is set up to pick up messages from a queue (sample.messages). MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details). So what happens if the listener throws an exception?

According to the intent of the URI configration on the connection factory, we’d like to see the broker attempt to redeliver the message again, and if that fails the message should end up in a dead letter queue (ActiveMQ.DLQ). However, if you test it, that’s not what happens at all.

You will not see any further redelivery attempts and there’s nothing in the dead letter queue. Why? Because you haven’t configured up a TransactionManager, Spring’s DefaultMessageListenerContainer picks up the message and immediately acknowledges it. As far as the broker is concerned, it’s job is done.

So let’s remedy this by adding one.

    <bean id="local.transactionManager" 
            class="org.springframework.jms.connection.JmsTransactionManager">
        <!-- can also refer to amq.connectionFactory -->
        <property name="connectionFactory" ref="consumer.connectionFactory" />
    </bean>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="topic:sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
        <!-- add a reference to the transaction manager -->
        <property name="transactionManager" ref="local.transactionManager" />
    </bean>

This uses Spring’s own TransactionManager instance to now make sure that the message is consumed with no issues. Any exceptions thrown will now cause the broker to redeliver the message according to the redelivery policy specified in the connection URI, and once that is exceeded to send the offending message to the dead letter queue.


Update 13/09: Thanks to Sue Macey for pointing out this fragment from the Spring docs:

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.


This is sufficient for most cases. However let’s say you want to update a database via your listener. Transactions ought to be atomic, and the JmsTransactionManager won’t do the job for you here – you need to bring out the big guns.

Once you start managing more than one resource (broker, database etc) in a transaction, you bring in a sizeable overhead. Distributed transactions will involve some sort of two-phase commit protocol, and the chatter adds processing time to each operation. Should you decide that it’s definitely something you want to do, JTA is the tool for the job.

If you’re running in a full JEE container and using a DataSource that supports XA transactions (some do, some don’t), you’re in luck:

    <bean id="consumer.messagerListener" 
            class="net.jakubkorab.amq.DBAwareMessageListener" >
        <!-- listener uses a JdbcTemplate with a DataSource defined elsewhere -->
        <property name="dataSource" ref="xa.dataSource" />
    </bean>

    <!-- use the XA-specific version of the connection factory -->
    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQXAConnectionFactory" 
            depends-on="my-broker">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager" 
            depends-on="my-broker"/>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messagerListener" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jta.transactionManager" />
        <!-- 
            now tell the broker that it's involved in a transaction; this has the same effect
            as creating a transacted session using the JMS API
        -->
        <property name="sessionTransacted" value="true" />
    </bean>

The JtaTransactionManager will pick up and manage transactional resources using the JTA implementation from the server. However, you need a full JEE server for this. Tomcat for example, does not come with JTA. Neither for that matter will JUnit, which makes testing your transaction config problematic. In these cases, you need to make use of an external JTA implementation, such as Atomikos or JTOM.

Atomikos’ approach to transaction management outside of the server is to place their own wrappers around your resource definitions. Their transaction manager will then scan the Spring ApplicationContext to find these. To set it up takes a final step:

    <!-- implementation-specific wrapper used by the transaction manager -->
    <bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager" 
                    init-method="init" 
                    destroy-method="close">
                <property name="forceShutdown" value="false" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp">
                <property name="transactionTimeout" value="300" />
            </bean>
        </property>
    </bean>

The JtaTransactionManager is configured to use an external underlying TransactionManager implementation, and wrappers are provided around the resources.

Assuming interaction with an HSQLDB instance, which doesn’t support XA transactions, you would add a further:

    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos NonXA to XA DataSource -->
    <bean id="xa.dataSource" class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
        <property name="uniqueResourceName" value="hsqldb" />
        <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
        <property name="url" value="jdbc:hsqldb:mem:myTestDb" />
        <property name="user" value="sa" />
        <property name="password" value="" />
        <property name="poolSize" value="3" />
    </bean>

When using a database that supports XA, you would use a com.atomikos.jdbc.AtomikosDataSourceBean instead.

The above should get you going, but remember that it’s important with any kind of transactional config to test that it actually works.

5 Comments

  • Preben says:

    Real nice – Have been looking for this a long time. No wonder why it’s hard to come around when you se the verbosity of the configuration needed to support XA. Frankly thats one of the reasons why we are still stuck with JEE appservers :-(

  • Michael says:

    We don’t have a mission-critical app, so in the end just gave up maintaining clean transaction support. Once the codebase became large, it was difficult to keep up with unit tests that required verifying that transaction support was still intact.

  • Ramin says:

    Nice article.

    Is it possible to use Spring Transaction annotation (@Transactional) as a transaction wrapper for the above example ?

    I mean commit database transaction inside a method annotated as transactional. (Also set sessionTransacted property as true).

  • Jesper says:

    Hi Ramin,
    I believe you can you @Transactional and fx. requires new and commit inside a transacted transaction. However if an RuntimeException is thrown inside the requiresnew, then you have to catch it if you dont want the whole transacted session to role back.

    Another question. Has anyone had luck with creating a redelivery delay in the transacted session?

    Cheesr,
    Jesper

  • Jack says:

    “MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details).”

    Please give me a quote, because I didn’t find it in the docs. In fact I found the opposite in the DefaultMessageListenerContainer javadoc:

    “Note: Don’t use Spring’s CachingConnectionFactory in combination with dynamic scaling. Ideally, don’t use it with a message listener container at all, since it is generally preferable to let the listener container itself handle appropriate caching within its lifecycle. Also, stopping and restarting a listener container will only work with an independent, locally cached Connection – not with an externally cached one.” (http://docs.spring.io/spring/docs/3.2.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html).

Leave a Comment