Cet article est le deuxième d’une série en quatre parties sur le fonctionnement interne de la machine virtuelle Envision : le logiciel qui exécute les scripts Envision. Voir part 1, part 3 et part 4. Cette série ne couvre pas le compilateur Envision (peut-être une autre fois), nous allons donc supposer que le script a d’une manière ou d’une autre été converti en bytecode, qui constitue l’entrée de la machine virtuelle Envision.

Comme la plupart des autres systèmes d’exécution parallèles, Envision produit un graphe orienté acyclique (DAG) dans lequel chaque nœud représente une opération à réaliser, et chaque arête représente une dépendance de données, où le nœud en aval a besoin du résultat du nœud en amont pour s’exécuter.

Au-delà des séries temporelles

Les nœuds sont appelés thunks, en référence au concept très similaire issu de Haskell et d’autres langages à évaluation paresseuse.

Exemples de thunks que l’on peut trouver dans un script Envision typique :

  • Analyser un fichier d’entrée au format .xlsx, .csv ou .csv.gz, et le convertir en une représentation par colonnes qui sera utilisée par le reste du script.
  • Charger une plage de lignes $M..N$ à partir d’une colonne individuelle ; cette colonne peut provenir soit du résultat de l’analyse d’un fichier d’entrée (voir ci-dessus), soit du propre format de fichier columnar .ion de Lokad, optimisé pour le stockage dans Microsoft Azure Blob Storage.
  • Étant donnée une plage de lignes $M..N$ d’un très grand vecteur $A$, un vecteur plus petit $B$, un projecteur $\pi$ qui associe chaque ligne de $A$ à une ligne de $B$, et une fonction $f$, calculer $f(A[l], B[\pi(l)])$. Ceci est appelé un join côté map.
  • Utiliser la simulation de Monte Carlo pour estimer la moyenne, la variance ou la distribution du résultat d’un processus aléatoire. Le résultat de plusieurs thunks de Monte Carlo, exécutés en parallèle, peut ensuite être combiné par un thunk final.

En général, on s’attend à ce qu’un thunk prenne entre quelques centaines de millisecondes (pour des manipulations de données de petite envergure) et quelques minutes (pour des simulations de Monte Carlo ou une descente de gradient). C’est une hypothèse forte : la machine virtuelle Envision est autorisée à avoir un surcoût significatif pour l’évaluation de chaque thunk, de l’ordre de la milliseconde. Un script devrait produire un petit nombre de thunks (entre 1 000 et 100 000), chaque thunk effectuant une unité de travail assez importante.

Transparence référentielle

Les thunks sont des fonctions pures : ils sont déterministes et ne peuvent pas avoir d’effets de bord. Ils opèrent en lisant leurs entrées immuables et en renvoyant la même valeur à chaque exécution. Cette propriété importante aide de plusieurs manières :

  1. Puisque l’évaluation d’un thunk n’a aucun effet de bord, elle n’interférera pas avec l’évaluation d’un autre thunk, et ainsi tous les thunks peuvent être exécutés simultanément (tant que leurs entrées sont disponibles) sur plusieurs cœurs CPU, ou même distribués sur plusieurs workers. La machine virtuelle Envision suit le frontier de chaque script (l’ensemble des thunks qui peuvent être exécutés car toutes leurs entrées sont disponibles), et en sélectionne un nouveau dès qu’un CPU se libère.
  2. Inversement, il est possible d’évaluer les thunks un par un et d’obtenir le même résultat. Par exemple, lorsque le cluster est fortement sollicité, que ses workers sont indisponibles, ou lorsqu’il s’agit de reproduire l’évaluation d’un script sur le poste de travail d’un développeur afin d’en investiguer un problème.
  3. Deux workers exécutant le même thunk ne constituent pas une erreur, mais simplement une perte de temps. En tant que tel, cela n’est pas quelque chose qui doit être évité (compte tenu de toutes les difficultés inhérentes à la synchronisation dans un système distribué), il suffit de s’assurer que cela ne se produit pas trop fréquemment1.
  4. Si le résultat d’un thunk est perdu (en raison d’un crash d’un worker ou d’une indisponibilité du réseau), il est possible de le relancer. Même si plusieurs thunks sont perdus, le DAG original reste disponible, et peut servir de filiation des données pour recalculer les valeurs nécessaires.

Cependant, cela signifie également que les thunks ne peuvent pas communiquer entre eux (par exemple, en ouvrant un canal et en transmettant des données entre eux). Cela restreint les stratégies possibles en matière de concurrence et de parallélisme.

Production de thunks

Dans de nombreux frameworks de calcul distribué, le DAG d’exécution est produit en dehors du cluster (par exemple, sur une machine de scheduler), puis des portions du graphe sont envoyées aux workers individuels pour exécution. Très souvent, le DAG doit être produit en plusieurs étapes : par exemple, une opération de jointure peut être optimisée différemment selon la taille des tables2, et il n’est pas toujours possible de connaître la taille d’une table avant d’évaluer son contenu, il vaut donc la peine d’attendre que la taille des tables soit connue avant de générer la portion du DAG qui effectue la jointure. Cela signifie qu’il y aura des allers-retours entre le scheduler et les workers, où le scheduler produira des tâches supplémentaires basées sur les résultats des workers.

Cela fait du scheduler un point de défaillance unique, et permettre plusieurs schedulers actifs, ou un système de basculement entre un scheduler actif et un passif, ajouterait une complexité considérable. Pour Envision, notre objectif resilience était plutôt de garantir qu’un seul worker puisse calculer une mission entière, sans impliquer le scheduler. Ainsi, même si une panne de scheduler de dix minutes empêcherait la soumission de nouvelles missions, elle n’interromprait pas l’exécution des missions déjà lancées. Cependant, cela signifie que les workers devraient être capables de générer de nouvelles portions du DAG sans l’aide du scheduler.

Nous y parvenons en laissant un thunk retourner un nouveau thunk au lieu d’une valeur - pour emprunter davantage de termes à Haskell, construire le DAG implique des monades plutôt que de simples foncteurs. Ce nouveau thunk a ses propres parents, qui peuvent également être de nouveaux thunks, et ainsi de suite, formant ainsi un tout nouveau DAG complet. En pratique, le nouveau DAG partage souvent de nombreux thunks avec l’ancien DAG, car il a besoin des résultats de ces calculs.

Lors de la soumission d’une nouvelle mission au cluster, un seul thunk est soumis (contenant le script à compiler et exécuter, ainsi que les références à tous les fichiers d’entrée). Ce thunk produit ensuite le DAG d’exécution initial, qui s’agrandira quelques fois de plus jusqu’à devenir complet.

Graphe de Merkle

Afin d’être transmis sur le réseau, les thunks sont également sérialisables, en utilisant un format binaire personnalisé conçu pour avoir une faible empreinte. Sur un DAG contenant 100 000 thunks, un budget de 10MiB ne peut supporter que 104 octets par thunk !

Le support de la sérialisation binaire nous a permis de transformer le DAG en un Merkle DAG, où chaque thunk possède un identifiant déterminé par le contenu binaire de ce thunk et de tous ses ancêtres3. Nous appelons cet identifiant le hash du thunk.

L’utilisation d’un Merkle DAG présente deux avantages principaux. Tout d’abord, les thunks qui réalisent la même opération sont automatiquement fusionnés car, ayant le même contenu et les mêmes ancêtres, ils possèdent également le même identifiant.

Deuxièmement, il est possible que deux scripts partagent certains de leurs thunks — peut-être lisent-ils les mêmes fichiers d’entrée et appliquent les mêmes opérations, ou peut-être qu’un Supply Chain Scientist travaille sur le script, modifiant quelques lignes à chaque exécution. Dans ce cas, les résultats des thunks partagés peuvent être réutilisés s’ils sont encore disponibles en mémoire, réduisant ainsi considérablement le temps d’exécution du script. La possibilité de modifier et de réexécuter un script crée une boucle de rétroaction courte qui améliore la productivité des Supply Chain Scientists.

Ordonnancement local des thunks

Nous détaillerons davantage dans un futur article comment l’exécution des thunks est répartie sur plusieurs machines dans un cluster. Pour l’instant, considérez simplement que chaque worker détient une copie du DAG complet, sait quels thunks ont déjà été exécutés (et où trouver leurs résultats), sait quels thunks sont en cours d’exécution par le cluster, et est responsable de l’ordonnancement d’autres thunks à exécuter sur ses 32 cœurs. Cet ordonnancement local est réalisé par un service à thread unique appelé le kernel (à ne pas confondre avec le noyau Linux). Le kernel, ainsi que les threads workers qui exécuteront réellement les thunks, s’exécutent tous dans le même processus .NET afin de partager les objets gérés entre eux.

La recherche d’un nouveau thunk est presque instantanée, puisque le kernel maintient un frontier de thunks prêts à être exécutés pour chaque DAG, et n’a qu’à en choisir un au hasard. La majeure partie du temps du kernel est consacrée à la mise à jour du frontier chaque fois qu’un thunk démarre son exécution (il doit en être retiré), termine son exécution (ses descendants peuvent rejoindre le frontier, en fonction du fait qu’il reste des parents non exécutés), ou se perd en raison de l’indisponibilité du worker détenant son résultat (ses descendants doivent quitter le frontier, mais le thunk lui-même peut être remis dans le frontier si ses propres parents sont encore disponibles).

La gestion des frontiers est un travail à très forte variabilité, pouvant prendre entre une microseconde et plusieurs secondes—plus d’un million de fois plus longtemps ! Par exemple, une étape de shuffle comporte une couche de $N$ thunks qui lisent les résultats d’une autre couche de $M$ thunks. Chaque thunk en aval lit les résultats de l’ensemble des $M$ thunks en amont, ce qui engendre $M\cdot N$ arêtes dans le DAG. Pour $M = N = 1000$ (un degré de parallélisation très probable lorsqu’il s’agit de milliards de lignes), cela représente un million d’arêtes. Si cela n’est pas contrôlé, ce phénomène peut obliger le kernel à s’arrêter pendant plusieurs secondes, période durant laquelle aucun nouveau thunk n’est programmé, et ainsi jusqu’à 32 cœurs restent inactifs4.

Nous résolvons ce problème en introduisant des nœuds virtuels dans le DAG pour représenter ce type de connexion entre les couches. Le nœud virtuel possède $M$ entrées (une pour chaque thunk de la couche en amont) et $N$ sorties (une pour chaque thunk de la couche en aval). Cela réduit le nombre d’arêtes à $M + N$, ce qui est bien plus gérable !

Génération de code de granularité faible

Les premières versions d’Envision, en 2013 et 2014, fonctionnaient sur le principe que chaque opération vectorielle était effectuée par un seul thunk. Lors de l’exécution de T.A / (T.B + 1), il y avait un premier thunk pour diffuser 1 dans la table T, un deuxième thunk pour ajouter T.B au résultat du premier thunk, et un troisième thunk pour diviser T.A par le résultat du deuxième thunk. Cela avait l’avantage de pouvoir facilement implémenter chaque opération comme une fonction C#, exécutée en un seul thunk, ce qui est une excellente idée lors de l’implémentation initiale d’un DSL. Cela comporte, bien sûr, l’inconvénient qu’une quantité inutile de mémoire est consommée (le premier thunk produirait un vecteur de millions de copies de la valeur 1), et que la mémoire prend du temps à être écrite et relue.

Il était impératif d’avoir des thunks qui évaluent plusieurs opérations successivement, plutôt que d’avoir un thunk pour chaque opération.

De nombreuses bases de données SQL fonctionnent selon des variantes du modèle volcan, où la requête est transformée en un arbre d’itérateurs. Chaque itérateur agit comme une fonction impure qui renvoie la valeur suivante à chaque appel, et peut appeler récursivement d’autres itérateurs. Dans ce modèle, la diffusion d’un scalaire dans une table serait un itérateur renvoyant une constante, l’addition ou la division de deux vecteurs contiendrait des références à deux itérateurs, et la lecture d’un vecteur consisterait à en incrémenter l’index :

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Compiler une requête vers le modèle volcan consiste à construire l’arbre des itérateurs :

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Cela a l’avantage qu’aucune allocation de mémoire n’est effectuée pour les vecteurs intermédiaires. Cependant, le coût des appels de fonction domine les opérations arithmétiques simples réalisées par ces fonctions.

C’est pour cette raison qu’en 2015, Envision est passé à la génération de code juste-à-temps. Le principe est assez similaire à celui du moteur d’exécution Tungsten d’Apache Spark : compiler l’opération T.A / (T.B + 1) en une fonction dans un langage impératif.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

La cible que nous utilisons pour cette compilation est .NET IL, le langage bytecode utilisé par .NET pour ses assemblies. Cela nous permet de tirer parti du compilateur JIT de .NET pour produire du code machine optimisé à partir de notre IL généré.

Cette génération de code à l’exécution a constitué le plus grand obstacle lors de la migration d’Envision de .NET Framework vers .NET Core en 2017. En effet, bien que .NET Core prenne en charge les mêmes API System.Reflection que .NET Framework pour produire et exécuter du IL à l’exécution, il ne prend pas en charge la sauvegarde de ce IL sur le disque sous forme de DLL. Bien que cela ne soit pas une exigence pour exécuter Envision, c’est certainement une condition nécessaire pour développer le compilateur d’Envision! System.Reflection ne fait rien pour empêcher la création d’un IL invalide, et ne signale qu’une InvalidProgramException plutôt inutile lorsqu’une méthode contenant un IL invalide est exécutée. La seule approche raisonnable pour étudier de tels problèmes est d’enregistrer un fichier d’assemblage et d’utiliser ILVerify ou ILSpy. En raison de cette exigence, nous avons en réalité continué à cibler à la fois .NET Framework et .NET Core pendant deux ans—la production tournait sur .NET Core, et le débogage IL était effectué sur .NET Framework. Enfin, en 2019, nous avons publié notre propre bibliothèque Lokad.ILPack en remplacement de cette fonctionnalité, et avons migré loin de .NET Framework.

Cela conclut l’analyse d’aujourd’hui sur la manière dont Envision exécute les scripts. Dans l’article suivant, nous aborderons la manière dont sont stockés les résultats intermédiaires.

Promotion sans gêne : nous recrutons des ingénieurs logiciels. Le travail à distance est possible.


  1. Les workers diffusent dans le cluster dès qu’ils démarrent un nouveau thunk, et évitent d’exécuter des thunks que d’autres workers ont déjà revendiqués. Il reste le cas rare où deux workers démarrent le même thunk presque simultanément ; nous évitons cela en faisant choisir à chaque worker un thunk au hasard dans le frontier, et en demandant au scheduler de réduire le nombre de workers lorsque le frontier se contracte trop. Cela signifie que l’exécution en double n’est pas impossible, mais très peu probable. ↩︎

  2. Une jointure shuffle coûteuse est utilisée pour deux grandes tables, et la jointure map-side, moins onéreuse, est utilisée lorsqu’une des tables est suffisamment petite pour tenir en mémoire. ↩︎

  3. Pour les thunks sans ancêtres, tels que ceux qui lisent à partir de fichiers d’entrée, nous incluons le hash du contenu de ces fichiers d’entrée dans le corps du thunk. Cela garantit que si deux thunks lisent le même fichier d’entrée, ils auront le même hash, et si ils lisent deux fichiers d’entrée différents, incluant deux versions différentes du fichier à un chemin donné, alors ils auront des hashes différents. ↩︎

  4. Cela a également un effet sur la taille de la sérialisation. En effet, si toutes les arêtes sont représentées dans le DAG sérialisé, même à raison de seulement deux octets par arête, cela représente déjà 2MB de données ! ↩︎