Blog Zenika

#CodeTheWorld

Architecture

Failover with RabbitMQ, the sender's story

When a client connects to a RabbitMQ instance, it must be able to handle the failure of this instance. The cause of the failure can be an abrupt crash or a network glitch. These things happen, especially in cloud environments, where anything can die, anytime. Applications can’t rely blindly on the middleware; the software must be able to recover from these kinds of failure, and not crash miserably. Let’s see how a message sender can be made more robust and reliable with the RabbitMQ Java binding first, and then with the Spring AMQP project.

Context

Here is the context:

The sender measures temperature every second and sends a message.

A consumer is consuming from a temperature queue and displays the temperature on the console (we won’t show the consumer code in this post). Don’t forget to create the temperature queue if you’re running the sample.

There are 3 nodes in the cluster, they all run locally, on different ports. You wouldn’t do that in production, but this is handy to test failure of a cluster node. Note the sample works even if you’re not running a cluster.

When it detects a failure, the client tries to reconnect to another node. It picks the node randomly.

Sending messages with RabbitMQ Java binding

RabbitMQ comes with bindings (= drivers) for different languages. The Java binding works great and is directly usable in applications, even if it can be considered a little bit « low-level ». Let’s see how to implement our bulletproof sender with the Java binding.

The main sending loop

The main loop does the following:

It asks for a first channel instance on a specific port (it’s then easier to choose the node that will crash first)

It starts looping and sending temperature to the default exchange, with temperature as the routing key. The messages are then automatically routed to the temperature queue (if it exists).

If the sending fails, the sender asks for a new channel.

Note the channel methods can’t fail: they loop until they get a channel. We’ll see their code later.
Here is the main loop:

public static void main(String [] args) {
  Channel channel = channel(FIRST_CONNECTION_PORT);
  while (true) {
    letsWait();  // sends a measure every second
    String temperature = measureTemperature();
    try {
      channel.basicPublish("", "temperature", null,temperature.getBytes());
    } catch (Exception e) {
       System.err.println("Message sending failed, trying to reconnect");
       channel = channel();
    }
  }
}

Channel creation

This is where things get interesting. The channel creation hides the failure and retry logic to the main loop. The channel method loops until it gets a connection on the passed in port parameter. Note it delegates the connection creation to another method (more on this later).
Here is the code of the channel creation:

protected static Channel channel(int portSuggestion) {
  Channel channel = null;
  while (channel == null) {
    try {
      Thread.sleep(500);
      channel = connection(portSuggestion).createChannel();
      return channel;
    } catch (Exception e) {
      System.err.println("Connection to broker failed, trying again");
    }
  }
  return null;
}
private static Channel channel() {
  // not suggestion for the port
  return channel(-1);
}

Connection creation

At last, the creation of the connection. In this method, the client doesn’t try endlessly to reconnect to the same node, it picks a node randomly. This is useful if the cause of the failure is a crash rather than a transient network glitch.
Here is the code that creates the connection:

protected static Connection connection(int portSuggestion) throws IOException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setVirtualHost("/");
  factory.setHost("localhost");
  int serverPort = portSuggestion == -1 ? pickPortRandomly() : portSuggestion;
  System.out.println("Connecting on port "+serverPort);
  factory.setPort(serverPort);
  Connection connection = factory.newConnection();
  return connection;
}
private static int pickPortRandomly() {
  return SERVER_PORTS[RANDOM.nextInt(SERVER_PORTS.length)];
}

Testing

You can run the sender and the consumer classes. To simulate a failure, you can stop the Erlang application.
If you’re running a standalone RabbitMQ instance (not a cluster):

rabbitmqctl stop_app

If you’re running a cluster, to stop the server1 node:

rabbitmqctl -n server1 stop_app

To restart the node, use the same command, but with start_app instead of stop_app.
When simulating a crash, you should see the sender trying to reconnect (and the consumer as well, if you stopped its node).

So what?

Well, we built a robust and reliable message sender. The thing is this code is a little bit cumbersome, especially if we need to rewrite it everytime we want to send messages. Fortunately, higher-level libraries like Spring AMQP can help.

Sending messages with Spring AMQP

Spring AMQP is part of the Spring portfolio and provides an abstraction over the RabbitMQ Java binding. People familiar with Spring and/or its JMS support will feel at ease Spring AMQP. Note Spring AMQP also provides support for Spring Integration.

Configuration

The main class in Spring AMQP is the RabbitTemplate, here is how to configure it in a Spring application context:

<rabbit:connection-factory id="connectionFactory" />
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  <property name="connectionFactory" ref="connectionFactory" />
</bean>

Using the RabbitTemplate

Here is our sender program, using now the RabbitTemplate (note the RabbitTemplate class implements the RabbitOperations interface):

public static void main(String[] args) {
  ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring-amqp-sender.xml");
  RabbitOperations tpl = ctx.getBean(RabbitOperations.class);
  while (true) {
    letsWait();
    String temperature = measureTemperature();
    try {
      tpl.convertAndSend("temperature",temperature);
    } catch (AmqpException e) {
      System.err.println("Error while sending message, keeping looping...");
    }
  }
}

Is that all? Where is the error-handling code? Well, this is the catch block! In case of error, the RabbitTemplate throws a AmqpException. It doesn’t hide that something went wrong, but it handles the reconnection transparently if we use the template again.
There’s a drawback: we can’t specify several nodes to try to reconnect to. If the node is really dead, the template will try to reconnect endlessly. This can be easily avoided by using a load balancer in front of the cluster. HAProxy is a solution that works well with RabbitMQ. The Spring configuration would point to the load balancer and the load balancer would round-robin the nodes when a connection dies.

What about failover for the consumer?

Failover for a consumer can be tricky, especially for asynchronous consumption. This is another story and would need another blog post (you can check the samples for a solution based on the Java binding). The good news is RabbitMQ provides the necessary callbacks to notify async consumers when something goes wrong and Spring AMQP also supports failover for async consumers.

Conclusion

Handling failover is complex and is a must-have for any so-called robust application. We have everything we need with RabbitMQ Java binding, but the error-handling code can be tricky and doesn’t bring much value to the end-user. The Java binding is great to get started, but higher-level like Spring AMQP are more than welcome when it comes to write day-to-day application code. Spring AMQP comes with many other features than failover like declarative configuration, POJO-based async consumption, etc. Don’t hesitate to take a look at it!

Une réflexion sur “Failover with RabbitMQ, the sender's story

  • nbedard

    If you remove the while(true) loop, you will see that the application never ends because you do not close the Connection.

    Répondre

Répondre à nbedardAnnuler la réponse.

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.

En savoir plus sur Blog Zenika

Abonnez-vous pour poursuivre la lecture et avoir accès à l’ensemble des archives.

Continue reading