Эта статья является четвертой из серии из четырех частей о внутренней работе виртуальной машины Envision: программного обеспечения, которое выполняет скрипты Envision. См. часть 1, часть 2 и часть 3. В этой серии не рассматривается компилятор Envision (возможно, в другой раз), поэтому давайте просто предположим, что скрипт каким-то образом был преобразован в байт-код, который принимает виртуальная машина Envision в качестве входных данных.

Предыдущие статьи в основном рассматривали, как отдельные рабочие выполняют скрипты Envision. Однако, как для устойчивости, так и для производительности, Envision фактически выполняется на кластере машин.

Каждый уровень в рабочем процессе обменивается данными с тем же уровнем в других рабочих процессах или с другими уровнями в том же рабочем процессе. Это гарантирует, что сетевое взаимодействие может оставаться частным деталем реализации каждого уровня.

На низком уровне каждый рабочий процесс открывает два TLS-соединения с каждой другой машиной в кластере, и связь между различными уровнями мультиплексируется через эти два соединения (одно соединение используется для коротких сообщений, другое - для передачи больших объемов данных).

Абстрактное распределенное выполнение

Уровень управления

Этот уровень используется планировщиком для назначения и отмены заданий рабочим и не включает взаимодействие между рабочими. Основные сообщения этого уровня:

  • Планировщик просит рабочего начать работу над заданием.
  • Планировщик просит рабочего прекратить работу над заданием.
  • Рабочий сообщает планировщику, что в процессе выполнения задания произошла катастрофическая ошибка (обычно недетерминированная проблема, например, “накрылся NVMe-диск”), что означает, что то же самое задание может быть попытано выполнить снова в будущем или на другом рабочем процессе).
  • Рабочий предоставляет планировщику статистику о своем текущем состоянии: список заданий, размер фронта каждого DAG задания, общее количество оставшихся для выполнения thunk в каждом DAG задания.

Планировщик использует эти статистические данные для принятия решения о повторном назначении заданий. Фактические правила для этого довольно сложны, так как они зависят от правил приоритета, справедливости между несколькими арендаторами и между скриптами одного арендатора, а также от общей нагрузки на кластер в данный момент, но общая тенденция состоит в том, что задания с достаточно большим фронтом могут быть распределены по нескольким рабочим, при условии, что эти рабочие не перегружены. При выполнении одинакового объема работы более эффективно запустить четыре задания на каждом рабочем процессе, чем распределить их по всем рабочим процессам.

Уровень выполнения

Каждый рабочий процесс отслеживает, какие thunk в настоящее время он выполняет, и передает этот список другим рабочим процессам каждый раз, когда он планирует новый thunk1. Это гарантирует, что за исключением очень короткого окна, связанного с задержкой сети, два рабочих процесса не начнут выполнять один и тот же thunk.

Конечно, если рабочий процесс перестает отправлять эти обновления (например, потому что он вышел из строя или отключился от остального кластера), его коллеги будут считать любой список, старше нескольких секунд, устаревшим и позволят себе выполнять эти thunk.

Уровень метаданных

Каждый рабочий процесс пытается хранить копию полных метаданных, но фактически не синхронизируется. Мы решили не давать гарантии, что все рабочие процессы согласуются по поводу точно таких же метаданных, а вместо этого работаем с гарантиями последовательной согласованности. Это делает распределение уровня метаданных наиболее сложным с точки зрения проектирования2.

Последовательная согласованность этого уровня следует трем основным правилам:

  1. Каждое локальное изменение уровня метаданных немедленно передается всем остальным рабочим процессам. Эта передача может не удалиться и не будет повторена.
  2. Удаленные изменения, полученные от других рабочих процессов, объединяются с локальным уровнем метаданных на основе монотонного прогресса3: значение “нет результата” для thunk может быть перезаписано значением “контрольной точки” (означающим, что thunk начал, но не закончил выполнение), которое может быть перезаписано значением “псевдонима” (означающим, что thunk вернул DAG для выполнения вместо него), которое может быть перезаписано значением “результата” (который может быть успешным результатом с соответствующими атомами или фатальной ошибкой).
  3. Когда другой уровень отправляет сетевой ответ на основе значения в уровне метаданных, уровень метаданных также повторно передает это значение.

Третье правило разработано для принудительной синхронизации, когда это действительно необходимо. Например, рассмотрим следующую последовательность событий:

  • Планировщик просит рабочий процесс выполнить миссию (через уровень управления)
  • Рабочий процесс выполняет миссию и передает результат (через уровень метаданных), но сообщение потеряно по пути к планировщику.
  • Планировщик замечает, что рабочий процесс больше не выполняет миссию (через уровень управления) и просит его выполнить ее снова.
  • Рабочий процесс замечает, что у thunk миссии уже есть результат в уровне метаданных и ничего не делает, потому что ничего делать не нужно.

Это тупиковая ситуация, когда планировщик и рабочий процесс не согласны относительно состояния thunk в уровне метаданных (рабочий процесс считает, что он завершен, а планировщик считает, что он не завершен). Третье правило разрешает эту ситуацию, решив, что поскольку ответ рабочего процесса «Я больше не работаю над этой миссией» и основан на наблюдении рабочего процесса, что у thunk есть результат, то уровень метаданных должен повторно передать эту информацию. Тупиковая ситуация тогда разрешается:

  • Уровень метаданных рабочего процесса повторно передает результат thunk, и он принимается планировщиком.
  • Планировщик реагирует на появление результата для thunk миссии, помечая эту миссию как завершенную и уведомляя клиента, который запросил эту миссию.

Уровень атомов

Рабочие процессы объединяют свои уровни атомов, чтобы создать распределенное хранилище блобов, где каждый атом может быть запрошен по его идентификатору - 128-битному хешу его содержимого, созданному с помощью SpookyHash. Это не распределенная хеш-таблица (DHT), потому что это дало бы неправильные компромиссы: в DHT поиск атома был бы быстрым (по заданному хешу можно было бы вычислить идентификатор рабочего процесса, удерживающего его, с помощью простой функции), но запись атома была бы медленной (он должен был бы быть отправлен с машины, которая его вычислила, на машину, которая ожидает его, исходя из текущей структуры DHT). Учитывая, что большинство атомов ожидается использовать на той же машине, которая их произвела, это неэффективно.

Вместо этого, когда рабочий запрашивает атом из своего собственного слоя атомов, он сначала ищет этот атом на своих собственных накопителях NVMe. Если атом не найден, то запрашивается у других рабочих наличие этого атома. Это самая большая проблема производительности в распределенной конструкции Envision, поскольку эти запросы должны быть выполнены как можно быстрее, и для этого требуется сложная стратегия тайм-аута для работы с неотзывчивыми рабочими: если ждать слишком долго, то вы потратите секунды, ожидая ответа, который никогда не придет; если сдаться слишком рано, то вам придется пересчитать атом, который можно было загрузить с другого рабочего.

Чтобы помочь в этом, слой атомов также пакетирует несколько запросов вместе, чтобы обеспечить, чтобы все остальные рабочие имели полный поток запросов, на которые им нужно ответить, и чтобы легче обнаружить, когда время ответа рабочего внезапно возрастает.

Как только хотя бы еще один рабочий подтвердит наличие атома на своем диске, отправляется второй запрос для загрузки атома. Такие запросы на загрузку обычно имеют резкий характер, поскольку многие thunks сначала запрашивают свои атомы, а затем начинают обрабатывать их содержимое. Из-за этого слой атомов знает, что для каждой пары рабочих существует единственная очередь загрузки, и не паникует, если данный запрос атома не получает свой первый байт в течение нескольких секунд (если очередь заполнена и другие атомы получают свои байты, то нет ничего страшного). В некотором смысле, тайм-аут не на уровне запроса атома, а на уровне всего слоя.

Кроме того, к очереди передачи применяются две оптимизации:

  1. Каждый запрос указывает, какому thunk требуются данные, чтобы отправитель попытался группировать запросы от одного и того же thunk (чем быстрее данный thunk разблокируется, тем быстрее он сможет начать обрабатывать свои входные данные).
  2. Когда выполнение thunk отменяется (из-за ошибки, из-за изменения приоритета или потому что обнаружено, что другой рабочий уже его завершил), слой атомов сообщает об этой отмене, чтобы все запросы этого thunk могли быть удалены из очереди загрузки.

Обычный рабочий будет отправлять данные порциями по 1 ГБ/с, обычно охватывая 7 ГБ данных за порцию.

Слой журналирования

Этот слой сохраняет дополнительную информацию о состоянии выполнения, чтобы ее можно было просмотреть позже для расследования проблем или измерения производительности. Он очень подробный и содержит информацию, такую как, какие thunks были выполнены, сколько времени они занимали и какой результат они дали. Важные события, такие как создание нового DAG (включая сериализованный DAG самого себя) или обнаружение отсутствия атома, также регистрируются. Всего каждый день для каждого рабочего производится несколько гигабайт.

Чтобы минимизировать влияние на производительность, каждый рабочий записывает накопленные журналы каждые 60 секунд или при накоплении 4 мегабайт (что часто происходит при всплеске активности). Это записывается в блоковый блоб Azure Blob Storage4, и у каждого рабочего есть свой собственный блоб, чтобы избежать необходимости поддержки нескольких писателей в одном блобе.

Затем у нас есть другие машины (вне рабочей среды Envision), которые могут читать эти блобы журналов после факта и составлять подробную статистику о том, что произошло в кластере.

Бесстыдная реклама: мы нанимаем инженеров по программному обеспечению. Возможна удаленная работа.


  1. Это может показаться неэффективным с точки зрения пропускной способности, но учтите, что каждый идентификатор thunk весит 24 байта, и на каждого рабочего процесса приходится до 32 thunk, поэтому каждое обновление занимает всего 768 байтов - меньше, чем пакет TCP! ↩︎

  2. Хотя с точки зрения производительности уровень атомов гораздо более сложный. ↩︎

  3. Уровень метаданных по сути является огромным векторным часами, где часы хранятся для каждого thunk, а не для каждого рабочего процесса. ↩︎

  4. Почему не использовать блобы для добавления? Ну, и блоковые блобы, и блобы для добавления имеют серьезные проблемы производительности при чтении файла, состоящего из множества маленьких записей: производительность чтения падает с ~60 МБ/с для обычного блоба до менее ~2 МБ/с! Чтение 5-гигабайтного блоба журнала занимает примерно 40 минут при такой скорости. Мы связались с Microsoft по этому вопросу, но планов по его исправлению нет. Чтобы обойти эту проблему, мы полагаемся на то, что блоковый блоб может быть вручную рекомпактным (взять последние 1000 маленьких записей, удалить их из блоба и записать их обратно как одну большую запись), в то время как блоб для добавления не может быть изменен таким образом. ↩︎