ZeroMQ et load-balancing : un exemple concret

Il y a 2 mois, j’ai écrit un article au sujet de ZeroMQ. Si vous ne l’avez pas encore lu, je vous le conseille, je pense avoir réussi à expliquer de manière assez simple les concepts de base de cette bibliothèque réseau aux fonctionnalités très puissantes.

Pour joindre l’utile à l’agréable (comprendre : pour faire un peu de R&D, monter mes équipes en compétence sur des projets intéressants pour l’entreprise), j’ai demandé à mon administrateur système de sortir de sa «zone de confort», et de coder un petit projet en PHP utilisant ZeroMQ.

Le projet

L’idée est de mettre en place un serveur de sauvegarde centralisé. Ce serveur contient un gros disque dur (en fait, plusieurs, mais j’y reviendrai), sur lequel est stocké une copie de tous les fichiers qui doivent être sauvegardés sur les postes de travail. Tous les jours, ce serveur doit lancer une série de rsync pour synchroniser le disque dur local avec les machines à sauvegarder. C’est l’étape de “backup”.

Par la suite, nous recopions les données sur un second disque dur. Chaque dimanche, une copie complète est effectuée, alors que tous les autres jours de la semaine on se contente de faire une copie incrémentale pour ajouter les nouveaux fichiers. C’est l’étape d’“archivage”.

(oui, je sais, on pourrait utiliser un logiciel de sauvegarde comme Amanda, mais l’aspect R&D serait vachement moins évident, hein)

La réalisation

Tout cela n’a rien de bien sorcier. On pourrait faire un simple programme, lancé par crontab, qui lanceraient séquentiellement un rsync sur chaque poste de travail, puis qui effectuerait séquentiellement l’archivage de chaque machine. Le truc, c’est qu’en faisant ainsi, on perdrait un temps fou. De nos jours, on a des processeurs multi-cœurs, des réseaux gigabit ou plus… Ce serait quand même idiot de ne pas mener plusieurs sauvegardes en parallèle.

La première possibilité qu’on pourrait envisager serait simplement de lancer autant de programmes qu’il y a de machines à sauvegarder. Mais ce serait peut-être un peu violent et difficile à surveiller efficacement. Et plus le nombre de machine à sauvegarder augmente, moins cette méthode serait efficace.

On a donc décidé de mettre en place une architecture comprenant  un serveur qui coordonne le travail de plusieurs « workers », des programmes qui effectuent le boulot réel. L’idée est de démarrer un nombre fini de workers, ce qui détermine le nombre de tâches effectuées simultanément, et de leur indiquer les machines à sauvegarder.
ZeroMQ excelle dans ce genre de situation. Les workers vont se connecter au serveur, et attendre qu’il leur envoie des ordres. Le serveur, lui se contentera d’envoyer des ordres séquentiels sur sa socket ; ZeroMQ se chargera de les délivrer en les répartissant aux différents clients (c’est la fonctionnalité de load-balancing intégrée à ZeroMQ).

Petit rappel : À la base, ZeroMQ fournit 3 types de communication. Le REQ/REP sert à faire du client-serveur classique (on fait une requête, on reçoit une réponse) ; le PUSH/PULL pour envoyer des données à sens unique ; le PUB/SUB pour envoyer des données à tous ceux qui s’y sont abonnés. La principale différence entre les deux derniers est que le PUB envoie ses paquets de données − en même temps − à tous les SUB qui y sont connectés, alors que le PUSH envoie ses données successivement à chacun des PULL connectés − l’un après l’autre.

Le problème

Mon admin sys. est revenu vers moi avec un comportement étrange. Quelque soit le nombre de workers, un seul d’entre eux recevait tous les paquets de données. Aïe.

Voici, en très simplifié, le code du serveur :

// création de la socket ZMQ
$ctx = new ZMQContext();
$socket = new ZMQSocket($ctx, ZMQ::SOCKET_PUSH);
$socket->bind('tcp://*:1234');

// création des workers en tâche de fond
for ($i = 0; $i < $nbrWorkers; $i++)
    exec('/path/to/worker.php >> /path/to/log 2>&1 &');

// envoi des ordres de sauvegarde
foreach ($machines as $machine)
    $socket->send($machine);

// envoi des ordres de "suicide"
for ($i = 0; $i < $nbrWorkers; $i++)
    $socket->send('KILL');

Le code du client (là encore, très simplifié) :

// création de la socket ZMQ
$ctx = new ZMQContext();
$socket = new ZMQSocket($ctx, ZMQ::SOCKET_PULL);
$socket->connect('tcp://localhost:1234');

// traitement
while (true) {
    // réception des données
    $msg = $socket->recv();

    // gestion du "suicide"
    if ($msg == 'KILL')
        exit(0);

    // sauvegarde de la machine demandée
    backup($msg);
}

Reprenons le déroulement. Le serveur commence par créer sa socket ZMQ. Puis il crée des sous-processus pour instancier autant de workers que prévu. Dans la foulée, il envoie les noms des machines à sauvegarder. Puis il envoie autant d’ordres de « suicide » qu’il a créé de workers (pour leur demander de s’arrêter une fois que le travail est terminé).

Imaginons que nous avons cinq machines à sauvegarder (nommées A, B, C, D et E), et trois workers (nommés Prime, Seconde et Tierce).
Le serveur envoie les messages dans l’ordre suivant : A, B, C, D, E, KILL, KILL, KILL.

On peut imaginer que les réceptions se fassent de la sorte :

  • A => Prime
  • B => Seconde
  • C => Tierce
  • D => Prime
  • E => Seconde
  • KILL => Tierce
  • KILL => Prime
  • KILL => Seconde

Tout irait bien ; on peut voir que toutes les machines seraient sauvegardées en parallélisant les traitements (jusqu’à 3 sauvegardes simultanées), puis que chaque worker recevrait bien une instruction lui demandant de s’arrêter. Et pourtant, ce n’est pas le cas. L’un des workers reçoit tous les messages, et les deux autres rien du tout.

L’explication

ZeroMQ est une bibliothèque dont l’exécution est très rapide. Dans le code présenté ci-dessus, la partie la plus lente de l’exécution tient dans la connexion réseau ; le moment où les socket BSD entrent en jeu pour connecter un programme à un autre.

En fait, au moment où le premier worker se connecte au serveur, ZeroMQ a déjà dans sa file d’attente interne tous les messages qui doivent être envoyés. Donc, dès que cette première connexion est établie, il lui balance tout. Ce qui est normal, car à ce moment-là il n’y a pas encore d’autre connexion avec laquelle faire le load-balancing.

La solution est donc d’attendre que toutes les connexions soient effectuées avant d’envoyer les données. Il y a deux manières d’y arriver ; l’une est rapide mais crado, l’autre est bien plus propre mais un poil plus complexe.

Méthode quick and dirty

Le plus simple, pour bien comprendre où se situait le problème, est d’ajouter une temporisation entre la création des workers et l’envoi des données. Cela afin de laisser aux workers le temps de se connecter au serveur et d’être « enregistrés » dans le load-balancing.

Voici le code du serveur adapté :

// création de la socket ZMQ
$ctx = new ZMQContext();
$socket = new ZMQSocket($ctx, ZMQ::SOCKET_PUSH);
$socket->bind('tcp://*:1234');

// création des workers en tâche de fond
for ($i = 0; $i < $nbrWorkers; $i++)
    exec('/path/to/worker.php >> /path/to/log 2>&1 &');

// temporisation de 3 secondes
sleep(3);

// envoi des ordres de sauvegarde
foreach ($machines as $machine)
    $socket->send($machine);

// envoi des ordres de "suicide"
for ($i = 0; $i < $nbrWorkers; $i++)
    $socket->send('KILL');

Ah oui, je sais, c’est sale. Mais j’avais prévenu, et ça marche. Par contre, si on augmente le nombre de workers à lancer, on risque d’avoir une temporisation insuffisante. Et si la machine est spécialement lente à ce moment-là, on risque encore de rater des workers.

Méthode propre

Pour bien faire les choses, il faut que les workers envoient un message au serveur, pour lui signifier qu’ils sont prêts à recevoir des données. Ainsi, le serveur n’enverra ses ordres qu’après que tous les workers se soient déclarés.

Comme bien souvent avec ZeroMQ, cela impose d’ouvrir un canal de communication supplémentaire. Mais, comme toujours avec ZeroMQ, il ne faut pas avoir peur de le faire, car c’est simple et rapide à mettre en œuvre.

Cela donne donc une infrastructure de la forme suivante :

Le serveur doit donc ouvrir deux sockets, l’une qui lui servira à envoyer ses ordres aux workers, l’autre pour recevoir les messages envoyés par ceux-ci.
Cette seconde socket est d’ailleurs bien utile : elle permettra de remonter d’autres types d’informations, par exemple pour avertir le serveur à chaque fois qu’une sauvegarde est terminée.

Cela nous amène à un serveur qui ressemble à ceci :

// création des sockets ZMQ
$ctx = new ZMQContext();
$output = new ZMQSocket($ctx, ZMQ::SOCKET_PUSH);
$output->bind('tcp://*:1234');
$input = new ZMQSocket($ctx, ZMQ::SOCKET_PULL);
$input->bind('tcp://*:1235');

// création des workers en tâche de fond
for ($i = 0; $i < $nbrWorkers; $i++)
    exec('/path/to/worker.php >> /path/to/log 2>&1 &');

// réception des confirmations des workers
for ($i = 0; $i < $nbrWorkers; $i++)
    $input->recv();

// envoi des ordres de sauvegarde
foreach ($machines as $machine)
    $output->send($machine);

// envoi des ordres de "suicide"
for $i = 0; $i < $nbrWorkers; $i++)
    $output->send('KILL');

Le code du client :

// création des sockets ZMQ
$ctx = new ZMQContext();
$input = new ZMQSocket($ctx, ZMQ::SOCKET_PULL);
$input->connect('tcp://localhost:1234');
$output = new ZMQSocket($ctx, ZMQ::SOCKET_PUSH);
$output->connect('tcp://localhost:1235');

// envoi de la confirmation de connexion au serveur
$output->send(1);

// traitement
while (true) {
    // réception des données
    $msg = $input->recv();

    // gestion du "suicide"
    if ($msg == 'KILL')
        exit(0);

    // sauvegarde de la machine demandée
    backup($msg);
}

Et là, tout fonctionne à la perfection.

13 commentaires pour “ZeroMQ et load-balancing : un exemple concret

  1. Pourquoi ne pas tout simplement utiliser Amazon s3 ?
    Cela est bien moins fastidieux et évite la perte de données. En ce qui concerne le bakcup du backup, un clic ou petit script suffit…
    Les coûts sont augmentés, mais pour mon cas accéder aux données et ne pas les perdre est plus important que la sécurité d’accès de ses dites données puisqu’elles ont une moindre importance privée.

  2. @hiro : Principalement à cause de la volumétrie. Envoyer les données d’une trentaine de postes de travail et de serveurs internes, sur une ligne ADSL, ce n’est pas possible. Sans parler des délais pour récupérer les données (et quand il y a besoin de restaurer des données, c’est toujours dans l’urgence).

    Par contre, nous utilisons Amazon S3 pour dupliquer les sauvegardes de nos serveurs de production, qui sont sauvegardés 2 fois par sécurité (une fois sur un serveur de backup et une fois sur Amazon S3).

    Bref, le but de cet article n’est pas de discuter des meilleures méthodes de sauvegarde, mais bien de parler de ZeroMQ 🙂

  3. Ce système de sauvegarde m’interesse beaucoup.
    Au final, vos worker lancent des rsync ou vous avez optez pour un autre moyen ?

    Dans votre exemple, si un des workers n’arrivent pas a démarrer ou si il s’arrete avant de prévenir le serveur, alors le serveur attendra indéfiniment non ?

    Est il possible de mettre un timeout sur les recv() ?

  4. @Paul : Oui, les workers lancent des rsync. D’un point de vue théorique, ils peuvent effectivement planter, auquel cas le serveur les attendra indéfiniment. Mais concrètement, c’est plutôt le rsync qui risque de planter, auquel cas le worker remontera l’information de fin de tâche au serveur.

    ZMQ offre la possibilité de faire du polling, ce qui permet d’écouter plusieurs sockets simultanément, et dont on peut configurer une durée de timeout. Idéalement, il faudrait qu’on l’utilise pour ne pas risquer d’attendre indéfiniment (et aussi pour éventuellement renvoyer des ordres qui n’auraient pas été traités).

  5. J’ai un comportement étrange.
    J’ai un process (A) PHP qui lit en BD des taches.
    J’ai un process (M) qui se charge de récupérer les taches de A et de lancer les workers (W).

    Un worker ne peut faire qu’une seule tache et doit mourrir à la fin.
    Du coup j’ai fais
    Worker:
    – PULL sur :1236 qu’il a démarrer
    – attent une tache sur :1234
    – PULL sur :1237 le resultat
    – PULL sur :1236 qu’il s’arrete

    Process M:
    – lance X Worker
    – attent X « start »
    – a chaque « finish » sur :1236 relance un worker

    Process A:
    – lit en BD les taches et les envoie sur :1234

    Avec cette facon, le probleme c’est que A envoie toutes les taches sur :1234
    mais si j’ai 3 workers et 4 taches le 4eme worker creer (a la fin d’un autre) ne recoit pas la 4eme taches…

    Y a un truc que j’ai pas compris ? y a une autre facon plus simple de faire ?

    Mon manager fait un pcntl_fork() puis un exec() et de temps en temps j’ai une assert de ZMQ comment l’éviter ?

  6. Il faut bien comprendre que les données sont bufferisées au niveau des sockets qui reçoivent les paquets, pas au niveau de la socket qui les envoie.

    Dans ton cas, M instancie trois workers : W1, W2 et W3. Ensuite, A écrit quatre tâches (T1, T2, T3 et T4) sur sa socket PUSH, sur le port 1234.
    ZeroMQ gère les choses de la manière suivante : Il regarde quelles sont les sockets PULL qui sont connectées à la socket PUSH. Il en trouve trois. Il envoie donc ses messages en load-balancing sur ces trois connexions. T1 est envoyé sur W1, T2 sur W2, T3 sur W3 et T4 sur W1.

    Le problème, c’est que W1 va s’arrêter après avoir traité T1. Il ne traitera donc pas T4. Et le quatrième worker qui sera créé, il ne recevra aucune tâche, puisque toutes les tâches ont déjà été « distribuées ».

    Il existe deux solutions à ton problème.
    Tu peux faire des workers réutilisables. Ainsi, W1 traitera T1, puis en faisant une autre lecture sur sa socket, il trouvera T2 et le traitera. Le problème, c’est alors de dire aux worker de s’arrêter (pour qu’ils ne tournent pas indéfiniment). Si tu relis l’article, j’explique que nous envoyons des messages KILL pour ordonner aux workers de s’arrêter.

    L’autre possibilité, c’est que ton process M reçoive les tâches envoyées par A, et qu’il les transmette aux workers au compte-goutte, au fur et à mesure qu’il les instancie.

    De manière générale, je trouve bizarre d’avoir un process qui instancie les workers, et un autre qui leur donne des ordres. Avoir un seul programme de gestion des workers est plus simple, et unifie les différents traitements.

  7. J’ai voulu faire 2 process, car le process A tournera sur un serveur, alors que le process M touchera sur plusieurs serveurs.

    J’ai un problème de mémoire dans les processus.
    Je dois faire des millions d’insert dans une table, avec le framework Symfony et j’ai beau faire des free/unset/=null, la mémoire utilisée par le worker grossis à fur et à mesure des insert.

    Du coup je suis obligé de faire en sorte qu’un worker n’execute qu’une tache d’insert.

  8. OK, je vois…

    Pour commencer, deux solutions rapides :
    – Tu vires Symfony. Nan, je rigole… Enfin, pense-y…
    – Les workers pourraient à leur tour instancier un programme à chaque fois qu’ils reçoivent une tâche. Ainsi, pas de problème de fuite de mémoire. Un simple exec et c’est bon. Ça semble malpropre, mais c’est loin d’être idiot.

    Et une solution plus propre : En fait, tu veux faire de la distribution de tâches sur plusieurs machines. Ce que je conseillerais, c’est d’avoir sur chaque machine un « programme-pivot » qui gère les workers. Relis ce que je disais à la fin de mon dernier commentaire en ayant ça en tête.
    Imaginons que tu ais 3 serveurs, avec sur chacun un programme de gestion des workers (M1, M2 et M3). Ces trois programmes sont connectés au programme A en PUSH/PULL. Quand A envoie ses tâches, il les distribue sur M1/M2/M3.
    Chacun de ces programmes a ensuite la responsabilité d’instancier ses workers, de leurs transmettre les tâches, puis de vérifier que les workers ont terminé leur boulot. Éventuellement, ils pourront remonter des statistiques à A.

    Ah, je pense à autre chose. Puisque tu veux faire des workers à usage unique, il vaut peut être mieux que les workers demandent la prochaine tâche à effectuer. Pour cela, il suffit de mettre en place une communication REQ-REP, au lieu de faire de la répartition de tâches grâce au load-balancing intégré aux communications PUSH-PULL.
    Tu peux regarder ce que j’écrivais dans mon article suivant consacré à ZeroMQ (la partie intitulée La solution alternative). Je pense que ça correspond bien à ton besoin.

  9. Merci, pour l’instant j’en suis a testé la solution avec gestionnaires de workers.

    Mais la le soucis que j’ai c’est que je me chope un assert 0MQ …
    Assertion failed: ok (mailbox.cpp:79)
    Abort trap: 6

    Les gestionnaires fonctionnent comme ca:
    – M lance X workers qui attendent les ordres.
    – Les workers notifie à M qu’ils ont démarré (comme dans ton premier article)
    – M recoit les taches à faire et les distribue.
    – des qu’un Worker a fini il envoie la reponse a un programme tier et envoie « fini » à M
    – Des que M recoit un « fini » il instancie un nouveau worker.

    Et c’est ici que l’assert apparait…
    Apparemment c’est une histoire de socket partagé mais ce que je comprends pas c’est que je fais:

    protected function launch() {
      $pid = pcntl_fork();
      if ($pid === -1) {
        Logger::getRootLogger()->critical("Fork failed.");
      } else if ($pid) {
        Logger::getRootLogger()->critical("Father." . $pid);
        return $pid;
      } else {
        Logger::getRootLogger()->critical("Child");
        exec("/path/to/worker");
        exit(0);
      }
    }

    Avec un pcntl_exec c’est meme pas la peine au chargement ca plante…

  10. Mmh… Je ne suis pas certain, mais mon instinct me dis que le problème vient dans le fait de forker après avoir ouvert des sockets ZeroMQ.

    Je m’explique : Quand on fait un fork, le processus enfant hérite de tous les descripteurs ouverts du processus parent. Le problème, c’est que ZeroMQ est une librairie assez complexe, qui ouvre des threads pour gérer ses communications. Donc je pense que le fork vient foutre le bordel là-dedans. Ce n’est pas une analyse très scientifique, mais je sens bien un truc dans ce genre-là.

    On peut remarquer que tous les exemples de code concernant ZeroMQ effectuent la création des workers avant d’ouvrir les sockets. Il doit bien y avoir une raison à cela.

    On en revient aux deux solutions possibles :
    – Soit tu fais des workers réutilisables, qui instancient eux-même des sous-programmes (pcntl_fork + pcntl_exec, ou simplement exec), en espérant que cela ne pose pas de soucis au niveau du ZeroMQ des workers.
    – Soit tu as un programme de gestion des workers séparé du programme qui communique avec les workers.

  11. Bonjour,

    Je me permets de proposer une solution (tardivement) qui me semble bien plus simple en se basant sur la commande ‘sem’ du projet ‘Gnu/parallel’.
    J’ai pas la syntaxe exact en tête mais globalement le script bash ressemblerait à ça :

    #!/bin/bash

    postes=’
    poste1
    poste2
    poste3
    poste4
    poste5

    backup_dir=’/var/backups/postes/ »

    for poste in $postes
    do
    sem -3 « rsync $poste:…. $backup_dir${poste}/ »
    done

    exit 0

    La commande sem va s’occuper de lancer 3 rsync en parallèle et d’attendre pour lancer les suivants.

  12. @Guillaume : Oui, on peut trouver plein de solutions techniques permettant d’arriver au même résultat. Faire du backup avec Amanda, passer par Amazon S3, mettre en place un serveur Gearman, utiliser GNU Parallel, etc.

    Là, l’idée étant de faire de la R&D sur ZeroMQ…

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Notifiez-moi des commentaires à venir via email. Vous pouvez aussi vous abonner sans commenter.