Cet article est le quatrième d’une série en quatre parties sur le fonctionnement interne de la machine virtuelle Envision : le logiciel qui exécute les scripts Envision. Consultez la partie 1, la partie 2 et la partie 3. Cette série ne couvre pas le compilateur Envision (peut-être une autre fois), donc supposons simplement que le script a été converti en bytecode que la machine virtuelle Envision utilise en entrée.

Les articles précédents ont principalement examiné comment les travailleurs individuels exécutent les scripts Envision. Cependant, pour la résilience et les performances, Envision est en réalité exécuté sur un cluster de machines.

Chaque couche d’un travailleur communique avec la même couche des autres travailleurs, ou avec d’autres couches du même travailleur. Cela garantit que la communication réseau peut rester un détail d’implémentation privé de chaque couche.

À un niveau inférieur, chaque travailleur ouvre deux connexions TLS vers chaque autre machine du cluster, et les communications des différentes couches sont multiplexées à travers ces deux connexions (une connexion est utilisée pour les messages courts, l’autre pour les transferts de données volumineux).

Exécution distribuée abstraite

Couche de Contrôle

Cette couche est utilisée par le planificateur pour attribuer et désattribuer des missions aux travailleurs et n’implique aucune communication entre les travailleurs. Les principaux messages de cette couche sont les suivants :

  • Le planificateur demande au travailleur de commencer à travailler sur une mission.
  • Le planificateur demande au travailleur de cesser de travailler sur une mission.
  • Le travailleur informe le planificateur qu’il a rencontré une erreur catastrophique lors de l’exécution d’une mission (généralement un problème non déterministe, tel que “le lecteur NVMe a pris feu”, ce qui signifie que la même mission peut être tentée à nouveau à l’avenir ou sur un autre travailleur).
  • Le travailleur fournit au planificateur des statistiques sur son état actuel : liste des missions, taille de la frontière de chaque DAG de mission, nombre total de thunks restant à exécuter dans chaque DAG de mission.

Le planificateur utilise ces statistiques pour décider quand réattribuer des missions. Les règles réelles pour le faire sont assez complexes, car elles dépendent de règles de priorité, d’équité entre plusieurs locataires et entre les scripts du même locataire, et de la charge globale du cluster à ce moment-là, mais la tendance générale est que les missions avec une frontière suffisamment grande peuvent être réparties sur plusieurs travailleurs, tant que ces travailleurs ne sont pas déjà surchargés. Avec la même quantité de travail à effectuer, il est plus efficace d’exécuter quatre missions sur un seul travailleur que de les répartir sur tous les travailleurs.

Couche d’Exécution

Chaque travailleur garde une trace des thunks qu’il exécute actuellement et diffuse cette liste aux autres travailleurs chaque fois qu’il planifie un nouveau thunk1. Cela garantit que, en dehors de la très courte fenêtre liée à la latence du réseau, deux travailleurs ne commenceront pas à exécuter le même thunk.

Bien sûr, si un travailleur cesse d’envoyer ces mises à jour (par exemple, parce qu’il a planté ou qu’il est déconnecté du reste du cluster), ses pairs considéreront toute liste plus ancienne que quelques secondes comme obsolète et se permettront d’exécuter ces thunks.

Couche de Métadonnées

Chaque travailleur essaie de conserver une copie complète des métadonnées, mais ne se synchronise pas réellement. Nous avons choisi de ne pas garantir que tous les travailleurs sont d’accord sur les mêmes métadonnées exactes, et travaillons plutôt avec des garanties de cohérence éventuelle. Cela rend la distribution de la couche de métadonnées la plus difficile en termes de conception2.

La cohérence éventuelle de cette couche suit trois règles principales :

  1. Chaque modification locale de la couche de métadonnées est immédiatement diffusée à tous les autres travailleurs. Cette diffusion peut échouer et ne sera pas réessayée.
  2. Les modifications distantes reçues des autres travailleurs sont fusionnées dans la couche de métadonnées locale, en fonction d’une progression monotone3 : une valeur “pas de résultat” pour un thunk peut être écrasée par une valeur “checkpoint” (ce qui signifie que le thunk a commencé, mais n’a pas terminé, son exécution), qui peut être écrasée par une valeur “alias” (ce qui signifie que le thunk a renvoyé un DAG à exécuter à sa place), qui peut être écrasée par une valeur “résultat” (qui peut être soit un résultat réussi avec ses atomes associés, soit une erreur fatale).
  3. Chaque fois qu’une autre couche envoie une réponse réseau basée sur une valeur de la couche de métadonnées, la couche de métadonnées diffuse également cette valeur.

La troisième règle est conçue pour forcer un niveau de synchronisation lorsqu’il est réellement pertinent. Par exemple, considérez la séquence d’événements suivante :

  • Le planificateur demande à un travailleur d’exécuter une mission (par le biais de la couche de contrôle)
  • Le travailleur exécute la mission et diffuse le résultat (par le biais de la couche de métadonnées), mais le message est perdu en route vers le planificateur.
  • Le planificateur remarque que le travailleur n’exécute plus la mission (par le biais de la couche de contrôle) et lui demande de l’exécuter à nouveau.
  • Le travailleur constate que le thunk de la mission a déjà un résultat dans la couche de métadonnées et ne fait rien, car rien n’a besoin d’être fait.

Il s’agit d’une impasse où le planificateur et le travailleur ne sont pas d’accord sur l’état d’un thunk dans la couche de métadonnées (le travailleur pense qu’il est terminé, le planificateur pense qu’il ne l’est pas). La troisième règle résout cela en décidant que puisque la réponse du travailleur est “Je ne travaille plus sur cette mission” et est basée sur l’observation du travailleur selon laquelle le thunk a un résultat, alors la couche de métadonnées devrait diffuser à nouveau cette information. L’impasse est alors résolue :

  • La couche de métadonnées du travailleur diffuse à nouveau le résultat du thunk, et il est reçu par le planificateur.
  • Le planificateur réagit à l’apparition d’un résultat pour le thunk d’une mission, en marquant cette mission comme terminée et en informant le client qui a demandé cette mission.

Couche Atom

Les travailleurs combinent leurs couches atomiques pour créer un magasin de blobs distribué, où chaque atome peut être demandé par son identifiant - le hachage de 128 bits de son contenu, créé avec SpookyHash. Il ne s’agit pas d’une table de hachage distribuée (DHT), car cela fournirait les mauvais compromis : dans une DHT, trouver un atome serait rapide (étant donné son hachage, l’identifiant du travailleur qui le détient peut être calculé avec une fonction simple), mais écrire un atome serait lent (il devrait être envoyé de la machine qui l’a calculé à la machine qui est censée le détenir compte tenu de la disposition actuelle de la DHT). Étant donné que la plupart des atomes sont censés être consommés sur la même machine qui les a produits, cela est gaspilleur.

Au lieu de cela, chaque fois qu’un travailleur demande un atome de sa propre couche atomique, il recherche d’abord cet atome sur ses propres disques NVMe. S’il n’est pas trouvé, les autres travailleurs sont interrogés sur l’existence de cet atome. Il s’agit du plus grand défi de performance de la conception distribuée d’Envision, car ces requêtes doivent être terminées le plus rapidement possible, et une stratégie de délai d’attente complexe est nécessaire pour traiter les travailleurs non réactifs : attendre trop longtemps et vous avez perdu des secondes à attendre une réponse qui n’est jamais venue ; abandonner trop tôt et vous devrez recomputer un atome qui aurait pu être téléchargé depuis un autre travailleur.

Pour aider à cela, la couche atomique regroupe également plusieurs demandes ensemble, afin de s’assurer que tous les autres travailleurs conservent un pipeline complet de demandes auxquelles ils doivent répondre, et pour détecter plus facilement lorsque les temps de réponse d’un travailleur augmentent soudainement.

Une fois qu’au moins un autre travailleur a confirmé l’existence de l’atome sur son disque, une deuxième demande est envoyée pour télécharger l’atome. De telles demandes de téléchargement ont tendance à être très irrégulières, car de nombreux thunks demandent d’abord leurs atomes, puis commencent à traiter leur contenu. Pour cette raison, la couche atomique est consciente qu’il existe une seule file d’attente de téléchargement pour chaque paire de travailleurs, et ne panique pas si une demande d’atome donnée ne reçoit pas son premier octet pendant plusieurs secondes (si la file d’attente est pleine et que d’autres atomes reçoivent leurs octets, alors il n’y a rien à craindre). En un sens, le délai d’attente n’est pas au niveau de la demande d’atome, mais au niveau de l’ensemble de la couche.

De plus, deux optimisations sont appliquées à la file d’attente de transfert :

  1. Chaque demande spécifie quel thunk a besoin des données, de sorte que l’expéditeur essaie de regrouper les demandes du même thunk (plus rapidement un thunk donné est débloqué, plus rapidement il pourra commencer à traiter ses entrées).
  2. Lorsqu’une exécution de thunk est annulée (en raison d’une erreur, d’un changement de priorité ou parce qu’il est découvert qu’un autre worker l’a déjà terminée), la couche atomique communique cette annulation afin que toutes les demandes de ce thunk puissent être purgées de la file de téléchargement.

Un worker typique envoie des données par rafales de 1 Go/s, couvrant généralement 7 Go de données par rafale.

Couche de journalisation

Cette couche conserve des informations supplémentaires sur l’état de l’exécution, afin qu’elles puissent être examinées ultérieurement pour enquêter sur les problèmes ou mesurer les performances. Elle est très détaillée, contenant des informations telles que les thunks qui ont été exécutés, combien de temps ils ont mis à s’exécuter et quel type de résultat ils ont produit. Des événements importants, tels que la construction d’un nouveau DAG (y compris le DAG sérialisé lui-même) ou la découverte qu’un atome est manquant, sont également enregistrés. Au total, plusieurs gigaoctets sont produits chaque jour pour chaque worker.

Pour minimiser l’impact sur les performances, chaque worker écrit les journaux accumulés toutes les 60 secondes, ou chaque fois que 4 mégaoctets sont accumulés (ce qui se produit souvent lorsqu’il y a une rafale d’activité). Cela est écrit dans un bloc de stockage Azure Blob4, et chaque worker a son propre blob dédié afin d’éviter d’avoir à prendre en charge plusieurs écrivains sur un seul blob.

Nous avons ensuite d’autres machines (en dehors de l’environnement de production d’Envision) qui peuvent lire ces blobs de journal après coup et compiler des statistiques détaillées sur ce qui s’est passé sur le cluster.

Petite publicité : nous recrutons des ingénieurs logiciels. Le travail à distance est possible.


  1. Cela peut sembler gaspiller de la bande passante, mais considérez que chaque identifiant de thunk pèse 24 octets et qu’il y a jusqu’à 32 thunks par travailleur, donc chaque mise à jour ne prend que 768 octets - moins qu’un paquet TCP ! ↩︎

  2. Bien que, en termes de performance, la couche atomique soit bien plus difficile. ↩︎

  3. La couche de métadonnées est essentiellement un énorme vecteur d’horloge, où les horloges sont conservées par thunk plutôt que par travailleur. ↩︎

  4. Pourquoi pas des Append Blobs ? Eh bien, les blocs de stockage et les Append Blobs ont tous deux des problèmes de performances majeurs lors de la lecture d’un fichier composé de nombreuses petites écritures : les performances de lecture passent d’environ 60 Mo/s pour un blob normal à moins de 2 Mo/s ! Un blob de journal de 5 Go prend environ 40 minutes à lire à ce rythme. Nous avons contacté Microsoft à ce sujet, mais il n’y a pas de plans pour le résoudre. Pour contourner ce problème, nous nous appuyons sur le fait qu’un bloc de stockage peut être recompacté manuellement (prendre les 1000 dernières petites écritures, les effacer du blob et les réécrire en une seule grande écriture), tandis qu’un Append Blob ne peut pas être modifié de cette manière. ↩︎