Questo articolo è il terzo di una serie in quattro parti riguardante i meccanismi interni della macchina virtuale Envision: il software che esegue gli script di Envision. Vedi parte 1, parte 2 e parte 4. Questa serie non copre il compilatore Envision (forse in un’altra occasione), quindi assumiamo semplicemente che lo script sia stato in qualche modo convertito nel bytecode che la macchina virtuale Envision utilizza come input.

Durante l’esecuzione, i thunk leggono i dati di input e scrivono i dati di output, spesso in grandi quantità.

  • Un miliardo di booleani (un bit per valore) occupa 125MB.
  • Un miliardo di numeri in virgola mobile (precisione a 32 bit) occupa 4GB.
  • Un miliardo di linee di vendita minimali (data, località, EAN-13, quantità) occupa tra i 14GB e i 33GB (o più!) a seconda di come vengono codificati i valori.

Questo crea due sfide: come preservare questi dati dal momento in cui vengono creati fino al loro utilizzo (parte della risposta risiede nell’utilizzo di NVMe drives distribuiti su diverse macchine), e come minimizzare la quantità di dati che transitano attraverso canali più lenti della RAM (rete e archiviazione persistente).

Atomi e Archiviazione dei Dati

Livello dei Metadati

Una parte della soluzione consiste nell’avere due livelli di dati separati, in cui i dati vengono spinti in uno dei due livelli in base alla loro natura. Il livello dei metadati contiene informazioni sui dati effettivi e sugli script in esecuzione:

  • Quando un thunk ha restituito con successo dei dati, l’identificatore univoco di quei dati viene conservato in questo livello.
  • Quando un thunk fallisce, i messaggi di errore prodotti dal thunk vengono conservati in questo livello.
  • Quando un thunk ha restituito un nuovo thunk (e il DAG dei suoi genitori), il DAG serializzato viene conservato in questo livello.
  • Un thunk può salvare checkpoint nel livello dei metadati (solitamente costituito da un blocco dell’identificatore dei dati); se un thunk viene interrotto prima del completamento, può quindi caricare il checkpoint dal livello dei metadati e riprendere il lavoro da quella posizione.

In altre parole, il livello dei metadati può essere visto come un dizionario che mappa i thunk ai risultati, dove la natura esatta del risultato dipende da ciò che il thunk ha effettivamente restituito.

Il livello dei metadati può anche contenere informazioni aggiuntive sulla struttura dei dati a cui si fa riferimento. Per esempio, se un thunk ha restituito una coppia di vettori, allora i metadati conterranno l’identificatore univoco di ciascun vettore. Questo consente ai consumatori di accedere a un vettore senza dover caricare entrambi.

Ci sono due limiti sui valori memorizzati nel livello dei metadati: una voce non può superare i 10MB (quindi anche un DAG serializzato non è autorizzato a superare questa quantità!), e lo spazio totale di memorizzazione per il livello dei metadati è di 1,5GB. Di solito, ci sono circa un milione di valori in questo livello, per una dimensione media della voce di 1,5KB.

Il livello dei metadati risiede sempre nella RAM per garantire un accesso rapido. Funziona come fonte di verità per l’esecuzione dei thunk: un thunk è stato eseguito se e solo se esiste un risultato associato a quel thunk nel livello dei metadati—anche se ciò non garantisce che i dati a cui si fa riferimento in quel risultato siano disponibili.

Ogni worker nel cluster mantiene una propria copia del livello dei metadati. Il worker trasmette ogni modifica a questo livello (causata dall’esecuzione dei thunk locali) a tutti gli altri worker del cluster, e anche allo scheduler. Questo viene fatto con il massimo impegno: se un messaggio broadcast non raggiunge la sua destinazione, viene scartato1 senza ulteriori tentativi.

Ogni secondo, il livello dei metadati viene salvato su disco, in maniera incrementale. In caso di crash o riavvio, il worker impiegherà un paio di secondi per ricaricare l’intero livello da disco per ricordare cosa stava facendo.

Mantenere grandi database in memoria

Come accennato sopra, il livello dei metadati può contenere un milione di voci. Ogni singolo DAG può contenere centinaia di migliaia di nodi. Tutti questi hanno una durata lunga — da minuti a ore. Mantenere in memoria milioni di oggetti a lunga durata mette a dura prova il garbage collector di .NET.

La garbage collection in .NET è un argomento complesso (anche se esiste una eccellente serie di Konrad Kokosa per approfondire i dettagli a basso livello), ma il problema complessivo è una combinazione di tre fatti:

  • Il costo in termini di prestazioni di un passaggio di garbage collection è proporzionale al numero di oggetti in vita nell’area di memoria oggetto di garbage collection. Processare milioni di oggetti, spesso con miliardi di riferimenti da seguire, richiederà al garbage collector diversi secondi per essere completato.
  • Per evitare questo costo, il garbage collector di .NET opera con aree di memoria separate, chiamate generazioni, a seconda dell’età degli oggetti in esse contenuti. La generazione più giovane, Gen0, subisce la garbage collection frequentemente ma contiene solo gli oggetti allocati dall’ultimo passaggio (quindi, solo pochi). La generazione più vecchia, Gen2, viene raccolta solo se sia Gen1 che Gen0 sono state raccolte ma non hanno liberato abbastanza memoria. Questo sarà abbastanza raro finché la maggior parte delle allocazioni di oggetti è piccola e di breve durata.
  • Tuttavia, un’operazione normale di un thunk coinvolge grandi array di valori, che sono allocati nel Large Object Heap, un’area separata da Gen0, Gen1 e Gen2. Quando il Large Object Heap esaurisce lo spazio, viene eseguita una garbage collection completa, che include anche la raccolta di Gen2.

E Gen2 è il luogo in cui risiedono i milioni di oggetti provenienti dai DAG e dal livello dei metadati.

Per evitare ciò, abbiamo progettato sia i DAG che il livello dei metadati in modo da utilizzare pochissimi oggetti.

Ogni DAG consiste in sole due allocazioni—un array di nodi e un array di archi, entrambi sono tipi di valore non gestiti, in modo che il GC non debba nemmeno attraversare i loro contenuti per seguire eventuali riferimenti in essi contenuti. Quando un thunk è necessario per essere eseguito, esso viene deserializzato dalla rappresentazione binaria del DAG2, che è presente nel livello dei metadati.

Il livello dei metadati ha contenuti a lunghezza variabile, per cui viene costruito estraendo pezzi da un grande byte[], utilizzando ref struct e MemoryMarshal.Cast per manipolare i dati senza copiarli.

Spazio Temporaneo

Un cluster dispone di una RAM compresa tra 512GiB e 1.5TiB, e di uno spazio NVMe compreso tra 15.36TB e 46.08TB. La maggior parte di questo spazio è destinata a memorizzare i risultati intermedi dell’evaluazione dei thunk.

La RAM è un bene prezioso: rappresenta solo il 3% dello spazio di archiviazione disponibile, ma è tra 100× e 1000× più veloce in lettura e scrittura. Esiste un notevole vantaggio nel garantire che i dati che stanno per essere letti da un thunk siano già presenti in memoria (o non siano mai usciti dalla memoria fin dall’inizio).

Inoltre, è praticamente impossibile utilizzare il 100% della RAM disponibile in .NET—il sistema operativo ha esigenze di memoria variabili e non dispone di un metodo affidabile per comunicare al processo .NET che debba cedere parte della memoria, con il rischio che il processo venga terminato per esaurimento della memoria (OOM).

Envision risolve questo problema delegando la gestione dei trasferimenti da RAM a NVMe al sistema operativo. Abbiamo reso open source questo codice come Lokad.ScratchSpace. Questa libreria mappa in memoria tutto lo spazio di archiviazione disponibile sulle unità NVMe, ed lo espone come un blob store che l’applicazione può utilizzare per:

  1. scrivere blocchi di dati (fino a 2GB ciascuno) nello scratch space, direttamente o serializzando da un oggetto gestito. Questa operazione restituisce un identificatore di blocco.
  2. leggere blocchi di dati utilizzando i loro identificatori. Questa operazione fissa il blocco e lo espone all’applicazione come un ReadOnlySpan<byte>, che poi l’applicazione dovrà copiare (o deserializzare) in memoria gestita.

Una volta che lo scratch space è pieno, i blocchi più vecchi vengono rimossi per fare spazio a nuovi dati. Ciò significa che è possibile che un’operazione di lettura fallisca, se l’identificatore punta a un blocco che è stato eliminato, ma questo è un caso raro durante l’esecuzione di uno script Envision—raramente una singola esecuzione produce decine di terabyte. D’altra parte, ciò potrebbe impedire a una nuova esecuzione di riutilizzare i risultati di una precedente.

La chiave per utilizzare uno scratch space mappato in memoria è che la RAM disponibile è distribuita tra tre tipi di pagine3: memoria che appartiene ai processi (come il processo .NET di Envision), memoria che è una copia esatta byte-per-byte di una porzione di file su disco, e memoria destinata ad essere scritta su un file su disco.

La memoria che è una copia di un file su disco può, in qualsiasi momento, essere rilasciata dal sistema operativo e utilizzata per un altro scopo—essere assegnata a un processo per uso personale, o diventare una copia di un’altra porzione di un file su disco. Pur non essendo istantanee, queste pagine agiscono come un buffer di memoria che può essere rapidamente riassegnato a un altro uso. E finché non vengono riassegnate, il sistema operativo sa che contengono una copia di una specifica regione di memoria persistente, pertanto ogni richiesta di lettura per quella regione verrà reindirizzata alla pagina esistente, evitando così del tutto un caricamento da disco.

La memoria destinata ad essere scritta su disco, verrà infine scritta e diventerà una copia della regione in cui è stata scritta. Questa conversione è limitata dalla velocità di scrittura delle unità NVMe (nell’ordine di 1GB/s).

La memoria assegnata al processo non può essere riconvertita nei due altri tipi senza essere rilasciata dal processo (cosa che il GC di .NET a volte farà, dopo che una raccolta ha liberato una grande quantità di memoria). Tutta la memoria allocata attraverso .NET, inclusi tutti gli oggetti gestiti e tutto ciò che il GC supervisiona, deve appartenere a questo tipo di memoria.

In un worker tipico, il 25% della memoria è assegnato direttamente al processo .NET, il 70% è una copia in sola lettura delle regioni del file, e il 5% è in fase di scrittura.

Livello degli Atomi

Il principio generale è che ogni thunk scrive il proprio output nello scratch space sotto forma di uno o più atomi, quindi memorizza gli identificatori di quegli atomi nel livello dei metadati. I thunk successivi caricano quindi questi identificatori dal livello dei metadati e li utilizzano per interrogare lo scratch space per ottenere gli atomi di cui hanno bisogno.

Il nome «Atomo» è stato scelto perché non è possibile leggere solo una parte di un atomo: possono essere recuperati soltanto nella loro interezza. Se una struttura dati necessita di supportare la richiesta solo di una parte del suo contenuto, la salviamo invece come più atomi, che possono essere recuperati indipendentemente.

Alcuni atomi sono compressi; per esempio, la maggior parte dei vettori booleani non è rappresentata come bool[], che consuma un byte per elemento, ma viene invece compattata fino a 1 bit per valore, per poi essere compressa per eliminare lunghe sequenze di valori identici.

È possibile che alcuni atomi scompaiano, anche se ciò è un caso raro. Le due principali situazioni in cui ciò può accadere sono quando il livello dei metadati ricorda un risultato da un’esecuzione precedente, ma l’atomo corrispondente è stato espulso dallo scratch space nel frattempo, e quando l’atomo è stato memorizzato su un worker diverso che non risponde più alle richieste. Meno frequentemente, un errore di checksum rivela che i dati memorizzati non sono più validi e devono essere scartati.

Quando un atomo scompare, il thunk che lo ha richiesto viene interrotto ed entra in modalità di recupero:

  1. Il sistema verifica la presenza (ma non i checksum) di tutti gli altri atomi referenziati dagli input del thunk. Ciò perché gli atomi tendono a essere generati allo stesso tempo e sullo stesso worker, e la scomparsa di un atomo è correlata alla scomparsa di altri atomi generati nello stesso periodo e luogo.
  2. Il sistema esamina il livello dei metadati per trovare riferimenti a qualsiasi atomo risultato mancante durante il passaggio precedente. Ciò farà sì che alcuni thunk passino da “eseguito” a “non ancora eseguito” perché il loro risultato è stato scartato. Il kernel rileverà quindi questa situazione e li programmerà nuovamente.

I thunk ri-eseguiti produrranno nuovamente l’atomo, e l’esecuzione potrà riprendere.

Array di Atomi

Un aspetto particolare del livello degli atomi è il modo in cui vengono effettuate le mescolanze—una prima serie di $M$ thunk ciascuno produce diversi milioni di righe di dati, e poi una seconda serie di $N$ thunk legge l’output della serie precedente per eseguire un’altra operazione (di solito, una forma di riduzione), ma ogni singola riga della prima serie viene letta una sola volta da un thunk della seconda serie.

Sarebbe molto inefficiente se ogni thunk della seconda serie dovesse leggere tutti i dati della prima serie (ogni riga verrebbe letta $N$ volte, di cui $N-1$ sarebbero inutili), ma ciò è esattamente ciò che accadrebbe se ogni thunk della prima serie producesse esattamente un atomo.

D’altra parte, se ogni thunk nel primo livello produce un atomo per ogni thunk nel secondo livello, l’operazione di shuffle coinvolgerà in totale $M\cdot N$ atomi — un milione di atomi per $M = N = 1000$. Sebbene l’overhead sugli atomi non sia eccessivo, sommando un identificatore di atomo, un identificatore di tenant, il tipo di dato dell’atomo, la dimensione e un po’ di contabilità, può comunque raggiungere qualche centinaio di byte per atomo. Mentre 100MB possono sembrare un prezzo contenuto da pagare per spostare 4GB di dati effettivi, tali dati effettivi risiedono nello strato degli atomi (che è progettato per grandi quantità di dati), mentre 100MB rappresentano una fetta considerevole del budget totale di 1,5GB dello strato dei metadati.

Per ovviare a questo, Envision supporta array di atomi:

  • Tutti gli atomi in un array di atomi vengono scritti contemporaneamente e sono tenuti insieme sia in memoria che sul disco.
  • Dato l’identificatore dell’array di atomi, è facile derivare l’identificatore del i-esimo atomo nell’array.

Grazie a ciò, un array di atomi ha lo stesso overhead di un singolo atomo. In uno shuffle, i thunk del primo livello produrrebbero $M$ array di $N$ atomi ciascuno. I thunk del secondo livello richiederebbero ciascuno $M$ atomi, uno da ciascun array, nella posizione corrispondente al rango di quel thunk nello shuffle.

In conclusione, alcune statistiche di produzione! In un’ora, un lavoratore tipico eseguirà 150 000 thunk e scriverà 200 000 atomi (gli array di atomi vengono contati solo una volta) che rappresentano 750GiB di dati intermedi.

Nel prossimo e ultimo articolo di questa serie, discuteremo degli strati che permettono l’esecuzione distribuita.

Shameless plug: stiamo assumendo ingegneri del software. Il lavoro da remoto è possibile.


  1. I messaggi vengono scartati molto raramente, e anche se è meglio per le prestazioni che nessun messaggio venga scartato, ciò non è necessario per la correttezza. Si presume che il livello dei metadati di ogni worker sarà leggermente fuori sincronia con gli altri, e sebbene questo ne ostacoli la capacità di cooperare in missioni specifiche, ogni worker rimane in grado di portare a termine ogni missione autonomamente. Questo ci permette di evitare la complessità di impostare una consegna almeno una volta. ↩︎

  2. Questa deserializzazione comporta anche un notevole livello di decompressione, poiché applichiamo diverse tecniche complesse per mantenere al minimo la dimensione totale di un DAG serializzato. ↩︎

  3. In realtà esistono altri tipi di pagine, e questo articolo fornisce solo una panoramica molto limitata per quanto riguarda Envision. ↩︎