Work queues with MongoDB and Spring Integration

Work queue is a common pattern to decouple a producer of a request from the actual worker that processes the request. This pattern is especially useful when the producer doesn’t expect any response: as the processing is asynchronous, the producer isn’t held up and can do something else, instead of waiting for the worker to complete its job.
In this post, we’ll see how to quickly implement work queues with MongoDB and Spring Integration.

Work queue implementations

There are many ways to implement work queues, and some people around here have already found out that MongoDB can do the job. But these custom MongoDB-based work queue solutions are way too… custom. (Constructive) Laziness is a virtue (at least for programmers), so why not re-using? The good news is Spring Integration provides us with everything we need to implement work queues on top of MongoDB.

The use case

Imagine an HTTP request on a web application triggers the sending of an email. The email sending API would be like the following:

public interface EmailGateway {
  void send(EmailNotification email);

The web controller would do the following to send the email:

emailGateway.send(new EmailNotification("","Message from Minnie "+i,"Minnie is waiting for you!"));

A dedicated service would handle the actual sending:

public class EmailService {
  public void send(EmailNotification email) throws Exception {
    // actual sending...
    // ... takes time and holds up caller

The usual default solution would be all synchronous. What’s bad about such a solution is the caller (the web controller) would be held up during the email sending (which can take a couple of seconds). This means the sending monopolizes a container thread. Let several users call the web controller at the same time and they would starve the container threads.
A better solution would consist in making the sending asynchronous. This can be easily done by setting up a Spring Integration pipeline. The good news is this solution would affect the web controller: it will still be using the exact same EmailGateway interface but won’t have to wait for the processing.

Submission to a Spring Integration pipeline

The web controller just needs a reference to EmailGateway Spring bean. Spring Integration routes every call to this gateway to a channel. Here is the configuration to declare the gateway and the channel:

<int:gateway service-interface="com.zenika.EmailGateway" default-request-channel="tasks" />
<int:channel id="tasks">
  <int:queue message-store="messageStore"/>

The channel is a queue, which means it buffers messages. Spring Integration’s queue channels buffer messages in memory by default, but as we plugged in a message store, our messages will be stored permanently. We won’t lose any messages if the application crashes.
But what is this message store? This is where MongoDB comes in.

MongoDB comes in as the message store

A MongoDB message store is available since Spring Integration 2.1. It builds on top Spring Data MongoDB (go here if you want to discover Spring Data MongoDB).
The MongoDB message store is straightforward to configure:

<mongo:db-factory id="mongoDbFactory" dbname="spring-integration" />
<bean id="messageStore" class="">
  <constructor-arg ref="mongoDbFactory" />
  <constructor-arg value="emailtasks" /> <!-- the name of the collection -->

That’s it, our email sending requests are durable. Note a Spring Integration message store is also useful to implement the aggregator or the claim check patterns.
Let’s see now how to plug the actual sender.

A service activator to do the job

How to dequeue the email sending requests from the MongoDB queue? Quite simple, as Spring Integration handles the plumbing. We just have to configure a service activator and a poller (as the channel is pollable channel):

<bean id="emailService" class="com.zenika.EmailService" />
<int:service-activator input-channel="tasks" ref="emailService" method="send">
  <int:poller fixed-rate="5000" time-unit="MILLISECONDS" />

Spring Integration will pull messages every 5 seconds and send them, one after the other, to the email service.
The processing is mono-threaded by default. If you want to scale the processing out, you just need to plug a task executor on the poller:

<bean id="emailService" class="com.zenika.EmailService" />
<int:service-activator input-channel="tasks" ref="emailService" method="send">
  <int:poller fixed-rate="5000" time-unit="MILLISECONDS"
                    task-executor="emailTaskExecutor" />
<task:executor id="emailTaskExecutor" pool-size="10" />



This post covered how to implement a MongoDB-backed-up work queue with Spring Integration. This works great if you already have MongoDB in your architecture and don’t want to install any additional component. Note Spring Integration also provides message store implementations for relational databases, Redis, and Gemfire.
Our solution works but remains simple If you’re in need of more robust and feature-complete messaging solutions (like routing or distributed request-reply), solutions like RabbitMQ are usually more appropriate.
Source code

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.

%d blogueurs aiment cette page :