Questo articolo è il quarto di una serie in quattro parti sul funzionamento interno della macchina virtuale Envision: il software che esegue gli script di Envision. Vedi parte 1, parte 2 e parte 3. Questa serie non tratta il compilatore di Envision (forse in un’altra occasione), quindi assumiamo semplicemente che lo script sia stato in qualche modo convertito nel bytecode che la macchina virtuale Envision accetta in ingresso.

I precedenti articoli hanno esaminato principalmente come i worker eseguivano gli script di Envision. Tuttavia, sia per resilience sia per le prestazioni, Envision viene effettivamente eseguito su un cluster di macchine.

Ogni livello in un worker comunica con lo stesso livello negli altri worker, o con altri livelli nello stesso worker. Ciò garantisce che la comunicazione di rete possa restare un dettaglio di implementazione privato di ogni livello.

A un livello basso, ogni worker apre due connessioni TLS con ogni altra macchina del cluster, e le comunicazioni dai vari livelli vengono multiplexate attraverso queste due connessioni (una connessione viene usata per messaggi brevi, l’altra per trasferimenti di dati di grandi dimensioni).

Esecuzione distribuita astratta

Livello di Controllo

Questo livello viene usato dallo scheduler per assegnare e revocare le missioni ai worker e non comporta comunicazione diretta tra i worker. I messaggi principali di questo livello sono:

  • Lo scheduler chiede al worker di iniziare a lavorare su una missione.
  • Lo scheduler chiede al worker di smettere di lavorare su una missione.
  • Il worker comunica allo scheduler di aver incontrato un errore catastrofico durante l’esecuzione di una missione (solitamente un problema non deterministico, ad esempio “NVMe drive si è incendiato”, il che significa che la stessa missione può essere riprovata in futuro o su un altro worker).
  • Il worker fornisce allo scheduler statistiche sul suo stato attuale: elenco delle missioni, dimensione del fronte di ogni DAG della missione, numero totale di thunks rimasti da eseguire in ogni DAG della missione.

Lo scheduler utilizza queste statistiche per decidere quando riassegnare le missioni. Le regole effettive per farlo sono piuttosto complesse, poiché dipendono da regole di priorità, equità tra diversi tenant e tra script dello stesso tenant, e dal carico complessivo del cluster a quel punto, ma la tendenza generale è che le missioni con un fronte sufficientemente ampio possano essere distribuite su più worker, purché tali worker non siano già sovraccarichi. Dato lo stesso quantitativo di lavoro da svolgere, è più efficiente eseguire quattro missioni, una per ciascun worker, piuttosto che distribuirle tutte su tutti i worker.

Livello di Esecuzione

Ogni worker tiene traccia dei thunks che sta eseguendo attualmente, e trasmette questa lista agli altri worker ogni volta che pianifica un nuovo thunk1. Ciò garantisce che, al di fuori del brevissimo intervallo legato alla latenza di rete, due worker non inizieranno ad eseguire lo stesso thunk.

Naturalmente, se un worker smette di inviare questi aggiornamenti (ad esempio, perché è andato in crash o si è disconnesso dal resto del cluster), i suoi peer considereranno ogni lista vecchia di qualche secondo come obsoleta, e si permetteranno di eseguire quei thunks.

Livello dei Metadati

Ogni worker cerca di mantenere una copia dei metadati completi, ma non li sincronizza effettivamente. Abbiamo scelto di non garantire che tutti i worker concordino sugli stessi metadati esatti, lavorando invece con garanzie di consistenza eventuale. Questo rende la distribuzione del livello dei metadati la più impegnativa in termini di progettazione2.

La consistenza eventuale di questo livello segue tre regole principali:

  1. Ogni modifica locale al livello dei metadati viene trasmessa immediatamente a tutti gli altri worker. Questa trasmissione può fallire e non verrà tentata nuovamente.
  2. Le modifiche remote ricevute dagli altri worker vengono fuse nel livello dei metadati locale, basandosi su una progressione monotona3: un valore “no result” per un thunk può essere sovrascritto da un valore “checkpoint” (che indica che il thunk è iniziato, ma non ha terminato l’esecuzione), il quale può essere sovrascritto da un valore “alias” (che indica che il thunk ha restituito un DAG da eseguire al suo posto), che a sua volta può essere sovrascritto da un valore “result” (che può essere un risultato positivo con i relativi atomi, o un errore fatale).
  3. Ogni volta che un altro livello invia una risposta di rete basata su un valore del livello dei metadati, il livello dei metadati trasmette nuovamente quel valore.

La terza regola è progettata per forzare un livello di sincronizzazione quando è effettivamente rilevante. Ad esempio, consideri la seguente sequenza di eventi:

  • Lo scheduler chiede a un worker di eseguire una missione (attraverso il livello di controllo)
  • Il worker esegue la missione e trasmette il risultato (attraverso il livello dei metadati), ma il messaggio va perso lungo il percorso verso lo scheduler.
  • Lo scheduler nota che il worker non sta più eseguendo la missione (attraverso il livello di controllo), e gli chiede di eseguirla nuovamente.
  • Il worker osserva che il thunk della missione ha già un risultato nel livello dei metadati, e non fa nulla, perché non è necessario intervenire.

Si tratta di un deadlock in cui lo scheduler e il worker non concordano sullo stato di un thunk nel livello dei metadati (il worker ritiene che sia completato, lo scheduler che non lo sia). La terza regola risolve il problema decidendo che, dato che la risposta del worker «Non lavora più su questa missione» si basa sull’osservazione che il thunk ha un risultato, il livello dei metadati dovrebbe trasmettere nuovamente tale informazione. Il deadlock viene così risolto:

  • Il livello dei metadati del worker trasmette nuovamente il risultato del thunk, e questo viene ricevuto dallo scheduler.
  • Lo scheduler reagisce all’apparizione di un risultato per il thunk di una missione, segnando tale missione come completata e notificando il client che l’ha richiesta.

Livello degli Atomi

I worker combinano i loro livelli degli atomi per creare un blob store distribuito, in cui ogni atomo può essere richiesto tramite il suo identificatore—l’hash a 128 bit del suo contenuto, creato con SpookyHash. Questo non è una tabella hash distribuita (DHT), perché fornirebbe compromessi sbagliati: in un DHT, trovare un atomo sarebbe veloce (dato il suo hash, l’identificatore del worker che lo detiene può essere calcolato con una semplice funzione), ma scrivere un atomo sarebbe lento (dovrebbe essere inviato dalla macchina che lo ha calcolato, alla macchina che dovrebbe contenerlo secondo la disposizione attuale del DHT). Visto che la maggior parte degli atomi è destinata ad essere consumata sulla stessa macchina che li ha prodotti, ciò risulta uno spreco.

Invece, ogni volta che un worker richiede un atomo dal proprio livello degli atomi, prima lo cerca sui propri dischi NVMe. Se non viene trovato, allora si interroga gli altri worker sull’esistenza di quell’atomo. Questa è la più grande sfida in termini di prestazioni del design distribuito di Envision, poiché queste richieste devono essere completate il più rapidamente possibile, ed è necessaria una strategia complessa di timeout per gestire i worker non responsivi: se si attende troppo a lungo, si sprecheranno secondi in attesa di una risposta che non arriverà mai; arrendersi troppo presto, e si dovrà ricalcolare un atomo che avrebbe potuto essere scaricato da un altro worker.

Per aiutare in questo, il livello degli atomi raggruppa anche più richieste insieme, per garantire che tutti gli altri worker mantengano una pipeline completa delle richieste a cui devono rispondere, e per rilevare più facilmente quando i tempi di risposta di un worker aumentano improvvisamente.

Una volta che almeno un altro worker ha confermato l’esistenza dell’atomo sul proprio disco, viene inviata una seconda richiesta per scaricare l’atomo. Tali richieste di download tendono a essere molto irregolari, poiché molti thunks richiedono prima i loro atomi, per poi iniziare ad elaborare i loro contenuti. Per questo motivo, il livello degli atomi è consapevole che esiste una singola coda di download per ogni coppia di worker, e non va in panico se una determinata richiesta di atomo non riceve il suo primo byte per diversi secondi (se la coda è piena e altri atomi stanno ricevendo i loro byte, allora non c’è nulla di cui preoccuparsi). In un certo senso, il timeout non è a livello di richiesta dell’atomo, ma a livello dell’intero strato.

Inoltre, sono applicate due ottimizzazioni alla coda di trasferimento:

  1. Ogni richiesta specifica quale thunk necessita dei dati, in modo che il mittente cerchi di raggruppare insieme le richieste dello stesso thunk (più velocemente un dato thunk viene sbloccato, più rapidamente potrà iniziare ad elaborare i suoi input).
  2. Quando l’esecuzione di un thunk viene annullata (a causa di un errore, di una modifica di priorità, o perché si scopre che un altro worker lo ha già completato), il livello degli atomi comunica questa cancellazione affinché tutte le richieste di quel thunk possano essere rimosse dalla coda di download.

Un worker tipico invierà dati a raffica a 1GB/s, coprendo solitamente 7GB di dati per raffica.

Livello di Logging

Questo livello conserva informazioni aggiuntive sullo stato di esecuzione, in modo da poterle esaminare successivamente per indagare su eventuali problemi o misurare le prestazioni. È molto dettagliato, contenente informazioni quali quali thunks sono stati eseguiti, quanto tempo hanno impiegato e che tipo di risultato hanno prodotto. Eventi importanti, come la costruzione di un nuovo DAG (incluso il DAG serializzato stesso), o la scoperta che un atomo manca, vengono anch’essi registrati. In totale, per ogni worker vengono prodotti diversi gigabyte di dati al giorno.

Per minimizzare l’impatto sulle prestazioni, ogni worker scrive i log accumulati ogni 60 secondi, o ogni qualvolta vengano raggiunti 4 megabyte (cosa che spesso accade durante un picco di attività). Questi vengono scritti in un block blob di Azure Blob Storage4, e ogni worker ha il proprio blob dedicato per evitare di dover supportare più scrittori su un singolo blob.

Successivamente, abbiamo altre macchine (al di fuori dell’ambiente di produzione di Envision) che possono leggere questi log blob in seguito, e compilare statistiche dettagliate su ciò che è accaduto nel cluster.

Annuncio senza peli sulla lingua: stiamo cercando software engineers. È possibile lavorare in remoto.


  1. Questo potrebbe sembrare uno spreco in termini di banda, ma consideri che ogni identificatore di thunk pesa 24 byte, e ci sono fino a 32 thunks per worker, per cui ogni aggiornamento occupa solo 768 byte—meno di un pacchetto TCP! ↩︎

  2. Sebbene, in termini di prestazioni, il livello degli atomi sia di gran lunga più impegnativo. ↩︎

  3. Il livello dei metadati è sostanzialmente un enorme orologio vettoriale, dove gli orologi sono mantenuti per thunk piuttosto che per worker. ↩︎

  4. Perché non Append Blobs? Beh, sia i Block Blobs che gli Append Blobs hanno gravi problemi di prestazioni nella lettura di un file composto da molte piccole scritture: la velocità di lettura scende da ~60MB/s per un blob normale, a meno di ~2MB/s! Un blob di log da 5GB impiega circa 40 minuti per essere letto a quella velocità. Abbiamo contattato Microsoft riguardo a questo problema, ma non ci sono piani per risolverlo. Per aggirare questo problema, facciamo affidamento sul fatto che un Block Blob può essere manualmente ricompattato (prendere le ultime 1000 piccole scritture, cancellarle dal blob, e riscriverle come una singola grande scrittura), mentre un Append Blob non può essere modificato in questo modo. ↩︎