Spring Batch and MongoDB: cursor-based item reader


Spring Batch is a popular open source batch framework. It integrates with lots of RDBM technologies like JDBC, Hibernate or JPA, but doesn't have official support for NoSQL datastores yet. This post shows how to integrate Spring Batch and MongoDB to read large datasets from this document-oriented datastore.

What for?

MongoDB is often used to store large sets of data like logs. When it comes to analyze such data, MongoDB provides simple aggregation operators or even MapReduce, but these technics are somewhat limited. There's even a new project to integrate MongoDB with Hadoop, but, hey, apart from web-scale applications, who wants to set up a Hadoop cluster to make something out their last week logs?

That's where integrating with a lightweight project like Spring Batch is interesting. Spring Batch provides advanced batch features out of the box: transaction management, monitoring, retry, skip, restart after failure, complex flow of steps, and so on, all of this working on any Java-compliant platform.

Let's start by something simple: reading documents from a collection by implementing a restartable Spring Batch ItemReader.

What kind of item reader for MongoDB?

Reading large dataset isn't as simple as it looks like. I considered 3 strategies for a MongoDB item reader:

  • cursor: sending the query, getting a cursor, and iterating over the cursor until it's exhausted. The whole dataset is never in memory, it is streamed thanks to the cursor.
  • paging: sending a query to retrieve one page of data. Once the page is exhausted, retrieving the next one, reading it, until there's no more page. We usually use paging when cursor-based reading doesn't work well (e.g. memory leaks). Hopefully, the cursor will work correctly with MongoDB.
  • ranges: sending a query to retrieve one page, but using a specific range. This implies a knowledge of the data to be read (e.g. ranges of IDs), but could be the most effective strategy, especially on a restart.

The cursor-based strategy is the simplest and usually works great if correctly implemented. This is the one I chose in this post, but investigating the 2 other strategies should be interesting too (perhaps in upcoming posts :-) ).

The reader skeleton

Spring Batch's ItemReader interface is very simple:

public interface ItemReader<T> {
 
  T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
 
}

But we won't start from scratch: we'll use the AbstractItemCountingItemStreamItemReader as a base class. It provides a good foundation and handles error-prone, boilerplate code (e.g. for restartability). Here is the skeleton of our item reader:

public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
 
  private Mongo mongo;
 
  private String databaseName;
 
  private DBCollection collection;
 
  private String collectionName;
 
  private DBCursor cursor;
 
  // the document fields to select	
  private String [] fields;
 
  // the criteria document 
  private DBObject refDbObject;
 
  @Override
  protected void doOpen() throws Exception {
    // opening cursor
  }
 
  @Override
  protected T doRead() throws Exception {
    // reading!
  }
 
  @Override
  protected void doClose() throws Exception {
    // cleaning up
  }
 
  @Override
  public void afterPropertiesSet() throws Exception {
    // getting things ready
  }
 
  (...) setters
}

We're going to see the implementation of each method.

Getting things ready

Our item reader initializes its resources (server connection, database, collection) in the appropriate callback:

public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
  (...)
  @Override
  public void afterPropertiesSet() throws Exception {
    Assert.notNull(mongo,"Mongo must be specified");
    Assert.notNull(databaseName,"Mongo AND database must be set");
    Assert.notNull(collectionName,"collectionName must be set");
    DB db = mongo.getDB(databaseName);
    collection = db.getCollection(collectionName);
  }
}

Nothing fancy, just validation and resource handling.

Opening and closing the cursor

It's time now to open the cursor we'll iterate on. We'll use the find method of the DBCollection class. This method needs a criteria document (this is the query) and the fields to retrieve. Both parameters are optional.

public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
  (...)
  @Override
  protected void doOpen() throws Exception {
    cursor = collection.find(createDbObjectRef(),createDbObjectKeys());
  }
 
  protected DBObject createDbObjectKeys() {
    if(fields == null) {
      return new BasicDBObject();
    } else {
      BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
      for(String field : fields) {
        builder.add(field,1);
      }
      return builder.get();
    }
  }
 
  protected DBObject createDbObjectRef() {
    if(refDbObject == null) {
      return new BasicDBObject();
    } else {
      return refDbObject;
    }
  }
 
  @Override
  protected void doClose() throws Exception {
    cursor.close();
  }
}

Note the item reader uses two properties the developer can inject when they configure the reader.

Just reading...

We're now starting reading. After a call to next, the MongoDB cursor returns a DBObject, which behaves pretty much as a map. The reader could return a DBObject for every item it reads, but wouldn't it be good to provide a strategy to convert the DBObject into something more meaningful, like a domain object? No need to create a dedicated interface, we can use Spring's Converter API.

By using this interface, we can easily integrate with Spring Data MongoDB converters and benefit from a full-blown document-object mapping engine.

Here is how the reader would end up reading and leveraging the converter API:

public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
  (...)
  private Converter<DBObject,T> dbObjectConverter;
 
  @Override
  protected T doRead() throws Exception {
    if(!cursor.hasNext()) {
      return null;
    } else {
      DBObject dbObj = cursor.next();
      return dbObjectConverter.convert(dbObj);
    }
  }
 
}

OK, we're pretty much done, let's polish the reader by implementing restartability.

Restartability

Our reader is already restartable thanks to its parent class. In case of restart, the base class knows where the previous execution left off and would scroll up to there by reading the already-read documents. This works but isn't efficient. We can tell the base class how to jump to a specific item more efficiently by implementing the jumpToItem method:

public class MongoDbCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
  (...)
  @Override
  protected void jumpToItem(int itemIndex) throws Exception {
    cursor = cursor.skip(itemIndex);
  }
}

Chances are MongoDB scans sequentially the whole collection up to this point, but at least, the documents aren't sent on the network.

Configuring and launching the job

Our MongoDB item reader can be configured and used just like any other Spring Batch reader:

<batch:job id="job">
  <batch:step id="step">
    <batch:tasklet>
      <batch:chunk reader="reader" writer="writer" commit-interval="50"/>
    </batch:tasklet>
  </batch:step>
 </batch:job>
 
<bean id="reader" class="com.zenika.batch.item.database.mongo.MongoDbCursorItemReader">
  <property name="mongo" ref="mongo" />
  <property name="databaseName" value="spring-batch-mongodb" />
  <property name="collectionName" value="dummy" />
  <property name="dbObjectConverter">
    <bean class="com.zenika.batch.item.database.mongo.PassthroughDbObjectConverter" />
  </property>
  <property name="refDbObject">
    <value>#{ 
      T(com.mongodb.BasicDBObjectBuilder)
        .start()
        .push("number").add("$gt",12)
        .pop()
        .get() 
     }</value>
  </property>
  <property name="fields" value="number" />
</bean>
 
(...) writer and infrastructure configuration omitted

Note the configuration of the criteria document (refDbObject property): it uses Spring Expression Language to create a DBObject that defines the query. Here is what the definition would look like in Java:

DBObject refDbObject = BasicDBObjectBuilder.start().push("number").add("$gt",limit).pop().get();

The job can be launched just like any other Spring Batch jobs:

JobExecution exec = jobLauncher.run(job, jobParameters);
assertThat(exec.getStatus(), is(BatchStatus.COMPLETED));

Conclusion

This post is a first attempt to integrate Spring Batch with MongoDB. Implementing the item reader is straightforward, even without a solid knowledge of Spring Batch internals. Such integration is definitely on the roadmap of the Spring Batch project, but it's not there yet, so this post should be a good start for anyone eager to integrate these two great technologies.

Note a more advanced version of the cursor item reader could expose parameters like the cursor batch size, to have a better control over the memory footprint for very large datasets.

Stay tuned, as subsequent posts about Spring Batch and MongoDB should come soon!

Source code


Commentaires

1. Le jeudi 2 août 2012, 17:31 par Michael Bevz

Hi, great reading.

Could you tell me how dbObjectConverter is implemented?

Thanks,
Mike

Fil des commentaires de ce billet

Ajouter un commentaire

Le code HTML est affiché comme du texte et les adresses web sont automatiquement transformées.