Questo articolo è il secondo di una serie in quattro parti sul funzionamento interno della macchina virtuale Envision: il software che esegue gli script di Envision. Vedi parte 1, parte 3 e parte 4. Questa serie non tratta il compilatore Envision (forse in un’altra occasione), quindi assumiamo semplicemente che lo script sia stato in qualche modo convertito nel bytecode che la macchina virtuale Envision utilizza come input.

Come la maggior parte degli altri sistemi di esecuzione parallela, Envision produce un grafo aciclico diretto (DAG) in cui ogni nodo rappresenta un’operazione da eseguire, e ogni arco rappresenta una dipendenza dei dati in cui il nodo a valle necessita dell’output del nodo a monte per poter funzionare.

Oltre le serie temporali

I nodi sono chiamati thunks, dal concetto molto simile presente in Haskell e in altri linguaggi con valutazione lazy.

Esempi di thunks che si possono trovare in un tipico script Envision:

  • Analizzare un file di input in formato .xlsx, .csv o .csv.gz, e convertirlo in una rappresentazione colonnare che verrà utilizzata dal resto dello script.
  • Caricare un intervallo di righe $M..N$ da una colonna individuale; questa colonna può essere ottenuta sia dal risultato dell’analisi di un file di input (vedi sopra), sia dal formato colonnare .ion di Lokad ottimizzato per l’archiviazione in Microsoft Azure Blob Storage.
  • Dato un intervallo di righe $M..N$ da un vettore molto grande $A$, un vettore più piccolo $B$, un proiettore $\pi$ che associa ogni riga in $A$ a una riga in $B$, e una funzione $f$, calcolare $f(A[l], B[\pi(l)])$. Questo viene chiamato map-side join.
  • Utilizzare la simulazione Monte Carlo per stimare la media, la varianza o la distribuzione del risultato di un processo casuale. Il risultato di più thunks Monte Carlo, eseguiti in parallelo, può quindi essere combinato da un thunk finale.

In generale, si prevede che un thunk richieda da qualche centinaio di millisecondi (per manipolazioni di dati su piccola scala) a pochi minuti (per simulazioni Monte Carlo o discesa del gradiente). Questa è un’ipotesi forte: alla macchina virtuale Envision è consentito un sovraccarico significativo per la valutazione di ogni thunk, dell’ordine di millisecondi. Uno script dovrebbe produrre un numero ridotto di thunks (tra 1 000 e 100 000), con ogni thunk che esegue un’unità di lavoro abbastanza grande.

Trasparenza referenziale

I thunks sono funzioni pure: sono deterministici e non possono avere effetti collaterali. Operano leggendo i loro input immutabili e restituendo lo stesso valore ad ogni esecuzione. Questa proprietà importante aiuta in molti modi:

  1. Poiché la valutazione di un thunk non ha effetti collaterali, non interferirà con la valutazione di un altro thunk, e quindi tutti i thunks possono essere eseguiti in parallelo (purché i loro input siano disponibili) su più core CPU, o addirittura distribuiti su più worker. La macchina virtuale Envision tiene traccia della frontiera di ciascuno script (l’insieme dei thunks che possono essere eseguiti perché tutti i loro input sono disponibili), e ne sceglie uno nuovo ogni volta che una CPU diventa disponibile.
  2. Al contrario, è possibile valutare i thunks uno per uno e ottenere lo stesso risultato. Ad esempio, quando il cluster è sotto carico elevato, quando i worker del cluster non sono disponibili, o quando si riproduce la valutazione di uno script sulla workstation di un sviluppatore per indagare su un problema.
  3. Due worker che eseguono lo stesso thunk non rappresentano un errore, semplicemente una perdita di tempo. Pertanto, non è qualcosa che debba essere evitato (con tutte le difficoltà legate alla sincronizzazione in un sistema distribuito), basta assicurarsi che non accada troppo frequentemente1.
  4. Se il risultato di un thunk viene perso (a causa del crash di un worker o dell’indisponibilità della rete), è possibile eseguirlo nuovamente. Anche se vengono persi diversi thunks, il DAG originale rimane disponibile e può essere utilizzato come tracciato dei dati per ricalcolare i valori necessari.

Tuttavia, questo significa anche che i thunks non possono comunicare tra loro (ad esempio, aprendo un canale e trasmettendo dati). Questo limita le strategie disponibili per la concorrenza e il parallelismo.

Produzione dei thunks

In molti framework di calcolo distribuito, il DAG di esecuzione viene prodotto esternamente al cluster (ad esempio, su una macchina scheduler), e poi porzioni del grafo vengono inviate ai worker individualmente per l’esecuzione. Spesso, il DAG deve essere prodotto in più fasi: ad esempio, un’operazione di join può essere ottimizzata in modo differente a seconda della dimensione delle tabelle2, e non è sempre possibile conoscere la dimensione di una tabella prima di valutarne effettivamente il contenuto, per cui conviene aspettare che le dimensioni siano note prima di generare la porzione di DAG che esegue il join. Questo significa che si verificherà uno scambio tra lo scheduler e i worker, dove lo scheduler produrrà ulteriori task basati sui risultati dei worker.

Ciò trasforma lo scheduler in un punto unico di fallimento, e consentirebbe l’attivazione di scheduler multipli o uno schema di failover tra uno scheduler attivo e uno passivo, il che aggiungerebbe una notevole complessità. Per Envision, il nostro resilience target era invece quello di garantire che un singolo worker sia in grado di calcolare un’intera missione, senza coinvolgere lo scheduler. In tal modo, anche se uno scheduler dovesse essere inattivo per dieci minuti, questo impedirebbe la sottomissione di nuove missioni, ma non interromperebbe il completamento delle missioni già avviate. Tuttavia, ciò significa che i worker dovrebbero essere in grado di generare nuove porzioni del DAG senza l’aiuto dello scheduler.

Otteniamo ciò permettendo a un thunk di restituire un nuovo thunk invece di un valore – per utilizzare ulteriori termini di Haskell, costruire il DAG comporta l’utilizzo di monadi anziché semplici functor. Questo nuovo thunk ha i suoi propri genitori, che possono essere anch’essi nuovi thunks, e così via, formando un nuovo DAG completo. In pratica, il nuovo DAG spesso condivide molti dei suoi thunks con il vecchio DAG, perché necessita dei risultati di quei calcoli.

Nel sottomettere una nuova missione al cluster, viene inviato un unico thunk (contenente lo script da compilare ed eseguire, e i riferimenti a tutti i file di input). Questo thunk poi produce il DAG di esecuzione iniziale, che crescerà di qualche volta fino a diventare completo.

Grafo Merkle

Per essere trasmessi attraverso la rete, i thunks sono anche serializzabili, utilizzando un formato binario personalizzato progettato per avere un’impronta ridotta. Su un DAG con 100 000 thunks, un budget di 10MiB può supportare solo 104 byte per thunk!

Il supporto per la serializzazione binaria ci ha permesso di trasformare il DAG in un Merkle DAG, in cui ogni thunk ha un identificatore determinato dal contenuto binario di quel thunk e di tutti i suoi antenati3. Chiamiamo questo identificatore l’hash del thunk.

L’utilizzo di un Merkle DAG ha due vantaggi principali. Innanzitutto, i thunks che eseguono la stessa operazione vengono fusi automaticamente perché, avendo lo stesso contenuto e gli stessi antenati, possiedono anche lo stesso identificatore.

In secondo luogo, è possibile che due script condividano alcuni dei loro thunks — magari leggono gli stessi file di input e applicano le stesse operazioni, o magari un Supply Chain Scientist sta lavorando sullo script, cambiando poche righe alla volta tra un’esecuzione e l’altra. Quando ciò accade, gli output dei thunks condivisi possono essere riutilizzati se sono ancora disponibili in memoria, riducendo notevolmente il tempo di esecuzione dello script. La possibilità di modificare ed eseguire nuovamente uno script crea un breve ciclo di feedback che aumenta la produttività dei Supply Chain Scientists.

Pianificazione locale dei thunks

Approfondiremo ulteriormente in un futuro articolo come l’esecuzione dei thunks sia distribuita su diverse macchine in un cluster. Per ora, basta considerare che ogni worker detiene una copia dell’intero DAG, sa quali thunks sono già stati eseguiti (e dove trovare i loro risultati), sa quali thunks sono attualmente in esecuzione nel cluster, ed è responsabile della pianificazione di ulteriori thunks da eseguire sui suoi 32 core. Questa pianificazione locale è gestita da un servizio a thread singolo chiamato kernel (che non va confuso con il kernel Linux). Il kernel, così come i thread worker che eseguiranno effettivamente i thunks, girano tutti nello stesso processo .NET per poter condividere oggetti gestiti tra loro.

Trovare un nuovo thunk è quasi istantaneo, poiché il kernel mantiene una frontiera di thunks pronti per l’esecuzione per ogni DAG, e ha solo bisogno di sceglierne uno a caso. La maggior parte del tempo del kernel viene invece speso nell’aggiornamento della frontiera ogni volta che un thunk inizia l’esecuzione (deve essere rimosso dalla frontiera), termina l’esecuzione (i suoi discendenti possono essere aggiunti alla frontiera, a seconda che rimangano genitori non eseguiti), oppure viene perso a causa dell’indisponibilità del worker che ne deteneva il risultato (i suoi discendenti devono essere rimossi dalla frontiera, ma lo stesso thunk può essere riaggiunto se i suoi genitori sono ancora disponibili).

La gestione delle frontiere è un’operazione con una variabilità molto elevata, può richiedere da un microsecondo a diversi secondi — oltre un milione di volte di più! Ad esempio, un shuffle step ha uno strato di $N$ thunks che leggono gli output di un altro strato di $M$ thunks. Ogni thunk a valle legge gli output di tutti i $M$ thunks a monte, risultando in $M\cdot N$ archi nel DAG. Per $M = N = 1000$ (un grado di parallelizzazione molto probabile, quando si devono gestire miliardi di righe), ciò equivale a un milione di archi. Se non controllato, questo fenomeno può far sì che il kernel si arresti per alcuni secondi durante i quali non vengono pianificati nuovi thunks, con conseguenza che fino a 32 core rimangono inattivi4.

Abbiamo risolto questo problema introducendo nodi virtuali nel DAG per rappresentare questo tipo di connessione tra gli strati. Il nodo virtuale ha $M$ input (uno per ogni thunk nello strato a monte) e $N$ output (uno per ogni thunk nello strato a valle). Questo riduce il numero di archi a $M + N$, rendendolo significativamente più gestibile!

Generazione di codice a bassa granularità

Le prime versioni di Envision, nel 2013 e nel 2014, operavano con il presupposto che ogni operazione sui vettori venisse eseguita da un singolo thunk. Durante l’esecuzione di T.A / (T.B + 1) c’era un thunk per il broadcasting del valore 1 nella tabella T, un secondo thunk per sommare T.B al risultato del primo thunk, e un terzo thunk per dividere T.A per il risultato del secondo. Questo aveva il vantaggio di poter implementare facilmente ogni operazione come una funzione C#, eseguita come un singolo thunk, il che era un’ottima idea durante le prime fasi di implementazione di un DSL. Tuttavia, ha lo svantaggio che vengono consumate quantità eccessive di memoria (il primo thunk avrebbe prodotto un vettore di milioni di copie del valore 1), e la memoria richiede tempo per essere scritta e letta nuovamente.

Era imperativo avere thunks che valutassero diverse operazioni in successione, invece di avere un thunk per ogni operazione.

Molti database SQL operano su variazioni del modello volcano, in cui la query viene trasformata in un albero di iteratori. Ogni iteratore agisce come una funzione impura che restituisce il valore successivo dell’iterazione ogni volta che viene chiamato, e può chiamare ricorsivamente altri iteratori. In questo modello, il broadcasting di uno scalare in una tabella sarebbe un iteratore a ritorno costante, mentre l’addizione o la divisione di due vettori manterrebbe riferimenti a due iteratori, e la lettura da un vettore ne incrementerebbe l’indice:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Compilare una query nel modello volcano consiste nel costruire l’albero degli iteratori:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Questo ha il vantaggio che non vengono effettuate allocazioni di memoria per i vettori intermedi. Tuttavia, il sovraccarico delle chiamate alle funzioni domina le semplici operazioni aritmetiche eseguite da tali funzioni.

Per questo motivo, nel 2015 Envision si è spostato verso la generazione di codice just-in-time. Il principio è abbastanza simile al motore di esecuzione Tungsten di Apache Spark: compilare l’operazione T.A / (T.B + 1) in una funzione in un linguaggio imperativo.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

Il target che utilizziamo per questa compilazione è il .NET IL, il linguaggio bytecode utilizzato da .NET per i suoi assembly. Questo ci consente di sfruttare il compilatore JIT di .NET per produrre codice macchina ottimizzato a partire dal nostro IL generato.

Questa generazione di codice a runtime si è rivelata il più grande ostacolo durante la migrazione di Envision da .NET Framework a .NET Core nel 2017. Infatti, mentre .NET Core supporta le stesse API System.Reflection del .NET Framework per produrre ed eseguire IL a runtime, non supporta il salvataggio di quell’IL su disco come DLL. Pur non essendo questo un requisito per l’esecuzione di Envision, lo è certamente per lo sviluppo del compilatore di Envision! System.Reflection non interviene per impedire la creazione di IL non valido, segnalando soltanto un piuttosto inutile InvalidProgramException quando viene eseguito un metodo contenente IL non valido. L’unico approccio ragionevole per investigare tali problemi è salvare un file assembly e utilizzare ILVerify o ILSpy. A causa di questo requisito, abbiamo effettivamente continuato a puntare sia su .NET Framework che su .NET Core per due anni—la produzione sarebbe eseguita su .NET Core e il debugging dell’IL sarebbe effettuato su .NET Framework. Infine, nel 2019 abbiamo pubblicato la nostra libreria Lokad.ILPack come sostituto di questa funzionalità, e abbiamo abbandonato il .NET Framework.

Questo conclude l’analisi di oggi su come Envision esegue gli script. Nel prossimo articolo, discuteremo di come vengono memorizzati i risultati intermedi.

Plug senza vergogna: stiamo assumendo software engineers. È possibile lavorare da remoto.


  1. I worker trasmettono al cluster ogni volta che avviano un nuovo thunk, e evitano di eseguire thunks che altri worker hanno già reclamato. Resta il raro caso in cui due worker avviino lo stesso thunk quasi contemporaneamente; evitiamo questo facendo in modo che ogni worker scelga un thunk casuale dalla frontiera, e facendo sì che lo scheduler riduca il numero di worker quando la frontiera si restringe troppo. Ciò significa che l’esecuzione duplicata non è impossibile, ma è molto improbabile. ↩︎

  2. Per due tabelle grandi viene utilizzato un join shuffle costoso, mentre il più economico map-side join viene impiegato quando una delle tabelle è abbastanza piccola da poter essere contenuta in memoria. ↩︎

  3. Per i thunks senza antenati, come quelli che leggono dai file di input, includiamo l’hash del contenuto di tali file all’interno del corpo del thunk. Ciò garantisce che se due thunks leggono lo stesso file di input, avranno lo stesso hash, e se leggono due file diversi, comprese due versioni differenti dello stesso file in un dato percorso, allora avranno hash differenti. ↩︎

  4. Ciò ha anche un effetto sulla dimensione della serializzazione. Infatti, se tutti gli archi sono rappresentati nel DAG serializzato, anche con solo due byte per arco, rappresenterebbero già 2MB di dati! ↩︎