Questo articolo è il secondo di una serie di quattro parti sulle dinamiche interne della macchina virtuale Envision: il software che esegue gli script Envision. Vedi parte 1, parte 3 e parte 4. Questa serie non copre il compilatore Envision (forse in futuro), quindi supponiamo semplicemente che lo script sia stato convertito in bytecode che la macchina virtuale Envision prende in input.

Come la maggior parte degli altri sistemi di esecuzione parallela, Envision produce un grafo aciclico diretto (DAG) in cui ogni nodo rappresenta un’operazione da eseguire e ogni arco rappresenta una dipendenza dei dati in cui il nodo a valle ha bisogno dell’output del nodo a monte per poter essere eseguito.

Oltre le serie temporali

I nodi vengono chiamati thunks, dal concetto molto simile di Haskell e altri linguaggi con valutazione pigra.

Esempi di thunks che possono essere trovati in uno script Envision tipico:

  • Analizzare un file di input in formato .xlsx, .csv o .csv.gz e convertirlo in una rappresentazione a colonne che verrà utilizzata dal resto dello script.
  • Caricare un intervallo di righe $M..N$ da una singola colonna; questa colonna può essere presa sia dal risultato dell’analisi di un file di input (vedi sopra), sia dal formato di file colonnare .ion di Lokad ottimizzato per l’archiviazione in Microsoft Azure Blob Storage.
  • Dato un intervallo di righe $M..N$ da un vettore molto grande $A$, un vettore più piccolo $B$, un proiettore $\pi$ che associa ogni riga in $A$ con una riga in $B$ e una funzione $f$, calcolare $f(A[l], B[\pi(l)])$. Questo viene chiamato join lato mappa.
  • Utilizzare la simulazione di Monte Carlo per stimare la media, la varianza o la distribuzione del risultato di un processo casuale. Il risultato di più thunks di Monte Carlo, eseguiti in parallelo, può quindi essere combinato da un thunk finale.

In generale, ci si aspetta che un thunk richieda tra qualche centinaio di millisecondi (per la manipolazione di dati su piccola scala) e alcuni minuti (per simulazioni di Monte Carlo o discesa del gradiente). Questa è un’assunzione forte: la macchina virtuale Envision può avere un notevole overhead per la valutazione di ogni thunk, nell’ordine dei millisecondi. Uno script dovrebbe produrre un numero ridotto di thunks (tra 1 000 e 100 000), con ogni thunk che esegue un’unità di lavoro piuttosto grande.

Trasparenza referenziale

I thunks sono funzioni pure: sono deterministiche e non possono avere effetti collaterali. Operano leggendo i loro input immutabili e restituendo lo stesso valore ad ogni esecuzione. Questa proprietà importante aiuta in molti modi:

  1. Poiché la valutazione di un thunk non ha effetti collaterali, non interferirà con la valutazione di un altro thunk e quindi tutti i thunks possono essere eseguiti contemporaneamente (a condizione che i loro input siano disponibili) su diversi core della CPU o addirittura distribuiti su diversi worker. La macchina virtuale Envision tiene traccia della frontiera di ogni script (l’insieme di thunks che possono essere eseguiti perché tutti i loro input sono disponibili) e ne seleziona uno nuovo ogni volta che una CPU diventa disponibile.
  2. Al contrario, è possibile valutare i thunks uno per uno e ottenere lo stesso risultato. Ad esempio, quando il cluster è sotto carico elevato, quando i worker del cluster non sono disponibili o quando si riproduce la valutazione di uno script sulla postazione di lavoro di uno sviluppatore per indagare su un problema.
  3. Due worker che eseguono lo stesso thunk non è un errore, ma solo uno spreco di tempo. Pertanto, non è qualcosa che deve essere evitato (con tutte le difficoltà legate alla sincronizzazione in un sistema distribuito), è sufficiente assicurarsi che non accada troppo spesso1.
  4. Se il risultato di un thunk viene perso (a causa di un crash del worker o di un’indisponibilità di rete), è possibile eseguirlo nuovamente. Anche se diversi thunks vengono persi, il DAG originale rimane disponibile e può essere utilizzato come lineage dei dati per ricalcolare i valori necessari.

Tuttavia, ciò significa anche che i thunks non possono comunicare tra loro (ad esempio, aprendo un canale e trasmettendo dati tra di loro). Ciò limita le strategie disponibili per la concorrenza e il parallelismo.

Produzione di thunks

In molti framework di calcolo distribuito, il DAG di esecuzione viene prodotto al di fuori del cluster (ad esempio, su una macchina scheduler) e quindi porzioni del grafo vengono inviate ai singoli worker per l’esecuzione. Molto spesso, il DAG deve essere prodotto in più fasi: ad esempio, un’operazione di join può essere ottimizzata in modo diverso a seconda delle dimensioni delle tabelle2 e non è sempre possibile conoscere la dimensione di una tabella prima di valutarne effettivamente i contenuti, quindi vale la pena aspettare che le dimensioni delle tabelle siano note prima di generare la porzione del DAG che esegue il join. Ciò significa che si verificherà un andirivieni tra lo scheduler e i worker, in cui lo scheduler produrrà attività aggiuntive basate sui risultati dei worker.

Ciò trasforma lo scheduler in un singolo punto di errore e consentire più scheduler attivi o uno schema di failover tra uno scheduler attivo e uno passivo aggiungerebbe una complessità considerevole. Per Envision, il nostro obiettivo di resilienza era invece quello di garantire che un singolo worker sia in grado di calcolare un’intera missione senza coinvolgere lo scheduler. Pertanto, anche se un’interruzione di dieci minuti dello scheduler impedirebbe la presentazione di nuove missioni, non interromperebbe le missioni già avviate. Tuttavia, ciò significa che i worker dovrebbero essere in grado di generare nuove porzioni del DAG senza l’aiuto dello scheduler.

Raggiungiamo questo obiettivo consentendo a un thunk di restituire un nuovo thunk anziché un valore, per riutilizzare più termini di Haskell, la costruzione del DAG coinvolge monadi anziché solo funtori. Questo nuovo thunk ha i suoi genitori, che possono essere nuovi thunks, e così via, formando un nuovo DAG completo. In pratica, il nuovo DAG spesso condivide molti dei suoi thunks con il vecchio DAG, perché ha bisogno dei risultati di quei calcoli.

Quando si presenta una nuova missione al cluster, viene inviato solo un singolo thunk (contenente lo script da compilare ed eseguire e i riferimenti a tutti i file di input). Questo thunk produce quindi il DAG di esecuzione iniziale, che crescerà ancora alcune volte fino a diventare completo.

Grafo di Merkle

Per poter essere trasmessi sulla rete, i thunks sono anche serializzabili, utilizzando un formato binario personalizzato progettato per avere un basso impatto. Su un DAG con 100 000 thunks, un budget di 10MiB può supportare solo 104 byte per thunk!

Il supporto alla serializzazione binaria ci ha permesso di trasformare il DAG in un DAG di Merkle, in cui ogni thunk ha un identificatore determinato dal contenuto binario di quel thunk e di tutti gli antenati del thunk3. Chiamiamo questo identificatore l’hash del thunk.

Utilizzare un DAG di Merkle ha due vantaggi principali. In primo luogo, i thunks che eseguono la stessa operazione vengono automaticamente uniti perché, avendo lo stesso contenuto e gli stessi antenati, hanno anche lo stesso identificatore.

In secondo luogo, è possibile che due script condividano alcuni dei loro thunks - forse leggono gli stessi file di input e applicano le stesse operazioni su di essi, o forse un Supply Chain Scientist sta lavorando sullo script, modificando alcune righe alla volta tra le esecuzioni. Quando ciò accade, gli output dei thunks condivisi possono essere riutilizzati se sono ancora disponibili in memoria, riducendo notevolmente il tempo di esecuzione dello script. La possibilità di modificare e rieseguire uno script crea un breve ciclo di feedback che aiuta la produttività degli scienziati della supply chain.

Pianificazione locale dei thunks

Approfondiremo in un futuro articolo come l’esecuzione dei thunks viene distribuita su più macchine in un cluster. Per ora, considera semplicemente che ogni worker ha una copia dell’intero DAG, sa quali thunks sono già stati eseguiti (e dove trovare i loro risultati), sa quali thunks vengono attualmente eseguiti dal cluster ed è responsabile della pianificazione di ulteriori thunks da eseguire sui suoi 32 core. Questa pianificazione locale viene effettuata da un servizio single-threaded chiamato kernel (che non va confuso con il kernel Linux). Il kernel, così come i thread worker che effettueranno effettivamente i thunks, vengono eseguiti tutti nello stesso processo .NET per condividere oggetti gestiti tra di loro.

Trovare un nuovo thunk è quasi istantaneo, poiché il kernel tiene traccia di un frontiera di thunks pronti all’esecuzione per ogni DAG e deve solo sceglierne uno a caso. La maggior parte del tempo del kernel viene invece trascorsa nell’aggiornamento della frontiera ogni volta che un thunk inizia l’esecuzione (deve lasciare la frontiera), finisce l’esecuzione (i suoi discendenti potrebbero unirsi alla frontiera, a seconda se ha ancora genitori non eseguiti) o viene perso a causa del worker che detiene il suo risultato che diventa non disponibile (i suoi discendenti devono lasciare la frontiera, ma il thunk stesso potrebbe essere aggiunto nuovamente alla frontiera se i suoi genitori sono ancora disponibili).

Gestire le frontiere è un lavoro con una variabilità molto alta, può richiedere da un microsecondo a diversi secondi, ovvero oltre un milione di volte più lungo! Ad esempio, una fase di shuffle ha uno strato di $N$ thunks che leggono gli output di un altro strato di $M$ thunks. Ogni thunk a valle legge gli output di tutti i $M$ thunks a monte, risultando in $M\cdot N$ archi nel DAG. Per $M = N = 1000$ (un grado di parallelizzazione molto probabile, quando si lavora con miliardi di righe), ciò significa un milione di archi. Se non controllato, questo fenomeno può causare una pausa del kernel per diversi secondi, durante i quali non vengono pianificati nuovi thunks da eseguire e quindi fino a 32 core rimangono inattivi4.

Risolviamo questo problema introducendo nodi virtuali nel DAG per rappresentare questo tipo di connessione tra gli strati. Il nodo virtuale ha $M$ input (uno per ogni thunk nello strato a monte) e $N$ output (uno per ogni thunk nello strato a valle). Ciò riduce il numero di archi a $M + N$, che è significativamente più gestibile!

Generazione di codice a bassa granularità

Le prime versioni di Envision, nel 2013 e nel 2014, operavano sulla base che ogni operazione vettoriale venisse eseguita da un singolo thunk. Quando si esegue T.A / (T.B + 1) ci sarebbe stato un thunk per la trasmissione di 1 nella tabella T, un thunk per l’aggiunta di T.B al risultato del primo thunk e un thunk per la divisione di T.A per il risultato del secondo thunk. Questo aveva il vantaggio che potevamo facilmente implementare ogni operazione come una funzione C#, eseguita come un singolo thunk, che è un’ottima idea durante l’implementazione iniziale di un DSL. Ha, naturalmente, il lato negativo che vengono consumate quantità non necessarie di memoria (il primo thunk produrrebbe un vettore di milioni di copie del valore 1) e la memoria richiede tempo per essere scritta e letta nuovamente.

Era imperativo avere thunks che valutassero diverse operazioni in successione, invece di avere un thunk per ogni operazione.

Molti database SQL operano su variazioni del modello vulcano, in cui la query viene trasformata in un albero di iteratori. Ogni iteratore agisce come una funzione impura che restituisce il prossimo valore nell’iterazione ogni volta che viene chiamato e può chiamare ricorsivamente altri iteratori. In questo modello, la trasmissione di uno scalare in una tabella sarebbe un iteratore a ritorno costante, l’aggiunta o la divisione di due vettori avrebbe riferimenti a due iteratori e la lettura da un vettore incrementerebbe attraverso di esso:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Compilare una query nel modello vulcano consiste nel costruire l’albero di iteratori:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Questo ha il vantaggio che non vengono effettuate allocazioni di memoria per i vettori intermedi. Tuttavia, il costo delle chiamate alle funzioni domina le semplici operazioni aritmetiche che tali funzioni eseguono.

Per questo motivo, nel 2015 Envision è passato alla generazione di codice just-in-time. Il principio è molto simile al motore di esecuzione Tungsten di Apache Spark: compilare l’operazione T.A / (T.B + 1) in una funzione in un linguaggio imperativo.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

Il target che utilizziamo per questa compilazione è .NET IL, il linguaggio bytecode utilizzato da .NET per le sue assembly. Ciò ci consente di sfruttare il compilatore JIT .NET per produrre codice macchina ottimizzato dal nostro IL generato.

Questa generazione di codice in fase di esecuzione si è rivelata l’ostacolo più grande durante la migrazione di Envision da .NET Framework a .NET Core nel 2017. Infatti, mentre .NET Core supporta le stesse API System.Reflection di .NET Framework per la produzione ed esecuzione di IL in fase di esecuzione, non supporta il salvataggio di tale IL su disco come DLL. Sebbene ciò non sia un requisito per l’esecuzione di Envision, è certamente un requisito per lo sviluppo del compilatore Envision! System.Reflection non fa nulla per impedire la creazione di IL non valida e segnala solo una InvalidProgramException piuttosto inutile quando viene eseguito un metodo contenente IL non valido. L’unico approccio ragionevole per indagare su tali problemi è salvare un file di assembly e utilizzare ILVerify o ILSpy. A causa di questo requisito, abbiamo effettivamente continuato a puntare sia a .NET Framework che a .NET Core per due anni: la produzione sarebbe stata eseguita su .NET Core e il debug di IL sarebbe stato eseguito su .NET Framework. Infine, nel 2019 abbiamo pubblicato la nostra libreria Lokad.ILPack come sostituto di questa funzionalità e ci siamo allontanati da .NET Framework.

Questo conclude l’analisi odierna su come Envision esegue gli script. Nel prossimo articolo, discuteremo come vengono memorizzati i risultati intermedi.

Pubblicità sfacciata: stiamo assumendo ingegneri del software. È possibile lavorare in remoto.


  1. I worker trasmettono al cluster ogni volta che avviano un nuovo thunk e evitano di eseguire thunks che altri worker hanno reclamato. Rimane il caso raro in cui due worker avviano lo stesso thunk quasi contemporaneamente; evitiamo ciò facendo sì che ogni worker scelga un thunk casuale dalla frontiera e facendo sì che lo scheduler riduca il numero di worker quando la frontiera si riduce troppo. Ciò significa che l’esecuzione duplicata non è impossibile, ma molto improbabile. ↩︎

  2. Viene utilizzato un join di shuffle costoso per due tabelle di grandi dimensioni e viene utilizzato un join lato mappa più economico quando una delle tabelle è sufficientemente piccola da entrare in memoria. ↩︎

  3. Per i thunks senza antenati, come quelli che leggono da file di input, includiamo l’hash del contenuto di quei file di input all’interno del corpo del thunk. Ciò garantisce che se due thunks leggono lo stesso file di input, avranno lo stesso hash, e se leggono due file di input diversi, inclusa due diverse versioni del file in un determinato percorso, avranno hash diversi. ↩︎

  4. Questo ha anche un effetto sulla dimensione di serializzazione. Infatti, se tutti gli archi sono rappresentati nel DAG serializzato, anche con soli due byte per arco, rappresenta già 2MB di dati! ↩︎