Maîtriser le paramétrage des producers Kafka
Dans l’écosystème Apache Kafka, un Producer désigne un système qui va publier des messages vers un topic du cluster. Kafka a été pensé pour que cette publication de messages soit fiable et performante à la fois.
Lorsqu’on crée un Producer, il est possible de spécifier de nombreuses configurations, et il est facile de s’y perdre. Cet article a pour but d’éclaircir les différentes configurations et leurs rôles pour un échange de données fiable et performant pour les producers bénéficiant de la librairie kafka-clients.
Il est nécessaire de connaître les bases de Kafka pour bénéficier au mieux de cet article : comprendre le partitionnement, les brokers leaders et les replicas permet de bénéficier au mieux de ces configurations.
Pour un rappel sur Kafka, je vous conseille de lire l’article Kafka Fundamentals.
Déclaration des configurations
Les configurations sont à spécifier à la création du KafkaProducer. Elles peuvent être déclarées sous la forme d’un objet java.util.Properties, ou d’une java.util.Map<String, Object>.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8084");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, MyDomainClass> producer = new KafkaProducer<>(props);
Valeur par défaut des configurations
Kafka a beaucoup évolué au cours de ces dernières années. Des nouvelles fonctionnalités sont apparues, avec des nouvelles valeurs de configuration. Les configurations de cet article se basent sur la version 2.6.x de Kafka. Les valeurs par défaut ont changé au fur et à mesure des versions de Kafka, et pour cette raison, il est important de vérifier que ces configurations sont adaptées à la version de Kafka que vous utilisez en production. Il est en général une bonne pratique d’être explicite sur la configuration – cela facilitera également les montées de version des producers.
Confluent, l’entreprise qui commercialise des services autour de l’écosystème Kafka, propose une matrice très utile qui liste les différentes versions de Kafka.
Aller dans le détail : les Kafka Improvement Proposals
Kafka est open-source, ce qui signifie que n’importe qui peut faire des propositions pour l’améliorer. De telles propositions sont inscrites dans les KIPs (ou Kafka Improvement Proposals). Les KIPs décrivent de manière exhaustive les détails des fonctionnalités de Kafka et sont une très bonne source de connaissance pour apprendre comment fonctionnent les brokers.
La base
La configuration bootstrap.servers définit le ou les serveurs kafka à adresser lors de la création du producer. Il n’est pas nécessaire ici de déclarer l’ensemble des brokers du cluster, mais la bonne pratique est d’en déclarer 3 – si l’un des brokers est indisponible, le producer pourra toujours essayer de se connecter au cluster par un autre broker.
La configuration key.serializer définit la classe à utiliser pour sérialiser la clé lors de l’envoi d’un message au cluster. A noter que cette configuration est obligatoire même si on utilise une clé null. Ce paramètre doit être le fully-classified class name de la classe qui va effectuer la sérialisation des clés (classe qui doit implémenter l’interface org.apache.kafka.common.serialization.Serializer).
Kafka dispose de classes par défaut dans le package org.apache.kafka.commons.serialization (correspondant à peu près aux types java primitifs).
Il est également possible d’utiliser des serializers plus complexes avec d’autres dépendances. Confluent propose ainsi plusieurs serializers disponibles librements.
Par exemple, le serializer Json est disponible sur le repository de Confluent (ne pas oublier de configurer le repository Confluent dans votre fichier de build). D’autres serializers sont fournis nativement par Confluent.
La configuration value.serializer définit la classe à utiliser pour sérialiser la valeur du message. Ce paramètre doit être le fully-classified class name de la classe qui va effectuer la sérialisation des valeurs. Voir le key.serializer ci-dessus pour d’avantages d’explications sur les paramètres de sérialisation.
Le partitionner
C’est au niveau du producer qu’on configure le partitionner. Le partitionner définit dans quel partition du topic vont être envoyés les messages. Ce producer peur être spécifié avec la clé partitioner.class.
Le partitioner par défaut gère deux cas:
- Si la clé du message est null, alors le sticky partitionner (nouveau depuis la version 2.4) est utilisé.
- Sinon, on répartit les messages par partition avec la formule suivante: hash(clé) modulo nombre de partitions.
Si les messages ont une clé non-nulle, le partitionner par défaut va répartir les messages entre les différentes partitions en fonction de leur clé et va garantir que des messages d’une même clé seront envoyés vers la même partition.
Si la clé est null alors le sticky partitionner va répartir ces messages entre différentes partitions de manière équilibrée tout en privilégiant les performances du producer.
Le partitionner par défaut est souvent suffisant pour assurer que des messages de même clé sont bien envoyés aux mêmes partitions. Néanmoins il est utile de savoir qu’il peut être surchargé si le besoin s’en fait sentir.
Fiabilité de données
Il est possible de configurer le cluster en utilisant uniquement les paramètres définis ci-dessus. Cependant, d’autres configurations sont à connaître si on souhaite fiabiliser les échanges entre le producer et le cluster kafka.
Acks
Il est possible de spécifier quel type d’acquittement le producer attend de la part des brokers lors de l’envoi de messages à Kafka. Le paramétrage des acks (par défaut 1) se configure de trois manières possibles : 0, 1 ou ‘all’.
Pas d’acquittement : mettre ack à 0
Dans le cas où on configure ack à 0, le producer n’attend pas la confirmation du leader que les messages du batch ont été bien reçus. Il n’y a donc aucune garantie que le batch a bien été reçu par le broker. Le batch pourrait être perdu, à la suite d’un problème réseau par exemple.
Acquittement uniquement de la part du leader
Avec ack = 1, le producer attend que les messages du batch soient bien écrits dans la partition par le leader. Les messages ont donc correctement été intégrés dans le cluster. Pourtant, cela ne garantit pas forcément que les messages du batch ne puissent pas être perdus. Pour bien comprendre cela, il est nécessaire de rappeler comment Kafka assure la réplication des données.
Une histoire de replicas
Le cluster kafka est un système fault tolerant. Cela signifie que le cluster supporte l’arrêt inopiné d’un broker (et ce quelque soit la raison de l’arrêt: maintenance, problème au niveau hardware entraînant un crash…). Pour des raisons de fiabilité, il faut donc que les données des partitions soient répliquées sur plusieurs brokers. Cela permet à un broker disposant d’une copie des données (aussi appelé replica dans le jargon kafka) de pouvoir prendre le relais en cas de besoin. Pour répliquer les données entre les différents brokers, il faut paramétrer à la création du topic le replication.factor (à 1 par défaut) qui indique combien de broker doivent être des replicas d’une partition donnée.
Chaque partition a un broker leader, vers lequel les producers écrivent et les consumers lisent, et des brokers followers qui vont périodiquement faire un fetch pour récupérer les données du leader et se mettre à jour. Si le leader devient indisponible pour une raison ou une autre, des brokers replicas seront sélectionnés pour devenir les leaders des nouvelles partitions.
Une perte potentielle
En paramétrant ack à 1, il existe donc un risque que le broker leader d’une partition, après réception d’un batch, devienne offline avant que les replicas n’aient eu le temps de se mettre à jour mais après avoir envoyé l’acquittement au producer. Il y a dans ce cas une perte des messages du batch.
Acquittement de tous
Le paramétrage de ack à all (ou -1) spécifie à kafka que le producer souhaite attendre que tous les replicas (c’est à dire le leader et les followers d’une partition) aient reçu le batch avant acquittement. Le producer sait, à réception de l’acquittement, que les messages ont été répliqués au moins une fois dans le cluster.
Être sync ou pas sync telle est la question
Kafka est fault tolerant. Donc, un broker du cluster peut devenir offline à n’importe quel instant, sans que cela n’entraîne d’interruption de service. En gardant en tête cette philosophie, comment se déroule l’acquittement avec ack à all et un follower offline ? Dans ce cas, kafka considère le follower comme étant out-of-sync (voir cet excellent article pour comprendre cette notion) et n’attend pas le fetch de ce follower avant d’envoyer l’acquittement.
En suivant cette logique, il est facile d’imaginer un cas où configurer ack à all serait équivalent à configurer ack à 1: si tous les brokers followers sont out-of-sync.
Nombre minimum de replicas in-sync
Pour empêcher ce cas, il est possible de paramétrer le min.insync.replicas. Cette configuration spécifie le nombre minimum de replicas qui doivent être in-sync pour envoyer un acquittement au producer. Cette configuration agit comme garde-fou pour éviter que le broker leader ne soit le seul replica in-sync.
Configurer l’acquittement : récapitulatif
La valeur par défaut de ack est ‘1’, on s’assure donc que seul le leader a bien commité les différents messages dans le commit log avant de rendre la main au producer. En laissant la valeur par défaut, on s’expose à un risque de perte de données si le broker devient offline juste après avoir acquitté les messages.
Passer ack à 0 sera très performant du point de vue du producer, mais des données peuvent être perdues en cas de problème réseau ou de crash de broker leader.
Configurer ack sur all (ou -1) permet de garantir une non-perte de données à condition que le min.insync.replicas soit paramétré à une valeur acceptable (typiquement supérieur à 1). Le producer doit attendre le fetch des données par les followers, donc ce scénario est moins performant que ack à 1.
La tolérance de perte d’un message est à identifier en fonction du type de message. Pour un message qui représente les relevés d’un capteur de température, il est possible de perdre des messages dans le cas où un broker devient offline (suite à une maintenance par exemple). Pour une commande client, il est inacceptable de perdre ce message.
Retries
Les producers vont envoyer les messages par batch au broker leader de la partition concernée par le batch. Potentiellement, à cause d’un problème réseau, de la taille du batch ou d’un leader surchargé, il est possible que le producer ne reçoive pas l’acquittement à temps de la part du leader (pour rappel, ce paramètre correspond à la valeur configurée dans request.timeout.ms, soit 30 secondes par défaut).
Dans ce cas, selon le paramétrage de retries (2147483647 par défaut… soit l’équivalent de Integer.MAX_VALUE), le producer va réessayer d’envoyer le batch après avoir attendu une période correspondante à la valeur configurée dans retry.backoff.ms (100 ms par défaut).
Il peut parfois être difficile de savoir quel nombre de “retry” paramétrer avant de considérer que l’envoi du batch est un échec. La valeur par défaut, Integer.MAX_VALUE, semble indiquer que le producer va faire des “retries” à l’infini, hors le paramètre delivery.timeout.ms (2 minutes par défaut) empêche cela. Ce dernier contrôle la durée maximum d’envoi d’un message à kafka – si le message n’est toujours pas livré après un appel à send() dans le temps imparti, on considère l’envoi comme un échec, peu importe le nombre de retry qui ont été effectués auparavant. La KIP-91 détaille le fonctionnement de ce paramètre plus en détail.
Etant donné que delivery.timeout.ms agit comme limite au temps maximum d’attente une fois un message envoyé à Kafka, il est recommandé de laisser le nombre de retries à la valeur maximum (soit Integer.MAX_VALUE). Cela permettra au producer de réessayer tant qu’il ne dépasse pas le temps maximum alloué au delivery avant de renvoyer une exception côté producer.
Idempotence
Un serveur est idempotent si, lorsqu’on lui envoie la même requête une ou plusieurs fois, il ne la traite qu’une fois. On parle aussi de dé-duplication ou de dédoublonnage. Cette propriété est très utile, notamment dans le cas des “retries”, car elle permet de s’assurer que si un retry a été joué suite à un timeout du producer, mais que le message a bien été reçu (les paquets d’acquittement du broker ont été perdus à cause d’un problème réseau, par exemple), alors lorsque le producer va réessayer, le broker leader ne va pas insérer deux fois les mêmes messages dans le commit log.
Ce KIP explique comment fonctionne l’idempotence côté producer. Chaque producer tague ses messages avec un ID unique incrémental appelé “PID” et le broker garde en mémoire les derniers PID insérés pour chaque commit log. S’il reçoit un message avec le même PID, alors il l’ignore. Dans le cas de retries, le broker ignore donc les messages qui ont été déjà insérés dans le commit log.
Il est possible d’activer l’idempotence en mettant un flag dans la configuration côté producer : enable.idempotence. Ce paramètre suffit pour garantir que les messages ne seront jamais reçus en double dans le broker. Par contre, comme le spécifie la documentation de cette configuration, l’idempotence s’accompagne de deux prérequis : le paramètre max.in.flight.requests.per.connection doit être <= 5 (5 est la valeur défaut) et ack doit être positionné sur all.
Performance
Les producers n’envoient pas les messages un à un vers kafka, mais envoient des batchs de messages vers les différents brokers leaders de chaque partition. Quand le producer effectue un send, le message n’est donc pas immédiatement transmis sur le réseau mais accumulé dans un buffer interne. Les batchs sont envoyés dès que l’une des deux conditions suivantes se remplit : soit le timer démarré à la création du batch expire, soit le batch dépasse une certaine taille.
Batch Linger
La configuration linger.ms définit le temps minimum que le producer va attendre avant d’envoyer un batch. Cette propriété laisse donc le temps aux messages de s’accumuler côté producer avant envoi, pour limiter le nombre d’appels réseau vers le broker et gagner en performance globale. Tant que la durée définie dans linger.ms ne s’est pas écoulée, le producer n’enverra pas le batch, à condition que le batch ne dépasse pas la taille maximale autorisée (voir batch.size ci-dessous). Par défaut, cette propriété est configurée à 0 (on n’attend pas que les batchs se remplissent avant envoi). Pour connaître la valeur à configurer, des tests de performance doivent être effectués avec la volumétrie et la fréquence cible des données qui vont transiter par ce topic. Ces tests, en combinaison avec le batch.size, permettront d’assurer un taux de traitement maximum. Kafka fournit des outils natifs pour tester les performances des producers via l’outil en ligne de commande kafka-producer-perf-tool. Ce talk explique comment l’utiliser pour tester différents scénari de performance (les slides sont très bien faites).
Si on laisse le linger.ms à sa valeur par défaut, c’est-à-dire 0, cela ne signifie pas forcément que les producers enverront obligatoirement leurs messages un par un aux différents brokers. Le fonctionnement interne des producers fait que les messages seront envoyés par batch quand l’opportunité se présente côté producer, mais la latence restera très faible.
Batch Size
La configuration batch.size paramètre la taille maximum d’un batch avant envoi. Le batch sera déclenché si sa taille dépasse le batch.size, ou si sa durée de vie dépasse le linger.ms. Comme pour le linger.ms, il est nécessaire de faire des tests pour déterminer une valeur appropriée du batch.size.
Compression Type
Les producers peuvent compresser leurs données avant envoi aux brokers à l’aide de la propriété compression.type. Cette propriété permet au producer de compresser le batch et réduire considérablement sa taille. Charge au consumer de décompresser le batch à la réception. On peut noter ici que la compression est de bout en bout: le broker stocke le batch de messages au format compressé et l’envoi tel quel au consumer pour maximiser les performances.
La compression peut réduire la taille des batchs de 30 à 40% en fonction du format de sérialisation. Un format de sérialisation verbeux comme le json ou le XML possède de nombreux tokens qu’il est facile de compresser comme <, “, :. Même sur des formats de sérialisation autre comme l’avro et le protobuf, on peut avoir des gains de performance avec la compression.
Conclusion
Maîtriser les producers Kafka, c’est comprendre les différents paramètres qui entrent en jeux pour s’assurer que les messages produits sont fiables et performants. Kafka est distribué et résilient, mais charge aux développeurs de tirer parti de ces caractéristiques pour bâtir un système robuste.
Zenika est un partenaire Confluent. À ce titre, si l’article vous a plu et que vous souhaitez en savoir plus sur Kafka, nous vous invitons à vous inscrire à nos formations Kafka Confluent Developer, Kafka Confluent Administration, Kafka avancé et optimisation, et Kafka Streams et Conflent KSQL. Contactez-nous si vous souhaitez de l’accompagnement sur ces sujets Kafka.
Découvrez nos formations officielles Confluent Kafka