JDBC Persistence for Camel Aggregator

Apache Camel is an open-source integration framework. It provides an implementation of many of the known Enterprise Integration Patterns. Complex workflows can be created when combining the provided components just like LEGO blocks.

Camel uses a pluggable architecture that makes the creation of new components very easy. Camel provides more than an hundred different connectors and supported data formats out of the box. Axel Irriger wrote an interesting article about Camel. It gives a nice overview of all the provided functionalities.

Most of the configuration is done with the dedicated domain specific language, either in XML or in Java. Using convention over configuration reduces the required configuration when using the default settings.

Example (1)

Here is a simple route that moves file from the target/in to the target/out folder using both DSLs.

Java

XML

DSL and convention over configuration strongly reduce the time required to start using Camel. However, this default configuration is not always sufficient. Some components like the Content Enricher or the Aggregator need user defined strategies adapted to the use case.

Starting with version 2.3 Camel introduced a new Aggregator implementation. Despite being one of the more complex components it is still easy to use. Let’s take a closer look at it.

Aggregator2 Component

The Aggregator pattern concept goes like this (Source: Apache Camel website):

The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.

A correlation Expression is used to determine the messages which should be aggregated together.

An output message is created upon completion of one of the triggers which can be related to time, quantity or user defined.

Depending on the use case, the algorithm required to combine the different messages may differ. The old aggregator implementation had a simple default strategy: append the content of each received message to the output message. No default strategy is defined for the new aggregator. It must be provided with a bean implementing org.apache.camel.processor.aggregate.AggregationStrategy.

The old default behavior can be reproduced with the following class. A logger has been added to be able to easily inspect the current aggregation state:

Example (2)

An aggregator can be added to the simple route shown below

Java

XML

Aggregation is a stateful process. The default persistence mechanism uses an in-memory repository.
This means messages would be lost in case of failure. This behavior did not fit to our use case.
Camel provides a file-based alternative persistence strategy which uses HawtDB. Unfortunately that implementation for our Camel version was still experiencing issue in an OSGi environement.

Aggregation repository internals

The aggregation repository operates in two phases. During the first phase, messages having the same correlation id are combined using the provided aggregation strategy. Only the result is kept in the store along with its correlation id. For this, the aggregation location is used.

Upon completion (time,quantity,user-defined), the result message is sent to the next component on the route and moved to the completed location. If the message is successfully delivered, it is removed from that location. Otherwise, an aggregator background process scans at regular interval and performs redelivery until exhaustion. The message is then moved to the Dead Letter Channel.

Of course, all of this is transparent for the user. In you are interested in the component internals I suggest you to browse the source code and take a look at Camel in Action, chapter 8.2.

JDBC persistent store

JDBC seems to be a nicer approach for the repository persistence because it would support global transaction and cluster deployment. Therefore the org.apache.camel.spi.RecoverableAggregationRepository Interface had to implemented.

Implementation

Fortunately, most of the boilerplate code had already been done for the HawtDB aggregation repository. The JDBC implementation simply changes the location and format of the persisted data.
Since Camel body message may not be serializable, it is the component responsibility to perform the conversion back and forth with the persistent store. HawtDB uses its own Buffer object type to serialize the data. For JDBC we had to define the adequate format.

For both locations ids are String values, VARCHAR has been chosen because it makes it easy to query the database. Since the message body can hold any type of data (objet, binary, xml …) the fit clearly is a BLOB field.

To operate correctly, the aggregator will need to have access the database tables corresponding to the aggregation and completed locations. Creating these tables on-the-fly is not suited for production environment. Therefore, two tables must be created with the following structure or the aggregator will not start.

The name of the table can be changed if required, just modify aggregation with any other name given to the tables. That name is used in the component initialization. (see the repositoryName property)
More details about the implementation can be found on the mailling list associated thread or the corresponding JIRA ticket.

Configuration

The repository is a Java object that requires a transaction manager and a datasource. Unless specified otherwise, table names will be aggregation and aggregation_completed. Depending on your database you might need to provide a specific lob handler like it is done in this example for Oracle. As you can see, Spring makes it really easy.

To simplify, this article sample use an H2 database. H2 is an in-memory database that does not requires any specific lob handler. Therefore we can configure the repository like this:

Take a look in the sample source code if you want to see the datasource and transaction manager configuration.

Example (3)

With this last piece, we can now modify our simple route to be crash-resistant:

Java

XML

Maven dependencies

To start using this component, a single additional dependency needs to be added to you project pom.xml file.

In the latest version, the jdbc aggregator has been moved into the camel-sql module.

Sources

Source code for the sample can be downloaded here. It use Camel 2.7 and is self sufficient.
You can see the aggregation repository in action either by the unit test or play with it using the camel maven plugin. The plugin starts the route so you can experiment with it. Use mvn camel:run to start the route.

In this case the correlation id is the file name of all the messages put in the target/in directory. To have an aggregation output you need to drop at least five times a file having a given file name in the folder. Of course you can have concurrent aggregation if you use different filenames. Status is shown in the console. Be aware that the default pooling delay for file endpoints is 500ms.

The trace attribute has been set to true in the CamelContext to display all the information about message routing.

Conclusion

The aggregator is a powerful component in the Camel toolbox. Since version 2.6, it has been enriched with a new persistence capability. As shown in the article it is quite easy to use. Feel free to test/use it and tell us what you think about it.

1 comment to JDBC Persistence for Camel Aggregator

Leave a Reply

  

  

  


nine + = 16

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">