Spring Batch and MongoDB: cursor-based item reader
Spring Batch is a popular open source batch framework. It integrates with lots of RDBM technologies like JDBC, Hibernate or JPA, but doesn’t have official support for NoSQL datastores yet. This post shows how to integrate Spring Batch and MongoDB to read large datasets from this document-oriented datastore.
What for?
MongoDB is often used to store large sets of data like logs. When it comes to analyze such data, MongoDB provides simple aggregation operators or even MapReduce, but these technics are somewhat limited. There’s even a new project to integrate MongoDB with Hadoop, but, hey, apart from web-scale applications, who wants to set up a Hadoop cluster to make something out their last week logs?
That’s where integrating with a lightweight project like Spring Batch is interesting. Spring Batch provides advanced batch features out of the box: transaction management, monitoring, retry, skip, restart after failure, complex flow of steps, and so on, all of this working on any Java-compliant platform.
Let’s start by something simple: reading documents from a collection by implementing a restartable Spring Batch ItemReader
.
What kind of item reader for MongoDB?
Reading large dataset isn’t as simple as it looks like. I considered 3 strategies for a MongoDB item reader:
- cursor: sending the query, getting a cursor, and iterating over the cursor until it’s exhausted. The whole dataset is never in memory, it is streamed thanks to the cursor.
- paging: sending a query to retrieve one page of data. Once the page is exhausted, retrieving the next one, reading it, until there’s no more page. We usually use paging when cursor-based reading doesn’t work well (e.g. memory leaks). Hopefully, the cursor will work correctly with MongoDB.
- ranges: sending a query to retrieve one page, but using a specific range. This implies a knowledge of the data to be read (e.g. ranges of IDs), but could be the most effective strategy, especially on a restart.
The cursor-based strategy is the simplest and usually works great if correctly implemented. This is the one I chose in this post, but investigating the 2 other strategies should be interesting too (perhaps in upcoming posts 🙂 ).
The reader skeleton
Spring Batch’s ItemReader
interface is very simple:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
But we won’t start from scratch: we’ll use the AbstractItemCountingItemStreamItemReader
as a base class. It provides a good foundation and handles error-prone, boilerplate code (e.g. for restartability). Here is the skeleton of our item reader:
public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { private Mongo mongo; private String databaseName; private DBCollection collection; private String collectionName; private DBCursor cursor; // the document fields to select private String [] fields; // the criteria document private DBObject refDbObject; @Override protected void doOpen() throws Exception { // opening cursor } @Override protected T doRead() throws Exception { // reading! } @Override protected void doClose() throws Exception { // cleaning up } @Override public void afterPropertiesSet() throws Exception { // getting things ready } (...) setters }
We’re going to see the implementation of each method.
Getting things ready
Our item reader initializes its resources (server connection, database, collection) in the appropriate callback:
public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { (...) @Override public void afterPropertiesSet() throws Exception { Assert.notNull(mongo,"Mongo must be specified"); Assert.notNull(databaseName,"Mongo AND database must be set"); Assert.notNull(collectionName,"collectionName must be set"); DB db = mongo.getDB(databaseName); collection = db.getCollection(collectionName); } }
Nothing fancy, just validation and resource handling.
Opening and closing the cursor
It’s time now to open the cursor we’ll iterate on. We’ll use the find
method of the DBCollection
class. This method needs a criteria document (this is the query) and the fields to retrieve. Both parameters are optional.
public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { (...) @Override protected void doOpen() throws Exception { cursor = collection.find(createDbObjectRef(),createDbObjectKeys()); } protected DBObject createDbObjectKeys() { if(fields == null) { return new BasicDBObject(); } else { BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); for(String field : fields) { builder.add(field,1); } return builder.get(); } } protected DBObject createDbObjectRef() { if(refDbObject == null) { return new BasicDBObject(); } else { return refDbObject; } } @Override protected void doClose() throws Exception { cursor.close(); } }
Note the item reader uses two properties the developer can inject when they configure the reader.
Just reading…
We’re now starting reading. After a call to next
, the MongoDB cursor returns a DBObject
, which behaves pretty much as a map. The reader could return a DBObject
for every item it reads, but wouldn’t it be good to provide a strategy to convert the DBObject
into something more meaningful, like a domain object? No need to create a dedicated interface, we can use Spring’s Converter
API.
By using this interface, we can easily integrate with Spring Data MongoDB converters and benefit from a full-blown document-object mapping engine.
Here is how the reader would end up reading and leveraging the converter API:
public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { (...) private Converter<DBObject,T> dbObjectConverter; @Override protected T doRead() throws Exception { if(!cursor.hasNext()) { return null; } else { DBObject dbObj = cursor.next(); return dbObjectConverter.convert(dbObj); } } }
OK, we’re pretty much done, let’s polish the reader by implementing restartability.
Restartability
Our reader is already restartable thanks to its parent class. In case of restart, the base class knows where the previous execution left off and would scroll up to there by reading the already-read documents. This works but isn’t efficient. We can tell the base class how to jump to a specific item more efficiently by implementing the jumpToItem
method:
public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { (...) @Override protected void jumpToItem(int itemIndex) throws Exception { cursor = cursor.skip(itemIndex); } }
Chances are MongoDB scans sequentially the whole collection up to this point, but at least, the documents aren’t sent on the network.
Configuring and launching the job
Our MongoDB item reader can be configured and used just like any other Spring Batch reader:
<batch:job id="job"> <batch:step id="step"> <batch:tasklet> <batch:chunk reader="reader" writer="writer" commit-interval="50"/> </batch:tasklet> </batch:step> </batch:job> <bean id="reader" class="com.zenika.batch.item.database.mongo.MongoDbCursorItemReader"> <property name="mongo" ref="mongo" /> <property name="databaseName" value="spring-batch-mongodb" /> <property name="collectionName" value="dummy" /> <property name="dbObjectConverter"> <bean class="com.zenika.batch.item.database.mongo.PassthroughDbObjectConverter" /> </property> <property name="refDbObject"> <value>#{ T(com.mongodb.BasicDBObjectBuilder) .start() .push("number").add("$gt",12) .pop() .get() }</value> </property> <property name="fields" value="number" /> </bean> (...) writer and infrastructure configuration omitted
Note the configuration of the criteria document (refDbObject
property): it uses Spring Expression Language to create a DBObject
that defines the query. Here is what the definition would look like in Java:
DBObject refDbObject = BasicDBObjectBuilder.start().push("number").add("$gt",limit).pop().get();
The job can be launched just like any other Spring Batch jobs:
JobExecution exec = jobLauncher.run(job, jobParameters); assertThat(exec.getStatus(), is(BatchStatus.COMPLETED));
Conclusion
This post is a first attempt to integrate Spring Batch with MongoDB. Implementing the item reader is straightforward, even without a solid knowledge of Spring Batch internals. Such integration is definitely on the roadmap of the Spring Batch project, but it’s not there yet, so this post should be a good start for anyone eager to integrate these two great technologies.
Note a more advanced version of the cursor item reader could expose parameters like the cursor batch size, to have a better control over the memory footprint for very large datasets.
Stay tuned, as subsequent posts about Spring Batch and MongoDB should come soon!
Source code
Hi, great reading.
Could you tell me how dbObjectConverter is implemented?
Thanks,
Mike
Very useful
Thanks