Spring Batch is a powerful batch framework in Java with obviously a close integration with the Spring framework. One core feature for us is the ability to stage the data to process for a particular batch instance. There are several reasons for this:
- We have multiples ways to handle an error. Some issues might force the batch execution to fail. Others could be ignored and treated specifically by a next attempt
- We need to be able to restart where we left-off
- Processing should be distributed (partitioning)
Therefore such a job starts with a first staging step whose purpose is to execute a query on the domain model and feed the staging table with the items to process. Each row in this staging table represents a staging item and has the following information:
- a technical identifier (primary key)
- a reference to the job instance (jobInstanceId)
- a sequence number: the sequence starts at 0 and is incremented for every new item
- the name of the last step that handled the item
- the current status of the item which could be either
PENDING,COMPLETED,ERRORorSKIPPED - the actual item
The staging step is a regular, chunk-oriented, Spring Batch step. It has an ItemReader, an ItemProcessor and a no-op ItemWriter. The item reader can be any item reader, really. For the explanation, say that our reader is reading items from a database. Our item processor might look like the following:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/** * A default {@link StagingItemProcessor} that returns a {@link StagingItemProvider} * instance with no custom staging information by default. Sub-classes can override * the {@link #getStagingValue(Object)} or {@link #getCustomStagingItemProvider(Object)} * methods to override the default behavior. */ public class DefaultStagingItemProcessor<I> implements StagingItemProcessor<I> { private StagingValueProvider<I> stagingValueProvider; public StagingItemProvider process(I item) { return new StagingItemProvider(getStagingValue(item), getCustomStagingItemProvider(item)); } /** * Returns the {@link StagingValue} of the specified item. * * @param item the item * @return the staging value */ protected StagingValue getStagingValue(I item) { return stagingValueProvider.getStagingValue(item); } protected CustomStagingItemProvider getCustomStagingItemProvider(I item) { return null; } /** * Sets the {@link StagingValueProvider} to use. * * @param stagingValueProvider the staging value provider */ public void setStagingValueProvider(StagingValueProvider<I> stagingValueProvider) { this.stagingValueProvider = stagingValueProvider; } } |
What this code shows is that the staging item processor is responsible to process an incoming item and return a StagingItemProvider instance based on a StagingValueProvider. The StagingValueProvider is responsible to provide the staging value out of a given item, this value being the item field in the staging table. Say for instance that a set of policies have been read from the database, an instance of StagingValueProvider would return a StagingValue holding the primary key of the policy. We use a typed object (StagingValue) and not the raw Serializable item because it contains additional metadata such as how to actually store it. Indeed, the staging table has three columns to hold the item, a LONG, a VARCHAR and a CLOB depending on the serialization strategy that was chosen. The CustomStagingItemProvider, which is null in this case is used for business chunking that we will cover later.
The framework uses an internal ItemProcessor delegating to this one to actually create the staging items in the database. When the transaction commits for the chunk, one staging item per item to process is added to the staging table (which explains why the writer is a no-op).
Once the staging step has completed, our staging item table is filled for a particular job instance and the actual processing can start. A step that relies on this mechanism is called a staged step: it will read the staging value and process the actual item. Before processing the actual item, the staging value needs to be translated back to the actual item. This is done through an implementation of the StagingValueReader. In the previous example, such instance might retrieve the actual policy out of the database based on the primary key. In practice, our developers have to pick the instance to use based on the ones we provide for most of our use cases.
Since every item in the job has a status and a step name, it is possible to skip an item and move along with the rest of the data to process. In this case, the job fails with a COMPLETED WITH ERRORS status. Restarting the job would restart the processing of failed items at the step mentioned in the staging table, giving the ability to restart exactly where we left-off.
Partitioning
Moreover, the sequence number allows to easily identify the boundaries of partitions, allowing to distribute the items to process to several workers. Our Partitioner simply puts the min and max sequence for each partition in the execution context. Our reader gets these information from the execution context and performs an SQL query that looks like the following (simplified):
|
1 2 3 4 |
SELECT OBJECTID, STATUS, SEQUENCE FROM STAGING_ITEM WHERE JOBINSTANCEID = 1234 AND SEQUENCE >= 0 AND SEQUENCE <= 999 AND (STATUS = 'PENDING' AND STEPNAME = 'step1') ORDER BY SEQUENCE; |
where 1234 is the id of the job instance, step1 is the name of the staged step that is running and dealing with a partition of 1000 elements (in this case the first partition). Using this procedure, each partition reads the data it has to process independently (and also updates the status of each individual item independently).
Business chunking
Simple partitioning might not be enough in more complex use cases. Often the data to process (for instance policies) is linked to another business concept (for instance the customer) and it is important to process all the items related to that concept in the same thread. One obvious reason is data aggregation that should be stored somewhere when all the policies for a particular customer have been handled. To achieve this scenario, each partition must contain a set of business chunks and we need to be able to identify them. The way we have chosen to fulfill this need is to open the possibility for developers to stores arbitrary data in a custom table that is linked to the staging item with a foreign key.
Back on our example, a batch needs to process all the policies of the system and we want to perform some aggregation on the customer.
- Item type to process: the policy
- Business chunk: all the policies of a customer
During the staging process, we store additional information in a table called policy_custom_item. The developer can provide an implementation of the CustomStagingItemProvider to store the custom data at the same time as the default one. Again, we provide a default implementation, based on conventions, where the developer has to set the name of the custom table and a map of column names to values and the framework will deal with the primary key of the row and the reference to the item in the staging_item table (column stagingitemid).
We should then give the list of business partitions (one business partition here means all the policies of a given customer), something like:
|
1 2 3 4 |
SELECT MIN(S.sequence) as minSeq, MAX(S.sequence) as maxSeq FROM staging_item S, policy_custom_item C WHERE S.jobinstanceId = ? AND C.stagingitemid = S.objectid GROUP BY C.clientid ORDER BY maxSeq |
Our custom Partitioner is now responsible to rebuild the partitions based on these business chunks. In this case, it means that the size of the partitions might differ, especially if the number of policies per customer differ a lot. Notice that the only thing that the developer had to do in this case is provide the SQL query used to build the business chunk boundaries!
The processor of such a step is also able to detect that the business chunk has changed and offer the ability to implement callbacks offered by the BusinessChunkListener interface (beforeBusinessChunk, afterBusinessChunk).
Conclusion
Spring Batch offers an open contract that allowed us to build an infrastructure that suited our needs. There is no demo available at this time as the features that we have exposed in this article are part of other features that we have built on top of Spring Batch. In a next article we will cover how partitions can be invoked on several machines, offering a distributed partitioning support for Spring Batch.
Very interesting post, thank you!!!
Your approach looks pretty neat even if I’d like to have more info.
- You don’t talk about the way you deal with the restart… Let’s say an error/exception occur. You restart the job and then somewhere in the staging step you have to deal with existing items into the staging table (through the reader or the staging processor)?
- What about the cleaning/purge of the items in the staging table especially when the step is ok?
Thanks again
Ben
Hi Benjamin,
Thanks for your comment. Here are the answers to your questions:
Errors management:
* If an error occurred in the staged step (so the *real* step once the staging step has completed), the item is flagged in the staging item. Our reader deals with those flags properly and only re-process items that have failed
* If the error occurred in the staging step (which I believe is your question), the ExecutionContext retains where the job stopped. Once the job is restarted, the sequence is restored where we left-off. This takes the assumption that the items you are going to process for a job is not going to change on restart.
Cleaning:
We have a post-processor that cleans the content of the table if the job is successful. We basically check that all items are successful. You could also, like we did, create a job that does that cleaning and is invoked regularly with a scheduler.
HTH,
S.