Envision VM (Teil 2), Thunks und das Ausführungsmodell
Dieser Artikel ist der zweite Teil einer vierteiligen Serie über das Innenleben der Envision Virtual Machine – der Software, die Envision-Skripte ausführt. Siehe Teil 1, Teil 3 und Teil 4. Diese Serie behandelt nicht den Envision-Compiler (vielleicht ein andermal), also nehmen wir einfach an, dass das Skript irgendwie in den Bytecode umgewandelt wurde, den die Envision Virtual Machine als Eingabe verwendet.
Wie die meisten anderen parallelen Ausführungssysteme erzeugt Envision einen gerichteten azyklischen Graphen (DAG), in dem jeder Knoten eine Operation repräsentiert, die ausgeführt werden muss, und jede Kante eine Datenabhängigkeit, bei der der nachgelagerte Knoten die Ausgabe des vorgelagerten Knotens benötigt, um ausgeführt zu werden.

Die Knoten werden Thunks genannt, nach dem sehr ähnlichen Konzept aus Haskell und anderen Sprachen mit Lazy Evaluation.
Beispiele für Thunks, die in einem typischen Envision-Skript zu finden sind:
- Eine Eingabedatei im
.xlsx
,.csv
oder.csv.gz
Format parsen und in eine spaltige Darstellung umwandeln, die vom restlichen Skript verwendet wird. - Einen Bereich von Zeilen $M..N$ aus einer einzelnen Spalte laden; diese Spalte kann entweder aus dem Ergebnis des Parsens einer Eingabedatei (siehe oben) oder aus Lokads eigenem
.ion
-Spaltendateiformat stammen, das für die Speicherung in Microsoft Azure Blob Storage optimiert ist. - Gegeben sei ein Bereich von Zeilen $M..N$ aus einem sehr großen Vektor $A$, ein kleinerer Vektor $B$, ein Projektor $\pi$, der jede Zeile in $A$ mit einer Zeile in $B$ assoziiert, und eine Funktion $f$, berechne $f(A[l], B[\pi(l)])$. Dies wird als mapseitiger Join bezeichnet.
- Monte-Carlo-Simulation verwenden, um den Durchschnitt, die Varianz oder die Verteilung des Ergebnisses eines zufälligen Prozesses abzuschätzen. Das Ergebnis mehrerer parallel ausgeführter Monte-Carlo-Thunks kann dann von einem abschließenden Thunk kombiniert werden.
Im Allgemeinen wird erwartet, dass ein Thunk zwischen wenigen Hundert Millisekunden (bei kleinen Datenmanipulationen) und einigen Minuten (bei Monte-Carlo-Simulationen oder Gradientenabstiegsverfahren) benötigt. Dies ist eine starke Annahme: Die Envision Virtual Machine darf einen erheblichen Overhead bei der Auswertung jedes Thunks haben, in der Größenordnung von Millisekunden. Ein Skript sollte nur eine geringe Anzahl von Thunks erzeugen (zwischen 1 000 und 100 000), wobei jeder Thunk eine ziemlich große Arbeitseinheit ausführt.
Referentielle Transparenz
Thunks sind reine Funktionen: Sie sind deterministisch und können keine Nebenwirkungen haben. Sie arbeiten, indem sie ihre unveränderlichen Eingaben lesen und bei jeder Ausführung denselben Wert zurückgeben. Diese wichtige Eigenschaft hilft in vielerlei Hinsicht:
- Da die Auswertung eines Thunks keine Nebenwirkungen hat, wird sie die Auswertung eines anderen Thunks nicht beeinträchtigen, sodass alle Thunks gleichzeitig ausgeführt werden können (sofern ihre Eingaben verfügbar sind) auf mehreren CPU-Kernen oder sogar verteilt auf mehreren Arbeitern. Die Envision Virtual Machine behält den Frontier jedes Skripts im Auge (die Menge der Thunks, die ausgeführt werden können, weil alle ihre Eingaben verfügbar sind), und wählt einen neuen Thunk daraus, sobald ein CPU-Kern verfügbar wird.
- Umgekehrt ist es möglich, Thunks nacheinander auszuwerten und zum gleichen Ergebnis zu kommen. Zum Beispiel, wenn der Cluster stark ausgelastet ist, wenn Cluster-Arbeiter nicht verfügbar sind oder wenn die Auswertung eines Skripts auf der Arbeitsstation eines Entwicklers reproduziert wird, um ein Problem zu untersuchen.
- Dass zwei Arbeiter denselben Thunk ausführen, ist kein Fehler, sondern einfach eine Zeitverschwendung. Daher ist es nicht etwas, das verhindert werden muss (mit all den Schwierigkeiten, die die Synchronisation in einem verteilten System mit sich bringt), es genügt zu gewährleisten, dass dies nicht zu häufig passiert1.
- Wenn das Ergebnis eines Thunks verloren geht (aufgrund eines Ausfalls eines Arbeiters oder wegen Netzwerkausfällen), kann er erneut ausgeführt werden. Selbst wenn mehrere Thunks verloren gehen, bleibt der ursprüngliche DAG verfügbar und kann als Datenherkunft verwendet werden, um die benötigten Werte neu zu berechnen.
Allerdings bedeutet dies auch, dass Thunks nicht miteinander kommunizieren können (zum Beispiel durch Öffnen eines Kanals und Übertragen von Daten zwischen ihnen). Dies schränkt die verfügbaren Strategien für Nebenläufigkeit und Parallelität ein.
Thunk-Produktion
In vielen verteilten Berechnungsrahmenwerken wird der Ausführungs-DAG außerhalb des Clusters erzeugt (zum Beispiel auf einer Scheduler-Maschine) und dann werden Teile des Graphen einzelnen Arbeitern zur Ausführung übermittelt. Häufig muss der DAG in mehreren Schritten erzeugt werden: Zum Beispiel kann eine Join-Operation je nach Größe der Tabellen2 unterschiedlich optimiert werden, und es ist nicht immer möglich, die Größe einer Tabelle zu kennen, bevor man ihren Inhalt tatsächlich auswertet, sodass es sich lohnt, zu warten, bis die Tabellengrößen bekannt sind, bevor der Teil des DAGs, der den Join ausführt, erzeugt wird. Dies bedeutet, dass zwischen dem Scheduler und den Arbeitern hin und her kommuniziert wird, wobei der Scheduler zusätzliche Aufgaben basierend auf den Ergebnissen der Arbeiter erstellt.
Dies verwandelt den Scheduler in einen Single Point of Failure, und die Zulassung mehrerer aktiver Scheduler oder ein Failover-Schema zwischen einem aktiven und einem passiven Scheduler würde erheblich an Komplexität hinzufügen. Für Envision war unser resilience Ziel stattdessen sicherzustellen, dass ein einzelner Arbeiter in der Lage ist, eine gesamte Mission zu berechnen, ohne den Scheduler einzubeziehen. Daher würde beispielsweise ein zehnminütiger Ausfall des Schedulers verhindern, dass neue Missionen eingereicht werden, aber bereits gestartete Missionen würden ungestört zu Ende geführt werden. Dies bedeutet jedoch, dass Arbeiter in der Lage sein sollten, neue Teile des DAGs ohne Hilfe des Schedulers zu erzeugen.
Dies erreichen wir, indem wir einen Thunk einen neuen Thunk anstatt eines Wertes zurückgeben lassen – um mehr Haskell-Begriffe zu verwenden, beinhaltet das Erzeugen des DAGs die Verwendung von Monaden anstatt nur von Funktoren. Dieser neue Thunk hat eigene Eltern, die ebenfalls neue Thunks sein können, und so weiter, wodurch ein vollständig neuer DAG entsteht. In der Praxis teilt sich der neue DAG oft viele seiner Thunks mit dem alten DAG, weil er die Ergebnisse dieser Berechnungen benötigt.
Beim Einreichen einer neuen Mission im Cluster wird nur ein einziger Thunk eingereicht (der das zu kompilierende und auszuführende Skript sowie die Verweise auf alle Eingabedateien enthält). Dieser Thunk erzeugt dann den initialen Ausführungs-DAG, der einige Male weiter wachsen wird, bis er vollständig ist.
Merkle-Graph
Um über das Netzwerk übertragen zu werden, sind Thunks auch serialisierbar, wobei ein benutzerdefiniertes binäres Format verwendet wird, das für einen geringen Fußabdruck ausgelegt ist. Bei einem DAG mit 100 000 Thunks kann ein Budget von 10MiB nur 104 Byte pro Thunk unterstützen!
Die Unterstützung für die binäre Serialisierung ermöglichte es uns, den DAG in einen Merkle-DAG zu verwandeln, bei dem jeder Thunk eine durch den Binärinhalt dieses Thunks und aller seiner Vorfahren3 bestimmte Kennung besitzt. Wir nennen diese Kennung den Hash des Thunks.
Die Verwendung eines Merkle-DAG hat zwei Hauptvorteile. Erstens werden Thunks, die die gleiche Operation durchführen, automatisch zusammengeführt, weil sie aufgrund ihres gleichen Inhalts und ihrer Vorfahren auch dieselbe Kennung besitzen.
Zweitens ist es möglich, dass zwei Skripte einige ihrer Thunks teilen — vielleicht lesen sie dieselben Eingabedateien und wenden dieselben Operationen darauf an, oder vielleicht arbeitet ein Supply Chain Scientist an dem Skript, der bei jeder Ausführung ein paar Zeilen ändert. Wenn dies geschieht, können die Ausgaben der geteilten Thunks wiederverwendet werden, wenn sie noch im Speicher verfügbar sind, was die Ausführungszeit des Skripts erheblich verkürzt. Die Möglichkeit, ein Skript zu bearbeiten und erneut auszuführen, schafft eine kurze Rückkopplungsschleife, die die Produktivität der Supply Chain Scientists unterstützt.
Lokale Thunk-Planung
Wir werden in einem zukünftigen Artikel näher darauf eingehen, wie die Ausführung von Thunks über mehrere Maschinen in einem Cluster verteilt wird. Betrachten Sie vorerst, dass jeder Arbeiter eine Kopie des gesamten DAGs hält, weiß, welche Thunks bereits ausgeführt wurden (und wo deren Ergebnisse zu finden sind), weiß, welche Thunks derzeit vom Cluster ausgeführt werden, und dafür verantwortlich ist, zusätzliche Thunks zur Ausführung auf seinen 32 Kernen zu planen. Diese lokale Planung erfolgt durch einen einzelthreadigen Dienst namens Kernel (der nicht mit dem Linux-Kernel zu verwechseln ist). Der Kernel sowie die Arbeiter-Threads, die die Thunks tatsächlich ausführen, laufen alle im selben .NET-Prozess, um verwaltete Objekte miteinander zu teilen.
Das Finden eines neuen Thunks ist nahezu augenblicklich, da der Kernel für jeden DAG eine Frontier von zur Ausführung bereiten Thunks bereithält und nur einen zufällig auswählen muss. Die meiste Zeit des Kernels wird stattdessen damit verbracht, die Frontier zu aktualisieren, sobald ein Thunk mit der Ausführung beginnt (er muss die Frontier verlassen), die Ausführung eines Thunks beendet ist (seine Nachkommen können der Frontier beitreten, je nachdem, ob noch nicht ausgeführte Eltern vorhanden sind) oder verloren geht, weil der Arbeiter, der sein Ergebnis hält, nicht mehr verfügbar ist (seine Nachkommen müssen die Frontier verlassen, aber der Thunk selbst kann der Frontier wieder hinzugefügt werden, sofern seine eigenen Eltern noch verfügbar sind).
Die Pflege der Frontiers ist Arbeit mit sehr hoher Variabilität; sie kann zwischen einer Mikrosekunde und mehreren Sekunden dauern — über eine Million Mal länger! Zum Beispiel hat ein Shuffle-Schritt eine Schicht von $N$ Thunks, die die Ausgaben einer anderen Schicht von $M$ Thunks lesen. Jeder nachgelagerte Thunk liest die Ausgaben aller $M$ vorgelagerten Thunks, was zu $M\cdot N$ Kanten im DAG führt. Für $M = N = 1000$ (ein sehr wahrscheinlicher Grad der Parallelisierung, wenn Milliarden von Zeilen verarbeitet werden), sind das eine Million Kanten. Wenn dem nicht entgegengewirkt wird, kann dieses Phänomen dazu führen, dass der Kernel für Sekunden pausiert, während keiner neuen Thunk zur Ausführung geplant wird, sodass bis zu 32 Kerne untätig bleiben4.
Wir lösen dieses Problem, indem wir dem DAG virtuelle Knoten hinzufügen, die diese Art von Verbindung zwischen den Schichten repräsentieren. Der virtuelle Knoten hat $M$ Eingaben (je einer für jeden Thunk in der vorgelagerten Schicht) und $N$ Ausgaben (je einer für jeden Thunk in der nachgelagerten Schicht). Dies verringert die Anzahl der Kanten auf $M + N$, was erheblich besser handhabbar ist!
Codegenerierung mit niedriger Granularität
Die ersten Versionen von Envision, in 2013 und 2014, arbeiteten nach dem Prinzip, dass jede Vektoroperation von einem einzelnen Thunk ausgeführt wird. Bei der Ausführung von T.A / (T.B + 1)
gäbe es einen Thunk für das Broadcasting von 1
in die Tabelle T
, einen zweiten Thunk zum Addieren von T.B
zum Ergebnis des ersten Thunks und einen dritten Thunk für die Division von T.A
durch das Ergebnis des zweiten Thunks. Dies hatte den Vorteil, dass wir jede Operation leicht als C#-Funktion implementieren konnten, die als einzelner Thunk ausgeführt wird, was während der frühen Implementierung einer DSL eine ausgezeichnete Idee war. Es hat natürlich den Nachteil, dass unnötig große Mengen an Speicher verbraucht werden (der erste Thunk würde einen Vektor mit Millionen von Kopien des Werts 1
erzeugen), und Speicher braucht Zeit, um geschrieben und wieder ausgelesen zu werden.
Es war zwingend erforderlich, Thunks zu haben, die mehrere Operationen hintereinander auswerten, anstatt einen Thunk für jede Operation zu haben.
Viele SQL-Datenbanken arbeiten mit Variationen des Volcano-Modells, bei dem die Abfrage in einen Baum von Iteratoren umgewandelt wird. Jeder Iterator fungiert als unsaubere Funktion, die bei jedem Aufruf den nächsten Wert der Iteration zurückgibt, und kann rekursiv andere Iteratoren aufrufen. In diesem Modell würde das Broadcasting eines Skalars in eine Tabelle ein Iterator sein, der konstant einen Wert zurückgibt, während das Addieren oder Dividieren zweier Vektoren Referenzen auf zwei Iteratoren hält, und das Auslesen aus einem Vektor würde fortlaufend durch diesen Vektor iterieren:
Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }
Das Kompilieren einer Abfrage zum Volcano-Modell besteht darin, den Baum der Iteratoren zu erstellen:
Div(Read(A), Div(Read(B), BroadcastScalar(1)))
Dies hat den Vorteil, dass für die Zwischenvektoren keine Speicherallokationen vorgenommen werden. Allerdings dominiert der Overhead von Funktionsaufrufen die einfachen arithmetischen Operationen, die diese Funktionen ausführen.
Aus diesem Grund wechselte Envision 2015 zur Just-in-Time-Codegenerierung. Das Prinzip ist dem des Tungsten-Ausführungs-Engines von Apache Spark recht ähnlich: Die Operation T.A / (T.B + 1)
wird zu einer Funktion in einer imperativen Sprache kompiliert.
float[] GeneratedFunction(float[] a, float[] b) {
var result = new float[a.Length];
for (var i = 0; i < a.Length; ++i)
result[i] = a[i] / (b[i] + 1);
return result;
}
Das Ziel, das wir für diese Kompilierung verwenden, ist .NET IL, die Bytecode-Sprache, die von .NET für seine Assemblies verwendet wird. Dadurch können wir den .NET-JIT-Compiler nutzen, um aus unserem generierten IL optimierten Maschinencode zu erzeugen.
Diese Laufzeit-Codegenerierung erwies sich als das größte Hindernis beim Migrieren von Envision vom .NET Framework auf .NET Core im Jahr 2017. Tatsächlich unterstützt .NET Core dieselben System.Reflection
-APIs wie das .NET Framework zum Erzeugen und Ausführen von IL zur Laufzeit, jedoch unterstützt es nicht, diese IL als DLL auf die Festplatte zu speichern. Auch wenn dies keine Voraussetzung für das Ausführen von Envision ist, so ist es doch eine Voraussetzung für die Entwicklung des Envision-Compilers! System.Reflection
unternimmt nichts, um die Erstellung ungültiger IL zu verhindern, und meldet lediglich eine ziemlich nutzlose InvalidProgramException
, wenn eine Methode mit ungültiger IL ausgeführt wird. Der einzig vernünftige Ansatz zur Untersuchung solcher Probleme besteht darin, eine Assembly-Datei zu speichern und ILVerify oder ILSpy zu verwenden. Aufgrund dieser Anforderung haben wir tatsächlich zwei Jahre lang sowohl auf .NET Framework als auch auf .NET Core gesetzt – in der Produktion lief .NET Core, und das IL-Debugging erfolgte im .NET Framework. Schließlich haben wir 2019 unsere eigene Bibliothek Lokad.ILPack als Ersatz für diese Funktion veröffentlicht und sind vom .NET Framework migriert.
Damit schließt die heutige Analyse darüber ab, wie Envision Skripte ausführt. Im nächsten Artikel werden wir besprechen, wie Zwischenergebnisse gespeichert werden.
Schamloser Hinweis: Wir stellen Software Engineers ein. Remote-Arbeit ist möglich.
-
Arbeiter senden an den Cluster, sobald sie einen neuen Thunk starten, und vermeiden es, Thunks auszuführen, die von anderen Arbeitern beansprucht wurden. Es gibt den seltenen Fall, dass zwei Arbeiter denselben Thunk nahezu gleichzeitig starten; wir vermeiden dies, indem jeder Arbeiter einen zufälligen Thunk aus der Frontier auswählt und der Scheduler die Anzahl der Arbeiter reduziert, wenn die Frontier zu stark schrumpft. Das bedeutet, dass doppelte Ausführungen nicht unmöglich, aber sehr unwahrscheinlich sind. ↩︎
-
Für zwei große Tabellen wird ein kostenintensiver Shuffle-Join verwendet, und der günstigere mapseitige Join wird verwendet, wenn eine der Tabellen klein genug ist, um im Speicher zu passen. ↩︎
-
Für Thunks ohne Vorfahren, wie zum Beispiel solche, die Eingabedateien lesen, fügen wir den Hash des Inhalts dieser Eingabedateien in den Körper des Thunks ein. Dies stellt sicher, dass wenn zwei Thunks dieselbe Eingabedatei lesen, sie denselben Hash haben, und wenn sie zwei verschiedene Eingabedateien lesen, einschließlich zweier verschiedener Versionen derselben Datei an einem bestimmten Pfad, dann haben sie unterschiedliche Hashes. ↩︎
-
Dies wirkt sich auch auf die Größe der Serialisierung aus. Tatsächlich würde, wenn alle Kanten im serialisierten DAG dargestellt würden, selbst bei nur zwei Byte pro Kante bereits 2MB an Daten repräsentiert werden! ↩︎