Envision VM (parte 2), Thunks y el Modelo de Ejecución
Este artículo es el segundo de una serie de cuatro partes sobre el funcionamiento interno de la máquina virtual de Envision: el software que ejecuta los scripts de Envision. Consulta parte 1, parte 3 y parte 4. Esta serie no cubre el compilador de Envision (quizás en otra ocasión), así que asumamos que el script ha sido de alguna manera convertido al bytecode que la máquina virtual de Envision toma como entrada.
Al igual que la mayoría de los otros sistemas de ejecución paralela, Envision produce un grafo acíclico dirigido (DAG) donde cada nodo representa una operación que debe ejecutarse, y cada arista representa una dependencia de datos en la que el nodo descendente requiere la salida del nodo ascendente para ejecutarse.

Los nodos se llaman thunks, en referencia al concepto muy similar de Haskell y otros lenguajes con evaluación perezosa.
Ejemplos de thunks que se pueden encontrar en un script típico de Envision:
- Analizar un archivo de entrada en formato
.xlsx
,.csv
o.csv.gz
, y convertirlo en una representación columnar que será utilizada por el resto del script. - Cargar un rango de líneas $M..N$ de una columna individual; esta columna puede obtenerse ya sea del resultado de analizar un archivo de entrada (ver arriba), o a partir del formato de archivo columnar
.ion
de Lokad optimizado para el almacenamiento en Microsoft Azure Blob Storage. - Dado un rango de líneas $M..N$ de un vector muy grande $A$, un vector más pequeño $B$, un proyector $\pi$ que asocia cada línea en $A$ con una línea en $B$, y una función $f$, calcular $f(A[l], B[\pi(l)])$. Esto se llama un join en el lado del mapa.
- Utilizar simulación Monte Carlo para estimar el promedio, la varianza o la distribución del resultado de un proceso aleatorio. El resultado de múltiples thunks Monte Carlo, ejecutados en paralelo, puede luego combinarse mediante un thunk final.
En general, se espera que un thunk tome entre unos pocos cientos de milisegundos (para manipulaciones de datos a pequeña escala) y unos pocos minutos (para simulaciones Monte Carlo o descenso del gradiente). Esta es una suposición fuerte: se permite que la máquina virtual de Envision tenga una sobrecarga significativa para la evaluación de cada thunk, del orden de los milisegundos. Un script debería producir un número reducido de thunks (entre 1 000 y 100 000), con cada thunk realizando una unidad de trabajo bastante amplia.
Transparencia referencial
Los thunks son funciones puras: son deterministas y no pueden tener efectos secundarios. Operan leyendo sus entradas inmutables, y retornan el mismo valor en cada ejecución. Esta propiedad importante ayuda de muchas maneras:
- Dado que la evaluación de un thunk no tiene efectos secundarios, no interferirá con la evaluación de otro thunk, y por lo tanto todos los thunks pueden ejecutarse concurrentemente (siempre que sus entradas estén disponibles) en varios núcleos de CPU, o incluso distribuidos en varios workers. La máquina virtual de Envision lleva un registro de la frontier de cada script (el conjunto de thunks que pueden ejecutarse porque todas sus entradas están disponibles), y selecciona un nuevo thunk de ella cada vez que un CPU se libera.
- Por el contrario, es posible evaluar los thunks uno por uno y obtener el mismo resultado. Por ejemplo, cuando el clúster está bajo una carga pesada, cuando los workers del clúster no están disponibles, o al reproducir la evaluación de un script en la estación de trabajo de un desarrollador para investigar un problema.
- Que dos workers ejecuten el mismo thunk no es un error, solo una pérdida de tiempo. Como tal, no es algo que deba evitarse (con toda la dificultad que conlleva la sincronización en un sistema distribuido), basta con asegurarse de que no ocurra con demasiada frecuencia1.
- Si se pierde el resultado de un thunk (debido a un fallo de un worker o a la indisponibilidad de la red), es posible ejecutarlo de nuevo. Incluso si se pierden varios thunks, el DAG original permanece disponible, y puede utilizarse como una línea de datos para recalcular los valores necesarios.
Sin embargo, esto también significa que los thunks no pueden comunicarse entre sí (por ejemplo, abriendo un canal y transmitiendo datos entre ellos). Esto restringe las estrategias disponibles para la concurrencia y el paralelismo.
Producción de thunks
En muchos marcos de computación distribuida, el DAG de ejecución se produce fuera del clúster (por ejemplo, en una máquina scheduler), y luego se envían porciones del grafo a workers individuales para su ejecución. Muy a menudo, el DAG debe producirse en varios pasos: por ejemplo, una operación de unión puede optimizarse de manera diferente dependiendo del tamaño de las tablas2, y no siempre es posible conocer el tamaño de una tabla antes de evaluar su contenido, por lo que vale la pena esperar a que se conozcan los tamaños de las tablas antes de generar la porción del DAG que realiza la unión. Esto significa que habrá idas y vueltas entre el scheduler y los workers, donde el scheduler producirá tareas adicionales basadas en los resultados de los workers.
Esto convierte al scheduler en un único punto de fallo, y permitir múltiples schedulers activos, o un esquema de failover entre un scheduler activo y uno pasivo, añadiría bastante complejidad. Para Envision, nuestro objetivo de resilience fue, en cambio, asegurar que un solo worker sea capaz de calcular una misión completa, sin involucrar al scheduler. Como tal, aunque un tiempo de inactividad del scheduler de diez minutos impida que se envíen nuevas misiones, no interrumpiría la finalización de las misiones ya iniciadas. Sin embargo, esto significa que los workers deberían ser capaces de generar nuevas porciones del DAG sin ayuda del scheduler.
Logramos esto permitiendo que un thunk retorne un nuevo thunk en lugar de un valor - para reutilizar más términos de Haskell, la construcción del DAG involucra monads en lugar de simples functors. Este nuevo thunk tiene sus propios padres, que pueden ser también nuevos thunks, y así sucesivamente, formando un nuevo DAG completo. En la práctica, el nuevo DAG a menudo comparte muchos de sus thunks con el DAG antiguo, porque necesita los resultados de esos cálculos.
Al enviar una nueva misión al clúster, se envía solo un thunk (que contiene el script para compilar y ejecutar, y las referencias a todos los archivos de entrada). Este thunk luego produce el DAG de ejecución inicial, que crecerá unas cuantas veces más hasta completarse.
Grafo Merkle
Para poder transmitirse por la red, los thunks también son serializables, utilizando un formato binario personalizado diseñado para tener una huella baja. En un DAG con 100 000 thunks, un presupuesto de 10MiB solo puede soportar 104 bytes por thunk!
El soporte para la serialización binaria nos permitió convertir el DAG en un Merkle DAG, donde cada thunk tiene un identificador determinado por el contenido binario de ese thunk y todos sus antecesores3. Llamamos a este identificador el hash del thunk.
Utilizar un Merkle DAG tiene dos beneficios principales. Primero, los thunks que realizan la misma operación se fusionan automáticamente porque, al tener el mismo contenido y antecesores, también tienen el mismo identificador.
Segundo, es posible que dos scripts compartan algunos de sus thunks — tal vez lean los mismos archivos de entrada y apliquen las mismas operaciones sobre ellos, o tal vez un Supply Chain Scientist esté trabajando en el script, cambiando unas pocas líneas a la vez entre ejecuciones. Cuando esto sucede, las salidas de los thunks compartidos se pueden reutilizar si aún están disponibles en memoria, reduciendo enormemente el tiempo de ejecución del script. Poder editar y re-ejecutar un script crea un ciclo de retroalimentación corto que ayuda a la productividad de los Supply Chain Scientists.
Programación local de thunks
Entraremos en más detalles en un futuro artículo sobre cómo se distribuye la ejecución de thunks en varias máquinas de un clúster. Por ahora, basta con considerar que cada worker posee una copia de todo el DAG, conoce qué thunks ya han sido ejecutados (y dónde encontrar sus resultados), sabe qué thunks están siendo ejecutados actualmente por el clúster, y es responsable de programar thunks adicionales para que se ejecuten en sus 32 núcleos. Esta programación local se realiza mediante un servicio de un solo hilo llamado kernel (que no debe confundirse con el kernel de Linux). El kernel, así como los hilos de los workers que ejecutarán realmente los thunks, se ejecutan en el mismo proceso .NET para compartir objetos gestionados entre sí.
Encontrar un nuevo thunk es casi instantáneo, ya que el kernel mantiene una frontier de thunks listos para ejecutarse para cada DAG, y solo necesita elegir uno al azar. La mayor parte del tiempo del kernel se dedica, en cambio, a actualizar la frontier cada vez que un thunk comienza a ejecutarse (debe abandonar la frontier), termina de ejecutarse (sus descendientes pueden unirse a la frontier, dependiendo de si le quedan padres no ejecutados), o se pierde debido a que el worker que posee su resultado se vuelve no disponible (sus descendientes deben abandonar la frontier, pero el propio thunk puede ser añadido nuevamente a la frontier si sus propios padres aún están disponibles).
El cuidado de las frontiers es un trabajo con una variabilidad muy alta, puede tomar entre un microsegundo y varios segundos—¡más de un millón de veces más! Por ejemplo, un shuffle step tiene una capa de $N$ thunks que leen las salidas de otra capa de $M$ thunks. Cada thunk descendente lee las salidas de los $M$ thunks ascendentes, resultando en $M\cdot N$ aristas en el DAG. Para $M = N = 1000$ (un grado de paralelización muy probable, cuando se trata de miles de millones de líneas), eso es un millón de aristas. Si no se controla, este fenómeno puede hacer que el kernel se detenga durante segundos, durante los cuales no se programan nuevos thunks para ejecutar, y por lo tanto hasta 32 núcleos permanecen inactivos4.
Resolvemos este problema introduciendo nodos virtuales en el DAG para representar este tipo de conexión entre capas. El nodo virtual tiene $M$ entradas (una por cada thunk en la capa ascendente) y $N$ salidas (una por cada thunk en la capa descendente). Esto disminuye el número de aristas a $M + N$, lo cual es significativamente más manejable!
Generación de código de baja granularidad
Las primeras versiones de Envision, en 2013 y 2014, funcionaban bajo la premisa de que cada operación vectorial se realizaba mediante un solo thunk. Al ejecutar T.A / (T.B + 1)
habría un thunk para transmitir 1
a la tabla T
, un segundo thunk para sumar T.B
al resultado del primer thunk, y un tercer thunk para dividir T.A
por el resultado del segundo thunk. Esto tenía la ventaja de que podíamos implementar fácilmente cada operación como una función en C#, ejecutada como un solo thunk, lo cual era una excelente idea durante la implementación temprana de un DSL. Por supuesto, tenía la desventaja de que se consumían cantidades innecesarias de memoria (el primer thunk produciría un vector de millones de copias del valor 1
), y la memoria tarda en escribirse y leerse.
Era imperativo contar con thunks que evalúen varias operaciones en sucesión, en lugar de tener un thunk por cada operación.
Muchas bases de datos SQL operan con variaciones del modelo volcano, donde la consulta se convierte en un árbol de iteradores. Cada iterador actúa como una función impura que retorna el siguiente valor en la iteración cada vez que se llama, y puede invocar recursivamente a otros iteradores. En este modelo, transmitir un escalar a una tabla sería un iterador que retorna constante, sumar o dividir dos vectores mantendría referencias a dos iteradores, y leer de un vector lo incrementaría:
Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }
Compilar una consulta al modelo volcano consiste en construir el árbol de iteradores:
Div(Read(A), Div(Read(B), BroadcastScalar(1)))
Esto tiene la ventaja de que no se realizan asignaciones de memoria para los vectores intermedios. Sin embargo, la sobrecarga de llamar a funciones domina las simples operaciones aritméticas que esas funciones realizan.
Debido a esto, en 2015 Envision pasó a la generación de código just-in-time. El principio es bastante similar al motor de ejecución Tungsten de Apache Spark: compilar la operación T.A / (T.B + 1)
a una función en un lenguaje imperativo.
float[] GeneratedFunction(float[] a, float[] b) {
var result = new float[a.Length];
for (var i = 0; i < a.Length; ++i)
result[i] = a[i] / (b[i] + 1);
return result;
}
El objetivo que utilizamos para esta compilación es .NET IL, el lenguaje de bytecode usado por .NET para sus ensamblados. Esto nos permite aprovechar el compilador JIT de .NET para producir código máquina optimizado a partir de nuestro IL generado.
Esta generación de código en tiempo de ejecución resultó ser el obstáculo más grande al migrar Envision de .NET Framework a .NET Core en 2017. De hecho, si bien .NET Core soporta las mismas APIs de System.Reflection
que .NET Framework para producir y ejecutar IL en tiempo de ejecución, no soporta guardar ese IL en disco como un DLL. Si bien eso no es un requisito para ejecutar Envision, ciertamente lo es para desarrollar el compilador de Envision! System.Reflection
no hace nada para prevenir la creación de IL inválido, y solo informa de una InvalidProgramException
bastante inútil cuando se ejecuta un método que contiene IL inválido. El único enfoque razonable para investigar tales problemas es guardar un archivo de ensamblado y usar ILVerify o ILSpy. Debido a este requisito, en realidad continuamos apuntando tanto a .NET Framework como a .NET Core durante dos años—la producción se ejecutaría en .NET Core, y la depuración de IL se realizaría en .NET Framework. Finalmente, en 2019 publicamos nuestra propia biblioteca Lokad.ILPack como un reemplazo para esta funcionalidad, y migramos fuera de .NET Framework.
Esto concluye el análisis de hoy sobre cómo Envision ejecuta scripts. En el próximo artículo, discutiremos cómo se almacenan los resultados intermedios.
Sin tapujos: estamos contratando ingenieros de software. Es posible trabajar de forma remota.
-
Los workers transmiten al clúster cada vez que inician un nuevo thunk, y evitan ejecutar thunks que otros workers hayan reclamado. Permanece el raro caso en que dos workers inician el mismo thunk casi al mismo tiempo; evitamos esto haciendo que cada worker seleccione un thunk al azar de la frontier, y haciendo que el scheduler reduzca el número de workers cuando la frontier se reduce demasiado. Esto significa que la ejecución duplicada no es imposible, pero es muy improbable. ↩︎
-
Se utiliza un costoso shuffle join para dos tablas grandes, y se emplea el más económico map-side join cuando una de las tablas es lo suficientemente pequeña como para caber en memoria. ↩︎
-
Para los thunks sin antecesores, como aquellos que leen de archivos de entrada, incluimos el hash del contenido de esos archivos de entrada dentro del cuerpo del thunk. Esto asegura que, si dos thunks leen el mismo archivo de entrada, tendrán el mismo hash, y si leen dos archivos de entrada diferentes, incluyendo dos versiones diferentes del archivo en una ruta dada, entonces tendrán hashes distintos. ↩︎
-
Esto también tiene un efecto en el tamaño de la serialización. De hecho, si todas las aristas se representan en el DAG serializado, incluso con solo dos bytes por arista, ya representa 2MB de datos! ↩︎