Dieser Artikel ist der vierte Teil einer vierteiligen Serie über die Funktionsweise der Envision-Virtual Machine: die Software, die Envision-Skripte ausführt. Siehe Teil 1, Teil 2 und Teil 3. Diese Serie behandelt nicht den Envision-Compiler (vielleicht ein anderes Mal), daher nehmen wir einfach an, dass das Skript irgendwie in den Bytecode umgewandelt wurde, den die Envision-Virtual Machine als Eingabe akzeptiert.

Die vorherigen Artikel haben hauptsächlich untersucht, wie einzelne Arbeiter Envision-Skripte ausführen. Envision wird jedoch sowohl aus Gründen der Robustheit als auch der Leistung tatsächlich auf einem Cluster von Maschinen ausgeführt.

Jede Schicht in einem Arbeiter kommuniziert mit derselben Schicht in den anderen Arbeitern oder mit anderen Schichten im selben Arbeiter. Dadurch kann die Netzwerkkommunikation ein privates Implementierungsdetail jeder Schicht bleiben.

Auf niedriger Ebene öffnet jeder Arbeiter zwei TLS-Verbindungen zu jeder anderen Maschine im Cluster, und die Kommunikation der verschiedenen Schichten wird über diese beiden Verbindungen multiplexiert (eine Verbindung wird für kurze Nachrichten verwendet, die andere für große Datentransfers).

Abstrakte verteilte Ausführung

Kontrollschicht

Diese Schicht wird vom Scheduler verwendet, um Missionen den Arbeitern zuzuweisen und sie wieder freizugeben, und beinhaltet keine Kommunikation zwischen den Arbeitern. Die Hauptnachrichten dieser Schicht sind:

  • Der Scheduler fordert den Arbeiter auf, mit der Arbeit an einer Mission zu beginnen.
  • Der Scheduler fordert den Arbeiter auf, die Arbeit an einer Mission zu beenden.
  • Der Arbeiter teilt dem Scheduler mit, dass während der Ausführung einer Mission ein katastrophaler Fehler aufgetreten ist (normalerweise ein nichtdeterministisches Problem wie “NVMe-Laufwerk ist in Brand geraten”), was bedeutet, dass dieselbe Mission in Zukunft oder auf einem anderen Arbeiter erneut versucht werden kann).
  • Der Arbeiter gibt dem Scheduler Statistiken über seinen aktuellen Zustand: Liste der Missionen, Größe der Frontier jedes Mission-DAGs, Gesamtzahl der noch auszuführenden Thunks in jedem Mission-DAG.

Der Scheduler verwendet diese Statistiken, um zu entscheiden, wann Missionen erneut zugewiesen werden sollen. Die tatsächlichen Regeln dafür sind ziemlich komplex, da sie von Prioritätsregeln, Fairness zwischen mehreren Mietern und zwischen Skripten desselben Mieters sowie von der Gesamtlast des Clusters zu diesem Zeitpunkt abhängen. Die allgemeine Tendenz besteht jedoch darin, dass Missionen mit einer ausreichend großen Frontier auf mehrere Arbeiter verteilt werden können, solange diese Arbeiter nicht bereits überlastet sind. Bei gleicher Arbeitsmenge ist es effizienter, vier Missionen jeweils auf einem einzigen Arbeiter auszuführen, als sie auf alle Arbeiter zu verteilen.

Ausführungsschicht

Jeder Arbeiter behält den Überblick darüber, welche Thunks er gerade ausführt, und sendet diese Liste jedes Mal, wenn er einen neuen Thunk plant1. Dadurch starten zwei Arbeiter außerhalb des sehr kurzen Zeitfensters in Bezug auf die Netzwerklatenz nicht mit der Ausführung desselben Thunks.

Natürlich behandeln die Peers, wenn ein Arbeiter aufhört, diese Updates zu senden (zum Beispiel, weil er abgestürzt ist oder von dem Rest des Clusters getrennt ist), jede Liste, die älter als wenige Sekunden ist, als veraltet und erlauben sich, diese Thunks auszuführen.

Metadatenschicht

Jeder Arbeiter versucht, eine Kopie der vollständigen Metadaten zu behalten, synchronisiert sie jedoch nicht tatsächlich. Wir haben uns dafür entschieden, keine Garantie dafür zu geben, dass alle Arbeiter die genau gleichen Metadaten haben, und stattdessen mit Garantien für eine eventuelle Konsistenz zu arbeiten. Dies macht die Verteilung der Metadatenschicht in Bezug auf das Design am herausforderndsten2.

Die eventuelle Konsistenz dieser Schicht folgt drei Hauptregeln:

  1. Jede lokale Änderung an der Metadatenschicht wird sofort an alle anderen Arbeiter übertragen. Diese Übertragung kann fehlschlagen und wird nicht erneut versucht.
  2. Von anderen Arbeitern empfangene Remote-Änderungen werden in die lokale Metadatenschicht zusammengeführt, basierend auf einer monotonen Progression3: Ein “kein Ergebnis”-Wert für einen Thunk kann von einem “Checkpoint”-Wert überschrieben werden (was bedeutet, dass der Thunk gestartet, aber nicht fertig ausgeführt wurde), der von einem “Alias”-Wert überschrieben werden kann (was bedeutet, dass der Thunk einen DAG zurückgegeben hat, der an seiner Stelle ausgeführt werden soll), der von einem “Ergebnis”-Wert überschrieben werden kann (der entweder ein erfolgreiches Ergebnis mit seinen zugehörigen Atomen oder einen schwerwiegenden Fehler sein kann).
  3. Immer wenn eine andere Schicht eine Netzwerkantwort auf der Grundlage eines Werts in der Metadatenschicht sendet, überträgt die Metadatenschicht diesen Wert erneut.

Die dritte Regel ist darauf ausgelegt, eine Synchronisierungsebene zu erzwingen, wenn sie tatsächlich relevant ist. Betrachten Sie zum Beispiel die folgende Ereignissequenz:

  • Der Scheduler fordert einen Arbeiter auf, eine Mission auszuführen (über die Steuerungsschicht).
  • Der Arbeiter führt die Mission aus und sendet das Ergebnis (über die Metadatenschicht), aber die Nachricht geht auf dem Weg zum Scheduler verloren.
  • Der Scheduler bemerkt, dass der Arbeiter die Mission nicht mehr ausführt (über die Steuerungsschicht) und fordert ihn auf, sie erneut auszuführen.
  • Der Arbeiter stellt fest, dass der Thunk der Mission bereits ein Ergebnis in der Metadatenschicht hat und nichts tut, weil nichts getan werden muss.

Hierbei handelt es sich um eine Deadlock-Situation, bei der der Scheduler und der Arbeiter über den Zustand eines Thunks in der Metadatenschicht nicht übereinstimmen (der Arbeiter glaubt, er sei fertig, der Scheduler glaubt, er sei es nicht). Die dritte Regel löst dies auf, indem sie entscheidet, dass die Metadatenschicht diese Information erneut übertragen sollte, da die Antwort des Arbeiters “Ich arbeite nicht mehr an dieser Mission” auf der Beobachtung des Arbeiters beruht, dass der Thunk ein Ergebnis hat. Der Deadlock wird dann aufgelöst:

  • Die Metadatenschicht des Arbeiters überträgt das Thunk-Ergebnis erneut, und es wird vom Scheduler empfangen.
  • Der Scheduler reagiert auf das Auftreten eines Ergebnisses für einen Thunk einer Mission, indem er diese Mission als abgeschlossen markiert und den Client benachrichtigt, der diese Mission angefordert hat.

Atomschicht

Die Arbeiter kombinieren ihre Atomschichten, um einen verteilten Blob-Speicher zu erstellen, in dem jedes Atom anhand seines Identifikators - dem 128-Bit-Hash seines Inhalts, der mit SpookyHash erstellt wurde - angefordert werden kann. Dies ist keine verteilte Hashtabelle (DHT), da dies die falschen Kompromisse bieten würde: In einer DHT wäre das Auffinden eines Atoms schnell (unter Verwendung seines Hashs kann der Identifikator des Arbeiters, der es enthält, mit einer einfachen Funktion berechnet werden), aber das Schreiben eines Atoms wäre langsam (es müsste von der Maschine, die es berechnet hat, an die Maschine gesendet werden, die es gemäß der aktuellen Layout der DHT halten soll). Da die meisten Atome auf der gleichen Maschine verbraucht werden sollen, auf der sie erzeugt wurden, ist dies verschwenderisch.

Stattdessen sucht ein Arbeiter, wenn er ein Atom aus seiner eigenen Atomschicht anfordert, zuerst nach diesem Atom auf seinen eigenen NVMe-Laufwerken. Wenn es nicht gefunden wird, werden die anderen Arbeiter nach dem Vorhandensein dieses Atoms befragt. Dies ist die größte Herausforderung der verteilten Gestaltung von Envision in Bezug auf die Leistung, da diese Abfragen so schnell wie möglich abgeschlossen werden müssen und eine komplexe Timeout-Strategie erforderlich ist, um nicht reagierende Arbeiter zu behandeln: Warten Sie zu lange, und Sie haben Sekunden verschwendet, um auf eine Antwort zu warten, die nie gekommen ist; geben Sie zu früh auf, und Sie müssen ein Atom neu berechnen, das von einem anderen Arbeiter heruntergeladen werden konnte.

Um dies zu erleichtern, gruppiert die Atomschicht auch mehrere Anfragen zusammen, um sicherzustellen, dass alle anderen Arbeiter eine volle Pipeline von Anfragen haben, die sie beantworten müssen, und um leichter zu erkennen, wenn die Antwortzeiten eines Arbeiters plötzlich ansteigen.

Sobald mindestens ein anderer Arbeiter die Existenz des Atoms auf seiner Festplatte bestätigt hat, wird eine zweite Anfrage zum Herunterladen des Atoms gesendet. Solche Download-Anfragen neigen dazu, sehr sprunghaft zu sein, da viele Thunks zuerst ihre Atome anfordern und dann mit der Verarbeitung ihrer Inhalte beginnen. Aus diesem Grund ist sich die Atomschicht bewusst, dass es für jedes Paar von Arbeitern eine einzelne Download-Warteschlange gibt und keine Panik auslöst, wenn eine bestimmte Atom-Anfrage mehrere Sekunden lang kein erstes Byte empfängt (wenn die Warteschlange voll ist und andere Atome ihre Bytes empfangen, gibt es nichts zu befürchten). In gewisser Weise liegt das Timeout nicht auf der Ebene der Atom-Anfrage, sondern auf der Ebene der gesamten Schicht.

Darüber hinaus gibt es zwei Optimierungen, die auf die Übertragungswarteschlange angewendet werden:

  1. Jede Anfrage gibt an, welcher Thunk die Daten benötigt, damit der Sender versucht, Anfragen desselben Thunks zu gruppieren (je schneller ein bestimmter Thunk freigegeben wird, desto schneller kann er mit der Verarbeitung seiner Eingaben beginnen).
  2. Wenn die Ausführung eines Thunks abgebrochen wird (aufgrund eines Fehlers, aufgrund einer Änderung der Priorität oder weil festgestellt wird, dass ein anderer Arbeiter ihn bereits abgeschlossen hat), kommuniziert die Atomschicht diese Stornierung, damit alle Anfragen dieses Thunks aus der Download-Warteschlange entfernt werden können.

Ein typischer Arbeiter sendet Daten in Schüben von 1 GB/s aus und deckt normalerweise 7 GB Daten pro Schub ab.

Protokollierungsschicht

Diese Schicht enthält zusätzliche Informationen zum Zustand der Ausführung, damit sie später zur Untersuchung von Problemen oder zur Leistungsmessung überprüft werden können. Sie ist sehr detailliert und enthält Informationen wie welche Thunks ausgeführt wurden, wie lange sie zur Ausführung benötigt haben und welche Art von Ergebnis sie erzeugt haben. Wichtige Ereignisse wie der Aufbau eines neuen DAG (einschließlich des serialisierten DAG selbst) oder die Feststellung, dass ein Atom fehlt, werden ebenfalls protokolliert. Insgesamt werden für jeden Arbeiter täglich mehrere Gigabyte erzeugt.

Um die Leistungsauswirkungen zu minimieren, schreibt jeder Arbeiter die angesammelten Protokolle alle 60 Sekunden oder immer dann, wenn 4 Megabyte angesammelt sind (was häufig bei einem Aktivitätsschub der Fall ist). Dies wird in einem Azure Blob Storage Block Blob4 geschrieben, und jeder Arbeiter hat seinen eigenen dedizierten Blob, um die Unterstützung mehrerer Schreibvorgänge auf einem einzelnen Blob zu vermeiden.

Anschließend haben wir andere Maschinen (außerhalb der Envision-Produktionsumgebung), die diese Protokoll-Blobs nachträglich lesen und detaillierte Statistiken darüber erstellen können, was im Cluster passiert ist.

Unverschämte Werbung: Wir suchen Softwareingenieure. Remote-Arbeit ist möglich.


  1. Dies mag in Bezug auf die Bandbreite verschwenderisch erscheinen, aber bedenken Sie, dass jeder Thunk-Identifier 24 Bytes wiegt und es bis zu 32 Thunks pro Arbeiter gibt. Daher beträgt jedes Update nur 768 Bytes - weniger als ein TCP-Paket! ↩︎

  2. Obwohl die Atomschicht in Bezug auf die Leistung weit herausfordernder ist. ↩︎

  3. Die Metadatenschicht ist im Wesentlichen ein riesiger Vektoruhr, bei dem die Uhren pro Thunk und nicht pro Arbeiter geführt werden. ↩︎

  4. Warum keine Append Blobs? Nun, sowohl Block Blobs als auch Append Blobs haben erhebliche Leistungsprobleme beim Lesen einer Datei, die aus vielen kleinen Schreibvorgängen besteht: Die Leseleistung sinkt von ~60 MB/s für einen normalen Blob auf unter ~2 MB/s! Ein 5 GB großes Protokoll-Blob dauert bei dieser Geschwindigkeit etwa 40 Minuten zum Lesen. Wir haben Microsoft wegen dieses Problems kontaktiert, aber es gibt keine Pläne, es zu beheben. Um dieses Problem zu umgehen, verlassen wir uns darauf, dass ein Block Blob manuell neu kompaktiert werden kann (die letzten 1000 kleinen Schreibvorgänge nehmen, sie aus dem Blob löschen und sie erneut als einzelnen großen Schreibvorgang schreiben), während ein Append Blob auf diese Weise nicht geändert werden kann. ↩︎