Cet article est le deuxième d’une série en quatre parties sur le fonctionnement interne de la machine virtuelle Envision : le logiciel qui exécute les scripts Envision. Voir partie 1, partie 3 et partie 4. Cette série ne couvre pas le compilateur Envision (peut-être une autre fois), donc supposons simplement que le script a été converti d’une manière ou d’une autre en bytecode, que la machine virtuelle Envision prend en entrée.

Comme la plupart des autres systèmes d’exécution parallèle, Envision produit un graphe acyclique dirigé (DAG) où chaque nœud représente une opération à effectuer, et chaque arête représente une dépendance de données où le nœud aval a besoin de la sortie du nœud amont pour s’exécuter.

Au-delà des séries temporelles

Les nœuds sont appelés des thunks, d’après le concept très similaire de Haskell et d’autres langages avec une évaluation paresseuse.

Exemples de thunks que l’on peut trouver dans un script Envision typique :

  • Analyser un fichier d’entrée au format .xlsx, .csv ou .csv.gz, et le convertir en une représentation en colonnes qui sera utilisée par le reste du script.
  • Charger une plage de lignes $M..N$ à partir d’une colonne individuelle ; cette colonne peut être extraite soit du résultat de l’analyse d’un fichier d’entrée (voir ci-dessus), soit du format de fichier en colonnes .ion propre à Lokad, optimisé pour le stockage dans Microsoft Azure Blob Storage.
  • Étant donné une plage de lignes $M..N$ d’un très grand vecteur $A$, un vecteur plus petit $B$, un projecteur $\pi$ qui associe chaque ligne de $A$ à une ligne de $B$, et une fonction $f$, calculer $f(A[l], B[\pi(l)])$. Cela s’appelle une jointure côté map.
  • Utiliser la simulation de Monte Carlo pour estimer la moyenne, la variance ou la distribution du résultat d’un processus aléatoire. Le résultat de plusieurs thunks de Monte Carlo, exécutés en parallèle, peut ensuite être combiné par un thunk final.

En général, on s’attend à ce qu’un thunk prenne entre quelques centaines de millisecondes (pour la manipulation de données à petite échelle) et quelques minutes (pour les simulations de Monte Carlo ou la descente de gradient). C’est une hypothèse forte : la machine virtuelle Envision peut avoir des frais généraux importants pour l’évaluation de chaque thunk, de l’ordre de millisecondes. Un script devrait produire un petit nombre de thunks (entre 1 000 et 100 000), chaque thunk effectuant une unité de travail assez importante.

Transparence référentielle

Les thunks sont des fonctions pures : elles sont déterministes et ne peuvent pas avoir d’effets secondaires. Elles fonctionnent en lisant leurs entrées immuables et en renvoyant la même valeur à chaque exécution. Cette propriété importante présente de nombreux avantages :

  1. Étant donné que l’évaluation d’un thunk n’a pas d’effets secondaires, elle n’interférera pas avec l’évaluation d’un autre thunk, et donc tous les thunks peuvent être exécutés simultanément (tant que leurs entrées sont disponibles) sur plusieurs cœurs de CPU, voire répartis sur plusieurs travailleurs. La machine virtuelle Envision suit la frontière de chaque script (l’ensemble des thunks qui peuvent être exécutés car toutes leurs entrées sont disponibles) et choisit un nouveau thunk à partir de celle-ci chaque fois qu’un CPU devient disponible.
  2. À l’inverse, il est possible d’évaluer les thunks un par un et d’obtenir le même résultat. Par exemple, lorsque le cluster est soumis à une charge importante, lorsque les travailleurs du cluster ne sont pas disponibles, ou lors de la reproduction de l’évaluation d’un script sur la station de travail d’un développeur afin d’investiguer un problème.
  3. Deux travailleurs exécutant le même thunk n’est pas une erreur, juste une perte de temps. Ainsi, ce n’est pas quelque chose qui doit être évité (avec toutes les difficultés liées à la synchronisation sur un système distribué), il suffit de s’assurer que cela n’arrive pas trop souvent1.
  4. Si le résultat d’un thunk est perdu (en raison d’un crash du travailleur ou d’une indisponibilité du réseau), il est possible de l’exécuter à nouveau. Même si plusieurs thunks sont perdus, le DAG d’origine reste disponible et peut être utilisé comme une lignée de données pour recalculer les valeurs nécessaires.

Cependant, cela signifie également que les thunks ne peuvent pas se communiquer entre eux (par exemple, en ouvrant un canal et en transmettant des données entre eux). Cela limite les stratégies disponibles pour la concurrence et le parallélisme.

Production de thunks

Dans de nombreux frameworks de calcul distribué, le DAG d’exécution est produit en dehors du cluster (par exemple, sur une machine de planification) et des portions du graphe sont ensuite poussées vers des travailleurs individuels pour l’exécution. Très souvent, le DAG doit être produit en plusieurs étapes : par exemple, une opération de jointure peut être optimisée différemment en fonction de la taille des tables2, et il n’est pas toujours possible de connaître la taille d’une table avant d’évaluer réellement son contenu, il vaut donc la peine d’attendre que les tailles des tables soient connues avant de générer la portion du DAG qui effectue la jointure. Cela signifie qu’il y aura un va-et-vient entre le planificateur et les travailleurs, où le planificateur produira des tâches supplémentaires en fonction des résultats des travailleurs.

Cela transforme le planificateur en un point de défaillance unique, et permettre plusieurs planificateurs actifs, ou un schéma de basculement entre un planificateur actif et un planificateur passif, ajouterait une complexité considérable. Pour Envision, notre objectif de résilience était plutôt de garantir qu’un seul travailleur est capable de calculer une mission entière, sans impliquer le planificateur. Ainsi, même une panne de dix minutes du planificateur empêcherait la soumission de nouvelles missions, mais n’interromprait pas les missions déjà démarrées. Cependant, cela signifie que les travailleurs doivent être capables de générer de nouvelles portions du DAG sans l’aide du planificateur.

Nous y parvenons en permettant à une fonction de retourner une nouvelle fonction plutôt qu’une valeur - pour utiliser davantage les termes de Haskell, la construction du DAG implique des monades plutôt que simplement des foncteurs. Cette nouvelle fonction a ses propres parents, qui peuvent également être de nouvelles fonctions, et ainsi de suite, formant un nouveau DAG complet. En pratique, le nouveau DAG partage souvent de nombreuses fonctions avec l’ancien DAG, car il a besoin des résultats de ces calculs.

Lors de la soumission d’une nouvelle mission au cluster, une seule fonction est soumise (contenant le script à compiler et à exécuter, ainsi que les références à tous les fichiers d’entrée). Cette fonction produit ensuite le DAG d’exécution initial, qui va croître encore quelques fois jusqu’à ce qu’il soit complet.

Graphe de Merkle

Afin d’être transmises sur le réseau, les fonctions sont également sérialisables, en utilisant un format binaire personnalisé conçu pour avoir une empreinte mémoire réduite. Sur un DAG avec 100 000 fonctions, un budget de 10 MiB ne peut prendre en charge que 104 octets par fonction !

Le support de la sérialisation binaire nous a permis de transformer le DAG en un DAG de Merkle, où chaque fonction a un identifiant déterminé par le contenu binaire de cette fonction et de tous les ancêtres de la fonction3. Nous appelons cet identifiant le hash de la fonction.

L’utilisation d’un DAG de Merkle présente deux avantages principaux. Premièrement, les fonctions qui effectuent la même opération sont automatiquement fusionnées car, ayant le même contenu et les mêmes ancêtres, elles ont également le même identifiant.

Deuxièmement, il est possible que deux scripts partagent certaines de leurs fonctions - peut-être qu’ils lisent les mêmes fichiers d’entrée et leur appliquent les mêmes opérations, ou peut-être qu’un Supply Chain Scientist travaille sur le script, en changeant quelques lignes à chaque exécution. Lorsque cela se produit, les sorties des fonctions partagées peuvent être réutilisées si elles sont toujours disponibles en mémoire, ce qui réduit considérablement le temps d’exécution du script. Pouvoir modifier et réexécuter un script crée une boucle de rétroaction courte qui aide à la productivité des Supply Chain Scientists.

Planification locale des fonctions

Nous entrerons dans plus de détails dans un article futur sur la façon dont l’exécution des fonctions est répartie sur plusieurs machines dans un cluster. Pour l’instant, considérez simplement que chaque travailleur détient une copie de l’ensemble du DAG, sait quelles fonctions ont déjà été exécutées (et où trouver leurs résultats), sait quelles fonctions sont actuellement en cours d’exécution par le cluster, et est responsable de la planification de fonctions supplémentaires à exécuter sur ses 32 cœurs. Cette planification locale est effectuée par un service à thread unique appelé le noyau (qui ne doit pas être confondu avec le noyau Linux). Le noyau, ainsi que les threads des travailleurs qui exécuteront réellement les fonctions, s’exécutent tous dans le même processus .NET afin de partager des objets gérés les uns avec les autres.

Trouver une nouvelle fonction est presque instantané, car le noyau conserve une frontière de fonctions prêtes à être exécutées pour chaque DAG et n’a besoin d’en choisir une au hasard. La majeure partie du temps du noyau est plutôt consacrée à la mise à jour de la frontière chaque fois qu’une fonction commence à s’exécuter (elle doit quitter la frontière), termine son exécution (ses descendants peuvent rejoindre la frontière, selon qu’elle a encore des parents non exécutés) ou devient perdue en raison de l’indisponibilité du travailleur qui détient son résultat (ses descendants doivent quitter la frontière, mais la fonction elle-même peut être ajoutée à nouveau à la frontière si ses propres parents sont toujours disponibles).

S’occuper des frontières est un travail avec une très grande variabilité, cela peut prendre entre une microseconde et plusieurs secondes - plus d’un million de fois plus longtemps ! Par exemple, une étape de mélange a une couche de $N$ fonctions qui lisent les sorties d’une autre couche de $M$ fonctions. Chaque fonction en aval lit les sorties de toutes les $M$ fonctions en amont, ce qui donne $M\cdot N$ arêtes dans le DAG. Pour $M = N = 1000$ (un degré de parallélisation très probable, lorsqu’il s’agit de traiter des milliards de lignes), cela fait un million d’arêtes. Si cela n’est pas contrôlé, ce phénomène peut amener le noyau à faire une pause de plusieurs secondes, pendant laquelle aucune nouvelle fonction n’est planifiée pour s’exécuter, et donc jusqu’à 32 cœurs restent inactifs4.

Nous résolvons ce problème en introduisant des nœuds virtuels dans le DAG pour représenter ce type de connexion entre les couches. Le nœud virtuel a $M$ entrées (une pour chaque fonction de la couche en amont) et $N$ sorties (une pour chaque fonction de la couche en aval). Cela réduit le nombre d’arêtes à $M + N$, ce qui est beaucoup plus gérable !

Génération de code à faible granularité

Les premières versions d’Envision, en 2013 et 2014, fonctionnaient sur la base que chaque opération vectorielle est effectuée par une seule fonction. Lors de l’exécution de T.A / (T.B + 1), il y aurait une fonction pour diffuser 1 dans la table T, une autre fonction pour ajouter T.B au résultat de la première fonction, et une troisième fonction pour diviser T.A par le résultat de la deuxième fonction. Cela avait l’avantage que nous pouvions facilement implémenter chaque opération en tant que fonction C#, exécutée en tant que seule fonction, ce qui est une excellente idée lors de la première implémentation d’un DSL. Cela a bien sûr pour inconvénient que des quantités inutiles de mémoire sont consommées (la première fonction produirait un vecteur de millions de copies de la valeur 1), et la mémoire prend du temps à être écrite et lue.

Il était impératif d’avoir des fonctions qui évaluent plusieurs opérations successives, au lieu d’avoir une fonction pour chaque opération.

De nombreuses bases de données SQL fonctionnent sur des variations du modèle volcano, où la requête est transformée en un arbre d’itérateurs. Chaque itérateur agit comme une fonction impure qui renvoie la prochaine valeur de l’itération à chaque appel, et peut appeler récursivement d’autres itérateurs. Dans ce modèle, diffuser un scalaire dans une table serait un itérateur renvoyant une constante, ajouter ou diviser deux vecteurs tiendrait des références à deux itérateurs, et lire à partir d’un vecteur incrémenterait à travers celui-ci :

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Compiler une requête vers le modèle volcano consiste à construire l’arbre des itérateurs :

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Cela a l’avantage qu’aucune allocation de mémoire n’est effectuée pour les vecteurs intermédiaires. Cependant, le surcoût des appels de fonctions domine les opérations arithmétiques simples que ces fonctions effectuent.

Pour cette raison, en 2015, Envision est passé à la génération de code à la volée. Le principe est assez similaire au moteur d’exécution Tungsten d’Apache Spark : compiler l’opération T.A / (T.B + 1) en une fonction dans un langage impératif.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}
L18 

La cible que nous utilisons pour cette compilation est le .NET IL, le langage bytecode utilisé par .NET pour ses assemblies. Cela nous permet de tirer parti du compilateur .NET JIT pour produire du code machine optimisé à partir de notre IL généré.

Cette génération de code à l'exécution s'est avérée être le plus grand obstacle lors de la migration d'Envision de .NET Framework à .NET Core en 2017. En effet, bien que .NET Core prenne en charge les mêmes API `System.Reflection` que .NET Framework pour produire et exécuter de l'IL à l'exécution, [il ne prend pas en charge l'enregistrement de cet IL sur le disque en tant que DLL](https://github.com/dotnet/runtime/issues/15704). Bien que cela ne soit pas une exigence pour _exécuter_ Envision, cela constitue certainement une exigence pour développer le compilateur Envision ! `System.Reflection` n'empêche pas la création d'IL invalide et ne signale qu'une `InvalidProgramException` plutôt inutile lorsqu'une méthode contenant de l'IL invalide est exécutée. La seule approche raisonnable pour enquêter sur de tels problèmes est d'enregistrer un fichier assembly et d'utiliser ILVerify ou ILSpy. En raison de cette exigence, nous avons en fait continué à cibler à la fois .NET Framework et .NET Core pendant deux ans&mdash;la production s'exécutait sur .NET Core et le débogage de l'IL était effectué sur .NET Framework. Enfin, en 2019, nous avons publié notre propre bibliothèque [Lokad.ILPack](https://github.com/Lokad/ILPack) en remplacement de cette fonctionnalité et nous nous sommes éloignés de .NET Framework.

Cela conclut l'analyse d'aujourd'hui sur la façon dont Envision exécute les scripts. Dans le prochain article, nous discuterons de la façon dont les résultats intermédiaires sont stockés.

_Petite publicité : nous recrutons des [ingénieurs logiciels](/fr/ingenierie-logicielle/). Le travail à distance est possible._

  1. Les travailleurs diffusent dans le cluster chaque fois qu’ils commencent un nouveau thunk et évitent d’exécuter des thunks que d’autres travailleurs ont revendiqués. Il reste le cas rare où deux travailleurs commencent le même thunk presque en même temps ; nous évitons cela en faisant en sorte que chaque travailleur choisisse un thunk aléatoire dans la frontière et en faisant en sorte que le planificateur réduise le nombre de travailleurs lorsque la frontière diminue trop. Cela signifie que l’exécution en double n’est pas impossible, mais très improbable. ↩︎

  2. Une jointure coûteuse par redistribution est utilisée pour deux grandes tables, et la jointure côté mappage moins chère est utilisée lorsque l’une des tables est suffisamment petite pour tenir en mémoire. ↩︎

  3. Pour les fonctions sans ancêtres, telles que celles qui lisent à partir de fichiers d’entrée, nous incluons le hash du contenu de ces fichiers d’entrée à l’intérieur du corps de la fonction. Cela garantit que si deux fonctions lisent le même fichier d’entrée, elles auront le même hash, et si elles lisent deux fichiers d’entrée différents, y compris deux versions différentes du fichier à un chemin donné, alors elles auront des hashes différents. ↩︎

  4. Cela a également un effet sur la taille de la sérialisation. En effet, si toutes les arêtes sont représentées dans le DAG sérialisé, même avec seulement deux octets par arête, cela représente déjà 2 Mo de données ! ↩︎