この記事は、Envision仮想マシンの内部動作に関する4部作の2番目の記事です。Envisionスクリプトを実行するソフトウェアです。パート1パート3パート4を参照してください。このシリーズではEnvisionコンパイラはカバーしていません(別の機会にでも)。したがって、スクリプトはどのようにしてEnvision仮想マシンが入力として受け取るバイトコードに変換されたかを想定してください。

ほとんどの他の並列実行システムと同様に、Envisionは有向非巡回グラフ(DAG)を生成します。各ノードは実行する必要がある操作を表し、各エッジは下流のノードが上流のノードの出力を実行するために必要とするデータの依存関係を表します。

Beyond time-series

ノードは、Haskellや他の遅延評価を持つ言語と非常に似た概念であるサンクと呼ばれます。

典型的なEnvisionスクリプトで見つかるサンクの例:

  • .xlsx.csv.csv.gz形式の入力ファイルを解析し、スクリプトの残りの部分で使用される列指向の表現に変換します。
  • 個々の列から行の範囲$M..N$をロードします。この列は、入力ファイルの解析結果(上記参照)またはMicrosoft Azure Blob Storageに格納するために最適化されたLokad独自の.ion列指向ファイル形式から取得できます。
  • 非常に大きなベクトル$A$から行の範囲$M..N$、より小さなベクトル$B$、プロジェクタ$\pi$($A$の各行を$B$の行に関連付ける)および関数$f$が与えられた場合、$f(A[l], B[\pi(l)])$を計算します。これはマップサイドジョインと呼ばれます。
  • モンテカルロシミュレーションを使用して、ランダムプロセスの平均、分散、または分布を推定します。並列に実行される複数のモンテカルロサンクの結果は、最終的なサンクによって組み合わされることができます。

一般的に、サンクは数百ミリ秒(小規模なデータ操作の場合)から数分(モンテカルロシミュレーションや勾配降下の場合)かかることが期待されています。これは強い仮定です。Envision仮想マシンは、各サンクの評価において、ミリ秒のオーダーでかなりのオーバーヘッドを持つことが許可されています。スクリプトは少数のサンク(1,000〜100,000個)を生成し、各サンクはかなり大きな作業単位を実行する必要があります。

参照透過性

サンクは純粋関数です。つまり、決定論的であり、副作用を持つことはありません。サンクは不変の入力を読み取り、実行ごとに同じ値を返します。この重要な特性は、多くの方法で役立ちます:

  1. サンクの評価に副作用がないため、他のサンクの評価と干渉しません。そのため、すべてのサンクは(入力が利用可能である限り)複数のCPUコアで並行して実行されるか、さらには複数のワーカーで分散されることができます。Envision仮想マシンは、各スクリプトのフロンティア(すべての入力が利用可能なために実行できるサンクのセット)を追跡し、CPUが利用可能になるたびに新しいサンクを選択します。
  2. 逆に、サンクを1つずつ評価して同じ結果に到達することも可能です。たとえば、クラスタが重い負荷下にある場合、クラスタワーカーが利用できない場合、または問題を調査するためにスクリプトの評価を開発者のワークステーションで再現する場合などです。
  3. 同じサンクを実行する2つのワーカーはエラーではありませんが、時間の無駄です。そのため、これを頻繁に起こさないようにする必要はありません(分散システムでの同期に関連するすべての困難を伴うもの)。1
  4. サンクの結果が失われた場合(ワーカーのクラッシュやネットワークの利用不可など)、それを再実行することが可能です。複数のサンクが失われても、元のDAGは利用可能であり、必要な値を再計算するためのデータの系譜として使用することができます。

ただし、これはサンクがお互いに「通信」することができないことを意味します(たとえば、チャネルを開いてデータを送信することなど)。これにより、並行性と並列性のための利用可能な戦略が制限されます。

サンクの生成

多くの分散計算フレームワークでは、実行DAGはクラスタの外部(たとえば、スケジューラマシン上)で生成され、グラフの一部が個々のワーカーにプッシュされて実行されます。しばしば、DAGは複数のステップで生成する必要があります。たとえば、結合操作はテーブルのサイズに応じて異なる最適化が行われることがあります2。テーブルのサイズを実際に評価する前には、テーブルのサイズを知ることはできないため、結合を実行するDAGの一部を生成する前にテーブルのサイズがわかるまで待つ価値があります。これにより、スケジューラとワーカーの間で往復が発生し、スケジューラはワーカーの結果に基づいて追加のタスクを生成します。

これにより、スケジューラは単一の障害点となり、複数のアクティブなスケジューラやアクティブとパッシブのスケジューラ間のフェイルオーバースキームを許可すると、かなりの複雑さが追加されます。Envisionでは、レジリエンスのターゲットは、スケジューラを介さずに単一のワーカーがミッション全体を計算できるようにすることでした。そのため、10分間のスケジューラのダウンタイムでは新しいミッションの提出ができなくなりますが、既に開始されたミッションは完了することはありません。ただし、これはワーカーがスケジューラの助けなしでDAGの新しい部分を生成できる必要があることを意味します。

これを実現するために、サンクは値ではなく、_新しいサンク_を返すようにします。より具体的には、DAGの構築にはファンクターだけでなくモナドも使用されます。この新しいサンクには、それ自体も新しいサンクである可能性のある親があります。実際には、新しいDAGはしばしば古いDAGと多くのサンクを共有しています。なぜなら、それらの計算の結果が必要だからです。

クラスタに新しいミッションを提出する際には、単一のサンクのみが提出されます(コンパイルと実行するスクリプト、およびすべての入力ファイルへの参照を含む)。このサンクは初期の実行DAGを生成し、それが完全になるまで数回成長します。

Merkleグラフ

サンクはネットワーク上で転送するためにもシリアライズ可能である必要があります。これには、低いフットプリントを持つために設計されたカスタムバイナリ形式が使用されます。100,000個のサンクを持つDAGでは、10MiBの予算では1つのサンクあたり104バイトしかサポートできません!

バイナリシリアル化のサポートにより、DAGをMerkle DAGに変換することができました。各サンクは、そのサンクとすべてのサンクの祖先のバイナリコンテンツによって決定される識別子を持っています3。この識別子をサンクのハッシュと呼びます。

Merkle DAGを使用することには2つの主な利点があります。まず、同じ操作を実行するサンクは自動的にマージされます。同じ内容と祖先を持つため、同じ識別子を持つからです。

そして、2つのスクリプトがいくつかのサンクを共有することができます。たとえば、同じ入力ファイルを読み取り、それらに同じ操作を適用するかもしれません。または、サプライチェーンサイエンティストがスクリプトで作業し、実行の間に数行を変更しているかもしれません。これが起こると、共有サンクの出力はメモリにまだ存在している場合に再利用することができ、スクリプトの実行時間を大幅に短縮することができます。スクリプトを編集して再実行できることは、サプライチェーンサイエンティストの生産性を向上させる短いフィードバックループを作成します。

ローカルサンクスケジューリング

サンクの実行がクラスタ内の複数のマシンに分散される方法については、将来の記事で詳しく説明します。今のところ、各ワーカーは完全なDAGのコピーを保持し、すでに実行されたサンク(およびその結果の場所)を知っており、クラスタで現在実行中のサンクを知っており、32個のコアで実行するための追加のサンクのスケジューリングを担当しています。このローカルスケジューリングは、カーネルと呼ばれるシングルスレッドのサービスによって行われます(これはLinuxカーネルとは異なるものです)。カーネルと実際にサンクを実行するワーカースレッドは、互いに管理されたオブジェクトを共有するために同じ.NETプロセスで実行されます。

新しいサンクを見つけることはほぼ瞬時です。カーネルは各DAGの実行準備が整ったサンクのフロンティアを保持しており、ランダムに1つを選ぶだけです。カーネルのほとんどの時間は、サンクの実行が開始されるとフロンティアを更新するために費やされます(フロンティアから離れる必要があります)。実行が終了すると(実行されていない親が残っているかどうかに応じて)、その子孫がフロンティアに参加する可能性があります。また、ワーカーが結果を保持しているサンクが利用できなくなるために失われた場合、その子孫はフロンティアから離れなければなりませんが、サンク自体は自身の親がまだ利用可能であればフロンティアに追加される場合があります。

フロンティアへの対応は非常に高い変動性を持つ作業であり、マイクロ秒から数秒までかかることがあります。たとえば、シャッフルステップには、別の層の$N$個のサンクがあり、それらは別の層の$M$個のサンクの出力を読み取ります。各下流サンクは、すべての$M$個の上流サンクの出力を読み取り、DAGには$M\cdot N$個のエッジがあります。$M = N = 1000$の場合(数十億行を扱う場合に非常に一般的な並列化の度合い)、それは100万個のエッジです。この現象が放置されると、カーネルは数秒間一時停止し、その間に新しいサンクがスケジュールされず、最大32個のコアがアイドル状態になる可能性があります4

私たちは、このような層間の接続を表すために、DAGに仮想ノードを導入することで、この問題を解決します。仮想ノードには$M$個の入力(上流層の各サンクに1つずつ)と$N$個の出力(下流層の各サンクに1つずつ)があります。これにより、エッジの数が$M + N$に減少し、管理が容易になります!

低粒度のコード生成

2013年と2014年のEnvisionの最初のバージョンでは、各ベクトル演算が単一のサンクによって実行されるという基準で動作していました。T.A / (T.B + 1)を実行する場合、サンク1つは1をテーブルTにブロードキャストするためのものであり、サンク2つはサンク1つの結果にT.Bを追加するためのものであり、サンク3つはサンク2つの結果でT.Aを除算するためのものでした。これにより、各操作をC#関数として実装し、単一のサンクとして実行することが容易になりました。これはDSLの初期実装時には優れたアイデアです。もちろん、不必要な量のメモリが消費される(サンク1つは値1の数百万のコピーのベクトルを生成します)ため、メモリの書き込みと読み取りに時間がかかります。

各操作ごとに1つのサンクではなく、連続して複数の操作を評価するサンクが必要でした。

多くのSQLデータベースは、volcanoモデルのバリエーションで動作します。このモデルでは、クエリがイテレータのツリーに変換されます。各イテレータは、呼び出されるたびにイテレーションの次の値を返す副作用のある関数として機能し、他のイテレータを再帰的に呼び出すことができます。このモデルでは、スカラーをテーブルにブロードキャストする場合は定数を返すイテレータ、2つのベクトルを加算または除算する場合は2つのイテレータへの参照を保持し、ベクトルから読み取る場合はそれを通過します:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }

クエリをvolcanoモデルにコンパイルすることは、イテレータのツリーを構築することを意味します:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))

これにより、中間ベクトルのメモリ割り当てが行われないという利点があります。ただし、関数の呼び出しのオーバーヘッドが、それらの関数が実行する単純な算術演算よりも支配的です。

このため、2015年にEnvisionはジャストインタイムコード生成に移行しました。この原則は、Apache SparkのTungsten実行エンジンと非常に似ています。操作T.A / (T.B + 1)を命令型言語の関数にコンパイルします。

float[] GeneratedFunction(float[] a, float[] b) {
    var result = new float[a.Length];
    for (var i = 0; i < a.Length; ++i)
        result[i] = a[i] / (b[i] + 1);
    return result;
}

このコンパイルのターゲットは.NETのアセンブリで使用されるバイトコード言語である.NET ILです。これにより、生成されたILから最適化されたマシンコードを生成するために.NET JITコンパイラを活用することができます。

このランタイムコード生成は、2017年にEnvisionを.NET Frameworkから.NET Coreに移行する際の最大の障害となりました。実際、.NET CoreはランタイムでILを生成および実行するための.NET Frameworkと同じSystem.Reflection APIをサポートしていますが、そのILをDLLとしてディスクに保存することはサポートしていません。これはEnvisionを_実行する_ための要件ではありませんが、Envisionコンパイラを開発するための要件です!System.Reflectionは、無効なILが作成されるのを防ぐことはできず、無効なILを含むメソッドが実行されると、あまり役に立たないInvalidProgramExceptionのみを報告します。このような問題を調査するための唯一の合理的なアプローチは、アセンブリファイルを保存してILVerifyまたはILSpyを使用することです。この要件のために、実際には2年間にわたって.NET Frameworkと.NET Coreの両方をターゲットにしました。本番環境では.NET Coreで実行し、ILデバッグは.NET Frameworkで行われました。最終的に、2019年に独自のライブラリLokad.ILPackを公開し、.NET Frameworkから移行しました。

これにより、今日のEnvisionのスクリプトの実行方法の分析は終了です。次の記事では、中間結果の保存方法について説明します。

自己紹介: ソフトウェアエンジニアを募集しています。リモートワークも可能です。


  1. ワーカーは新しいサンクを開始するたびにクラスタにブロードキャストし、他のワーカーが宣言したサンクを実行しないようにします。ほとんど同時に2つのワーカーが同じサンクを開始するという稀なケースが残ります。これを避けるために、各ワーカーはフロンティアからランダムにサンクを選択し、スケジューラはフロンティアがあまりにも縮小した場合にワーカーの数を減らすことでこれを回避します。これにより、重複した実行は不可能ではありませんが、非常に起こりにくくなります。 ↩︎

  2. コストのかかるシャッフル結合は2つの大きなテーブルに使用され、片方のテーブルがメモリに収まるほど小さい場合には、より安価なマップサイド結合が使用されます。 ↩︎

  3. 入力ファイルから読み取るなど、祖先を持たないサンクの場合、その入力ファイルの_内容_のハッシュをサンクの本体に含めます。これにより、2つのサンクが同じ入力ファイルを読み取る場合、同じハッシュを持ち、2つの異なる入力ファイルを読み取る場合、特定のパスの2つの異なるバージョンを含む場合、異なるハッシュを持つことが保証されます。 ↩︎

  4. これはシリアル化サイズにも影響を与えます。実際、すべてのエッジがシリアル化されたDAGに表される場合、エッジごとにわずか2バイトでも、すでに2MBのデータを表します! ↩︎