Journal SeqTools 1.0.0: la programmation concurrente, c'est dur!

Posté par  . Licence CC By‑SA.
31
28
déc.
2019

Sommaire

J'ai profité des vacances pour améliorer ma librairie en python SeqTools, déjà présentée sur linuxfr.

Pour rappel, cette librairie permet le traitement "paresseux" de séquences, c'est-à-dire de tout conteneur qui permet l'accès à ses éléments par indexation, en gros des listes, des tableaux ou un objet qui implémente __getitem__.
La librairie se destine plutôt à des transformations éléments par éléments ou à la réorganisation/combinaison d'une ou de plusieurs séquences entre elles. Pour les pythonistes: c'est un peu le pendant de itertools pour les séquences.
L'intérêt du traitement paresseux, c'est que l'on peut travailler aisément sur des jeux de données qui ne rentreraient pas en mémoire ou prendraient beaucoup de temps si on devait appliquer les transformations pour tous les éléments, alors que l'on ne souhaite parfois en évaluer que quelques-uns (cf le journal précédent).
Pour avoir une meilleure idée de ce dont il retourne, vous pouvez jeter un œil aux exemples de la doc.

Au menu des nouveautés pour cette v1, c'est surtout une refonte de la partie évaluation des éléments avec des processus/threads séparés, qui permet d'utiliser plusieurs cœurs quand on veut lire beaucoup d'éléments successifs.
À noter que j'ai retiré le support pour python 2.

Le multi-processing/multi-threading c'est difficile!

Une des fonctionnalités les plus avancées de seqtools, c'est la fonction prefetch qui calcule la valeur des éléments de manière asynchrone en utilisant des processus ou des threads. Concrètement, si on demande l'élément 10, alors 11, 12, 13, etc. sont aussi envoyés dans la file des tâches et seront immédiatement/plus rapidement disponibles ensuite.

En pratique, c'est transparent pour l'utilisateur:

def f1(x):
sleep(0.005 * (1 + random.random()))
return x

x = list(range(1000))
y = seqtools.smap(f1, x) # applique f1 à tous les éléments de x
y = seqtools.prefetch(y, nworkers=4, max_buffered=16, method="thread")

# notez l'utilisation de l'index ci-dessous: on peut toujours accéder
# à des éléments aléatoirement, on perdra simplement l'accès plus rapide
# aux éléments précalculés
for i in range(1000):
    print(y[i])

La difficulté principale ne vient étonnement pas du caractère asynchrone des réponses qu'il faut réorganiser (ça se résout en une vingtaine de lignes), le problème c'est les problèmes! Ou comment gérer les plantages des workers, du processus parent, de la communication, etc.
Précédemment, les workers n'étaient pas en mode démon, donc le processus parent attendait que les workers quittent pour s'arrêter à son tour. Côté workers, il y avait un timeout pour quitter automatiquement en cas d'inactivité ce qui évite de bloquer indéfiniment à la fin du script. J'ai choisi cette approche car je ne voulais pas prendre le risque de laisser traîner des processus de worker zombies.

À l'usage, cette approche s'est révélée une fausse bonne idée:

  • Il faut relancer les processus qui ont atteint leur timeout (par exemple si le programme prend du temps avant de lire une nouvelle valeur de la séquence). Ça ajoute un délai et le fork qui re-crée un worker à un endroit imprévisible.
  • Si le processus parent plante (segfault, sigbus, sigterm…) les enfants peuvent rester bloqués sur la communication avec le parent (ça devrait pas être le cas, mais la bibliothèque multiprocessing n'est pas spécialement endurcie contre les plantages).
  • Si le timeout est grand, ça retarde d'autant la fin du script quand on fait un ctrl-c pour l'interrompre.

La nouvelle version de SeqTools utilise donc la stratégie inverse où les workers vérifient ponctuellement si le parent est toujours vivant.
Ils finiront donc par quitter naturellement après la fin du script principal. Par ailleurs, j'utilise désormais un Pipe plutôt que multiprocessing.Manager pour communiquer les valeurs, ce qui s'avère à la fois plus rapide et plus robuste aux plantages.

Extension en C pour des transferts sans copies

Pour communiquer au parent une valeur de la séquence calculée par le worker, il faut la sérialiser, l'envoyer dans le tuyau, la réceptionner et la décoder. Ces opérations ajoutent un surcoût assez important alors que l'on manipule souvent de simples tableaux avec une taille fixe bien connue. Pour accélérer ce cas d'usage, seqtools permet désormais d'utiliser de la mémoire partagée où les valeurs sont écrites par les workers et lues par le processus principal.

Comme à l’accoutumée, quelques écueils ont séparé l'idée de l'implémentation:

  • Comme la mémoire partagé ne s'alloue pas à la volée, on peut seulement créer un buffer partagé unique qui stockera à tout instant un nombre limité d'éléments de la séquence.
  • Pour s’accommoder de l'espace limité, il suffit de mettre en place du recyclage: quand une entrée de la mémoire partagée n'est plus utilisée par le script principal, elle est remise en service côté workers pour recevoir une nouvelle valeur de la séquence.
  • Bonne nouvelle: Python est un si bon langage qu'il dispose d'un destructeur, il suffit donc de surveiller quand une ancienne entrée est détruite pour déclencher son recyclage.
  • Mauvaise nouvelle: À partir du tableau en mémoire partagée, un utilisateur peut créer une vue (ex: memoryview ou l'équivalent numpy) qui pointe directement vers la mémoire sous-jacente; le tableau que j'ai renvoyé peut donc être supprimé alors que la mémoire est toujours en service. C'est une fonctionnalité des objets implémentant l'interface buffers, laquelle est largement exploitée par les librairies de manipulation de tableaux comme numpy.
  • Bonne nouvelle: on peut aussi suivre la création et la suppression des vues!
  • Mauvaise nouvelle: c'est seulement au niveau de l'API des extensions en C (plus précisément ici), du coup j'ai dû coder un peu en C.

Pour la suite

J'ai décidé de passer le projet en v1 car il contient tout ce dont j'ai besoin pour mon travail mais je peux très bien ajouter des fonctionnalités. Il y a une assez une bonne couverture du code par des tests, ce qui m'a bien aidé pour chasser les bugs, mais je vous invite à me contacter si vous en trouvez encore.

  • # Projets liés

    Posté par  . Évalué à 10.

    J'ai oublié de mentionner d'autres projets similaires qui existent:

    • dask est orienté pour le fouille de données, l’exécution à la demande n'est pas le comportement par défaut mais peut être explicitement demandée.
    • joblib propose une version plus élaborée de ma fonction seqtools.prefetch avec une API fonctionnelle. Si c'est la seule fonction qui vous intéresse dans seqtools, pensez à y jeter un œil.
    • Appache arrow est plus puissant mais franchement plus lourd et compliqué de mon point de vue.
    • pytorch.data a moins de fonctionnalités.
    • tensorflow.data est très lié au reste de l'écosystème tensorflow.

    Dans l'ensemble, je pense que seqtools se distingue par le fait que ce soit très simple, léger et transparent: une dépendance, peu de code, son API qui prend et retourne de simples objets semblables à des listes et une gestion des erreurs pour aider l'utilisateur au maximum.

  • # Oui c'est compliqué

    Posté par  . Évalué à 5. Dernière modification le 31 décembre 2019 à 09:58.

    Merci pour ce journal et la librairie qui est vraiment intéressante.

    En effet c'est ces détails d'implémentations qui sont complexes et qui font aussi qu'une implémentation naïve est parfois complètement inutile. De mon coté je suis déjà tombé dans les écueils suivant:

    • le temps de création des processus est trop long (typiquement parce qu'on fork alors que le processus principal à déjà un gros objet en mémoire). Utiliser la bonne méthode de démarrage, dont forkserver peut aider.
    • la communication entre les processus est plus long que le traitement ! (parcequ'on pickle des gros objets ou plein de petits objets). Là la mémoire partagée aide, mais comme tu disais, il faut gérer manuellement, donc c'est bien plus complexe.
    • l’implémentation de imap_unordered est en fait greedy (gloutonne) du coté de la production des tâches (le stock de tâches est créé immédiatement et non pas au fur et à mesure de la consommation) ce qui peut conduire à une explosion de la mémoire… le prefetch de SeqTools répond bien à ça.

    Coté création/suppression des processus, a tu regardé (plus pour inspiration) ipyparallel ?

    • [^] # Re: Oui c'est compliqué

      Posté par  . Évalué à 4.

      Merci pour ton commentaire, je vais jeter un œil au forkserver et aussi à ipyparallel qui pourrait me servir au boulot.
      Je n'ai pas voulu trop rentrer dans les détails pour le journal, mais ton premier point est aussi une des raisons pour lesquelles j'ai retiré le timeout sur les processus inactifs. Pour un de mes scripts, j'avais désactivé l'overcommit car je ne voulais pas me retrouver bloqué à l'extérieur de mon serveur en cas de fuites mémoire. Comme les forks pour redémarrer des workers avaient lieu pendant que je travaillais sur un gros tableau, j'avais des OutOfMemory rapidement.
      Au sujet de ton deuxième point, c'est vrai mais ce n'est pas aussi terrible que je le craignais initialement. cPickle est pas mal optimisé donc la latence est négligeable dans beaucoup de mes cas d'utilisation. En revanche, j'ai fait quelques essais avec l'interpréteur PyPy qui implémente pickle en pur python et c'est souvent trop lent. Je ré-essaierai PyPy avec la mémoire partagée quand ils auront corrigé un bug dans le support des extensions.

  • # Merci

    Posté par  . Évalué à 1.

    Merci pour le partage. Je fais très peu de python et je n'ai pas besoin de ton outil, mais je suis toujours intéressé par les commentaire d'autres développeurs :)

    La difficulté principale ne vient étonnement pas du caractère asynchrone des réponses qu'il faut réorganiser (ça se résout en une vingtaine de lignes), le problème c'est les problèmes!

    Ça m'étonne. Ta bibliothèque1 semble considérer l'ordre comme importante (le nom et cette citation). Mais l'API incite à avoir un accès aléatoire aux donnée.

    Tu as regardé du coté de RxPython ? Ça ne fait pas du tout la même chose (le modèle d'exécution n'a rien à voir), mais leur API est plutôt complète pour permettre à l'utilisateur de gérer les cas d'erreur (entre autre). Par contre Rx va fortement teinter le code qui l'utilise (ce sont des types à eux qui sont utilisé et non des simili tableau).


    1. je trouve bien plus sympa de parler de bibliothèque qui amha véhicule une notion de partage plutôt que le faux ami librairie qui fait plus référence à une relation de clientélisme ;) 

    https://linuxfr.org/users/barmic/journaux/y-en-a-marre-de-ce-gros-troll

    • [^] # Re: Merci

      Posté par  . Évalué à 1. Dernière modification le 06 janvier 2020 à 10:47.

      La bibliothèque prend et manipule des données indexées de 0 à N, donc des séquences dans le jargon de python. On peut donc notamment faire de l'accès aléatoire au données, c'est à dire demander le i-ème élément directement à la différence d'une API à base d'itérateurs par exemple.
      L'objet renvoyé par prefetch ne fait pas exception, mais du fait qu'il récupère les valeurs de manière asynchrone il faut réordonner les réponses, renvoyer l'élément réclamé et mettre dans un tampon les autres.
      Certes, prefetch ne donne un gain de vitesse que quand on itère sur la séquence, mais il y a un intérêt à garder la possibilité d'accéder à un élément aléatoire: on ne casse pas l'api même si on place le prefetch au milieu d'un série de transformations et on peut plus facilement accéder à une valeur quelconque durant le prototypage ou le débogage.
      J'ai regardé RxPython à un moment, mais je voulais une API plus générique, plus pythonique et plus transparente à l'utilisation (c'est pas une critique, c'est juste un autre cas d'usage AMHA).

Suivre le flux des commentaires

Note : les commentaires appartiennent à celles et ceux qui les ont postés. Nous n’en sommes pas responsables.