Envision VM (パート2)、サンクと実行モデル
この記事は、Envisionスクリプトを実行するソフトウェアであるEnvision仮想マシンの内部処理についての全4部構成の第2部です。参照:パート1、パート3およびパート4。このシリーズではEnvisionコンパイラについては扱っていません(もしかしたら別の機会に)、なのでスクリプトが何らかの形でEnvision仮想マシンが入力として受け取るバイトコードに変換されたと仮定しましょう.
ほとんどの並列実行システムと同様に、Envisionは実行すべき操作を表す各ノードと、実行のために上流ノードの出力を必要とする各エッジによって、データ依存性を持つ有向非巡回グラフ(DAG)を生成する。

これらのノードは、Haskellやその他の遅延評価を持つ言語における非常に類似した概念に因んで、サンクと呼ばれる。
一般的なEnvisionスクリプトで見られるサンクの例:
- 入力ファイル(
.xlsx
、.csv
、または.csv.gz
形式)を解析し、スクリプトの残り部分で使用されるカラム形式のデータに変換する。 - 個々のカラムから$M..N$行の範囲を読み込む;このカラムは、入力ファイルの解析結果(上記参照)から取得するか、またはMicrosoft Azure Blob Storage向けに最適化されたLokad独自の
.ion
カラム形式ファイルから取得することができる。 - 非常に大きなベクトル$A$の$M..N$行の範囲、より小さいベクトル$B$、$A$の各行を$B$の行と対応付けるプロジェクター$\pi$、および関数$f$が与えられた場合、$f(A[l], B[\pi(l)])$を計算する。これはマップサイド・ジョインと呼ばれる。
- モンテカルロシミュレーションを用いて、ランダムプロセスの結果の平均、分散、または分布を推定する。複数のモンテカルロ・サンクを並列実行した結果は、最終的なサンクによって結合することができる。
一般的に、サンクは(小規模なデータ操作で)数百ミリ秒から(モンテカルロシミュレーションや勾配降下などで)数分を要すると想定される。これは厳しい前提であり、Envision仮想マシンは各サンクの評価に対してミリ秒単位の大きなオーバーヘッドを許容している。スクリプトは、1 000から100 000の間の少数のサンクを生成し、それぞれがかなり大きな作業単位を実行することが求められる。
参照透過性
サンクは純粋関数である:すなわち、決定論的で副作用を持たない。サンクは不変の入力を読み取り、実行のたびに同じ値を返す。この重要な性質は様々な面で役立つ:
- サンクの評価が副作用を伴わないため、別のサンクの評価に干渉せず、入力が利用可能であれば複数のCPUコア上や複数のワーカーに分散して同時に実行できる。Envision仮想マシンは各スクリプトのフロンティア(すべての入力が利用可能なために実行可能なサンクの集合)を追跡し、CPUが利用可能になるたびに新たなサンクを選ぶ。
- 逆に、一つずつサンクを評価して同じ結果に到達することも可能である。例えば、クラスターに過大な負荷がかかっている場合、クラスターのワーカーが利用不可能な場合、または問題解決のために開発者のワークステーション上でスクリプトの評価を再現する場合などである。
- 同じサンクを二人のワーカーが実行することはエラーではなく、ただの時間の浪費である。したがって、分散システムにおける同期の困難さから完全に回避する必要はなく、あまり頻繁に起こらないようにすれば十分である1。
- サンクの結果が失われた場合(ワーカーのクラッシュやネットワークの利用不可能などによって)、再度実行することが可能である。仮に複数のサンクが失われたとしても、元のDAGは保持され、必要な値を再計算するためのデータリネージとして利用できる。
しかし、これによりサンク同士が互いに_通信_すること(例えば、チャネルを開いてデータを送受信すること)ができなくなり、並行性や並列性を実現するための戦略が制限される。
サンクの生成
多くの分散計算フレームワークでは、実行DAGはクラスター外(例えばスケジューラマシン上)で生成され、その後グラフの一部が各ワーカーにプッシュされて実行される。しばしば、DAGは複数のステップで生成される必要がある。例えば、結合操作はテーブルのサイズ2に応じて異なる最適化が施され得るが、テーブルの内容を実際に評価する前にサイズが分かるとは限らないため、結合を実行するDAG部分を生成する前にテーブルサイズが判明するのを待つ価値がある。このため、スケジューラとワーカー間で結果に基づいてスケジューラが追加タスクを生成するという往復が発生する。
これによりスケジューラが単一障害点となり、複数のアクティブなスケジューラや、アクティブとパッシブなスケジューラ間のフェイルオーバー方式を採用すると非常に複雑になる。Envisionでは、私たちのレジリエンス目標として、スケジューラを介さず単一のワーカーで全ミッションを計算できるようにすることを重視した。したがって、たとえスケジューラが10分間ダウンしても新たなミッションの送信は阻止されるが、既に開始されたミッションの完了は妨げられない。しかし、これはワーカーがスケジューラの助けを借りずにDAGの新たな部分を生成できる必要があることを意味する。
これを実現するために、サンクが値の代わりに_新たなサンク_を返すことを許容する― Haskellの用語を借れば、DAGの構築は単なるファンクターではなくモナドを用いる。新しいサンクは自身の親を持ち、それらもまた新たなサンクである可能性があり、こうして完全な新しいDAGが形成される。実際、新しいDAGは以前の計算結果を必要とするため、旧DAGの多くのサンクを共有することが多い。
クラスターに新たなミッションを送信する際は、コンパイルおよび実行されるスクリプトとすべての入力ファイルへの参照を含む単一のサンクのみが送信される。このサンクが初期の実行DAGを生成し、完全なものになるまで数回拡大される。
マークルグラフ
ネットワーク上で送信可能にするため、サンクは低フットプリントを実現するために設計されたカスタムバイナリ形式を使用してシリアライズ可能である。たとえば、100 000のサンクを持つDAGでは、10MiBの予算でサンクあたりわずか104バイトしか確保できない!
バイナリシリアライゼーションのサポートにより、各サンクがその自身およびすべての先祖のバイナリ内容に基づいて決定される識別子を持つマークルDAGに変換することが可能となった。この識別子をサンクのハッシュと呼ぶ。
マークルDAGを使用することには大きく2つの利点がある。第一に、同じ操作を実行するサンクは、同一の内容と先祖を共有するため、自動的に統合され、同じ識別子を持つ。
第二に、2つのスクリプトが一部のサンクを共有することが可能である―例えば、同じ入力ファイルを読み込み同じ操作を実行する場合や、サプライチェーン・サイエンティストがスクリプトに取り組み、実行間に数行ずつ変更を加える場合などである。このような場合、共有されたサンクの出力がメモリ上に存在すれば再利用でき、スクリプトの実行時間を大幅に短縮する。スクリプトを編集して再実行できる短いフィードバックループは、サプライチェーン・サイエンティストの生産性向上につながる。
ローカルサンクスケジューリング
今後の記事で、クラスター内の複数マシンにわたってサンクの実行がどのように分散されるかについてさらに詳しく説明する。現時点では、各ワーカーがDAG全体のコピーを保持し、既に実行されたサンク(およびその結果の位置)を把握し、クラスター内で現在実行中のサンクを認識するとともに、自身の32コア上で追加のサンクのスケジューリングを担当していると考えてほしい。このローカルスケジューリングは、カーネルと呼ばれるシングルスレッドのサービスによって行われる(これはLinuxカーネルと混同してはならない)。カーネルと、実際にサンクを実行するワーカースレッドは、相互に管理されたオブジェクトを共有するために同一の.NETプロセス内で動作する。
新たなサンクの探索はほぼ即時に行われる。なぜならカーネルは各DAGごとに実行可能なサンクのフロンティアを保持し、ランダムに1つを選ぶだけで済むからである。大部分の時間は、サンクが実行開始(フロンティアから除外)または実行完了(未実行の親の有無に応じてその子孫がフロンティアに参加する可能性がある)したり、ワーカーがその結果を保持できなくなったためにサンクが失われたりした際のフロンティアの更新に費やされる。
フロンティアの管理作業は非常に変動が大きく、マイクロ秒から数秒―場合によっては_百万_倍以上―かかることがある。例えば、シャッフルステップでは、$M$個のサンクの出力を読み取る$N$個のサンクの層が存在する。各下流のサンクはすべての$M$個の上流サンクの出力を読み取るため、DAGには$M\cdot N$本のエッジが生じる。$M = N = 1000$(数十億行を扱う際の並列化として十分想定される規模)の場合、これは100万本のエッジに相当する。これが放置されると、カーネルが数秒間停止し、その間に新たなサンクがスケジュールされず、最大32コアがアイドル状態になる可能性がある3。
この問題は、層間の接続を表すためにDAGに仮想ノードを導入することで解決する。仮想ノードは、上流層の各サンクに対して1つずつの$M$個の入力と、下流層の各サンクに対して1つずつの$N$個の出力を持つ。これにより、エッジの数は$M + N$に減少し、はるかに管理しやすくなる!
低粒度のコード生成
2013年と2014年の初期バージョンのEnvisionでは、各ベクトル演算が単一のサンクによって実行されるという前提で動作していた。例えば、T.A / (T.B + 1)
を実行する際、テーブルT
に1
をブロードキャストするサンク1、サンク1の結果にT.B
を加算するサンク2、そしてサンク2の結果でT.A
を除算するサンク3が存在した。これにより、各演算を単一のサンクとしてC#関数で容易に実装できるという利点があったが、その反面、不要なメモリ消費が発生する(サンク1では値1
の何百万ものコピーからなるベクトルが生成される)という欠点もあり、メモリの書き込みや読み出しには時間がかかる。
各演算ごとにサンクを用いるのではなく、連続した複数の演算を評価するサンクが必要であった。
多くのSQLデータベースは、クエリをイテレーターの木構造に変換するボルケーノモデルの変種で動作する。各イテレーターは、呼び出されるたびに次の値を返す不純な関数のように振る舞い、他のイテレーターを再帰的に呼び出すことができる。このモデルでは、テーブルへのスカラーのブロードキャストは定数を返すイテレーターとなり、2つのベクトルの加算または除算は2つのイテレーターへの参照を保持し、ベクトルからの読み出しはそのインクリメントを伴う:
Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }
ボルケーノモデルへのクエリのコンパイルは、イテレーターの木構造を構築することを意味する:
Div(Read(A), Div(Read(B), BroadcastScalar(1)))
これにより、中間ベクトルに対してメモリ割り当てが行われないという利点がある。しかし、これらの関数呼び出しのオーバーヘッドが、実際に行われる単純な算術演算を支配してしまう。
このため、2015年にEnvisionはジャストインタイムコード生成に移行した。その原理は、Apache SparkのTungsten実行エンジンに非常に類似しており、T.A / (T.B + 1)
という演算を命令型言語の関数にコンパイルするというものである。
float[] GeneratedFunction(float[] a, float[] b) {
var result = new float[a.Length];
for (var i = 0; i < a.Length; ++i)
result[i] = a[i] / (b[i] + 1);
return result;
}
このコンパイルで使用するターゲットは、.NETのアセンブリで使用されるバイトコード言語である.NET ILである。これにより、.NET JITコンパイラを活用して、生成されたILから最適化されたマシンコードを作成できる。
このランタイムコード生成は、2017年にEnvisionを.NET Frameworkから.NET Coreへ移行する際の最大の障害となりました。実際、.NET Coreは、ランタイムでILを生成・実行するために.NET Frameworkと同じSystem.Reflection
APIをサポートしていますが、そのILをDLLとしてディスクに保存する機能はサポートされていません。これはEnvisionを_実行_するための要件ではありませんが、Envisionコンパイラーを開発するためには確かに必要な機能です! System.Reflection
は無効なILの生成を防ぐ処置を何も行わず、無効なILが含まれるメソッドが実行された際に、ほとんど役に立たないInvalidProgramException
を報告するだけです。このような問題を調査するための唯一の合理的な方法は、アセンブリファイルを保存してILVerifyやILSpyを使用することです。この要件のため、実際には2年間、.NET Frameworkと.NET Coreの両方をターゲットにし続けました — 本番環境は.NET Core上で動作し、ILデバッグは.NET Framework上で行われました。最終的に、2019年にこの機能の代替として私たち自身のライブラリLokad.ILPackを公開し、.NET Frameworkから移行しました。
これにて、Envisionがスクリプトを実行する仕組みに関する本日の解析を終了します。次回の記事では、中間結果の保存方法について解説します。
恥を忍んで宣伝します:私たちはソフトウェアエンジニアを募集しています。リモートワークも可能です。
-
ワーカーは新たなサンクを開始する際にクラスター全体へブロードキャストし、他のワーカーが既に取得しているサンクの実行を避ける。しかし、二人のワーカーがほぼ同時に同じサンクを開始するという稀なケースが存在する。各ワーカーがフロンティアからランダムにサンクを選び、フロンティアが大幅に縮小した場合にスケジューラがワーカー数を減らすことで、重複実行は不可能ではないが非常に起こりにくい状態を実現している。 ↩︎
-
2つの大きなテーブルの場合、高コストのシャッフル・ジョインが用いられ、片方のテーブルがメモリに収まるほど小さい場合はより安価なマップサイド・ジョインが使用される。 ↩︎
-
これはシリアライズサイズにも影響を与える。実際、すべてのエッジがシリアライズされたDAGに表現される場合、エッジ1本あたりたった2バイトであっても、既に2MBのデータ量となる! ↩︎