Dieser Artikel ist der vierte Teil einer vierteiligen Reihe über das Innenleben der Envision Virtual Machine: die Software, die Envision-Skripte ausführt. Siehe Teil 1, Teil 2 und Teil 3. Diese Serie behandelt nicht den Envision-Compiler (vielleicht ein andermal), daher nehmen wir einfach an, dass das Skript irgendwie in den Bytecode umgewandelt wurde, den die Envision Virtual Machine als Eingabe verwendet.

Die vorherigen Artikel untersuchten größtenteils, wie einzelne Worker Envision-Skripte ausführten. Allerdings wird Envision sowohl aus Gründen der Resilienz als auch der Performance tatsächlich auf einem Cluster von Maschinen ausgeführt.

Jede Ebene in einem Worker kommuniziert mit derselben Ebene in den anderen Workern oder mit anderen Ebenen im selben Worker. Dies stellt sicher, dass die Netzkommunikation ein privates Implementierungsdetail jeder Ebene bleibt.

Auf niedriger Ebene öffnet jeder Worker zwei TLS-Verbindungen zu jeder anderen Maschine im Cluster, und die Kommunikation der verschiedenen Ebenen wird über diese beiden Verbindungen multiplexiert (eine Verbindung wird für kurze Nachrichten verwendet, die andere für große Datenübertragungen).

Abstrakte verteilte Ausführung

Steuerungsebene

Diese Ebene wird vom Scheduler verwendet, um den Workern Aufträge zuzuweisen und diese wieder zu entziehen, und beinhaltet keine Kommunikation zwischen den Workern. Die wichtigsten Nachrichten dieser Ebene sind:

  • Der Scheduler fordert einen Worker auf, mit der Bearbeitung eines Auftrags zu beginnen.
  • Der Scheduler fordert einen Worker auf, die Bearbeitung eines Auftrags einzustellen.
  • Ein Worker teilt dem Scheduler mit, dass er während der Ausführung eines Auftrags einen katastrophalen Fehler festgestellt hat (in der Regel ein nicht-deterministisches Problem, wie z. B. “NVMe drive caught on fire”, was bedeutet, dass derselbe Auftrag in Zukunft oder auf einem anderen Worker erneut versucht werden kann).
  • Ein Worker übermittelt dem Scheduler Statistiken über seinen aktuellen Zustand: eine Liste der Aufträge, die Größe der Frontier des DAGs jedes Auftrags und die Gesamtzahl der Thunks, die im DAG jedes Auftrags noch ausgeführt werden müssen.

Der Scheduler nutzt diese Statistiken, um zu entscheiden, wann Aufträge neu zugewiesen werden. Die tatsächlichen Regeln dafür sind recht komplex, da sie von Prioritätsregeln, Fairness zwischen mehreren Mandanten und zwischen Skripten desselben Mandanten sowie von der momentanen Gesamtauslastung des Clusters abhängen. Der allgemeine Trend ist jedoch, dass Aufträge mit einer ausreichend großen Frontier auf mehrere Worker verteilt werden können, solange diese Worker nicht bereits überlastet sind. Bei derselben Menge an Arbeit ist es effizienter, vier Aufträge jeweils auf einem einzelnen Worker auszuführen, als alle über alle Worker zu verteilen.

Ausführungsebene

Jeder Worker behält den Überblick darüber, welche Thunks er aktuell ausführt, und sendet diese Liste an die anderen Worker, jedes Mal, wenn ein neuer Thunk geplant wird1. Dies stellt sicher, dass abgesehen von dem sehr kurzen Zeitfenster, das mit der Netzwerklatenz zusammenhängt, zwei Worker nicht gleichzeitig denselben Thunk ausführen.

Natürlich, wenn ein Worker aufhört, diese Updates zu senden (zum Beispiel, weil er abgestürzt ist oder die Verbindung zum Rest des Clusters verloren hat), werden seine Kollegen jede Liste, die älter als ein paar Sekunden ist, als veraltet behandeln und sich erlauben, diese Thunks auszuführen.

Metadaten-Ebene

Jeder Worker versucht, eine Kopie der vollständigen Metadaten zu behalten, jedoch wird keine echte Synchronisation durchgeführt. Wir haben uns entschieden, keine Garantie dafür zu geben, dass alle Worker über exakt dieselben Metadaten verfügen, sondern stattdessen mit Eventual-Consistency-Garantien zu arbeiten. Dies macht die Verteilung der Metadatenebene zur größten Herausforderung in Bezug auf das Design2.

Die Eventual-Consistency dieser Ebene folgt drei Hauptregeln:

  1. Jede lokale Änderung an der Metadaten-Ebene wird sofort an alle anderen Worker übertragen. Diese Übertragung kann fehlschlagen und wird nicht erneut versucht.
  2. Remote Änderungen, die von anderen Workern empfangen werden, werden in die lokale Metadaten-Ebene integriert, basierend auf einem monotonen Fortschritt3: Ein “no result”-Wert für einen Thunk kann durch einen “checkpoint”-Wert überschrieben werden (was bedeutet, dass der Thunk begonnen hat, aber noch nicht beendet ist), welcher durch einen “alias”-Wert überschrieben werden kann (was bedeutet, dass der Thunk einen DAG zurückgegeben hat, der an seiner Stelle ausgeführt werden soll), welcher durch einen “result”-Wert überschrieben werden kann (welcher entweder ein erfolgreiches Ergebnis mit den zugehörigen Atomen oder einen fatalen Fehler darstellen kann).
  3. Immer wenn eine andere Ebene eine Netzwerkantwort basierend auf einem Wert in der Metadaten-Ebene sendet, überträgt die Metadaten-Ebene diesen Wert ebenfalls erneut.

Die dritte Regel ist darauf ausgelegt, ein Maß an Synchronisierung zu erzwingen, wenn dies tatsächlich relevant ist. Betrachten Sie zum Beispiel die folgende Ereignisfolge:

  • Der Scheduler fordert einen Worker auf, einen Auftrag auszuführen (über die Steuerungsebene)
  • Der Worker führt den Auftrag aus und sendet das Ergebnis (über die Metadaten-Ebene), aber die Nachricht geht auf dem Weg zum Scheduler verloren.
  • Der Scheduler bemerkt, dass der Worker den Auftrag nicht mehr ausführt (über die Steuerungsebene), und fordert ihn auf, ihn erneut auszuführen.
  • Der Worker stellt fest, dass der Thunk des Auftrags bereits ein Ergebnis in der Metadaten-Ebene hat, und unternimmt nichts, da nichts zu tun ist.

Dies ist ein Deadlock, bei dem sich der Scheduler und der Worker über den Zustand eines Thunks in der Metadaten-Ebene uneinig sind (der Worker ist der Ansicht, dass er abgeschlossen ist, der Scheduler hingegen nicht). Die dritte Regel löst dieses Problem, indem entschieden wird, dass, da die Antwort des Workers «Ich arbeite nicht mehr an diesem Auftrag» und dessen Beobachtung, dass der Thunk ein Ergebnis hat, darauf hinweisen, dass die Metadaten-Ebene diese Information erneut übertragen sollte. Der Deadlock wird dann aufgelöst:

  • Die Metadaten-Ebene des Workers überträgt das Thunk-Ergebnis erneut, und es wird vom Scheduler empfangen.
  • Der Scheduler reagiert auf das Erscheinen eines Ergebnisses für den Thunk eines Auftrags, indem er diesen Auftrag als abgeschlossen markiert und den anfragenden Client benachrichtigt.

Atom-Ebene

Die Worker kombinieren ihre Atom-Ebenen, um einen verteilten Blob-Store zu erstellen, in dem jedes Atom anhand seines Identifikators angefordert werden kann – dem 128-Bit-Hash seines Inhalts, erstellt mit SpookyHash. Dies ist kein Distributed Hash Table (DHT), da dies die falschen Kompromisse bieten würde: In einem DHT wäre das Auffinden eines Atoms schnell (anhand des Hashs könnte der Identifikator des es haltenden Workers mit einer einfachen Funktion berechnet werden), aber das Schreiben eines Atoms wäre langsam (es müsste von der Maschine, die es berechnet hat, an die Maschine gesendet werden, die es gemäß der aktuellen DHT-Konfiguration halten soll). Da die meisten Atome voraussichtlich auf derselben Maschine verbraucht werden, die sie produziert hat, ist dies verschwenderisch.

Stattdessen sucht ein Worker, wann immer er ein Atom aus seiner eigenen Atom-Ebene anfordert, zunächst auf seinen eigenen NVMe-Laufwerken danach. Falls es dort nicht gefunden wird, werden die anderen Worker nach der Existenz dieses Atoms befragt. Dies ist die größte Performance-Herausforderung des verteilten Designs von Envision, da diese Anfragen so schnell wie möglich abgeschlossen werden müssen und eine komplexe Timeout-Strategie notwendig ist, um mit nicht reagierenden Workern umzugehen: Wartet man zu lange, hat man Sekunden verschwendet, in denen auf eine Antwort gewartet wurde, die nie kam; gibt man zu früh auf, muss ein Atom neu berechnet werden, das von einem anderen Worker hätte heruntergeladen werden können.

Um dabei zu helfen, fasst die Atom-Ebene auch mehrere Anfragen zusammen, um sicherzustellen, dass alle anderen Worker eine volle Pipeline von Anfragen behalten, die sie beantworten müssen, und um leichter feststellen zu können, wann die Antwortzeiten eines Workers plötzlich ansteigen.

Sobald mindestens ein weiterer Worker die Existenz des Atoms auf seiner Festplatte bestätigt hat, wird eine zweite Anfrage zum Herunterladen des Atoms gesendet. Solche Download-Anfragen neigen dazu, sehr sprunghaft zu sein, da viele Thunks zuerst ihre Atome anfordern und dann mit der Verarbeitung ihres Inhalts beginnen. Deshalb ist sich die Atom-Ebene bewusst, dass es für jedes Worker-Paar eine einzelne Download-Warteschlange gibt, und gerät nicht in Panik, wenn eine bestimmte Atom-Anfrage mehrere Sekunden lang nicht das erste Byte erhält (wenn die Warteschlange voll ist und andere Atome ihre Bytes erhalten, gibt es keinen Grund zur Sorge). In gewissem Sinne erfolgt das Timeout nicht auf der Ebene der Atom-Anfrage, sondern auf der Ebene der gesamten Schicht.

Zusätzlich werden zwei Optimierungen auf die Übertragungswarteschlange angewendet:

  1. Jede Anfrage gibt an, welcher Thunk die Daten benötigt, sodass der Sender versucht, Anfragen desselben Thunks zusammenzufassen (je schneller ein bestimmter Thunk freigegeben wird, desto schneller kann er mit der Verarbeitung seiner Eingaben beginnen).
  2. Wenn die Ausführung eines Thunks abgebrochen wird (aufgrund eines Fehlers, einer Prioritätsänderung oder weil festgestellt wird, dass ein anderer Worker ihn bereits abgeschlossen hat), teilt die Atom-Ebene diesen Abbruch mit, damit alle Anfragen dieses Thunks aus der Download-Warteschlange entfernt werden können.

Ein typischer Worker wird Daten in Bursts von 1GB/s versenden, normalerweise mit etwa 7GB Daten pro Burst.

Logging-Ebene

Diese Ebene speichert zusätzliche Informationen über den Ausführungszustand, sodass diese im Nachhinein zur Problemuntersuchung oder Leistungsüberprüfung herangezogen werden können. Sie ist sehr detailliert und enthält Informationen wie welche Thunks ausgeführt wurden, wie lange sie zur Ausführung benötigten und welche Art von Ergebnis sie lieferten. Wichtige Ereignisse, wie etwa der Aufbau eines neuen DAGs (einschließlich des serialisierten DAGs selbst) oder die Entdeckung, dass ein Atom fehlt, werden ebenfalls protokolliert. Insgesamt werden täglich mehrere Gigabyte an Daten für jeden Worker erzeugt.

Um die Performance-Auswirkungen zu minimieren, schreibt jeder Worker die angesammelten Logs alle 60 Sekunden oder sobald 4 Megabyte angesammelt sind (was häufig bei einem Aktivitätsanstieg der Fall ist). Dies wird in einem Azure Blob Storage Block Blob4 gespeichert, und jeder Worker hat seinen eigenen dedizierten Blob, um mehrere Schreiber auf einem einzigen Blob zu vermeiden.

Anschließend gibt es weitere Maschinen (außerhalb der Envision-Produktionsumgebung), die diese Log-Blobs nachträglich auslesen und detaillierte Statistiken darüber erstellen können, was im Cluster geschehen ist.

Werbung ohne Scheu: Wir stellen Software Engineers ein. Remote-Arbeit ist möglich.


  1. Dies mag in Bezug auf die Bandbreite verschwenderisch erscheinen, aber bedenke, dass jeder Thunk-Identifier 24 Byte wiegt und es bis zu 32 Thunks pro Worker gibt, sodass jedes Update nur 768 Byte benötigt — weniger als ein TCP-Paket! ↩︎

  2. Obwohl in Bezug auf die Performance die Atom-Ebene weitaus herausfordernder ist. ↩︎

  3. Die Metadaten-Ebene ist im Grunde eine riesige Vektor-Uhr, bei der die Uhren nicht pro Worker, sondern pro Thunk geführt werden. ↩︎

  4. Warum keine Append Blobs? Nun, sowohl Block Blobs als auch Append Blobs haben enorme Performance-Probleme beim Lesen einer Datei, die aus vielen kleinen Schreibvorgängen besteht: Die Lesegeschwindigkeit fällt von ca. 60MB/s bei einem normalen Blob auf unter 2MB/s! Ein 5GB Log-Blob benötigt bei dieser Geschwindigkeit etwa 40 Minuten zum Lesen. Wir haben Microsoft bezüglich dieses Problems kontaktiert, jedoch gibt es keine Pläne, es zu beheben. Um dieses Problem zu umgehen, verlassen wir uns darauf, dass ein Block Blob manuell neu kompakteriert werden kann (die letzten 1000 kleinen Schreibvorgänge werden zusammengeführt, aus dem Blob gelöscht und als ein einzelner großer Schreibvorgang wieder geschrieben), während ein Append Blob nicht in dieser Weise modifiziert werden kann. ↩︎