Эта статья является второй частью четырехчастной серии о внутреннем устройстве виртуальной машины Envision: программном обеспечении, которое выполняет скрипты Envision. См. часть 1, часть 3 и часть 4. В этой серии не рассматривается компилятор Envision (возможно, в другой раз), поэтому давайте просто предположим, что скрипт каким-то образом был преобразован в байт-код, который принимает виртуальная машина Envision в качестве входных данных.

Как и большинство других систем параллельного выполнения, Envision создает направленный ациклический граф (DAG), где каждый узел представляет операцию, которую необходимо выполнить, а каждое ребро представляет зависимость данных, где узел внизу требует вывода узла вверху для выполнения.

Beyond time-series

Узлы называются thunks, по аналогии с очень похожим концептом из Haskell и других языков с ленивым вычислением.

Примеры thunks, которые могут быть найдены в типичном скрипте Envision:

  • Разбор входного файла в формате .xlsx, .csv или .csv.gz и преобразование его в колоночное представление, которое будет использоваться остальной частью скрипта.
  • Загрузка диапазона строк $M..N$ из отдельного столбца; этот столбец может быть взят либо из результата разбора входного файла (см. выше), либо из собственного формата файла Lokad .ion, оптимизированного для хранения в Microsoft Azure Blob Storage.
  • Учитывая диапазон строк $M..N$ из очень большого вектора $A$, меньший вектор $B$, проектор $\pi$, который связывает каждую строку в $A$ с строкой в $B$, и функцию $f$, вычислить $f(A[l], B[\pi(l)])$. Это называется объединением на стороне отображения.
  • Использование метода Монте-Карло для оценки среднего значения, дисперсии или распределения результата случайного процесса. Результат нескольких параллельно выполняемых Monte Carlo thunks затем может быть объединен финальным thunk’ом.

В общем случае ожидается, что thunk займет от нескольких сотен миллисекунд (для манипуляции данными малого масштаба) до нескольких минут (для Монте-Карло симуляций или градиентного спуска). Это сильное предположение: виртуальная машина Envision может иметь значительные накладные расходы на вычисление каждого thunk, порядка миллисекунд. Скрипт должен создавать небольшое количество thunks (от 1 000 до 100 000), при этом каждый thunk выполняет достаточно большую единицу работы.

Референциальная прозрачность

Thunks являются чистыми функциями: они детерминированы и не могут иметь побочных эффектов. Они работают, считывая свои неизменяемые входные данные и возвращая одно и то же значение при каждом выполнении. Это важное свойство помогает во многих отношениях:

  1. Поскольку вычисление thunk не имеет побочных эффектов, оно не будет вмешиваться в вычисление другого thunk, и поэтому все thunks могут выполняться параллельно (при условии, что их входные данные доступны) на нескольких ядрах процессора или даже распределены на нескольких рабочих. Виртуальная машина Envision отслеживает фронтир каждого скрипта (набор thunks, которые могут быть выполнены, потому что все их входные данные доступны) и выбирает новый thunk из него, когда процессор становится доступным.
  2. Напротив, возможно вычислить thunks один за другим и получить тот же результат. Например, когда кластер перегружен, когда рабочие кластера недоступны или когда воспроизводится вычисление скрипта на рабочей станции разработчика для решения проблемы.
  3. Два рабочих, выполняющих один и тот же thunk, не является ошибкой, это просто пустая трата времени. Поэтому это нечто, что должно быть избегаемо (со всеми трудностями синхронизации в распределенной системе), достаточно обеспечить, чтобы это не происходило слишком часто1.
  4. Если результат thunk потерян (из-за сбоя рабочего или недоступности сети), его можно запустить снова. Даже если несколько thunks потеряны, исходный DAG остается доступным и может быть использован в качестве линии данных для повторного вычисления необходимых значений.

Однако это также означает, что thunks не могут взаимодействовать друг с другом (например, открывать канал и передавать данные между ними). Это ограничивает доступные стратегии параллелизма и параллелизма.

Производство thunk

Во многих распределенных вычислительных фреймворках DAG выполнения создается вне кластера (например, на машине планировщика), а затем части графа передаются отдельным рабочим для выполнения. Очень часто DAG должен быть создан в несколько этапов: например, операция объединения может быть оптимизирована по-разному в зависимости от размера таблиц2, и не всегда возможно знать размер таблицы до фактического вычисления ее содержимого, поэтому стоит подождать, пока размеры таблиц не станут известными, прежде чем генерировать часть DAG, выполняющую объединение. Это означает, что между планировщиком и рабочими будет происходить обратная связь, где планировщик будет создавать дополнительные задачи на основе результатов от рабочих.

Это превращает планировщик в единую точку отказа, и позволяет использовать несколько активных планировщиков или схему аварийного переключения между активным и пассивным планировщиками, что добавляет довольно много сложностей. Для Envision нашей целью устойчивости было обеспечить, чтобы один рабочий мог вычислить всю миссию без участия планировщика. Таким образом, даже если планировщик будет недоступен в течение десяти минут, новые миссии не будут приниматься, но уже запущенные миссии будут завершены. Однако это означает, что рабочие должны быть способны генерировать новые части DAG без помощи планировщика.

Мы достигаем этого, позволяя thunk возвращать новый thunk вместо значения - чтобы использовать больше терминов Haskell, построение DAG включает монады, а не только функторы. У этого нового thunk есть свои собственные родители, которые также могут быть новыми thunks, и так далее, образуя полностью новый DAG. На практике новый DAG часто содержит множество thunks из старого DAG, потому что он нуждается в результатах этих вычислений.

При отправке новой миссии в кластер отправляется только один thunk (содержащий скрипт для компиляции и выполнения и ссылки на все входные файлы). Этот thunk затем создает начальный DAG выполнения, который будет несколько раз увеличиваться, пока не станет завершенным.

Граф Меркля

Для передачи по сети thunks также сериализуются с использованием специального бинарного формата, разработанного для минимизации размера. В DAG с 100 000 thunks бюджет в 10 МиБ может поддерживать только 104 байта на каждый thunk!

Поддержка бинарной сериализации позволила нам превратить DAG в граф Меркля, где каждый thunk имеет идентификатор, определяемый бинарным содержимым этого thunk и всех предков thunk3. Мы называем этот идентификатор хешем thunk.

Использование графа Меркля имеет два основных преимущества. Во-первых, thunks, выполняющие одну и ту же операцию, автоматически объединяются, потому что, имея одинаковое содержимое и предков, они также имеют одинаковый идентификатор.

Во-вторых, два скрипта могут использовать некоторые из своих thunks — возможно, они читают одни и те же входные файлы и применяют к ним одни и те же операции, или, возможно, специалист по цепям поставок работает над скриптом, постепенно изменяя несколько строк между выполнениями. Когда это происходит, результаты общих thunks могут быть повторно использованы, если они все еще находятся в памяти, что существенно сокращает время выполнения скрипта. Возможность редактирования и повторного выполнения скрипта создает короткий цикл обратной связи, который способствует продуктивности специалистов по цепям поставок.

Локальное планирование thunk

Мы подробнее рассмотрим распределение выполнения thunk по нескольким машинам в кластере в будущей статье. Пока просто учтите, что каждый рабочий узел хранит копию всего DAG, знает, какие thunks уже выполнены (и где найти их результаты), знает, какие thunks в настоящее время выполняются кластером и отвечает за планирование дополнительных thunks для выполнения на своих 32 ядрах. Это локальное планирование выполняется однопоточным сервисом, называемым ядром (которое не следует путать с ядром Linux). Ядро, а также рабочие потоки, которые фактически выполняют thunks, все работают в одном процессе .NET, чтобы иметь возможность обмениваться управляемыми объектами друг с другом.

Поиск нового thunk практически мгновенный, поскольку ядро хранит фронтенд готовых к выполнению thunks для каждого DAG и нужно только выбрать один случайным образом. Большую часть времени ядро тратит на обновление фронтенда, когда thunk начинает выполняться (он должен покинуть фронтенд), заканчивает выполнение (его потомки могут присоединиться к фронтенду, в зависимости от того, остались ли у него невыполненные родители) или теряется из-за недоступности рабочего узла, который хранит его результат (его потомки должны покинуть фронтенд, но сам thunk может быть добавлен обратно в фронтенд, если его собственные родители все еще доступны).

Работа с фронтендами имеет очень высокую изменчивость и может занимать от микросекунд до нескольких секунд—более миллиона раз дольше! Например, шаг перемешивания имеет слой из $N$ thunks, которые читают результаты другого слоя из $M$ thunks. Каждый нижестоящий thunk читает результаты всех $M$ вышестоящих thunks, что приводит к $M\cdot N$ ребрам в DAG. Для $M = N = 1000$ (очень вероятная степень параллелизации при работе с миллиардами строк) это миллион ребер. Если не контролировать этот феномен, ядро может приостановиться на несколько секунд, во время которых не запланировано выполнение новых thunks, и до 32 ядер останутся без работы4.

Мы решаем эту проблему, вводя виртуальные узлы в DAG, чтобы представлять этот тип связи между слоями. Виртуальный узел имеет $M$ входов (по одному для каждого thunk в верхнем слое) и $N$ выходов (по одному для каждого thunk в нижнем слое). Это уменьшает количество ребер до $M + N$, что значительно более управляемо!

Кодогенерация с низкой гранулярностью

Первые версии Envision, в 2013 и 2014 годах, работали на основе того, что каждая векторная операция выполняется одним thunk. При выполнении T.A / (T.B + 1) был бы thunk один для широковещательного 1 в таблицу T, thunk два для добавления T.B к результату thunk один и thunk три для деления T.A на результат thunk два. Это имело преимущество в том, что мы могли легко реализовать каждую операцию в виде функции C#, выполняемой как один thunk, что является отличной идеей во время ранней реализации DSL. Конечно, это имеет недостаток в том, что потребляются ненужные объемы памяти (thunk один производит вектор из миллионов копий значения 1), и память требует времени для записи и чтения обратно.

Было необходимо иметь thunks, которые вычисляют несколько операций последовательно, а не иметь один thunk для каждой операции.

Многие SQL-базы данных работают с вариациями модели вулкана, где запрос преобразуется в дерево итераторов. Каждый итератор действует как нечистая функция, которая возвращает следующее значение при каждом вызове и может рекурсивно вызывать другие итераторы. В этой модели широковещание скаляра в таблицу будет являться итератором с постоянным возвращаемым значением, добавление или деление двух векторов будет иметь ссылки на два итератора, и чтение из вектора будет инкрементироваться через него:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

Компиляция запроса в модель вулкана состоит в построении дерева итераторов:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

Это имеет преимущество в том, что не выполняются выделения памяти для промежуточных векторов. Однако накладные расходы на вызов функций превосходят простые арифметические операции, которые выполняют эти функции.

Поэтому в 2015 году Envision перешел к генерации кода в режиме Just-in-Time. Принцип довольно похож на исполнительный движок Tungsten в Apache Spark: скомпилировать операцию T.A / (T.B + 1) в функцию на императивном языке.

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

Целью, которую мы используем для этой компиляции, является .NET IL, язык байт-кода, используемый .NET для своих сборок. Это позволяет нам использовать .NET JIT-компилятор для создания оптимизированного машинного кода из нашего сгенерированного IL.

Этот код, генерирующийся во время выполнения, оказался самым большим препятствием при переходе Envision с .NET Framework на .NET Core в 2017 году. Действительно, хотя .NET Core поддерживает те же API System.Reflection, что и .NET Framework, для создания и выполнения IL во время выполнения, он не поддерживает сохранение этого IL на диск в виде DLL. Хотя это не является требованием для запуска Envision, это, безусловно, требование для разработки компилятора Envision! System.Reflection ничего не делает, чтобы предотвратить создание недопустимого IL, и сообщает только о бесполезном исключении InvalidProgramException, когда выполняется метод, содержащий недопустимый IL. Единственный разумный подход к исследованию таких проблем - сохранить файл сборки и использовать ILVerify или ILSpy. Из-за этого требования мы фактически продолжали работать с .NET Framework и .NET Core в течение двух лет - продакшн работал на .NET Core, а отладка IL выполнялась на .NET Framework. Наконец, в 2019 году мы опубликовали нашу собственную библиотеку Lokad.ILPack в качестве замены для этой функции и перешли от .NET Framework.

Это завершает сегодняшний анализ того, как Envision выполняет скрипты. В следующей статье мы обсудим, как хранятся промежуточные результаты.

Бесстыдная реклама: мы нанимаем инженеров по программному обеспечению. Возможна удаленная работа.


  1. Рабочие передают сообщение всему кластеру, когда они запускают новый thunk, и избегают выполнения thunks, которые уже были взяты другими рабочими. Остается редкий случай, когда два рабочих запускают один и тот же thunk почти одновременно; мы избегаем этого, позволяя каждому рабочему выбирать случайный thunk из фронтира, и позволяя планировщику уменьшать количество рабочих, когда фронтир слишком сокращается. Это означает, что дублированное выполнение не является невозможным, но очень маловероятным. ↩︎

  2. Для двух больших таблиц используется дорогостоящее объединение с перемешиванием, а для одной из таблиц используется более дешевое объединение на стороне отображения, когда одна из таблиц достаточно мала, чтобы поместиться в память. ↩︎

  3. Для thunks без предков, таких как те, которые читают из входных файлов, мы включаем хеш содержимого этих входных файлов в тело thunk. Это гарантирует, что если два thunks читают один и тот же входной файл, у них будет одинаковый хеш, а если они читают два разных входных файла, включая две разные версии файла по заданному пути, то у них будут разные хеши. ↩︎

  4. Это также влияет на размер сериализации. Действительно, если все ребра представлены в сериализованном DAG, даже с двумя байтами на ребро, это уже представляет собой 2 МБ данных! ↩︎