Cet article est le quatrième d’une série en quatre volets sur le fonctionnement interne de la machine virtuelle Envision : le logiciel qui exécute les scripts Envision. Voir partie 1, partie 2 et partie 3. Cette série ne traite pas du compilateur Envision (peut-être une autre fois), alors supposons simplement que le script a d’une manière ou d’une autre été converti en bytecode que la machine virtuelle Envision prend en entrée.

Les articles précédents examinaient principalement comment les travailleurs individuels exécutaient les scripts Envision. Cependant, tant pour la résilience que pour la performance, Envision est en réalité exécuté sur un cluster de machines.

Chaque couche d’un travailleur communique avec la même couche chez les autres travailleurs, ou avec d’autres couches dans le même travailleur. Cela garantit que la communication réseau peut rester un détail d’implémentation privé de chaque couche.

À un niveau bas, chaque travailleur ouvre deux connexions TLS vers chaque autre machine du cluster, et les communications des différentes couches sont multiplexées via ces deux connexions (une connexion est utilisée pour les messages courts, l’autre pour les transferts de données volumineux).

Exécution distribuée abstraite

Couche de contrôle

Cette couche est utilisée par le planificateur pour assigner et désassigner des missions aux travailleurs et n’implique aucune communication entre travailleurs. Les messages principaux de cette couche sont :

  • Le planificateur demande au travailleur de commencer à travailler sur une mission.
  • Le planificateur demande au travailleur d’arrêter de travailler sur une mission.
  • Le travailleur informe le planificateur qu’il a rencontré une erreur catastrophique lors de l’exécution d’une mission (généralement un problème non déterministe, tel qu’un disque NVMe a pris feu, ce qui signifie que la même mission peut être réessayée dans le futur ou sur un autre travailleur).
  • Le travailleur fournit au planificateur des statistiques sur son état actuel : la liste des missions, la taille de la frontière du DAG de chaque mission, le nombre total de thunks restant à être exécutés dans le DAG de chaque mission.

Le planificateur utilise ces statistiques pour décider quand réaffecter les missions. Les règles réelles pour ce faire sont assez complexes, car elles dépendent des règles de priorité, de l’équité entre plusieurs locataires et entre les scripts d’un même locataire, ainsi que de la charge globale du cluster à ce moment-là, mais la tendance générale est que les missions dont la frontière est suffisamment grande peuvent être réparties sur plusieurs travailleurs, à condition que ceux-ci ne soient pas déjà surchargés. Étant donné la même quantité de travail à effectuer, il est plus efficace d’exécuter quatre missions sur un seul travailleur chacune, que de les répartir sur tous les travailleurs.

Couche d’exécution

Chaque travailleur suit quels thunks il est en train d’exécuter, et il diffuse cette liste aux autres travailleurs à chaque fois qu’il planifie un nouveau thunk1. Cela garantit que, en dehors de la très courte fenêtre liée à la latence réseau, deux travailleurs ne vont pas commencer à exécuter le même thunk.

Bien sûr, si un travailleur cesse d’envoyer ces mises à jour (par exemple, parce qu’il s’est planté ou qu’il est devenu déconnecté du reste du cluster), ses homologues considéreront toute liste de plus de quelques secondes comme périmée, et se permettront d’exécuter ces thunks.

Couche de métadonnées

Chaque travailleur essaie de garder une copie complète des métadonnées, mais ne les synchronise pas réellement. Nous avons choisi de ne fournir aucune garantie que tous les travailleurs soient en accord sur l’intégralité des métadonnées, et travaillons plutôt avec des garanties de cohérence éventuelle. Cela fait de la distribution de la couche de métadonnées l’un des aspects les plus difficiles en termes de conception2.

La cohérence éventuelle de cette couche suit trois règles principales :

  1. Chaque modification locale de la couche de métadonnées est immédiatement diffusée à tous les autres travailleurs. Cette diffusion peut échouer, et ne sera pas tentée de nouveau.
  2. Les modifications distantes reçues des autres travailleurs sont fusionnées dans la couche de métadonnées locale, sur la base d’une progression monotone3 : une valeur “no result” pour un thunk peut être écrasée par une valeur “checkpoint” (signifiant que le thunk a commencé, mais n’a pas terminé son exécution), laquelle peut être écrasée par une valeur “alias” (signifiant que le thunk a renvoyé un DAG à exécuter à sa place), laquelle peut être écrasée par une valeur “result” (qui peut être soit un résultat réussi avec ses atomes associés, soit une erreur fatale).
  3. Chaque fois qu’une autre couche envoie une réponse réseau basée sur une valeur de la couche de métadonnées, celle-ci diffuse également à nouveau cette valeur.

La troisième règle est conçue pour imposer un niveau de synchronisation lorsqu’il est réellement pertinent. Par exemple, considérez la séquence d’événements suivante :

  • Le planificateur demande à un travailleur d’exécuter une mission (via la couche de contrôle)
  • Le travailleur exécute la mission et diffuse le résultat (via la couche de métadonnées), mais le message est perdu en route vers le planificateur.
  • Le planificateur constate que le travailleur n’exécute plus la mission (via la couche de contrôle) et lui demande de la relancer.
  • Le travailleur remarque que le thunk de la mission a déjà un résultat dans la couche de métadonnées, et ne fait rien, car rien n’est à faire.

C’est un interblocage où le planificateur et le travailleur ne sont pas d’accord sur l’état d’un thunk dans la couche de métadonnées (le travailleur pense qu’il est terminé, le planificateur pense le contraire). La troisième règle résout ce problème en décidant que, puisque la réponse du travailleur de “je ne travaille plus sur cette mission” est basée sur son observation que le thunk a un résultat, la couche de métadonnées doit diffuser à nouveau cette information. L’interblocage est alors résolu :

  • La couche de métadonnées du travailleur diffuse à nouveau le résultat du thunk, et il est reçu par le planificateur.
  • Le planificateur réagit à l’apparition d’un résultat pour le thunk d’une mission, en signalant que la mission est terminée, et en notifiant le client qui avait demandé cette mission.

Couche d’atomes

Les travailleurs combinent leurs couches d’atomes pour créer un magasin de blobs distribué, où chaque atome peut être demandé via son identifiant—le hash 128 bits de son contenu, créé avec SpookyHash. Ce n’est pas une table de hachage distribuée (DHT), car cela offrirait des compromis inadaptés : dans une DHT, trouver un atome serait rapide (étant donné son hash, l’identifiant du travailleur le détenant peut être calculé par une fonction simple), mais écrire un atome serait lent (il devrait être envoyé de la machine qui l’a calculé à la machine censée le détenir selon la configuration actuelle de la DHT). Étant donné que la plupart des atomes sont destinés à être consommés sur la même machine qui les a produits, cela est contre-productif.

À la place, chaque fois qu’un travailleur demande un atome à partir de sa propre couche d’atomes, il recherche d’abord cet atome sur ses propres disques NVMe. S’il n’est pas trouvé, alors les autres travailleurs sont sollicités pour vérifier l’existence de cet atome. C’est le plus grand défi de performance de la conception distribuée d’Envision, puisque ces requêtes doivent être achevées aussi rapidement que possible, et une stratégie complexe de délai d’attente est nécessaire pour gérer les travailleurs non réactifs : attendre trop longtemps et vous aurez gaspillé des secondes à attendre une réponse qui ne viendra jamais ; abandonner trop tôt, et vous devrez recalculer un atome qui aurait pu être téléchargé depuis un autre travailleur.

Pour faciliter cela, la couche d’atomes regroupe également plusieurs requêtes ensemble, afin de s’assurer que tous les autres travailleurs maintiennent une file complète de requêtes auxquelles ils doivent répondre, et pour détecter plus facilement lorsque les temps de réponse d’un travailleur montent subitement en flèche.

Une fois qu’au moins un autre travailleur a confirmé l’existence de l’atome sur son disque, une deuxième requête est envoyée pour télécharger l’atome. Ces requêtes de téléchargement ont tendance à être très irrégulières, puisque de nombreux thunks demandent d’abord leurs atomes, puis commencent à traiter leur contenu. Pour cette raison, la couche d’atomes sait qu’il existe une seule file de téléchargement pour chaque paire de travailleurs, et ne panique pas si une requête d’atome donnée ne reçoit pas son premier octet pendant plusieurs secondes (si la file est pleine et que d’autres atomes reçoivent leurs octets, alors il n’y a aucune raison de s’inquiéter). En un sens, le délai d’attente ne se situe pas au niveau de la requête d’atome, mais au niveau de l’ensemble de la couche.

De plus, deux optimisations sont appliquées à la file de transfert :

  1. Chaque requête spécifie quel thunk a besoin des données, de sorte que l’émetteur tentera de regrouper les requêtes provenant du même thunk (plus un thunk est débloqué rapidement, plus vite il pourra commencer à traiter ses entrées).
  2. Lorsque l’exécution d’un thunk est annulée (en raison d’une erreur, d’un changement de priorité, ou parce qu’il est découvert qu’un autre travailleur l’a déjà terminé), la couche d’atomes communique cette annulation afin que toutes les requêtes de ce thunk puissent être supprimées de la file de téléchargement.

Un travailleur typique enverra des données par rafales de 1 Go/s, couvrant généralement 7 Go de données par rafale.

Couche de journalisation

Cette couche conserve des informations supplémentaires sur l’état de l’exécution, afin de pouvoir être examinée par la suite pour enquêter sur les problèmes ou mesurer la performance. Elle est très détaillée, contenant des informations telles que quels thunks ont été exécutés, combien de temps ils ont mis à s’exécuter et quel type de résultat ils ont produit. Des événements importants, tels que la construction d’un nouveau DAG (y compris le DAG sérialisé lui-même), ou la découverte qu’un atome manque, sont également consignés. Au total, plusieurs gigaoctets sont générés chaque jour pour chaque travailleur.

Pour minimiser l’impact sur la performance, chaque travailleur écrit les journaux accumulés toutes les 60 secondes, ou chaque fois que 4 mégaoctets sont accumulés (ce qui se produit souvent lors d’une rafale d’activité). Cela est écrit sur un blob de type Block Blob d’Azure Blob Storage4, et chaque travailleur possède son propre blob dédié afin d’éviter de devoir supporter plusieurs écrivains sur un seul blob.

Nous disposons ensuite d’autres machines (en dehors de l’environnement de production Envision) qui peuvent lire ces blobs de journaux après coup, et compiler des statistiques détaillées sur ce qui s’est passé dans le cluster.

Petit coup de pub sans complexe : nous recrutons des ingénieurs logiciels. Le travail à distance est possible.


  1. Cela peut sembler être un gaspillage en termes de bande passante, mais considérez que chaque identifiant de thunk pèse 24 octets, et qu’il y a jusqu’à 32 thunks par travailleur, donc chaque mise à jour ne prend que 768 octets—moins qu’un paquet TCP! ↩︎

  2. Bien que, en termes de performance, la couche d’atomes soit bien plus complexe. ↩︎

  3. La couche de métadonnées est essentiellement une énorme horloge vectorielle, où les horloges sont maintenues par thunk plutôt que par travailleur. ↩︎

  4. Pourquoi ne pas utiliser Append Blobs ? Eh bien, aussi bien les Block Blobs que les Append Blobs présentent d’importants problèmes de performance lors de la lecture d’un fichier composé de nombreux petits écrits : la performance de lecture chute d’environ ~60MB/s pour un blob normal, à moins de ~2MB/s ! Un blob de journalisation de 5GB prend environ 40 minutes à être lu à ce rythme. Nous avons contacté Microsoft à propos de ce problème, mais il n’est pas prévu de le corriger. Pour contourner ce problème, nous nous appuyons sur le fait qu’un Block Blob peut être recompilé manuellement (prendre les 1000 derniers petits écrits, les effacer du blob, et les réécrire en une seule grande écriture), alors qu’un Append Blob ne peut pas être modifié de cette manière. ↩︎