Questo articolo è il quarto di una serie di quattro parti sulle dinamiche interne della macchina virtuale Envision: il software che esegue gli script di Envision. Vedi parte 1, parte 2 e parte 3. Questa serie non copre il compilatore di Envision (forse in futuro), quindi supponiamo semplicemente che lo script sia stato convertito in bytecode che la macchina virtuale Envision prende in input.

Gli articoli precedenti hanno principalmente esaminato come i singoli lavoratori eseguono gli script di Envision. Tuttavia, sia per la resilienza che per le prestazioni, Envision viene effettivamente eseguito su un cluster di macchine.

Ogni livello di un lavoratore comunica con lo stesso livello negli altri lavoratori o con altri livelli nello stesso lavoratore. Ciò garantisce che la comunicazione di rete possa rimanere un dettaglio di implementazione privato di ogni livello.

A livello basso, ogni lavoratore apre due connessioni TLS con ogni altra macchina nel cluster, e le comunicazioni dei vari livelli vengono multiplexate attraverso queste due connessioni (una connessione viene utilizzata per i messaggi brevi, l’altra per i trasferimenti di dati di grandi dimensioni).

Esecuzione distribuita astratta

Livello di Controllo

Questo livello viene utilizzato dallo scheduler per assegnare e revocare missioni ai lavoratori e non coinvolge alcuna comunicazione tra lavoratori. I principali messaggi di questo livello sono:

  • Lo scheduler chiede al lavoratore di iniziare a lavorare su una missione.
  • Lo scheduler chiede al lavoratore di smettere di lavorare su una missione.
  • Il lavoratore comunica allo scheduler che ha riscontrato un errore catastrofico durante l’esecuzione di una missione (di solito un problema non deterministico, come “il drive NVMe ha preso fuoco”, il che significa che la stessa missione può essere riprovata in futuro o su un altro lavoratore).
  • Il lavoratore fornisce allo scheduler le statistiche sul suo stato attuale: elenco delle missioni, dimensione della frontiera di ciascun DAG della missione, numero totale di thunks da eseguire in ciascun DAG della missione.

Lo scheduler utilizza queste statistiche per decidere quando riassegnare le missioni. Le regole effettive per farlo sono piuttosto complesse, poiché dipendono da regole di priorità, equità tra più tenant e tra script dello stesso tenant e dal carico complessivo del cluster in quel momento, ma la tendenza generale è che le missioni con una frontiera sufficientemente ampia possano essere distribuite su più lavoratori, purché quei lavoratori non siano già sovraccaricati. Dato lo stesso quantitativo di lavoro da eseguire, è più efficiente eseguire quattro missioni su un singolo lavoratore piuttosto che distribuirle su tutti i lavoratori.

Livello di Esecuzione

Ogni lavoratore tiene traccia dei thunks che sta eseguendo attualmente e invia questa lista agli altri lavoratori ogni volta che pianifica un nuovo thunk1. Ciò garantisce che, al di fuori della finestra temporale molto breve legata alla latenza di rete, due lavoratori non inizieranno ad eseguire lo stesso thunk contemporaneamente.

Ovviamente, se un lavoratore smette di inviare questi aggiornamenti (ad esempio, perché è andato in crash o è stato disconnesso dal resto del cluster), i suoi peer considereranno qualsiasi elenco più vecchio di pochi secondi come obsoleto e si permetteranno di eseguire quei thunks.

Livello dei Metadati

Ogni lavoratore cerca di mantenere una copia completa dei metadati, ma non li sincronizza effettivamente. Abbiamo scelto di non garantire che tutti i lavoratori siano d’accordo sugli stessi metadati esatti e invece lavoriamo con garanzie di coerenza eventuale. Ciò rende la distribuzione del livello dei metadati la più impegnativa in termini di progettazione2.

La coerenza eventuale di questo livello segue tre regole principali:

  1. Ogni modifica locale al livello dei metadati viene immediatamente trasmessa a tutti gli altri lavoratori. Questa trasmissione può fallire e non verrà ripetuta.
  2. Le modifiche remote ricevute dagli altri lavoratori vengono unite al livello locale dei metadati, in base a una progressione monotona3: un valore “nessun risultato” per un thunk può essere sovrascritto da un valore “checkpoint” (che significa che il thunk è iniziato, ma non ha finito di eseguire), che può essere sovrascritto da un valore “alias” (che significa che il thunk ha restituito un DAG da eseguire al suo posto), che può essere sovrascritto da un valore “risultato” (che può essere un risultato di successo con i suoi atomi associati o un errore fatale).
  3. Ogni volta che un altro livello invia una risposta di rete basata su un valore nel livello dei metadati, il livello dei metadati trasmette nuovamente quel valore.

La terza regola è progettata per forzare un livello di sincronizzazione quando è effettivamente rilevante. Ad esempio, considera la seguente sequenza di eventi:

  • Lo scheduler chiede a un lavoratore di eseguire una missione (attraverso il livello di controllo)
  • Il lavoratore esegue la missione e trasmette il risultato (attraverso il livello dei metadati), ma il messaggio si perde durante il tragitto verso lo scheduler.
  • Lo scheduler nota che il lavoratore non sta più eseguendo la missione (attraverso il livello di controllo) e gli chiede di eseguirla di nuovo.
  • Il lavoratore osserva che il thunk della missione ha già un risultato nel livello dei metadati e non fa nulla, perché non c’è bisogno di fare nulla.

Questo è un punto morto in cui lo scheduler e il lavoratore non sono d’accordo sullo stato di un thunk nel livello dei metadati (il lavoratore crede che sia completato, lo scheduler crede che non lo sia). La terza regola risolve questo decidendo che poiché la risposta del lavoratore di «Non lavoro più su questa missione» si basa sull’osservazione del lavoratore che il thunk ha un risultato, allora il livello dei metadati dovrebbe trasmettere nuovamente queste informazioni. Il punto morto viene quindi risolto:

  • Il livello dei metadati del lavoratore trasmette nuovamente il risultato del thunk e viene ricevuto dallo scheduler.
  • Lo scheduler reagisce all’apparizione di un risultato per il thunk di una missione, segnalando quella missione come completata e notificando il client che ha richiesto quella missione.

Livello degli Atom

I lavoratori combinano i loro livelli di atomi per creare uno store di blob distribuito, in cui ogni atomo può essere richiesto dal suo identificatore, l’hash a 128 bit dei suoi contenuti, creato con SpookyHash. Questo non è una tabella hash distribuita (DHT), perché fornirebbe scambi sbagliati: in una DHT, trovare un atomo sarebbe veloce (dato il suo hash, l’identificatore del lavoratore che lo detiene può essere calcolato con una semplice funzione), ma scrivere un atomo sarebbe lento (dovrebbe essere inviato dalla macchina che lo ha calcolato alla macchina che si prevede lo detenga data la disposizione corrente della DHT). Dato che la maggior parte degli atomi si prevede che vengano consumati sulla stessa macchina che li ha prodotti, questo è uno spreco.

Invece, ogni volta che un lavoratore richiede un atomo dal proprio livello di atomi, prima cerca quell’atomo sulle proprie unità NVMe. Se non viene trovato, vengono interrogati gli altri lavoratori per verificare l’esistenza di quell’atomo. Questa è la sfida di prestazione più grande del design distribuito di Envision, poiché queste interrogazioni devono essere completate il più rapidamente possibile e è necessaria una complessa strategia di timeout per gestire i lavoratori non responsivi: aspettare troppo a lungo significa sprecare secondi in attesa di una risposta che non arriva mai; rinunciare troppo presto significa dover ricalcolare un atomo che avrebbe potuto essere scaricato da un altro lavoratore.

Per aiutare in questo, il livello degli atomi raggruppa anche più richieste insieme, per garantire che tutti gli altri lavoratori mantengano una pipeline completa di richieste a cui devono rispondere e per rilevare più facilmente quando i tempi di risposta di un lavoratore aumentano improvvisamente.

Una volta che almeno un altro lavoratore ha confermato l’esistenza dell’atomo sul proprio disco, viene inviata una seconda richiesta per scaricare l’atomo. Queste richieste di download tendono ad essere molto irregolari, poiché molti thunk richiedono prima i loro atomi e poi iniziano a elaborare i loro contenuti. Per questo motivo, il livello degli atomi è consapevole che c’è una singola coda di download per ogni coppia di lavoratori e non entra in panico se una determinata richiesta di atomo non riceve il suo primo byte per diversi secondi (se la coda è piena e altri atomi stanno ricevendo i loro byte, allora non c’è nulla di cui preoccuparsi). In un certo senso, il timeout non è a livello di richiesta di atomo, ma a livello dell’intero livello.

Inoltre, ci sono due ottimizzazioni applicate alla coda di trasferimento:

  1. Ogni richiesta specifica quale thunk ha bisogno dei dati, in modo che il mittente cerchi di raggruppare insieme le richieste dello stesso thunk (più velocemente un determinato thunk viene sbloccato, più velocemente sarà in grado di iniziare a elaborare i suoi input).
  2. Quando l’esecuzione di un thunk viene annullata (a causa di un errore, a causa di un cambio di priorità o perché si scopre che un altro lavoratore lo ha già completato), il livello degli atomi comunica questa cancellazione in modo che tutte le richieste di quel thunk possano essere eliminate dalla coda di download.

Un lavoratore tipico invierà dati a raffiche di 1 GB/s, coprendo di solito 7 GB di dati per raffica.

Livello di Logging

Questo livello conserva informazioni aggiuntive sullo stato dell’esecuzione, in modo che possano essere esaminate successivamente per indagare su problemi o misurare le prestazioni. È molto dettagliato, contenente informazioni come quali thunk sono stati eseguiti, quanto tempo ci hanno messo ad eseguirsi e che tipo di risultato hanno prodotto. Anche eventi importanti, come la costruzione di un nuovo DAG (compreso il DAG serializzato stesso) o la scoperta che manca un atomo, vengono registrati. In totale, vengono prodotti diversi gigabyte ogni giorno per ogni lavoratore.

Per ridurre l’impatto sulle prestazioni, ogni lavoratore scrive i log accumulati ogni 60 secondi o ogni volta che si accumulano 4 megabyte (cosa che accade spesso durante una raffica di attività). Questo viene scritto in un blocco di archiviazione di Azure Blob Storage4 e ogni lavoratore ha il proprio blob dedicato per evitare di dover supportare scrittori multipli su un singolo blob.

Abbiamo quindi altre macchine (al di fuori dell’ambiente di produzione di Envision) che possono leggere questi blob di log in seguito e compilare statistiche dettagliate su ciò che è successo nel cluster.

Pubblicità sfacciata: stiamo assumendo ingegneri del software. Il lavoro da remoto è possibile.


  1. Questo potrebbe sembrare uno spreco di larghezza di banda, ma considera che ogni identificatore di thunk pesa 24 byte e ci sono fino a 32 thunks per lavoratore, quindi ogni aggiornamento richiede solo 768 byte, meno di un pacchetto TCP! ↩︎

  2. Anche se, in termini di prestazioni, il livello degli atomi è molto più impegnativo. ↩︎

  3. Il livello dei metadati è essenzialmente un enorme vettore di orologi, in cui gli orologi sono mantenuti per-thunk anziché per-lavoratore. ↩︎

  4. Perché non Append Blobs? Beh, sia i Block Blobs che gli Append Blobs hanno problemi di prestazioni significativi quando si legge un file composto da molti piccoli scritti: le prestazioni di lettura scendono da ~60MB/s per un blob normale a meno di ~2MB/s! Un blob di log da 5GB richiede circa 40 minuti per essere letto a quella velocità. Abbiamo contattato Microsoft riguardo a questo problema, ma non ci sono piani per risolverlo. Per aggirare questo problema, ci affidiamo al fatto che un Block Blob può essere ricompattato manualmente (prendendo gli ultimi 1000 piccoli scritti, cancellandoli dal blob e riscrivendoli come un singolo grande scritto), mentre un Append Blob non può essere modificato in questo modo. ↩︎