Posts Tagged ‘ActiveMQ’


Configuring ActiveMQ transactions in Spring

It’s easy to configure Message Driven POJOs over ActiveMQ with Spring, but slightly more involved once you want to get transactions working correctly; here’s how to do it.

Start with a basic DefaultMessageListenerContainer config.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd


http://activemq.apache.org/schema/core


http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- setup an embedded JMS Broker for testing purposes -->
    <amq:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:61616" />
        </amq:transportConnectors>
    </amq:broker>

    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="consumer.connectionFactory" 
            class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amq.connectionFactory" />
    </bean>

    <bean id="consumer.messageListener" 
            class="net.jakubkorab.amq.CountingMessageListener" />

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
    </bean>
</beans>

An embedded broker is started, and a MessageListener (CountingMessageListener) is set up to pick up messages from a queue (sample.messages). MessageListenerContainers should always sit over a CachingConnectionFactory (see the Spring docs for details). So what happens if the listener throws an exception?

According to the intent of the URI configration on the connection factory, we’d like to see the broker attempt to redeliver the message again, and if that fails the message should end up in a dead letter queue (ActiveMQ.DLQ). However, if you test it, that’s not what happens at all.

You will not see any further redelivery attempts and there’s nothing in the dead letter queue. Why? Because you haven’t configured up a TransactionManager, Spring’s DefaultMessageListenerContainer picks up the message and immediately acknowledges it. As far as the broker is concerned, it’s job is done.

So let’s remedy this by adding one.

    <bean id="local.transactionManager" 
            class="org.springframework.jms.connection.JmsTransactionManager">
        <!-- can also refer to amq.connectionFactory -->
        <property name="connectionFactory" ref="consumer.connectionFactory" />
    </bean>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="topic:sample.messages" />
        <property name="messageListener" ref="consumer.messageListener" />
        <!-- add a reference to the transaction manager -->
        <property name="transactionManager" ref="local.transactionManager" />
    </bean>

This uses Spring’s own TransactionManager instance to now make sure that the message is consumed with no issues. Any exceptions thrown will now cause the broker to redeliver the message according to the redelivery policy specified in the connection URI, and once that is exceeded to send the offending message to the dead letter queue.


Update 13/09: Thanks to Sue Macey for pointing out this fragment from the Spring docs:

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.


This is sufficient for most cases. However let’s say you want to update a database via your listener. Transactions ought to be atomic, and the JmsTransactionManager won’t do the job for you here – you need to bring out the big guns.

Once you start managing more than one resource (broker, database etc) in a transaction, you bring in a sizeable overhead. Distributed transactions will involve some sort of two-phase commit protocol, and the chatter adds processing time to each operation. Should you decide that it’s definitely something you want to do, JTA is the tool for the job.

If you’re running in a full JEE container and using a DataSource that supports XA transactions (some do, some don’t), you’re in luck:

    <bean id="consumer.messagerListener" 
            class="net.jakubkorab.amq.DBAwareMessageListener" >
        <!-- listener uses a JdbcTemplate with a DataSource defined elsewhere -->
        <property name="dataSource" ref="xa.dataSource" />
    </bean>

    <!-- use the XA-specific version of the connection factory -->
    <bean id="amq.connectionFactory" 
            class="org.apache.activemq.ActiveMQXAConnectionFactory" 
            depends-on="my-broker">
        <property name="brokerURL" 
                value="tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager" 
            depends-on="my-broker"/>

    <bean id="consumer.jmsContainer" 
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumer.connectionFactory" />
        <property name="destinationName" value="sample.messages" />
        <property name="messageListener" ref="consumer.messagerListener" />
        <!-- change the transaction manager reference -->
        <property name="transactionManager" ref="jta.transactionManager" />
        <!-- 
            now tell the broker that it's involved in a transaction; this has the same effect
            as creating a transacted session using the JMS API
        -->
        <property name="sessionTransacted" value="true" />
    </bean>

The JtaTransactionManager will pick up and manage transactional resources using the JTA implementation from the server. However, you need a full JEE server for this. Tomcat for example, does not come with JTA. Neither for that matter will JUnit, which makes testing your transaction config problematic. In these cases, you need to make use of an external JTA implementation, such as Atomikos or JTOM.

Atomikos’ approach to transaction management outside of the server is to place their own wrappers around your resource definitions. Their transaction manager will then scan the Spring ApplicationContext to find these. To set it up takes a final step:

    <!-- implementation-specific wrapper used by the transaction manager -->
    <bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="amq.connectionFactory" />
    </bean>

    <bean id="jta.transactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager" 
                    init-method="init" 
                    destroy-method="close">
                <property name="forceShutdown" value="false" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp">
                <property name="transactionTimeout" value="300" />
            </bean>
        </property>
    </bean>

The JtaTransactionManager is configured to use an external underlying TransactionManager implementation, and wrappers are provided around the resources.

Assuming interaction with an HSQLDB instance, which doesn’t support XA transactions, you would add a further:

    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos NonXA to XA DataSource -->
    <bean id="xa.dataSource" class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
        <property name="uniqueResourceName" value="hsqldb" />
        <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
        <property name="url" value="jdbc:hsqldb:mem:myTestDb" />
        <property name="user" value="sa" />
        <property name="password" value="" />
        <property name="poolSize" value="3" />
    </bean>

When using a database that supports XA, you would use a com.atomikos.jdbc.AtomikosDataSourceBean instead.

The above should get you going, but remember that it’s important with any kind of transactional config to test that it actually works.