Remote partitioning with Spring Batch

In a previous article, we have discussed how staging the data to process with a sequence number could help us to build partitions that can be executed concurrently.  As explained in the documentation, the PartitionHandler is the component that knows about the mean to transport StepExecution requests to the actual worker(s). It basically sends one StepExecution request per partition to a set of  workers and wait for the results. Once each worker has contributed a result back to the handler, it returns the Collection of updated StepExecution instances. This simple and open design leads to a very simple PartitionHandler contract to fulfill.

Spring Batch provides a default PartitionHandler implementation that delegates the processing to a TaskExecutor (see TaskExecutorPartitionHandler). What it basically does is wrapping the execution of the partition in a FutureTask and delegating its processing to the underlying TaskExecutor (a thread pool). Once each FutureTask has completed, the implementation simply returns the StepExecution instances that each task has returned. Simple !

Our approach is to take those simple principles and see how they can be applied to a remote execution where JMS would be used to transport the requests and responses. Executing the partition remotely brings new challenges:

  1. The remote worker does not have the actual Step instance so it needs to be able to retrieve it based on a reference
  2. Compared to the default PartitionHandler, we need to wait for responses coming from different JVM(s)

Retrieving the Step instance

In order to provide a complete isolation for each batch, Spring Batch provides the ability to declare each job in a dedicated child application context. When the application starts with this feature, a component scans all the jobs matching a configurable expression and creates a child context for each of those files (we specify one job per file). For instance, the following configuration scans (and therefore register) all jobs declared in a XML files found in the config root package and starting with ‘job’.

The  JobLoader updates a component called the JobRegistry which is able to locate a Job by its name. This is the piece we have updated by introducing the concept of StepRegistry:

And of course, our JobLoader now updates the content of this registry.

Aggregating the result

Our partition handler needs now to send one StepExecutionRequest per partition to run, as a JMS message. Let’s ignore for now what happens on the remote end; we still need to deal with the results aggregation. In Spring Batch, one thread (the master) is managing the state of the entire job. So this thread must wait (i.e. block) until all responses have been received from the workers. In a cluster, we never know where the master thread will be so we had to create a mechanism where each job has a temporary queue associated to it. Having a temporary queue makes sure that only the master thread can listen for messages. And because we set the reference to this temporary queue as the JMSReplyTO header of each request, our remote workers are able to send the response to that queue as well.

Remains a classical problem in asynchronous communication: timeout. To deal with this problem, we have built a JmsAggregationService that is managed by the following interfaces:

  • AggregationItemListener: receives an event when a response is received by the aggregation service.
  • AggregationCompletionPolicy: determine when the aggregation is done, regardless of a potential timeout. In our case, this policy is straighforward since we expect one response per request so the only thing we have to do in this case is to use a default implementation that counts the number of responses
  • AggregationTimeoutPolicy: determines if a timeout should occur based on the time at which the aggregation started

While we could have built a straightforward policy that would timeout after some time, we decided to build an implementation that benefits from the metadata that Spring Batch stores in its internal model. Every time a chunk completes, the thread contributes some metadata to its StepExecutionContext and flushes it to the database. This is very handy because we can easily find out if a given partition is running or the last time it contributed something.

Based on these facilities, the aggregation service performs the following:

  1. Creates a JMS session
  2. Creates a temporary JMS queue that will be used for the aggregation
  3. Sends one message per request. Each message holds a reference to the temporary JMS queue
  4. Listen for incoming response
    • When no response has been received, it ask the timeout policy if it should timeout. Based on the result, it either throws an exception or listen again for a response
    • When a response has been received, it invokes all registered listeners and ask if it should complete. Based on the result, it either returns the result or listen again for a new response

Executing the partition on the remote worker

Executing the actual partition is now easy. Upon the reception of a StepExecutionRequest, the worker retrieves the actual Step instance using the StepRegistry. Once the partition completes or if an error occurs a response is sent to the destination that was set in the JMSReplyTO header of the request.

The diagram below summarizes the global solution

Note that even though the master seems to be separate from the actual slaves, it does not have to be that way in practice. This video demonstrates how an additional node can join a running instance. It also showcases what happens when a node is killed in the middle of a job!

YouTube Preview Image

Conclusion

These developments allows to support a remote partitioning use case for Spring Batch. And the good news is that BSB open sourced them (see the spring-batch and spring-batch-admin forks on github!). It should be noted that not a lot of code is actually JMS specific: most of the infrastructure can be reused to implement use cases where the transport protocol is different. Only the aggregation service and the remote invoker should be changed.

10 comments to Remote partitioning with Spring Batch

  • Juan

    This is very nice, thanks.
    How does it compare to org.springframework.batch.integration.partition.MessageChannelPartitionHandler in your experience?

  • Hi Juan,

    MessageChannelPartitionHandler was made after we started to work on this actually. We believe our implementations has several advantages over this one:

    1. You don’t need to see a “realistic receive timeout” (see the Javadoc of #handle) as our implementation knows about Spring batch. We make sure to timeout if a running partition has not contributed for quite some time. This is far more easier than setting a large timeout to give a chance for all partitions to complete (and having to wait for that timeout if an error occurs so that the Job is flagged properly)
    2. Our infrastructure does not require you to change anything to your job configuration. We have built a StepRegistry that is able to loacate a particular step name based on a job name, even if it has been registered in a child application context. That way, you can deploy your jobs just like you did for single machine processing and we’ll figure that out ourselves. The other implementation requires you extra config (such as the use of BeanFactoryStepLocator)

    Now the big advantage of the Spring integration solution is that you can use pretty much any transport protocol. Note that since this blog post, we have implemented the same with AMQP and RabbitMQ and we have a showcase of a batch running in the cloud on cloudfoundry. We should write a blog post for that as well.

    HTH,
    S.

  • [...] How to Partition a Slave Node’s Disks March 27th, 2012 by clairew Tweet [Image via BSB] [...]

  • Do you have any sample code in any repository (like GitHub) that shows how to do what are you showing?

  • Hi Marcel,

    Yes the fork has integration tests that showcase how you can change your regular Spring batch Job into a Job that run in a cluster/cloud.

    Have a look to:
    https://github.com/snicoll/spring-batch-admin/tree/a64b3435773e1177c8589a70710b6d54e93a8d19/spring-batch-integration/src/test/resources/org/springframework/batch/integration/partition/support

    Some tips:

    * Look at generic-partition-test-jobs.xml where the job defines a partition handler (the regular Spring batch way of course) with a reference to a “requestAggregator” (notice that the partition handler is our GenericPartitionHandler which does not exist in Spring batch)
    * The “requestAggregator” is the main piece that will distribute the partitions and collect the results. You can find an example for AMQP (that would work in CloudFoundry) in GenericPartitionHandlerWithAmqpTests-context.xml. This creates the destination to store the partitions and the message listener to listen for partitions and execute them

    In the same package you have the same for JMS so if you have a regular cluster with JMS support you could use that too.

    If you want to deploy your application to cloudfoundry, you just need to adapt your spring configuration so that the AMQP support is provisioned to you using the special cloud namespace.

    HTH,
    S.

    • Did you make any test with slave following one these estrategies:
      1. Slave partitioner
      2. Slave multiprocessor
      3. Slave chunk (this is probably the normal way)

      Based on these above estrategies, could you point the pro’s and con’s for each estrategy?

  • I don’t understand what you mean by slave partitioner.

  • Indeed a nice article, heard about spring batch but didn’t know we can use it like this.

  • [...] retrieving file contents as lines and more. Also check out Google Guava Libraries Essentials.* Remote partitioning with Spring Batch: This tutorial shows how to use Spring Batch in order to perform a remote execution of partitions [...]

Leave a Reply

  

  

  


5 + four =

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">