Remote partitioning with Spring Batch

In a previous article, we have discussed how staging the data to process with a sequence number could help us to build partitions that can be executed concurrently.  As explained in the documentation, the PartitionHandler is the component that knows about the mean to transport StepExecution requests to the actual worker(s). It basically sends one StepExecution request per partition to a set of  workers and wait for the results. Once each worker has contributed a result back to the handler, it returns the Collection of updated StepExecution instances. This simple and open design leads to a very simple PartitionHandler contract to fulfill.

Spring Batch provides a default PartitionHandler implementation that delegates the processing to a TaskExecutor (see TaskExecutorPartitionHandler). What it basically does is wrapping the execution of the partition in a FutureTask and delegating its processing to the underlying TaskExecutor (a thread pool). Once each FutureTask has completed, the implementation simply returns the StepExecution instances that each task has returned. Simple !

Our approach is to take those simple principles and see how they can be applied to a remote execution where JMS would be used to transport the requests and responses. Executing the partition remotely brings new challenges:

  1. The remote worker does not have the actual Step instance so it needs to be able to retrieve it based on a reference
  2. Compared to the default PartitionHandler, we need to wait for responses coming from different JVM(s)

Retrieving the Step instance

In order to provide a complete isolation for each batch, Spring Batch provides the ability to declare each job in a dedicated child application context. When the application starts with this feature, a component scans all the jobs matching a configurable expression and creates a child context for each of those files (we specify one job per file). For instance, the following configuration scans (and therefore register) all jobs declared in a XML files found in the config root package and starting with ‘job’.

<bean class="org.springframework.batch.core.configuration.support.AutomaticJobRegistrar">
    <property name="applicationContextFactories">
        <bean class="org.springframework.batch.core.configuration.support.ClasspathXmlApplicationContextsFactoryBean">
            <property name="resources" value="classpath*:/config/job*.xml"/>
        </bean>
    </property>
    <property name="jobLoader">
        <bean class="org.springframework.batch.core.configuration.support.DefaultJobLoader">
            <property name="jobRegistry" ref="jobRegistry"/>
        </bean>
    </property>
</bean>

The  JobLoader updates a component called the JobRegistry which is able to locate a Job by its name. This is the piece we have updated by introducing the concept of StepRegistry:

/**
 * Registry keeping track of all the {@link Step} defined in a
 * {@link org.springframework.batch.core.Job}.
 */
public interface StepRegistry {

    /**
     * Registers all the step of the given job. If the job is already registered,
     * the method {@link #unregisterStepsFromJob(String)} is called before registering
     * the given steps.
     *
     * @param jobName the give job name
     * @param steps the job steps
     */
    void register(String jobName, Collection<Step> steps);

    /**
     * Unregisters all the steps of the given job. If the job is not registered,
     * nothing happens.
     *
     * @param jobName the given job name
     */
    void unregisterStepsFromJob(String jobName);

    /**
     * Returns the {@link Step} of the specified job based on its name.
     *
     * @param jobName the name of the job
     * @param stepName the name of the step to retrieve
     * @return the step with the given name belonging to the mentioned job
     * @throws NoSuchJobException no such job with that name exists
     * @throws NoSuchStepException no such step with that name for that job exists
     */
    Step getStep(String jobName, String stepName) throws NoSuchJobException, NoSuchStepException;

}

And of course, our JobLoader now updates the content of this registry.

Aggregating the result

Our partition handler needs now to send one StepExecutionRequest per partition to run, as a JMS message. Let’s ignore for now what happens on the remote end; we still need to deal with the results aggregation. In Spring Batch, one thread (the master) is managing the state of the entire job. So this thread must wait (i.e. block) until all responses have been received from the workers. In a cluster, we never know where the master thread will be so we had to create a mechanism where each job has a temporary queue associated to it. Having a temporary queue makes sure that only the master thread can listen for messages. And because we set the reference to this temporary queue as the JMSReplyTO header of each request, our remote workers are able to send the response to that queue as well.

Remains a classical problem in asynchronous communication: timeout. To deal with this problem, we have built a JmsAggregationService that is managed by the following interfaces:

  • AggregationItemListener: receives an event when a response is received by the aggregation service.
  • AggregationCompletionPolicy: determine when the aggregation is done, regardless of a potential timeout. In our case, this policy is straighforward since we expect one response per request so the only thing we have to do in this case is to use a default implementation that counts the number of responses
  • AggregationTimeoutPolicy: determines if a timeout should occur based on the time at which the aggregation started

While we could have built a straightforward policy that would timeout after some time, we decided to build an implementation that benefits from the metadata that Spring Batch stores in its internal model. Every time a chunk completes, the thread contributes some metadata to its StepExecutionContext and flushes it to the database. This is very handy because we can easily find out if a given partition is running or the last time it contributed something.

Based on these facilities, the aggregation service performs the following:

  1. Creates a JMS session
  2. Creates a temporary JMS queue that will be used for the aggregation
  3. Sends one message per request. Each message holds a reference to the temporary JMS queue
  4. Listen for incoming response
    • When no response has been received, it ask the timeout policy if it should timeout. Based on the result, it either throws an exception or listen again for a response
    • When a response has been received, it invokes all registered listeners and ask if it should complete. Based on the result, it either returns the result or listen again for a new response

Executing the partition on the remote worker

Executing the actual partition is now easy. Upon the reception of a StepExecutionRequest, the worker retrieves the actual Step instance using the StepRegistry. Once the partition completes or if an error occurs a response is sent to the destination that was set in the JMSReplyTO header of the request.

The diagram below summarizes the global solution

Note that even though the master seems to be separate from the actual slaves, it does not have to be that way in practice. This video demonstrates how an additional node can join a running instance. It also showcases what happens when a node is killed in the middle of a job!

YouTube Preview Image

Conclusion

These developments allows to support a remote partitioning use case for Spring Batch. And the good news is that BSB open sourced them (see the spring-batch and spring-batch-admin forks on github!). It should be noted that not a lot of code is actually JMS specific: most of the infrastructure can be reused to implement use cases where the transport protocol is different. Only the aggregation service and the remote invoker should be changed.

3 comments to Remote partitioning with Spring Batch

  • Juan

    This is very nice, thanks.
    How does it compare to org.springframework.batch.integration.partition.MessageChannelPartitionHandler in your experience?

  • Hi Juan,

    MessageChannelPartitionHandler was made after we started to work on this actually. We believe our implementations has several advantages over this one:

    1. You don’t need to see a “realistic receive timeout” (see the Javadoc of #handle) as our implementation knows about Spring batch. We make sure to timeout if a running partition has not contributed for quite some time. This is far more easier than setting a large timeout to give a chance for all partitions to complete (and having to wait for that timeout if an error occurs so that the Job is flagged properly)
    2. Our infrastructure does not require you to change anything to your job configuration. We have built a StepRegistry that is able to loacate a particular step name based on a job name, even if it has been registered in a child application context. That way, you can deploy your jobs just like you did for single machine processing and we’ll figure that out ourselves. The other implementation requires you extra config (such as the use of BeanFactoryStepLocator)

    Now the big advantage of the Spring integration solution is that you can use pretty much any transport protocol. Note that since this blog post, we have implemented the same with AMQP and RabbitMQ and we have a showcase of a batch running in the cloud on cloudfoundry. We should write a blog post for that as well.

    HTH,
    S.

  • [...] How to Partition a Slave Node’s Disks March 27th, 2012 by clairew Tweet [Image via BSB] [...]

Leave a Reply

  

  

  


*

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>