Envision VM (часть 4), Распределённое выполнение
Эта статья — четвёртая в серии из четырёх частей, посвящённых внутренним механизмам виртуальной машины Envision: программному обеспечению, которое выполняет скрипты Envision. См. часть 1, часть 2 и часть 3. В этой серии не рассматривается компилятор Envision (возможно, в другой раз), так что будем считать, что скрипт каким-то образом был преобразован в байт-код, который принимает на вход виртуальная машина Envision.
Предыдущие статьи в основном рассматривали, как отдельные воркеры выполняли скрипты Envision. Однако как для устойчивости так и для производительности, Envision фактически выполняется на кластере машин.
Каждый уровень в воркере взаимодействует с тем же уровнем других воркеров или с другими уровнями в том же воркере. Это гарантирует, что сетевое взаимодействие остаётся закрытой внутренней деталью реализации каждого уровня.
На низком уровне каждый воркер устанавливает два TLS-соединения с каждой другой машиной в кластере, и сообщения от различных уровней мультиплексируются через эти два соединения (одно используется для коротких сообщений, другое — для передачи больших объёмов данных).

Уровень управления
Этот уровень используется планировщиком для назначения и отмены миссий на воркеры и не предполагает взаимодействия между воркерами. Основные сообщения этого уровня:
- Планировщик просит воркер начать выполнение миссии.
- Планировщик просит воркер прекратить выполнение миссии.
- Воркер сообщает планировщику, что во время выполнения миссии произошла катастрофическая ошибка (обычно недетерминированная проблема, такая как “NVMe drive caught on fire”, что означает, что эту же миссию можно будет попытаться выполнить снова в будущем или на другом воркере).
- Воркер предоставляет планировщику статистику о своём текущем состоянии: список миссий, размер фронтера DAG каждой миссии, общее число остающихся к выполнению thunk-ов в DAG каждой миссии.
Планировщик использует эту статистику для принятия решения о переназначении миссий. Фактические правила этого процесса достаточно сложны, поскольку они зависят от приоритетов, справедливости между несколькими арендаторами и между скриптами одного арендатора, а также от общей загрузки кластера в данный момент, но общий тренд таков: миссии с достаточно большим фронтером могут распределяться между несколькими воркерами, при условии что те не перегружены. При равном объёме работы эффективнее запускать каждую из четырёх миссий на отдельном воркере, чем распределять все миссии между всеми воркерами.
Уровень выполнения
Каждый воркер отслеживает, какие thunk-ы он в данный момент выполняет, и транслирует этот список другим воркерам каждый раз, когда планирует новый thunk1. Это гарантирует, что за исключением очень короткого промежутка, связанного с сетевой задержкой, два воркера не начнут выполнять один и тот же thunk.
Разумеется, если воркер перестанет отправлять эти обновления (например, из-за сбоя или отключения от кластера), его коллеги будут считать любой список старше нескольких секунд устаревшим и позволят себе выполнить эти thunk-ы.
Уровень метаданных
Каждый воркер пытается хранить полную копию метаданных, но фактически не синхронизируется. Мы решили не гарантировать, что все воркеры будут иметь идентичные метаданные, а вместо этого работать на основании eventual consistency. Это делает распределение уровня метаданных самым сложным с точки зрения проектирования2.
Принцип eventual consistency этого уровня основывается на трёх основных правилах:
- Каждое локальное изменение уровня метаданных немедленно транслируется всем другим воркерам. Эта трансляция может завершиться неудачей и не будет предпринята повторно.
- Изменения, полученные от других воркеров, интегрируются в локальный уровень метаданных на основе монотонного прогрессирования3: значение “no result” для thunk может быть перезаписано значением “checkpoint” (означающим, что выполнение thunk началось, но не завершилось), которое может быть перезаписано значением “alias” (означающим, что thunk вернул DAG, который должен быть выполнен вместо него), которое, в свою очередь, может быть перезаписано значением “result” (которое может быть как успешным результатом с сопутствующими атомами, так и фатальной ошибкой).
- Каждый раз, когда другой уровень отправляет сетевой ответ на основе значения из уровня метаданных, этот уровень повторно транслирует это значение.
Третье правило призвано обеспечить синхронизацию в тех случаях, когда это действительно важно. Например, рассмотрим следующую последовательность событий:
- Планировщик просит воркер выполнить миссию (через уровень управления)
- Воркер выполняет миссию и транслирует результат (через уровень метаданных), но сообщение теряется по пути к планировщику.
- Планировщик замечает, что воркер больше не выполняет миссию (через уровень управления), и просит его запустить её снова.
- Воркер замечает, что у thunk миссии уже есть результат в уровне метаданных, и ничего не делает, поскольку ничего делать не нужно.
Это приводит к взаимоблокировке, когда планировщик и воркер расходятся во взглядах на состояние thunk в уровне метаданных (воркер считает его завершённым, а планировщик — нет). Третье правило разрешает эту ситуацию, определяя, что поскольку ответ воркера “I no longer works on this mission” основан на его наблюдении, что у thunk имеется результат, уровень метаданных должен повторно транслировать эту информацию. Таким образом, взаимоблокировка разрешается:
- Уровень метаданных воркера повторно транслирует результат thunk, и он принимается планировщиком.
- Планировщик реагирует на появление результата у thunk миссии, помечая эту миссию как завершённую и уведомляя клиента, который запросил эту миссию.
Уровень атомов
Воркеры объединяют свои уровни атомов для создания распределённого хранилища blob, где каждый атом можно запросить по его идентификатору — 128-битному хэшу его содержимого, созданному с помощью SpookyHash. Это не распределённая хеш-таблица (DHT), поскольку она имеет неверный набор компромиссов: в DHT поиск атома осуществляется быстро (зная его хэш, можно вычислить идентификатор воркера, который его хранит, с помощью простой функции), однако запись атома будет медленной (его потребуется передать с машины, где он вычислен, на машину, которая должна его хранить согласно текущей раскладке DHT). Учитывая, что большинство атомов, как ожидается, будут использованы на той же машине, где они были созданы, это неэффективно.
Вместо этого, когда воркер запрашивает атом из своего уровня атомов, он сначала ищет его на своих NVMe-дисках. Если атом не найден, осуществляется запрос к другим воркерам о его наличии. Это является крупнейшей проблемой с точки зрения производительности в распределённом дизайне Envision, поскольку эти запросы должны выполняться как можно быстрее, а также требуется сложная стратегия таймаутов для работы с неотвечающими воркерами: если ждать слишком долго, будут потеряны секунды в ожидании ответа, который так и не поступил; если сдаться слишком рано, придётся пересчитывать атом, который можно было бы загрузить с другого воркера.
Для решения этой проблемы уровень атомов также объединяет несколько запросов, чтобы гарантировать, что все другие воркеры поддерживают полную очередь запросов, на которые им необходимо ответить, и чтобы проще обнаруживать внезапное увеличение времени отклика воркера.
Как только хотя бы один другой воркер подтвердит наличие атома на своём диске, отправляется второй запрос на его загрузку. Такие запросы на загрузку, как правило, бывают очень резкими, поскольку многие thunk-ы сначала запрашивают свои атомы, а затем начинают обрабатывать их содержимое. Из-за этого уровень атомов знает, что для каждой пары воркеров существует единая очередь загрузки, и не паникует, если для запроса какого-либо атома первая порция данных не поступает в течение нескольких секунд (если очередь заполнена, а остальные атомы получают свои данные, поводов для беспокойства нет). Можно сказать, что таймаут устанавливается не на уровне запроса атома, а на уровне всего слоя.
Кроме того, к очереди передачи применяются две оптимизации:
- Каждый запрос указывает, какому thunk требуются данные, чтобы отправитель мог группировать запросы от одного и того же thunk (чем быстрее конкретный thunk разблокируется, тем быстрее он сможет начать обработку своих входных данных).
- Когда выполнение thunk отменяется (из-за ошибки, изменения приоритета или потому, что обнаружено, что другой воркер уже завершил его), уровень атомов сообщает об этой отмене, чтобы все запросы для этого thunk могли быть удалены из очереди загрузки.
Обычный воркер передаёт данные порциями с пропускной способностью 1 ГБ/с, обычно охватывая 7 ГБ данных за одну порцию.
Уровень логирования
Этот уровень сохраняет дополнительную информацию о состоянии выполнения, чтобы её можно было проанализировать впоследствии для выяснения причин проблем или оценки производительности. Он очень подробный, содержит сведения о том, какие thunk-ы были выполнены, сколько времени потребовалось на их выполнение и какой результат был получен. Также регистрируются важные события, такие как построение нового DAG (включая сам сериализованный DAG) или обнаружение отсутствующего атома. В итоге ежедневно для каждого воркера генерируется несколько гигабайт логов.
Чтобы минимизировать влияние на производительность, каждый воркер записывает накопленные логи каждые 60 секунд или когда накопится 4 мегабайта данных (что часто происходит при всплеске активности). Эти данные записываются в блоб-блок Azure Blob Storage4, и у каждого воркера есть свой собственный блоб, чтобы избежать необходимости поддержки множественных писателей в одном блобе.
Затем другие машины (вне производственной среды Envision) могут считывать эти лог-блобы впоследствии и составлять подробную статистику о происходящем в кластере.
Беспардонная реклама: мы нанимаем программистов. Возможна удалённая работа.
-
Это может показаться расточением пропускной способности, но учтите, что идентификатор каждого thunk занимает 24 байта, и на воркер может приходиться до 32 thunk-ов, так что каждое обновление составляет всего 768 байт — меньше, чем один TCP-пакет! ↩︎
-
Хотя с точки зрения производительности уровень атомов представляет куда большую сложность. ↩︎
-
Уровень метаданных по сути является огромными векторными часами, где часы ведутся для каждого thunk, а не для каждого воркера. ↩︎
-
А почему не использовать Append Blobs? Дело в том, что и Block Blobs, и Append Blobs имеют серьёзные проблемы с производительностью при чтении файла, составленного из множества небольших записей: скорость чтения падает с ~60MB/s для обычного блоба до менее чем ~2MB/s! Чтение 5-гигабайтного лог-блоба занимает около 40 минут при такой скорости. Мы обратились к Microsoft по этому вопросу, но планы по его исправлению отсутствуют. Чтобы обойти эту проблему, мы полагаемся на то, что Block Blob можно вручную ре-компактировать (взять последние 1000 небольших записей, удалить их из блоба и записать обратно в виде одной большой записи), в то время как Append Blob таким образом не подлежит модификации. ↩︎