Este artículo es el cuarto de una serie de cuatro partes sobre el funcionamiento interno de la máquina virtual de Envision: el software que ejecuta scripts de Envision. Véase parte 1, parte 2 y parte 3. Esta serie no cubre el compilador de Envision (tal vez en otra ocasión), así que asumamos que el script ha sido de alguna manera convertido al bytecode que la máquina virtual de Envision toma como entrada.

Los artículos anteriores examinaban en su mayor parte cómo los trabajadores individuales ejecutaban scripts de Envision. Sin embargo, tanto para resiliencia como para el rendimiento, Envision se ejecuta en realidad a través de un clúster de máquinas.

Cada capa en un trabajador se comunica con la misma capa en los otros trabajadores, o con otras capas en el mismo trabajador. Esto asegura que la comunicación de red pueda mantenerse como un detalle de implementación privado de cada capa.

A un nivel bajo, cada trabajador abre dos conexiones TLS a cada otra máquina en el clúster, y las comunicaciones de las diversas capas se multiplexan a través de estas dos conexiones (una conexión se utiliza para mensajes cortos, la otra para transferencias de datos grandes).

Abstract distributed execution

Capa de Control

Esta capa es utilizada por el planificador para asignar y desasignar misiones a los trabajadores e involucra ninguna comunicación entre estos. Los mensajes principales de esta capa son:

  • El planificador le pide al trabajador que comience a trabajar en una misión.
  • El planificador le pide al trabajador que deje de trabajar en una misión.
  • El trabajador le informa al planificador que ha encontrado un error catastrófico durante la ejecución de una misión (usualmente un problema no determinístico, como “NVMe drive caught on fire”, lo que significa que la misma misión puede ser intentada nuevamente en el futuro o en otro trabajador).
  • El trabajador le proporciona al planificador estadísticas sobre su estado actual: lista de misiones, tamaño del frente de cada DAG de misión, número total de thunks que quedan por ejecutarse en el DAG de cada misión.

El planificador utiliza estas estadísticas para decidir cuándo reasignar misiones. Las reglas reales para hacerlo son bastante complejas, ya que dependen de reglas de prioridad, equidad entre múltiples inquilinos y entre scripts del mismo inquilino, y la carga general del clúster en ese momento, pero la tendencia general es que las misiones con un frente lo suficientemente grande pueden ser distribuidas a múltiples trabajadores, siempre y cuando esos trabajadores no estén ya sobrecargados. Dada la misma cantidad de trabajo a realizar, es más eficiente ejecutar cuatro misiones en un solo trabajador cada una, que distribuirlas todas entre todos los trabajadores.

Capa de Ejecución

Cada trabajador lleva un registro de qué thunks está ejecutando actualmente, y transmite esta lista a los otros trabajadores cada vez que programa un nuevo thunk1. Esto asegura que, fuera de la ventana muy corta relacionada a la latencia de red, dos trabajadores no comenzarán a ejecutar el mismo thunk.

Por supuesto, si un trabajador deja de enviar estas actualizaciones (por ejemplo, porque se ha estrellado o ha quedado desconectado del resto del clúster), sus pares considerarán cualquier lista de más de unos pocos segundos como obsoleta, y se permitirán ejecutar esos thunks.

Capa de Metadatos

Cada trabajador intenta mantener una copia de los metadatos completos, pero no los sincroniza realmente. Elegimos no proporcionar ninguna garantía de que todos los trabajadores estén de acuerdo en los metadatos exactos, y en su lugar trabajamos con garantías de consistencia eventual. Esto hace que distribuir la capa de metadatos sea lo más desafiante en términos de diseño2.

La consistencia eventual de esta capa sigue tres reglas principales:

  1. Cada cambio local a la capa de metadatos se transmite inmediatamente a todos los otros trabajadores. Esta transmisión puede fallar, y no se intentará de nuevo.
  2. Los cambios remotos recibidos de otros trabajadores se fusionan en la capa de metadatos local, basándose en una progresión monótona3: un valor de “sin resultado” para un thunk puede ser sobrescrito por un valor de “checkpoint” (lo que significa que el thunk ha comenzado, pero no ha terminado, su ejecución), que puede ser sobrescrito por un valor de “alias” (lo que significa que el thunk ha retornado un DAG para ser ejecutado en su lugar), que puede ser sobrescrito por un valor de “result” (el cual puede ser o un resultado exitoso con sus átomos asociados, o un error fatal).
  3. Siempre que otra capa envíe una respuesta de red basada en un valor en la capa de metadatos, la capa de metadatos también vuelve a transmitir ese valor.

La tercera regla está diseñada para forzar un nivel de sincronización cuando realmente es relevante. Por ejemplo, considera la siguiente secuencia de eventos:

  • El planificador le pide a un trabajador ejecutar una misión (a través de la capa de control)
  • El trabajador ejecuta la misión y transmite el resultado (a través de la capa de metadatos), pero el mensaje se pierde en el camino hacia el planificador.
  • El planificador se da cuenta de que el trabajador ya no está ejecutando la misión (a través de la capa de control), y le pide que la ejecute nuevamente.
  • El trabajador observa que el thunk de la misión ya tiene un resultado en la capa de metadatos, y no hace nada, porque no hay nada que hacer.

Esto es un bloqueo muerto donde el planificador y el trabajador no se ponen de acuerdo sobre el estado de un thunk en la capa de metadatos (el trabajador cree que está terminado, el planificador cree que no lo está). La tercera regla resuelve esto decidiendo que, dado que la respuesta del trabajador de «Ya no trabaja en esta misión» y se basa en la observación de que el thunk tiene un resultado, entonces la capa de metadatos debe transmitir esta información nuevamente. El bloqueo muerto se resuelve entonces:

  • La capa de metadatos del trabajador transmite nuevamente el resultado del thunk, y el planificador lo recibe.
  • El planificador reacciona ante la aparición de un resultado para el thunk de una misión, marcando esa misión como completa, y notificando al cliente que solicitó esa misión.

Capa de Átomos

Los trabajadores combinan sus capas de átomos para crear un almacén distribuido de blobs, donde cada átomo puede ser solicitado por su identificador—el hash de 128 bits de su contenido, creado con SpookyHash. Esto no es una tabla hash distribuida (DHT), porque eso proporcionaría los compromisos incorrectos: en un DHT, encontrar un átomo sería rápido (dado su hash, el identificador del trabajador que lo posee puede ser calculado con una función simple), pero escribir un átomo sería lento (tendría que ser enviado desde la máquina que lo calculó, a la máquina que se espera lo posea dado el layout actual del DHT). Dado que se espera que la mayoría de los átomos sean consumidos en la misma máquina que los produjo, esto es un despilfarro.

En cambio, cada vez que un trabajador solicita un átomo de su propia capa de átomos, primero busca ese átomo en sus propios discos NVMe. Si no se encuentra, entonces se consulta a los otros trabajadores sobre la existencia de ese átomo. Este es el mayor desafío de rendimiento del diseño distribuido de Envision, ya que estas consultas deben completarse lo más rápido posible, y se necesita una estrategia de timeout compleja para lidiar con trabajadores que no responden: espera demasiado, y habrás desperdiciado segundos esperando una respuesta que nunca llegó; renunciar demasiado pronto, y tendrás que recomputar un átomo que podría haberse descargado de otro trabajador.

Para ayudar con esto, la capa de átomos también agrupa múltiples solicitudes, para asegurar que todos los otros trabajadores mantengan una línea completa de solicitudes que deben contestar, y para detectar más fácilmente cuando los tiempos de respuesta de un trabajador aumentan repentinamente.

Una vez que al menos otro trabajador ha confirmado la existencia del átomo en su disco, se envía una segunda solicitud para descargar el átomo. Estas solicitudes de descarga tienden a ser muy irregulares, ya que muchos thunks solicitan sus átomos primero, y luego comienzan a procesar su contenido. Debido a esto, la capa de átomos es consciente de que existe una única cola de descargas para cada par de trabajadores, y no entra en pánico si una solicitud de átomo dada no recibe su primer byte durante varios segundos (si la cola está llena y otros átomos están recibiendo sus bytes, entonces no hay nada de qué preocuparse). En cierto sentido, el timeout no se aplica al nivel de la solicitud de átomo, sino al nivel de la capa entera.

Además, hay dos optimizaciones aplicadas a la cola de transferencias:

  1. Cada solicitud especifica qué thunk necesita los datos, de modo que el emisor intentará agrupar juntas las solicitudes del mismo thunk (cuanto más rápido se desbloquee un determinado thunk, más rápido podrá comenzar a procesar sus entradas).
  2. Cuando se cancela la ejecución de un thunk (ya sea por un error, por un cambio en la prioridad, o porque se descubre que otro trabajador ya lo ha terminado), la capa de átomos comunica esta cancelación para que todas las solicitudes de ese thunk puedan ser eliminadas de la cola de descargas.

Un trabajador típico estará enviando datos en ráfagas de 1GB/s, cubriendo usualmente 7GB de datos por ráfaga.

Capa de Logging

Esta capa preserva información adicional sobre el estado de la ejecución, de modo que se pueda revisar posteriormente para investigar problemas o medir el rendimiento. Es muy detallada, conteniendo información como qué thunks fueron ejecutados, cuánto tiempo tardaron en ejecutarse y qué tipo de resultado produjeron. Eventos importantes, como la construcción de un nuevo DAG (incluyendo el propio DAG serializado), o el descubrimiento de que falta un átomo, también se registran. En total, se producen varios gigabytes cada día para cada trabajador.

Para minimizar el impacto en el rendimiento, cada trabajador escribe los logs acumulados cada 60 segundos, o cada vez que se acumulan 4 megabytes (lo cual suele ocurrir cuando hay una ráfaga de actividad). Esto se escribe en un blob de bloque de Azure Blob Storage4, y cada trabajador tiene su propio blob dedicado para evitar tener que soportar múltiples escritores en un solo blob.

Luego tenemos otras máquinas (fuera del entorno de producción de Envision) que pueden leer estos blobs de log después del hecho, y compilar estadísticas detalladas sobre lo ocurrido en el clúster.

Una mención sin vergüenza: estamos contratando ingenieros de software. Se permite el trabajo remoto.


  1. Esto podría parecer un despilfarro en términos de ancho de banda, pero considera que cada identificador de thunk pesa 24 bytes, y hay hasta 32 thunks por trabajador, así que cada actualización solo toma 768 bytes—¡menos que un paquete TCP! ↩︎

  2. Aunque, en términos de rendimiento, la capa de átomos es mucho más desafiante. ↩︎

  3. La capa de metadatos es esencialmente un enorme reloj vectorial, donde los relojes se mantienen por thunk en lugar de por trabajador. ↩︎

  4. ¿Por qué no Append Blobs? Bueno, tanto Block Blobs como Append Blobs tienen importantes problemas de rendimiento al leer un archivo compuesto por muchas escrituras pequeñas: el rendimiento de lectura cae de ~60MB/s para un blob normal, a menos de ~2MB/s! Un blob de log de 5GB toma alrededor de 40 minutos para leerse a esa velocidad. Nos hemos puesto en contacto con Microsoft acerca de este problema, pero no hay planes para solucionarlo. Para sortear este problema, confiamos en que un Block Blob puede ser manualmente recompuesto (tomar las últimas 1000 escrituras pequeñas, borrarlas del blob, y escribirlas nuevamente como una sola escritura grande), mientras que un Append Blob no puede ser modificado de esta manera. ↩︎