Envision VM (часть 2), Thunks и Модель Выполнения
Эта статья является второй из серии из четырех частей об устройстве виртуальной машины Envision – программного обеспечения, исполняющего скрипты Envision. См. часть 1, часть 3 и часть 4. В этой серии не рассматривается компилятор Envision (возможно, в другой раз), так что будем считать, что скрипт каким-то образом был преобразован в байт-код, который принимает виртуальная машина Envision.
Как и большинство других систем параллельного выполнения, Envision формирует направленный ациклический граф (DAG), где каждый узел представляет операцию, которую необходимо выполнить, а каждое ребро представляет зависимость данных, когда нижестоящий узел нуждается в результате вышестоящего узла для запуска.

Узлы называются thunk, заимствуя термин из Haskell и других языков с ленивыми вычислениями.
Примеры thunk, которые можно найти в типичном скрипте Envision:
- Разобрать входной файл в формате
.xlsx
,.csv
или.csv.gz
и преобразовать его в колонко-ориентированное представление, которое будет использоваться остальной частью скрипта. - Загрузить диапазон строк от $M$ до $N$ из отдельной колонки; эта колонка может быть получена либо в результате разбора входного файла (см. выше), либо из собственного колонко-ориентированного формата файлов
.ion
от Lokad, оптимизированного для хранения в Microsoft Azure Blob Storage. - Для диапазона строк $M..N$ из очень большого вектора $A$, меньшего вектора $B$, проектора $\pi$, который сопоставляет каждую строку в $A$ со строкой в $B$, и функции $f$, вычислить $f(A[l], B[\pi(l)])$. Это называется объединением на стороне отображения.
- Использовать метод Монте-Карло для оценки среднего, дисперсии или распределения результата случайного процесса. Результаты нескольких thunk, выполненных параллельно, затем могут быть объединены итоговым thunk.
В общем, ожидается, что выполнение одного thunk займет от нескольких сотен миллисекунд (для небольших операций с данными) до нескольких минут (для симуляций Монте-Карло или градиентного спуска). Это важное предположение: виртуальной машине Envision разрешается значительная накладная стоимость на вычисление каждого thunk – порядка миллисекунд. Скрипт должен порождать небольшое количество thunk (от 1 000 до 100 000), при этом каждый thunk выполняет довольно объемную задачу.
Референциальная прозрачность
Thunk являются чистыми функциями: они детерминированы и не могут иметь побочных эффектов. Они работают, читая свои неизменяемые входные данные, и возвращают одно и то же значение при каждом запуске. Это важное свойство помогает во многих отношениях:
- Поскольку выполнение thunk не имеет побочных эффектов, оно не будет мешать выполнению другого thunk, и поэтому все thunk могут выполняться параллельно (при условии, что их входные данные доступны) на нескольких ядрах процессора или даже распределяться между несколькими рабочими узлами. Виртуальная машина Envision отслеживает фронтир каждого скрипта (набор thunk, готовых к выполнению, поскольку все их входные данные доступны), и выбирает новый thunk всякий раз, когда одно из ядер становится свободным.
- И наоборот, можно выполнять thunk последовательно и получать тот же результат. Например, когда кластер сильно загружен, когда рабочие узлы недоступны или при воспроизведении выполнения скрипта на рабочей станции разработчика для расследования проблемы.
- Два рабочих узла, выполняющие один и тот же thunk, не являются ошибкой, а всего лишь тратой ресурсов. Таким образом, это не то, чего обязательно следует избегать (с учетом всей сложности синхронизации в распределенной системе), достаточно лишь обеспечить, чтобы это происходило не слишком часто1.
- Если результат thunk теряется (из-за сбоя рабочего узла или недоступности сети), его можно выполнить заново. Даже если несколько thunk будут потеряны, исходный DAG остается доступным и может использоваться как путь данных для повторного вычисления необходимых значений.
Однако это также означает, что thunk не могут обмениваться информацией друг с другом (например, посредством открытия канала и передачи данных между ними). Это ограничивает доступные стратегии для конкурентного и параллельного выполнения.
Производство thunk
Во многих распределенных вычислительных средах DAG исполнения создается вне кластера (например, на машине-планировщике), а затем его части передаются отдельным рабочим узлам для выполнения. Зачастую DAG создается в несколько этапов: например, операция объединения может оптимизироваться по-разному в зависимости от размера таблиц2, и не всегда можно заранее узнать размер таблицы до фактического вычисления её содержимого, поэтому имеет смысл дождаться известности размеров таблиц перед генерацией части DAG, выполняющей объединение. Это означает, что между планировщиком и рабочими узлами будет происходить обмен информацией, при котором планировщик будет создавать дополнительные задачи на основе результатов от рабочих узлов.
Это превращает планировщика в единственную точку отказа, а разрешение наличия нескольких активных планировщиков или схема резервирования между активным и пассивным планировщиком добавили бы значительную сложность. Для Envision нашей целью resilience было обеспечить, чтобы один рабочий узел мог вычислить всю задачу целиком без участия планировщика. Таким образом, даже если планировщик недоступен в течение десяти минут, что предотвратит подачу новых задач, уже запущенные задачи не будут прерваны.
Мы достигаем этого, позволяя thunk возвращать новый thunk вместо значения – используя терминологию Haskell, построение DAG включает монады, а не только функторы. Этот новый thunk имеет своих родителей, которые могут быть новыми thunk, и так далее, образуя полностью новый DAG. На практике новый DAG часто разделяет многие свои thunk со старым, поскольку ему требуются результаты этих вычислений.
При подаче новой задачи в кластер отправляется только один thunk (содержащий скрипт для компиляции и выполнения, а также ссылки на все входные файлы). Этот thunk затем порождает начальный DAG исполнения, который несколько раз расширяется до завершения.
Граф Меркла
Для передачи по сети thunk также являются сериализуемыми, используя собственный бинарный формат, разработанный для минимизации занимаемой памяти. На DAG с 100 000 thunk бюджет в 10 МиБ позволяет выделить лишь 104 байта на один thunk!
Поддержка бинарной сериализации позволила нам превратить DAG в Merkle DAG, где каждый thunk имеет идентификатор, определяемый бинарным содержимым этого thunk и всех его предков3. Мы называем этот идентификатор хэшем thunk.
Использование Merkle DAG имеет два основных преимущества. Во-первых, thunk, выполняющие одну и ту же операцию, автоматически объединяются, поскольку, имея одинаковое содержимое и предков, они получают одинаковый идентификатор.
Во-вторых, два скрипта могут разделять некоторые из своих thunk — возможно, они читают одни и те же входные файлы и применяют к ним одинаковые операции, или же Специалист по цепям поставок работает над скриптом, изменяя несколько строк за раз при каждом исполнении. В этих случаях результаты общих thunk могут быть переиспользованы, если они еще находятся в памяти, что значительно сокращает время выполнения скрипта. Возможность редактировать и повторно запускать скрипт создает короткую обратную связь, повышающую продуктивность специалистов по цепям поставок.
Локальное планирование thunk
Мы остановимся на подробностях распределения выполнения thunk между несколькими машинами в кластере в будущей статье. Пока же рассмотрим, что каждый рабочий узел имеет копию всего DAG, знает, какие thunk уже выполнены (и где найти их результаты), знает, какие thunk в данный момент выполняются в кластере, и отвечает за планирование дополнительных thunk для работы на своих 32 ядрах. Это локальное планирование осуществляется однопоточным сервисом под названием kernel (не путать с ядром операционной системы Linux). Kernel, а также потоки рабочих узлов, которые фактически выполняют thunk, все работают в одном процессе .NET для обмена управляемыми объектами между собой.
Поиск нового thunk происходит практически мгновенно, поскольку kernel поддерживает фронтир готовых к выполнению thunk для каждого DAG и ему нужно лишь выбрать один случайным образом. Основное время kernel уходит на обновление фронтира, когда thunk начинает выполняться (его требуется удалить из фронтира), завершается выполнение (его потомки могут попасть во фронтир, в зависимости от наличия еще не выполненных родителей), или теряется из-за недоступности рабочего узла, хранящего его результат (его потомки должны быть удалены из фронтира, а сам thunk может быть возвращен в него, если его собственные родители все еще доступны).
Управление фронтирами – задача с чрезвычайно высокой изменчивостью по времени, она может занимать от микросекунды до нескольких секунд – в миллион раз дольше! Например, shuffle step имеет слой из $N$ thunk, которые читают результаты другого слоя из $M$ thunk. Каждый нижестоящий thunk читает результаты всех $M$ вышестоящих thunk, что приводит к $M\cdot N$ ребрам в DAG. При $M = N = 1000$ (что весьма вероятно при обработке миллиардов строк) это составляет миллион ребер. Если не предпринять меры, это может привести к тому, что kernel приостановится на несколько секунд, в течение которых новые thunk не будут запланированы, и до 32 ядра останутся простаивающими4.
Мы решаем эту проблему, вводя виртуальные узлы в DAG для представления такого рода связей между слоями. Виртуальный узел имеет $M$ входов (по одному для каждого thunk из вышестоящего слоя) и $N$ выходов (по одному для каждого thunk из нижестоящего слоя). Это сокращает число ребер до $M + N$, что значительно проще для управления!
Генерация кода низкой гранулярности
Первые версии Envision, в 2013 и 2014 годах, работали на основе того, что каждая векторная операция выполнялась одним thunk. При вычислении выражения T.A / (T.B + 1)
использовался один thunk для трансляции скаляра 1
в таблицу T
, второй thunk для сложения T.B
с результатом первого thunk, и третий thunk для деления T.A
на результат второго thunk. Это имело преимущество, так как позволяло легко реализовать каждую операцию как функцию на C#, выполняемую в одном thunk – что являлось отличной идеей на ранних этапах разработки DSL. Однако это, конечно, имело и недостаток: потреблялось избыточное количество памяти (первый thunk создавал вектор из миллионов копий значения 1
), и затраты времени на запись и чтение памяти были существенными.
Было необходимо, чтобы один thunk выполнял сразу несколько операций, вместо создания отдельного thunk для каждой операции.
Многие SQL базы данных работают по разновидностям volcano model, в которых запрос преобразуется в дерево итераторов. Каждый итератор действует как нечистая функция, возвращающая следующее значение при каждом вызове, и может рекурсивно вызывать другие итераторы.
Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }
Компиляция запроса в модель volcano заключается в построении дерева итераторов:
Div(Read(A), Div(Read(B), BroadcastScalar(1)))
Это имеет преимущество в том, что не происходит выделения памяти для промежуточных векторов. Однако накладные расходы на вызов функций оказываются доминирующими по сравнению с простыми арифметическими операциями, которые эти функции выполняют.
Из-за этого, в 2015 году Envision перешел к генерации кода в режиме Just-In-Time. Принцип весьма схож с исполнением Tungsten в Apache Spark: операция T.A / (T.B + 1)
компилируется в функцию на императивном языке.
float[] GeneratedFunction(float[] a, float[] b) {
var result = new float[a.Length];
for (var i = 0; i < a.Length; ++i)
result[i] = a[i] / (b[i] + 1);
return result;
}
Цель, которую мы используем для этой компиляции, — .NET IL, язык байт-кода, используемый .NET для своих сборок. Это позволяет нам воспользоваться JIT-компилятором .NET для создания оптимизированного машинного кода из сгенерированного IL.
Эта генерация кода во время выполнения оказалась самой большой проблемой при миграции Envision с .NET Framework на .NET Core в 2017 году. Действительно, хотя .NET Core поддерживает те же API System.Reflection
, что и .NET Framework для создания и выполнения IL во время выполнения, он не поддерживает сохранение этого IL на диск в виде DLL. Хотя это и не требуется для запуска Envision, это, безусловно, необходимо для разработки компилятора Envision! System.Reflection
не предотвращает создание некорректного IL и лишь сообщает о довольно бесполезном InvalidProgramException
при выполнении метода, содержащего некорректный IL. Единственный разумный подход к исследованию подобных проблем — сохранить файл сборки и воспользоваться ILVerify или ILSpy. Из-за этого требования мы фактически два года продолжали поддерживать как .NET Framework, так и .NET Core — продакшн работал на .NET Core, а отладка IL проводилась на .NET Framework. Наконец, в 2019 году мы опубликовали нашу собственную библиотеку Lokad.ILPack в качестве замены этой функции и перешли с .NET Framework.
На этом завершается сегодняшний анализ того, как Envision выполняет скрипты. В следующей статье мы обсудим, как хранятся промежуточные результаты.
Без стеснения: мы нанимаем инженеров-программистов. Возможна удаленная работа.
-
Рабочие узлы транслируют информацию в кластер каждый раз, когда запускают новый thunk, и избегают выполнения thunk, которые уже запущены другими узлами. Случается редкий случай, когда два рабочих узла запускают один и тот же thunk почти одновременно; мы решаем это, позволяя каждому рабочему узлу выбирать случайный thunk из фронтира, а планировщик уменьшает число рабочих узлов, когда фронтир значительно уменьшается. Это означает, что дублирование выполнения не исключено, но крайне маловероятно. ↩︎
-
Для двух больших таблиц применяется затратное объединение с перемешиванием (shuffle join), а если одна из таблиц достаточно мала, чтобы уместиться в памяти, используется более дешевое объединение на стороне отображения (map-side join). ↩︎
-
Для thunk без предков, например, тех, что читают входные файлы, мы включаем хэш содержимого этих входных файлов внутрь тела thunk. Это гарантирует, что если два thunk читают один и тот же файл, они будут иметь идентичный хэш, а если читают два разных файла (включая две разные версии файла по заданному пути), их хэши будут различаться. ↩︎
-
Это также сказывается на размере сериализованных данных. Действительно, если все ребра представлены в сериализованном DAG, даже по два байта на ребро, это уже составляет 2 МБ данных! ↩︎