Dieser Artikel ist der zweite Teil einer vierteiligen Serie über die Funktionsweise der Envision-Virtual Machine: der Software, die Envision-Skripte ausführt. Siehe Teil 1, Teil 3 und Teil 4. Diese Serie behandelt nicht den Envision-Compiler (vielleicht ein anderes Mal), daher nehmen wir einfach an, dass das Skript irgendwie in den Bytecode umgewandelt wurde, den die Envision-Virtual Machine als Eingabe akzeptiert.

Wie die meisten anderen parallelen Ausführungssysteme erzeugt Envision einen gerichteten azyklischen Graphen (DAG), bei dem jeder Knoten eine auszuführende Operation darstellt und jede Kante eine Datenabhängigkeit darstellt, bei der der nachgelagerte Knoten die Ausgabe des vorgelagerten Knotens benötigt, um ausgeführt zu werden.

Beyond time-series

Die Knoten werden Thunks genannt, nach dem sehr ähnlichen Konzept aus Haskell und anderen Sprachen mit Lazy Evaluation.

Beispiele für Thunks, die in einem typischen Envision-Skript gefunden werden können:

  • Parsen einer Eingabedatei im Format .xlsx, .csv oder .csv.gz und Konvertieren in eine spaltenbasierte Darstellung, die vom Rest des Skripts verwendet wird.
  • Laden eines Bereichs von Zeilen $M..N$ aus einer einzelnen Spalte; diese Spalte kann entweder aus dem Ergebnis des Parsens einer Eingabedatei stammen (siehe oben) oder aus Lokads eigenem .ion-spaltenbasierten Dateiformat, das für die Speicherung in Microsoft Azure Blob Storage optimiert ist.
  • Gegeben ein Bereich von Zeilen $M..N$ aus einem sehr großen Vektor $A$, einem kleineren Vektor $B$, einem Projektor $\pi$, der jeder Zeile in $A$ eine Zeile in $B$ zuordnet, und einer Funktion $f$, berechne $f(A[l], B[\pi(l)])$. Dies wird als Map-Side-Join bezeichnet.
  • Verwenden Sie die Monte-Carlo-Simulation, um den Durchschnitt, die Varianz oder die Verteilung des Ergebnisses eines Zufallsprozesses abzuschätzen. Das Ergebnis mehrerer parallel ausgeführter Monte-Carlo-Thunks kann dann von einem abschließenden Thunk kombiniert werden.

Im Allgemeinen wird erwartet, dass ein Thunk zwischen einigen hundert Millisekunden (für die Datenmanipulation im kleinen Maßstab) und einigen Minuten (für Monte-Carlo-Simulationen oder Gradientenabstiege) dauert. Dies ist eine starke Annahme: Die Envision-Virtual Machine darf eine erhebliche Overhead für die Auswertung jedes Thunks haben, im Bereich von Millisekunden. Ein Skript sollte nur eine geringe Anzahl von Thunks erzeugen (zwischen 1 000 und 100 000), wobei jeder Thunk eine ziemlich große Arbeitseinheit ausführt.

Referentielle Transparenz

Thunks sind reine Funktionen: Sie sind deterministisch und können keine Seiteneffekte haben. Sie arbeiten, indem sie ihre unveränderlichen Eingaben lesen und bei jeder Ausführung denselben Wert zurückgeben. Diese wichtige Eigenschaft hat viele Vorteile:

  1. Da die Auswertung eines Thunks keine Seiteneffekte hat, beeinflusst sie nicht die Auswertung eines anderen Thunks, und daher können alle Thunks (sofern ihre Eingaben verfügbar sind) gleichzeitig auf mehreren CPU-Kernen oder sogar auf mehreren Workern ausgeführt werden. Die Envision-Virtual Machine verfolgt den Frontier jedes Skripts (die Menge der Thunks, die ausgeführt werden können, weil alle ihre Eingaben verfügbar sind) und wählt einen neuen Thunk daraus aus, sobald eine CPU verfügbar wird.
  2. Umgekehrt ist es möglich, Thunks nacheinander auszuwerten und zum gleichen Ergebnis zu gelangen. Zum Beispiel, wenn der Cluster stark ausgelastet ist, wenn Cluster-Worker nicht verfügbar sind oder wenn die Auswertung eines Skripts auf der Entwicklungs-Workstation eines Entwicklers reproduziert wird, um ein Problem zu untersuchen.
  3. Zwei Worker, die denselben Thunk ausführen, sind kein Fehler, sondern nur eine Zeitverschwendung. Es ist daher nicht etwas, das unbedingt vermieden werden muss (mit all den Schwierigkeiten der Synchronisierung in einem verteilten System), es reicht aus sicherzustellen, dass es nicht zu oft vorkommt1.
  4. Wenn das Ergebnis eines Thunks verloren geht (aufgrund eines Worker-Absturzes oder einer Netzwerkstörung), ist es möglich, ihn erneut auszuführen. Selbst wenn mehrere Thunks verloren gehen, bleibt der ursprüngliche DAG verfügbar und kann als Datenherkunft verwendet werden, um die benötigten Werte erneut zu berechnen.

Allerdings bedeutet dies auch, dass Thunks nicht miteinander kommunizieren können (zum Beispiel, indem sie einen Kanal öffnen und Daten zwischen ihnen übertragen). Dies schränkt die verfügbaren Strategien für Parallelität und Nebenläufigkeit ein.

Thunk-Produktion

In vielen verteilten Berechnungsframeworks wird der Ausführungs-DAG außerhalb des Clusters erstellt (zum Beispiel auf einer Scheduler-Maschine) und dann werden Teile des Graphen an einzelne Worker zur Ausführung übergeben. Oft muss der DAG in mehreren Schritten erstellt werden: Zum Beispiel kann eine Join-Operation je nach Größe der Tabellen unterschiedlich optimiert werden2, und es ist nicht immer möglich, die Größe einer Tabelle zu kennen, bevor ihr Inhalt tatsächlich ausgewertet wird. Daher lohnt es sich, auf die Kenntnis der Tabellengrößen zu warten, bevor der DAG-Teil generiert wird, der den Join durchführt. Dies bedeutet, dass es zu einem Hin und Her zwischen dem Scheduler und den Workern kommt, bei dem der Scheduler zusätzliche Aufgaben basierend auf den Ergebnissen der Worker erstellt.

Dadurch wird der Scheduler zu einem Single Point of Failure, und die Zulassung mehrerer aktiver Scheduler oder ein Failover-Schema zwischen einem aktiven und einem passiven Scheduler würde eine ziemliche Komplexität hinzufügen. Für Envision war unser Resilienz-Ziel stattdessen sicherzustellen, dass ein einzelner Worker eine gesamte Mission berechnen kann, ohne den Scheduler einzubeziehen. Daher würde selbst eine zehnminütige Ausfallzeit des Schedulers verhindern, dass neue Missionen eingereicht werden, aber bereits gestartete Missionen würden nicht unterbrochen. Dies bedeutet jedoch, dass die Worker in der Lage sein sollten, neue Teile des DAG ohne Hilfe des Schedulers zu generieren.

Wir erreichen dies, indem wir einen Thunk einen neuen Thunk anstelle eines Werts zurückgeben lassen - um mehr Haskell-Begriffe zu verwenden, der Aufbau des DAG erfolgt mit Monaden anstelle von nur Funktoren. Dieser neue Thunk hat seine eigenen Eltern, die ebenfalls neue Thunks sein können, und so weiter, wodurch ein vollständig neuer DAG entsteht. In der Praxis teilt der neue DAG oft viele seiner Thunks mit dem alten DAG, weil er die Ergebnisse dieser Berechnungen benötigt.

Beim Einreichen einer neuen Mission an den Cluster wird nur ein einziger Thunk eingereicht (der das zu kompilierende und auszuführende Skript sowie die Verweise auf alle Eingabedateien enthält). Dieser Thunk erzeugt dann den anfänglichen Ausführungs-DAG, der sich noch einige Male erweitern wird, bis er vollständig ist.

Merkle-Graph

Um über das Netzwerk übertragen werden zu können, sind Thunks auch serialisierbar und verwenden ein benutzerdefiniertes binäres Format, das eine geringe Speicherplatzbelegung ermöglicht. Bei einem DAG mit 100 000 Thunks kann ein Budget von 10 MiB nur 104 Bytes pro Thunk unterstützen!

Die Unterstützung für die binäre Serialisierung ermöglichte es uns, den DAG in einen Merkle-DAG umzuwandeln, bei dem jeder Thunk eine Kennung hat, die durch den binären Inhalt dieses Thunks und aller Vorfahren des Thunks bestimmt wird3. Wir nennen diese Kennung den Hash des Thunks.

Die Verwendung eines Merkle-DAG hat zwei Hauptvorteile. Erstens werden Thunks, die dieselbe Operation ausführen, automatisch zusammengeführt, da sie denselben Inhalt und dieselben Vorfahren haben und daher auch dieselbe Kennung haben.

Zweitens ist es möglich, dass zwei Skripte einige ihrer Thunks gemeinsam nutzen - vielleicht lesen sie dieselben Eingabedateien und wenden dieselben Operationen auf sie an, oder vielleicht arbeitet ein Supply Chain Scientist an dem Skript und ändert dabei einige Zeilen zwischen den Ausführungen. Wenn dies geschieht, können die Ausgaben der gemeinsam genutzten Thunks wiederverwendet werden, wenn sie noch im Speicher verfügbar sind, was die Ausführungszeit des Skripts erheblich reduziert. Die Möglichkeit, ein Skript zu bearbeiten und erneut auszuführen, schafft eine kurze Rückkopplungsschleife, die die Produktivität der Supply Chain Scientists fördert.

Lokale Thunk-Planung

In einem zukünftigen Artikel werden wir genauer darauf eingehen, wie die Ausführung von Thunks auf mehreren Maschinen in einem Cluster verteilt wird. Im Moment sollten Sie nur beachten, dass jeder Worker eine Kopie des gesamten DAGs besitzt, weiß, welche Thunks bereits ausgeführt wurden (und wo ihre Ergebnisse zu finden sind), weiß, welche Thunks derzeit vom Cluster ausgeführt werden, und dafür verantwortlich ist, zusätzliche Thunks auf seinen 32 Kernen zu planen. Diese lokale Planung erfolgt durch einen Single-Threaded-Dienst namens Kernel (der nicht mit dem Linux-Kernel verwechselt werden darf). Der Kernel sowie die Worker-Threads, die die Thunks tatsächlich ausführen, laufen alle im selben .NET-Prozess, um verwaltete Objekte miteinander zu teilen.

Das Finden eines neuen Thunks ist nahezu augenblicklich, da der Kernel eine Front von bereiten Thunks für jeden DAG führt und nur einen zufällig auswählen muss. Die meiste Zeit des Kernels wird jedoch damit verbracht, die Front zu aktualisieren, wenn ein Thunk mit der Ausführung beginnt (er muss die Front verlassen), mit der Ausführung fertig ist (seine Nachkommen können der Front beitreten, je nachdem, ob er noch nicht ausgeführte Eltern hat) oder verloren geht, weil der Worker, der sein Ergebnis hält, nicht verfügbar ist (seine Nachkommen müssen die Front verlassen, aber der Thunk selbst kann zur Front hinzugefügt werden, wenn seine eigenen Eltern noch verfügbar sind).

Die Betreuung der Fronten ist eine Arbeit mit sehr hoher Variabilität, sie kann zwischen einer Mikrosekunde und mehreren Sekunden dauern - über eine Million Mal länger! Zum Beispiel hat ein Shuffle-Schritt eine Schicht von $N$ Thunks, die die Ausgaben einer anderen Schicht von $M$ Thunks lesen. Jeder nachgelagerte Thunk liest die Ausgaben aller $M$ vorgelagerten Thunks, was zu $M\cdot N$ Kanten im DAG führt. Für $M = N = 1000$ (ein sehr wahrscheinlicher Grad der Parallelisierung bei der Verarbeitung von Milliarden von Zeilen) sind das eine Million Kanten. Wenn dies nicht kontrolliert wird, kann dieses Phänomen dazu führen, dass der Kernel für Sekunden pausiert, währenddessen keine neuen Thunks geplant werden, und so bis zu 32 Kerne ungenutzt bleiben4.

Wir lösen dieses Problem, indem wir virtuelle Knoten in den DAG einführen, um diese Art von Verbindung zwischen den Schichten darzustellen. Der virtuelle Knoten hat $M$ Eingänge (einen für jeden Thunk in der vorgelagerten Schicht) und $N$ Ausgänge (einen für jeden Thunk in der nachgelagerten Schicht). Dadurch verringert sich die Anzahl der Kanten auf $M + N$, was deutlich besser handhabbar ist!

Niedriggranulare Codegenerierung

Die ersten Versionen von Envision, in den Jahren 2013 und 2014, basierten darauf, dass jede Vektoroperation von einem einzigen Thunk ausgeführt wird. Bei der Ausführung von T.A / (T.B + 1) würde es einen Thunk für das Broadcasting von 1 in die Tabelle T geben, einen Thunk für das Hinzufügen von T.B zum Ergebnis des ersten Thunks und einen Thunk für die Division von T.A durch das Ergebnis des zweiten Thunks. Dies hatte den Vorteil, dass wir jede Operation leicht als C#-Funktion implementieren konnten, die als einzelner Thunk ausgeführt wird, was eine ausgezeichnete Idee bei der frühen Implementierung einer DSL ist. Es hat natürlich den Nachteil, dass unnötig große Mengen an Speicher verbraucht werden (Thunk eins würde einen Vektor von Millionen von Kopien des Werts 1 erzeugen) und das Schreiben und Lesen von Speicher Zeit in Anspruch nimmt.

Es war unerlässlich, Thunks zu haben, die mehrere Operationen nacheinander auswerten, anstatt für jede Operation einen eigenen Thunk zu haben.

Viele SQL-Datenbanken arbeiten mit Variationen des Volcano-Modells, bei dem die Abfrage in einen Baum von Iteratoren umgewandelt wird. Jeder Iterator fungiert als eine unreine Funktion, die bei jedem Aufruf den nächsten Wert in der Iteration zurückgibt und rekursiv andere Iteratoren aufrufen kann. In diesem Modell würde das Broadcasting eines Skalars in eine Tabelle ein konstantenrückgebenden Iterator sein, das Hinzufügen oder Teilen von zwei Vektoren würde Verweise auf zwei Iteratoren halten und das Lesen aus einem Vektor würde durch ihn inkrementiert werden:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Das Kompilieren einer Abfrage zum Volcano-Modell besteht darin, den Baum der Iteratoren aufzubauen:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Dies hat den Vorteil, dass keine Speicherzuweisungen für die Zwischenvektoren durchgeführt werden. Allerdings überwiegt der Overhead des Aufrufs von Funktionen die einfachen arithmetischen Operationen, die diese Funktionen ausführen.

Aus diesem Grund wechselte Envision im Jahr 2015 zur Just-in-Time-Codegenerierung. Das Prinzip ist dem Tungsten-Ausführungsmotor von Apache Spark recht ähnlich: Kompilieren der Operation T.A / (T.B + 1) zu einer Funktion in einer imperativen Sprache.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

Das Ziel, das wir für diese Kompilierung verwenden, ist .NET IL, die Bytecode-Sprache, die von .NET für seine Assemblys verwendet wird. Dadurch können wir den .NET JIT-Compiler nutzen, um optimierten Maschinencode aus unserem generierten IL zu erzeugen.

Diese Laufzeit-Codegenerierung erwies sich beim Wechsel von .NET Framework zu .NET Core im Jahr 2017 als größte Hürde für die Migration von Envision. Obwohl .NET Core die gleichen System.Reflection-APIs wie .NET Framework zum Erzeugen und Ausführen von IL zur Laufzeit unterstützt, unterstützt es nicht das Speichern dieses IL auf der Festplatte als DLL. Obwohl dies keine Anforderung für das Ausführen von Envision ist, ist es sicherlich eine Anforderung für die Entwicklung des Envision-Compilers! System.Reflection verhindert nicht, dass ungültiges IL erstellt wird, und meldet nur eine eher nutzlose InvalidProgramException, wenn eine Methode mit ungültigem IL ausgeführt wird. Der einzige vernünftige Ansatz zur Untersuchung solcher Probleme besteht darin, eine Assembly-Datei zu speichern und ILVerify oder ILSpy zu verwenden. Aufgrund dieser Anforderung haben wir tatsächlich zwei Jahre lang sowohl .NET Framework als auch .NET Core als Ziel verwendet - die Produktion würde auf .NET Core laufen und das IL-Debugging würde auf .NET Framework durchgeführt werden. Schließlich haben wir im Jahr 2019 unsere eigene Bibliothek Lokad.ILPack als Ersatz für diese Funktion veröffentlicht und sind von .NET Framework weggezogen.

Damit schließen wir die heutige Analyse darüber ab, wie Envision Skripte ausführt. Im nächsten Artikel werden wir besprechen, wie Zwischenergebnisse gespeichert werden.

Unverfrorene Werbung: Wir suchen Softwareingenieure. Remote-Arbeit ist möglich.


  1. Die Worker senden an den Cluster, wann immer sie einen neuen Thunk starten, und vermeiden die Ausführung von Thunks, die von anderen Workern beansprucht wurden. Es bleibt der seltene Fall, dass zwei Worker den gleichen Thunk fast gleichzeitig starten; wir vermeiden dies, indem jeder Worker einen zufälligen Thunk aus dem Frontier auswählt und der Scheduler die Anzahl der Worker reduziert, wenn der Frontier zu stark schrumpft. Das bedeutet, dass doppelte Ausführung nicht unmöglich ist, aber sehr unwahrscheinlich. ↩︎

  2. Ein kostspieliger Shuffle-Join wird für zwei große Tabellen verwendet, und der günstigere Map-Side-Join wird verwendet, wenn eine der Tabellen klein genug ist, um in den Speicher zu passen. ↩︎

  3. Für Thunks ohne Vorfahren, wie solche, die aus Eingabedateien lesen, nehmen wir den Hash des Inhalts dieser Eingabedateien in den Körper des Thunks auf. Dadurch wird sichergestellt, dass zwei Thunks, die dieselbe Eingabedatei lesen, denselben Hash haben, und dass sie, wenn sie zwei verschiedene Eingabedateien lesen, einschließlich zwei verschiedener Versionen der Datei unter einem bestimmten Pfad, unterschiedliche Hashes haben. ↩︎

  4. Dies hat auch Auswirkungen auf die Größe der Serialisierung. Tatsächlich repräsentiert bereits die Darstellung aller Kanten im serialisierten DAG, selbst mit nur zwei Bytes pro Kante, 2 MB Daten! ↩︎