Este artículo es el segundo de una serie de cuatro partes sobre el funcionamiento interno de la máquina virtual Envision: el software que ejecuta los scripts de Envision. Consulta parte 1, parte 3 y parte 4. Esta serie no cubre el compilador de Envision (quizás en otro momento), así que supongamos que el script se ha convertido de alguna manera en el bytecode que la máquina virtual de Envision toma como entrada.

Al igual que otros sistemas de ejecución paralela, Envision produce un grafo acíclico dirigido (DAG) donde cada nodo representa una operación que debe realizarse, y cada borde representa una dependencia de datos donde el nodo aguas abajo necesita la salida del nodo aguas arriba para ejecutarse.

Más allá de las series de tiempo

Los nodos se llaman thunks, en referencia al concepto muy similar de Haskell y otros lenguajes con evaluación perezosa.

Ejemplos de thunks que se pueden encontrar en un script típico de Envision:

  • Analizar un archivo de entrada en formato .xlsx, .csv o .csv.gz, y convertirlo en una representación columnar que se utilizará en el resto del script.
  • Cargar un rango de líneas $M..N$ de una columna individual; esta columna puede ser tomada tanto del resultado de analizar un archivo de entrada (ver arriba), como del formato columnar .ion propio de Lokad optimizado para el almacenamiento en Microsoft Azure Blob Storage.
  • Dado un rango de líneas $M..N$ de un vector muy grande $A$, un vector más pequeño $B$, un proyector $\pi$ que asocia cada línea en $A$ con una línea en $B$, y una función $f$, calcular $f(A[l], B[\pi(l)])$. Esto se llama un join del lado del mapa.
  • Utilizar la simulación de Monte Carlo para estimar el promedio, la varianza o la distribución del resultado de un proceso aleatorio. El resultado de múltiples thunks de Monte Carlo, ejecutados en paralelo, puede combinarse mediante un thunk final.

En general, se espera que un thunk tarde entre unos pocos cientos de milisegundos (para manipulación de datos a pequeña escala) y unos pocos minutos (para simulaciones de Monte Carlo o descenso de gradiente). Esta es una suposición fuerte: la máquina virtual de Envision puede tener una sobrecarga significativa para la evaluación de cada thunk, del orden de milisegundos. Un script debe producir un número pequeño de thunks (entre 1 000 y 100 000), y cada thunk debe realizar una unidad de trabajo bastante grande.

Transparencia referencial

Los thunks son funciones puras: son deterministas y no pueden tener efectos secundarios. Operan leyendo sus entradas inmutables y devolviendo el mismo valor en cada ejecución. Esta propiedad importante ayuda de muchas maneras:

  1. Dado que la evaluación de un thunk no tiene efectos secundarios, no interferirá con la evaluación de otro thunk, por lo que todos los thunks se pueden ejecutar en paralelo (siempre que sus entradas estén disponibles) en varios núcleos de CPU, o incluso distribuidos en varios trabajadores. La máquina virtual de Envision realiza un seguimiento de la frontera de cada script (el conjunto de thunks que se pueden ejecutar porque todas sus entradas están disponibles) y elige un nuevo thunk de ella cada vez que un CPU está disponible.
  2. A la inversa, es posible evaluar los thunks uno por uno y llegar al mismo resultado. Por ejemplo, cuando el clúster está bajo una carga pesada, cuando los trabajadores del clúster no están disponibles o cuando se reproduce la evaluación de un script en la estación de trabajo de un desarrollador para investigar un problema.
  3. Dos trabajadores ejecutando el mismo thunk no es un error, solo una pérdida de tiempo. Como tal, no es algo que debamos evitar (con toda la dificultad que implica la sincronización en un sistema distribuido), es suficiente con asegurarse de que no ocurra con demasiada frecuencia1.
  4. Si se pierde el resultado de un thunk (debido a un fallo del trabajador o a la falta de disponibilidad de la red), es posible ejecutarlo nuevamente. Incluso si se pierden varios thunks, el DAG original sigue estando disponible y se puede utilizar como una línea de datos para volver a calcular los valores necesarios.

Sin embargo, esto también significa que los thunks no pueden comunicarse entre sí (por ejemplo, abriendo un canal y transmitiendo datos entre ellos). Esto restringe las estrategias disponibles para la concurrencia y la paralelización.

Producción de thunks

En muchos marcos de trabajo de computación distribuida, el DAG de ejecución se produce fuera del clúster (por ejemplo, en una máquina planificadora) y luego se empujan porciones del gráfico a los trabajadores individuales para su ejecución. Muy a menudo, el DAG debe producirse en varias etapas: por ejemplo, una operación de unión puede optimizarse de manera diferente según el tamaño de las tablas2, y no siempre es posible conocer el tamaño de una tabla antes de evaluar realmente su contenido, por lo que vale la pena esperar a que se conozcan los tamaños de las tablas antes de generar la porción del DAG que realiza la unión. Esto significa que habrá un intercambio de información entre el planificador y los trabajadores, donde el planificador producirá tareas adicionales basadas en los resultados de los trabajadores.

Esto convierte al planificador en un único punto de fallo y permitir múltiples planificadores activos o un esquema de conmutación por error entre un planificador activo y uno pasivo agregaría bastante complejidad. Para Envision, nuestro objetivo de resiliencia fue asegurarnos de que un solo trabajador pueda calcular una misión completa sin involucrar al planificador. Como tal, aunque una interrupción de diez minutos del planificador evitaría que se envíen nuevas misiones, no interrumpiría las misiones que ya se hayan iniciado. Sin embargo, esto significa que los trabajadores deben ser capaces de generar nuevas porciones del DAG sin ayuda del planificador.

Logramos esto permitiendo que un thunk devuelva un nuevo thunk en lugar de un valor, para reutilizar más términos de Haskell, la construcción del DAG implica mónadas en lugar de solo funtores. Este nuevo thunk tiene sus propios padres, que también pueden ser nuevos thunks, y así sucesivamente, formando un nuevo DAG completo. En la práctica, el nuevo DAG a menudo comparte muchos de sus thunks con el antiguo DAG, porque necesita los resultados de esos cálculos.

Al enviar una nueva misión al clúster, solo se envía un único thunk (que contiene el script para compilar y ejecutar, y las referencias a todos los archivos de entrada). Este thunk luego produce el DAG de ejecución inicial, que crecerá algunas veces más hasta que esté completo.

Grafo de Merkle

Para poder transmitirse a través de la red, los thunks también son serializables, utilizando un formato binario personalizado diseñado para tener un bajo consumo de recursos. ¡En un DAG con 100 000 thunks, un presupuesto de 10 MiB solo puede admitir 104 bytes por thunk!

El soporte para la serialización binaria nos permitió convertir el DAG en un DAG de Merkle, donde cada thunk tiene un identificador determinado por el contenido binario de ese thunk y todos los ancestros del thunk3. A este identificador lo llamamos el hash del thunk.

El uso de un DAG de Merkle tiene dos beneficios principales. Primero, los thunks que realizan la misma operación se fusionan automáticamente porque, al tener el mismo contenido y ancestros, también tienen el mismo identificador.

Segundo, es posible que dos scripts compartan algunos de sus thunks, tal vez leen los mismos archivos de entrada y aplican las mismas operaciones a ellos, o tal vez un Supply Chain Scientist está trabajando en el script, cambiando algunas líneas a la vez entre ejecuciones. Cuando esto sucede, los resultados de los thunks compartidos se pueden reutilizar si todavía están disponibles en memoria, lo que reduce en gran medida el tiempo de ejecución del script. Poder editar y volver a ejecutar un script crea un ciclo de retroalimentación corto que ayuda a la productividad de los Supply Chain Scientists.

Programación local de thunks

Entraremos en más detalles en un artículo futuro sobre cómo se distribuye la ejecución de los thunks en varias máquinas en un clúster. Por ahora, simplemente considere que cada trabajador tiene una copia de todo el DAG, sabe qué thunks ya se han ejecutado (y dónde encontrar sus resultados), sabe qué thunks se están ejecutando actualmente en el clúster y es responsable de programar thunks adicionales para que se ejecuten en sus 32 núcleos. Esta programación local la realiza un servicio de un solo hilo llamado el kernel (que no debe confundirse con el kernel de Linux). El kernel, así como los hilos de trabajador que realmente ejecutarán los thunks, se ejecutan todos en el mismo proceso .NET para compartir objetos administrados entre sí.

Encontrar un nuevo thunk es casi instantáneo, ya que el kernel mantiene una frontera de thunks listos para ejecutarse para cada DAG y solo necesita elegir uno al azar. La mayor parte del tiempo del kernel se gasta en actualizar la frontera cada vez que un thunk comienza a ejecutarse (necesita abandonar la frontera), termina de ejecutarse (sus descendientes pueden unirse a la frontera, dependiendo de si tiene algún padre no ejecutado) o se pierde debido a que el trabajador que tiene su resultado no está disponible (sus descendientes deben abandonar la frontera, pero el thunk en sí puede volver a agregarse a la frontera si sus propios padres todavía están disponibles).

Atender a las fronteras es un trabajo con una variabilidad muy alta, puede llevar entre una microsegundo y varios segundos, ¡más de un millón de veces más! Por ejemplo, un paso de mezcla tiene una capa de $N$ thunks que leen las salidas de otra capa de $M$ thunks. Cada thunk aguas abajo lee las salidas de los $M$ thunks aguas arriba, lo que resulta en $M\cdot N$ aristas en el DAG. Para $M = N = 1000$ (un grado de paralelización muy probable, cuando se trata de miles de millones de líneas), eso es un millón de aristas. Si no se controla, este fenómeno puede hacer que el kernel se detenga durante segundos a la vez, durante los cuales no se programan nuevos thunks para que se ejecuten, y así hasta 32 núcleos permanecen inactivos4.

Resolvemos este problema introduciendo nodos virtuales en el DAG para representar este tipo de conexión entre capas. El nodo virtual tiene $M$ entradas (una para cada thunk en la capa aguas arriba) y $N$ salidas (una para cada thunk en la capa aguas abajo). ¡Esto reduce el número de aristas a $M + N$, lo cual es significativamente más manejable!

Generación de código de baja granularidad

Las primeras versiones de Envision, en 2013 y 2014, operaban bajo la premisa de que cada operación de vector se realiza mediante un solo thunk. Al ejecutar T.A / (T.B + 1) habría un thunk para transmitir 1 a la tabla T, un thunk para sumar T.B al resultado del thunk uno y un thunk para dividir T.A por el resultado del thunk dos. Esto tenía la ventaja de que podíamos implementar cada operación como una función de C#, ejecutada como un solo thunk, lo cual es una excelente idea durante la implementación inicial de un DSL. Tiene, por supuesto, la desventaja de que se consumen cantidades innecesarias de memoria (el thunk uno produciría un vector de millones de copias del valor 1) y la memoria tarda tiempo en escribirse y leerse de nuevo.

Era imperativo tener thunks que evaluaran varias operaciones en sucesión, en lugar de tener un thunk para cada operación.

Muchas bases de datos SQL operan en variaciones del modelo volcánico, donde la consulta se convierte en un árbol de iteradores. Cada iterador actúa como una función impura que devuelve el siguiente valor en la iteración cada vez que se llama, y puede llamar recursivamente a otros iteradores. En este modelo, transmitir un escalar a una tabla sería un iterador que devuelve constantemente un valor, sumar o dividir dos vectores tendría referencias a dos iteradores, y leer de un vector incrementaría a través de él:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Compilar una consulta al modelo volcánico consiste en construir el árbol de iteradores:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Esto tiene la ventaja de que no se realizan asignaciones de memoria para los vectores intermedios. Sin embargo, el costo de llamar a las funciones domina las operaciones aritméticas simples que realizan esas funciones.

Debido a esto, en 2015 Envision pasó a la generación de código justo a tiempo. El principio es bastante similar al motor de ejecución Tungsten de Apache Spark: compilar la operación T.A / (T.B + 1) a una función en un lenguaje imperativo.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

El objetivo que utilizamos para esta compilación es .NET IL, el lenguaje de bytecode utilizado por .NET para sus ensamblados. Esto nos permite aprovechar el compilador JIT de .NET para producir código de máquina optimizado a partir de nuestro IL generado.

Esta generación de código en tiempo de ejecución resultó ser el mayor obstáculo al migrar Envision de .NET Framework a .NET Core en 2017. De hecho, si bien .NET Core admite las mismas API de System.Reflection que .NET Framework para producir y ejecutar IL en tiempo de ejecución, no admite guardar ese IL en disco como un DLL. Si bien eso no es un requisito para ejecutar Envision, ¡ciertamente es un requisito para desarrollar el compilador de Envision! System.Reflection no hace nada para evitar que se cree IL no válido y solo informa una InvalidProgramException bastante inútil cuando se ejecuta un método que contiene IL no válido. El único enfoque razonable para investigar tales problemas es guardar un archivo de ensamblado y usar ILVerify o ILSpy. Debido a este requisito, en realidad seguimos apuntando tanto a .NET Framework como a .NET Core durante dos años: la producción se ejecutaría en .NET Core y la depuración de IL se realizaría en .NET Framework. Finalmente, en 2019 publicamos nuestra propia biblioteca Lokad.ILPack como reemplazo de esta función y nos alejamos de .NET Framework.

Esto concluye el análisis de hoy sobre cómo Envision ejecuta scripts. En el próximo artículo, discutiremos cómo se almacenan los resultados intermedios.

Autopromoción descarada: estamos contratando ingenieros de software. El trabajo remoto es posible.


  1. Los trabajadores transmiten al clúster cada vez que inician un nuevo thunk y evitan ejecutar thunks que otros trabajadores hayan reclamado. Queda el caso raro en el que dos trabajadores inician el mismo thunk casi al mismo tiempo; evitamos esto haciendo que cada trabajador elija un thunk aleatorio de la frontera y haciendo que el planificador reduzca el número de trabajadores cuando la frontera se reduce demasiado. Esto significa que la ejecución duplicada no es imposible, pero es muy poco probable. ↩︎

  2. Se utiliza una costosa unión de mezcla para dos tablas grandes y se utiliza una unión del lado del mapa más barata cuando una de las tablas es lo suficientemente pequeña como para caber en memoria. ↩︎

  3. Para los thunks sin ancestros, como aquellos que leen archivos de entrada, incluimos el hash del contenido de esos archivos de entrada dentro del cuerpo del thunk. Esto asegura que si dos thunks leen el mismo archivo de entrada, tendrán el mismo hash, y si leen dos archivos de entrada diferentes, incluidas dos versiones diferentes del archivo en una ruta determinada, entonces tendrán hashes diferentes. ↩︎

  4. Esto también tiene un efecto en el tamaño de la serialización. De hecho, si todas las aristas se representan en el DAG serializado, incluso con solo dos bytes por arista, ¡ya representa 2MB de datos! ↩︎