Este artículo es el cuarto de una serie de cuatro partes sobre el funcionamiento interno de la máquina virtual Envision: el software que ejecuta los scripts de Envision. Consulta parte 1, parte 2 y parte 3. Esta serie no cubre el compilador de Envision (quizás en otro momento), así que supongamos que el script se ha convertido de alguna manera en el código de bytes que la máquina virtual de Envision toma como entrada.

Los artículos anteriores examinaron principalmente cómo los trabajadores individuales ejecutan scripts de Envision. Sin embargo, tanto para la resiliencia como para el rendimiento, Envision se ejecuta en realidad en un clúster de máquinas.

Cada capa en un trabajador se comunica con la misma capa en los demás trabajadores, o con otras capas en el mismo trabajador. Esto asegura que la comunicación de red pueda mantenerse como un detalle de implementación privado de cada capa.

A nivel bajo, cada trabajador abre dos conexiones TLS a cada otra máquina en el clúster, y las comunicaciones de las diversas capas se multiplexan a través de estas dos conexiones (una conexión se utiliza para mensajes cortos y la otra para transferencias de datos grandes).

Ejecución distribuida abstracta

Capa de Control

Esta capa es utilizada por el planificador para asignar y desasignar misiones a los trabajadores y no implica comunicación entre trabajadores. Los mensajes principales de esta capa son:

  • El planificador le pide al trabajador que comience a trabajar en una misión.
  • El planificador le pide al trabajador que deje de trabajar en una misión.
  • El trabajador le informa al planificador que ha encontrado un error catastrófico durante la ejecución de una misión (generalmente un problema no determinista, como “la unidad NVMe se incendió”, lo que significa que la misma misión se puede intentar nuevamente en el futuro o en otro trabajador).
  • El trabajador le proporciona al planificador estadísticas sobre su estado actual: lista de misiones, tamaño de la frontera de cada DAG de misión, número total de thunks que quedan por ejecutar en cada DAG de misión.

El planificador utiliza estas estadísticas para decidir cuándo volver a asignar misiones. Las reglas reales para hacerlo son bastante complejas, ya que dependen de reglas de prioridad, equidad entre múltiples inquilinos y entre scripts del mismo inquilino, y carga general del clúster en ese momento, pero la tendencia general es que las misiones con una frontera lo suficientemente grande se pueden distribuir en varios trabajadores, siempre y cuando esos trabajadores no estén sobrecargados. Dada la misma cantidad de trabajo por realizar, es más eficiente ejecutar cuatro misiones en un solo trabajador que distribuirlas en todos los trabajadores.

Capa de Ejecución

Cada trabajador realiza un seguimiento de los thunks que está ejecutando actualmente y transmite esta lista a los demás trabajadores cada vez que programa un nuevo thunk1. Esto asegura que, fuera de la ventana muy corta relacionada con la latencia de red, dos trabajadores no comenzarán a ejecutar el mismo thunk.

Por supuesto, si un trabajador deja de enviar estas actualizaciones (por ejemplo, porque se ha bloqueado o se ha desconectado del resto del clúster), sus pares considerarán cualquier lista anterior a unos pocos segundos como obsoleta y se permitirán ejecutar esos thunks.

Capa de Metadatos

Cada trabajador intenta mantener una copia completa de los metadatos, pero no se sincroniza realmente. Decidimos no garantizar que todos los trabajadores estén de acuerdo en los mismos metadatos exactos y, en su lugar, trabajar con garantías de consistencia eventual. Esto hace que la distribución de la capa de metadatos sea la más desafiante en términos de diseño2.

La consistencia eventual de esta capa sigue tres reglas principales:

  1. Cada cambio local en la capa de metadatos se transmite inmediatamente a todos los demás trabajadores. Esta transmisión puede fallar y no se intentará nuevamente.
  2. Los cambios remotos recibidos de otros trabajadores se fusionan en la capa de metadatos local, según una progresión monótona3: un valor de “sin resultado” para un thunk puede ser sobrescrito por un valor de “punto de control” (lo que significa que el thunk ha comenzado, pero no ha terminado, la ejecución), que puede ser sobrescrito por un valor de “alias” (lo que significa que el thunk ha devuelto un DAG para ser ejecutado en su lugar), que puede ser sobrescrito por un valor de “resultado” (que puede ser un resultado exitoso con sus átomos asociados o un error fatal).
  3. Cada vez que otra capa envía una respuesta de red basada en un valor de la capa de metadatos, la capa de metadatos también transmite ese valor nuevamente.

La tercera regla está diseñada para forzar un nivel de sincronización cuando realmente es relevante. Por ejemplo, considera la siguiente secuencia de eventos:

  • El planificador le pide a un trabajador que ejecute una misión (a través de la capa de control).
  • El trabajador ejecuta la misión y transmite el resultado (a través de la capa de metadatos), pero el mensaje se pierde en el camino hacia el planificador.
  • El planificador se da cuenta de que el trabajador ya no está ejecutando la misión (a través de la capa de control) y le pide que la ejecute nuevamente.
  • El trabajador observa que el thunk de la misión ya tiene un resultado en la capa de metadatos y no hace nada, porque no es necesario hacer nada.

Esto es un punto muerto donde el planificador y el trabajador no están de acuerdo sobre el estado de un thunk en la capa de metadatos (el trabajador cree que está hecho, el planificador cree que no lo está). La tercera regla resuelve esto al decidir que, dado que la respuesta del trabajador de «Ya no trabajo en esta misión» se basa en la observación del trabajador de que el thunk tiene un resultado, entonces la capa de metadatos debe transmitir esta información nuevamente. El punto muerto se resuelve entonces:

  • La capa de metadatos del trabajador transmite nuevamente el resultado del thunk y el planificador lo recibe.
  • El planificador reacciona a la aparición de un resultado para el thunk de una misión, marcando esa misión como completa y notificando al cliente que solicitó esa misión.

Capa de Átomos

Los trabajadores combinan sus capas de átomos para crear un almacén de bloques distribuido, donde cada átomo puede ser solicitado por su identificador, el hash de 128 bits de su contenido, creado con SpookyHash. Esto no es una tabla hash distribuida (DHT), porque proporcionaría compensaciones incorrectas: en una DHT, encontrar un átomo sería rápido (dado su hash, se puede calcular el identificador del trabajador que lo tiene con una función simple), pero escribir un átomo sería lento (necesitaría ser enviado desde la máquina que lo calculó, a la máquina que se espera que lo tenga según el diseño actual de la DHT). Dado que se espera que la mayoría de los átomos sean consumidos en la misma máquina que los produjo, esto es ineficiente.

En cambio, cuando un trabajador solicita un átomo de su propia capa de átomos, primero busca ese átomo en sus propias unidades NVMe. Si no se encuentra, entonces se consulta a los otros trabajadores si existe ese átomo. Este es el mayor desafío de rendimiento del diseño distribuido de Envision, ya que estas consultas deben completarse lo más rápido posible, y se necesita una estrategia de tiempo de espera compleja para tratar con trabajadores no responsivos: esperar demasiado tiempo significa perder segundos esperando una respuesta que nunca llegó; rendirse demasiado pronto significa que tendrás que volver a calcular un átomo que podría haberse descargado de otro trabajador.

Para ayudar con esto, la capa de átomos también agrupa varias solicitudes juntas, para asegurarse de que todos los demás trabajadores mantengan una tubería completa de solicitudes que necesitan responder, y para detectar más fácilmente cuando los tiempos de respuesta de un trabajador aumentan repentinamente.

Una vez que al menos otro trabajador ha confirmado la existencia del átomo en su disco, se envía una segunda solicitud para descargar el átomo. Estas solicitudes de descarga tienden a ser muy irregulares, ya que muchos thunks solicitan sus átomos primero y luego comienzan a procesar su contenido. Debido a esto, la capa de átomos es consciente de que hay una sola cola de descarga para cada par de trabajadores, y no entra en pánico si una solicitud de átomo determinada no recibe su primer byte durante varios segundos (si la cola está llena y otros átomos están recibiendo sus bytes, entonces no hay de qué preocuparse). En cierto sentido, el tiempo de espera no está a nivel de solicitud de átomo, sino a nivel de toda la capa.

Además, hay dos optimizaciones aplicadas a la cola de transferencia:

  1. Cada solicitud especifica qué thunk necesita los datos, para que el remitente intente agrupar las solicitudes del mismo thunk (cuanto más rápido se desbloquee un thunk dado, más rápido podrá comenzar a procesar sus entradas).
  2. Cuando se cancela la ejecución de un thunk (por un error, por un cambio de prioridad o porque se descubre que otro trabajador ya lo ha terminado), la capa de átomos comunica esta cancelación para que todas las solicitudes de ese thunk se eliminen de la cola de descarga.

Un trabajador típico enviará datos en ráfagas de 1GB/s, cubriendo generalmente 7GB de datos por ráfaga.

Capa de Registro

Esta capa conserva información adicional sobre el estado de la ejecución, para que pueda ser revisada posteriormente para investigar problemas o medir el rendimiento. Es muy detallada, contiene información como qué thunks se ejecutaron, cuánto tiempo tardaron en ejecutarse y qué tipo de resultado produjeron. También se registran eventos importantes, como la construcción de un nuevo DAG (incluido el DAG serializado en sí), o el descubrimiento de que falta un átomo. En total, se producen varios gigabytes cada día para cada trabajador.

Para minimizar el impacto en el rendimiento, cada trabajador escribe los registros acumulados cada 60 segundos, o cada vez que se acumulan 4 megabytes (lo cual ocurre a menudo cuando hay una ráfaga de actividad). Esto se escribe en un bloque de almacenamiento de Azure Blob Storage4, y cada trabajador tiene su propio blob dedicado para evitar tener que admitir múltiples escritores en un solo blob.

Luego tenemos otras máquinas (fuera del entorno de producción de Envision) que pueden leer estos blobs de registro después del hecho y compilar estadísticas detalladas sobre lo que sucedió en el clúster.

Autopromoción descarada: estamos contratando ingenieros de software. El trabajo remoto es posible.


  1. Esto puede parecer un desperdicio en términos de ancho de banda, pero considera que cada identificador de thunk pesa 24 bytes y hay hasta 32 thunks por trabajador, por lo que cada actualización solo ocupa 768 bytes, ¡menos que un paquete TCP! ↩︎

  2. Aunque, en términos de rendimiento, la capa de átomos es mucho más desafiante. ↩︎

  3. La capa de metadatos es esencialmente un gran reloj vectorial, donde los relojes se mantienen por thunk en lugar de por trabajador. ↩︎

  4. ¿Por qué no usar Blobs de anexos? Bueno, tanto los Blobs de bloque como los Blobs de anexos tienen problemas de rendimiento importantes al leer un archivo compuesto por muchas escrituras pequeñas: el rendimiento de lectura cae de ~60MB/s para un blob normal, a menos de ~2MB/s. Un blob de registro de 5GB tarda alrededor de 40 minutos en leerse a esa velocidad. Hemos contactado a Microsoft sobre este problema, pero no hay planes para solucionarlo. Para solucionar este problema, confiamos en el hecho de que se puede recompactar manualmente un Blob de bloque (tomar las últimas 1000 escrituras pequeñas, borrarlas del blob y volver a escribirlas como una sola escritura grande), mientras que un Blob de anexos no se puede modificar de esta manera. ↩︎