Coherence Part VI : traitement de données distribuées, concurrence et in-place processing

La gestion de la concurrence et la distribution des calculs sont primordiaux pour atteindre la scalabilité que promettent les in memory datagrids. Coherence propose plusieurs mécanismes pour cela, dans cet article nous allons gérer les accès concurrents aux données dans un premier temps avec des locks explicites puis nous verrons comment Coherence permet de faire des traitements lock-free et in place, pour que le noeud qui porte la donnée soit celui qui effectue les opérations afin d’atteindre une scalabilité supérieure.

Locks

Le locking consiste à verrouiller une entrée du cache pour éviter qu’un autre accès à cette entrée puisse la modifier en même temps. Dans PetClinic, nous sommes exposés à cette problématique pour tout ce qui concerne la modification des données, puisqu’il faut à chaque fois faire un get(), modifier l’entrée puis faire un put() avec l’entrée modifiée. Si un autre utilisateur de l’application modifie la même entrée, avec des get et puts entrelacés, on peut perdre la donnée, le dernier appel à put() écrasant le précédant avec les données récupérées lors de son propre get().
get_put_entrelaces.png
Dans ce cas, les modifications apportées par Client 1 sont écrasées. Coherence supporte le locking avec une granularité au niveau d’une entrée du cache. Leur utilisation est simple, il suffit d’appeler le couple de méthodes lock() et unlock() du NamedCache, qui vont blocker et débloquer la clé passé en argument. La méthode lock() demande un timeout en millisecondes comme second paramètre, une valeur de -1 garderas le lock indéfiniment, tandis que 0 le libérera immédiatement. Seul le nœud qui a posé un lock peut le libérer, donc n’oubliez pas de le faire même en cas d’exception.
Pour PetClinic, nous allons améliorer la méthode storePet() qui pour rappel n’était pas thread safe, puisqu’entre le get() et le put() dans le cache, un autre client de la grille pouvait avoir modifié l’entrée.

try {
    boolean lock = getOwnersCache().lock(pet.getOwner().getId());
    if (!lock) {
        throw new ConcurrentModificationException();
    }
    Owner owner = (Owner) getOwnersCache().get(pet.getOwner().getId());
    // store the pet in owner omitted for clarity
    ...
    getOwnersCache().put(owner.getId(), owner);
} finally {
    getOwnersCache().unlock(pet.getOwner().getId());
}

Les locks ne bloquent pas réellement l’entrée, il est toujours possible d’y accéder en lecture, et dans le cas des caches partitionnés en écriture, si vous n’utilisez pas les locks dans toute l’application. En revanche un seul noeud peut posséder le lock, il faut donc penser à le faire avant d’accéder en écriture au cache si vous utilisez cette API.

EntryProcessor

L’utilisation des locks n’est pas très performante sur des caches répliqués ou partitionnés puisque le lock, le get(), le put() et l’unlock font chacun un appel réseau. Pour optimiser ce type d’opération, Coherence propose un mécanisme pour effectuer ce traitement du coté du noeud qui porte l’entrée en une seule opération et donc un seul appel réseau, même si le traitement est effectué sur plusieurs entrées. Ce traitement qui ne nécessite pas de lock explicite est un processor. Ces processors ont aussi pour avantage de tirer naturellement partie de la structure du cluster pour distribuer les traitements. A la place de l’exécution du pattern lock, get, put, unlock en série pour toutes les entrées à traiter, le processor ne nécessitera qu’un appel pour traiter l’ensemble des entrées ciblées en parallèle sur les différents noeuds.
Bilan: réduction des échanges réseau et distribution de la charge de calcul.
Utiliser un EntryProcessor revient globalement à envoyer le code vers les données plutôt que faire venir les données vers le code de traitement. Cela offre plusieurs avantages:

  • diminution du volume de données qui transite sur le réseau, le processor et sa valeur de retour sont envoyés sur le réseau plutôt que la donnée dans les deux sens.
  • répartition la charge sur les différents nœuds du cluster.
  • réalisation d’opérations concurrentes sans lock explicite, les processor sont traités en série sur une entrée. Par contre il est toujours possible qu’avec deux processeurs successifs le second écrase les modifications du premier.
  • modifications des entrées dans le cache atomiques.
  • garantie d’exécution grâce aux fonctionnalités de réplication des caches.

A propos de la garantie d’exécution, celà signifie que si le nœud qui porte la donnée sort du cluster pendant le traitement, c’est la partition de sauvegarde qui est promue partition principale, qui va exécuter ce traitement.
Attention, un processeur ne réalise pas une transaction, si le processeur à des effets de bords et qu’une exception est levée, il n’y a pas de rollback.
L’interface EntryProcessor expose deux méthodes : process(Entry) et processAll(Set). Pour vous simplifier la vie vous pouvez étendre AbstractProcessor qui propose une implémentation par défaut de processAll() qui va simplement itérer sur le Set passé en paramètre et appliquer process() sur chaque entrée.

Utilisation d’un processeur

Nous allons à nouveau modifier la méthode storePet() pour utiliser un processeur qui va enregistrer le Pet . Voici le code du processeur

public class StorePetProcessor extends AbstractProcessor {
    private Pet pet;
    public StorePetProcessor(Pet pet) {
        this.pet = pet;
    }
    public Object process(Entry entry) {
        Owner owner = (Owner) entry.getValue();
        if (pet.getId() == null) {
            pet.setId(RandomUtils.nextInt());
            owner.addPet(pet);
        } else {
            Pet existingPet = owner.getPet(pet.getName());
            existingPet.setBirthDate(pet.getBirthDate());
            existingPet.setName(pet.getName());
            existingPet.setType(pet.getType());
        }
        entry.setValue(owner);
        return null;
    }
}

Pensez à appeler setValue() après avoir modifier l’entrée, pour la sérialiser et l’insérer dans le cache, sinon vous n’aurez fait que modifier la copie de-sérialisée de l’entrée.
Pour exécuter le processeur, on utilise la méthode invoke() de l’interface InvocableMap, implémentée par NamedCache. Les paramètres sont la clé de l’entrée sur laquelle on veut envoyer le processeur et le processeur lui même:

public void storePet(Pet pet) throws DataAccessException {
    EntryProcessor processor = new StorePetProcessor(pet);
    getOwnersCache().invoke(pet.getOwner().getId(), processor);
}

L’exécution du processeur est synchrone et le thread appelant recevra la valeur retournée par le processeur. Attention, Coherence possède un mécanisme de détection des threads bloqués : si votre traitement est long, il risque d’être tué par le GuardSupport. Pour éviter cela, il faut appeler la méthode GuardSupport.heartbeat(), qui va signaler que le thread n’est pas bloqué.
Notez qu’il est possible de lancer un processeur sur une clé qui n’existe pas, il est dans ce cas possible de crée l’entrée avec setValue(). Il est donc possible de coupler une action supplémentaire, comme la vérification des droits.
J’ai utilisé plus haut l’image de l’envoie de code vers les données, pour être plus précis, le processeur va être sérialisé puis envoyé vers les autres nœuds du cluster qui vont le de sérialiser. Le processeur doit donc exister dans le classpath des autres nœuds.

Exécution parallèle de processeurs

Pour exécuter le processeur sur plusieurs entrées en parallèle, vous pouvez utiliser la méthode invokeAll(). Il y a deux variantes, la première prend un Set des clés à traiter et la seconde prend un filtre et qui s’appliquera sur toutes les entrées qui correspondent au filtre. Cette approche est semblable à du Map/Reduce mais au lieu de réduire les résultats, on les traite indépendamment coté serveur. Dans les deux cas, la valeur de retour est une Map avec en clé celle de l’entrée du cache et en valeur le retour de la méthode process() pour cette entrée. L’exécution est distribuée et parallélisée sur le cluster, chaque noeud traitant les données qu’il porte localement. Pour ajouter une nouvelle spécialité à tous les vétérinaires, nous a
urions besoin d’un processeur comme celui ci :

public class AddSpecialtyToVetsProcessor extends AbstractProcessor {
    private Specialty specialty;
    public AddSpecialtyToVetsProcessor(Specialty specialty) {
        this.specialty = specialty;
    }
    public Object process(InvocableMap.Entry entry) {
        Vet vet = (Vet) entry.getValue();
        if (vet.getSpecialties().contains(specialty)) {
            return false;
        }
        vet.addSpecialty(specialty);
        return true;
    }
}

pour utiliser ce processeur sur toutes les entrée du cache, on utilise le AlwaysFilter, qui retourne toujours true.

Specialty specialty = new Specialty();
specialty.setName("radiology");
EntryProcessor processor = new AddSpecialtyToVetsProcessor(specialty);
getVetsCache().invokeAll(AlwaysFilter.INSTANCE, processor);

Ré-entrance des services

Il est impossible d’effectuer un appel réentrant depuis un processeur , c’est à dire accéder à une entrée du cache autre que celle qui est traitée actuellement. En effet, les accès au cache sont traités par un pool de thread et interdire les appels réentrant permet de ne pas bloquer le pool. Les caches Coherence sont groupées dans des services, qui ont chacun un pool de thread et une queue de messages. Tous les appels à un cache sont transmis au service qui le gère par la queue de message. Les services utilisent un modèle threadé, donc chaque message est traité par un thread du pool, un excès de message pourrait saturer le pool et provoquer un deadlock, avec les threads bloqués par en attente des appels réentrants qui ne peuvent êtres traités, faute de thread libre disponible. Pour prévenir ce type de problème, Coherence vérifie que le thread accédant au cache n’est pas un thread de son propre pool et le cas échéant déclenchera une exception avec comme message :

Assertion failed: poll() is a blocking call and cannot be called on the Service thread

Conclusion

Les processeurs sont des outils très simple et puissants à utiliser pour gérer la concurrence et paralléliser de façon performante le traitement des données sur le cluster. En effet, le processeur est exécuté par le noeud qui porte la donnée, le traitement est atomique et sans lock, elle est pas belle la vie ? On peut aussi faire un parallèle avec le select for update du SQL, qui verrouille les données le temps de la mise à jour, les autres accès à cette données attendant simplement que le précédent termine.
Les sources sont téléchargeables : Sur GitHub

git clone git://github.com/obourgain/petclinic-coherence.git
git checkout article6-end

En zip
Index des articles de la série Coherence :

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.

%d blogueurs aiment cette page :