この記事は、Envision仮想マシンの内部動作に関する4部作のうちの4番目です。Envisionスクリプトを実行するソフトウェアです。part 1part 2part 3を参照してください。このシリーズではEnvisionコンパイラはカバーしていません(別の機会にお願いします)。したがって、スクリプトがどのようにバイトコードに変換され、Envision仮想マシンが入力として受け取るかは、とりあえず想定してください。

前の記事では、個々のワーカーがEnvisionスクリプトを実行する方法について主に調査しました。ただし、信頼性とパフォーマンスのために、Envisionは実際には複数のマシンのクラスタ上で実行されます。

ワーカー内の各レイヤーは、他のワーカーの同じレイヤーまたは同じワーカー内の他のレイヤーと通信します。これにより、ネットワーク通信は各レイヤーのプライベートな実装の詳細として保持されます。

低レベルでは、各ワーカーはクラスタ内の他のマシンに対して2つのTLS接続を開き、さまざまなレイヤーの通信はこれらの2つの接続を介して多重化されます(1つの接続は短いメッセージ用、もう1つは大容量データ転送用です)。

抽象的な分散実行

コントロールレイヤー

このレイヤーは、スケジューラがワーカーにミッションを割り当てたり解除したりするために使用され、ワーカー間の通信は行われません。このレイヤーの主なメッセージは次のとおりです:

  • スケジューラがワーカーにミッションの作業を開始するように要求します。
  • スケジューラがワーカーにミッションの作業を停止するように要求します。
  • ワーカーがミッションの実行中に致命的なエラー(通常は「NVMeドライブが火事になった」などの非決定的な問題)に遭遇したことをスケジューラに通知します。これにより、同じミッションは将来または別のワーカーで再試行できます。
  • ワーカーは、現在の状態に関するスケジューラに対する統計情報を提供します:ミッションのリスト、各ミッションのDAGのフロンティアのサイズ、各ミッションのDAGで実行する必要のあるスロットの総数。

スケジューラはこれらの統計情報を使用してミッションの再割り当てのタイミングを決定します。これを行うための実際のルールは非常に複雑です。優先ルール、複数のテナントと同じテナントのスクリプト間の公平性、およびその時点でのクラスタの全体的な負荷に依存するためですが、一般的な傾向としては、十分なフロンティアを持つミッションは複数のワーカーに分散できますが、それらのワーカーがすでに過負荷でない限りです。同じ作業量を実行する場合、1つのワーカーごとに4つのミッションを実行する方が、それらをすべてのワーカーに分散させるよりも効率的です。

実行レイヤー

各ワーカーは現在実行中のスロットを追跡し、新しいスロットをスケジュールするたびにこのリストを他のワーカーにブロードキャストします1。これにより、ネットワークの遅延に関連する非常に短いウィンドウを除いて、2つのワーカーが同じスロットを実行し始めることはありません。

もちろん、ワーカーがこれらの更新を送信しなくなった場合(たとえば、クラッシュしたか、クラスタの残りから切断された場合)、そのピアは数秒前のリストを古く見なし、それらのスロットを実行することを許可します。

メタデータレイヤー

各ワーカーは完全なメタデータのコピーを保持しようとしますが、実際には同期しません。すべてのワーカーが正確に同じメタデータに同意する保証は提供しないことを選択し、最終的な整合性の保証で作業します。これにより、メタデータレイヤーの配布は設計上最も難しいものになります2

このレイヤーの最終的な整合性は、次の3つの主なルールに従います:

  1. メタデータレイヤーへのすべてのローカルな変更は、すぐにすべての他のワーカーにブロードキャストされます。このブロードキャストは失敗する可能性があり、再試行されません。
  2. 他のワーカーから受け取ったリモートの変更は、モノトニックな進行3に基づいてローカルのメタデータレイヤーにマージされます。スロットの「結果なし」値は、「チェックポイント」値(スロットが実行を開始したが、完了していないことを意味する)によって上書きされ、これは「エイリアス」値(スロットが代わりに実行するためにDAGを返したことを意味する)によって上書きされ、これは「結果」値(関連するアトムを持つ成功した結果または致命的なエラー)によって上書きされることができます。
  3. 他のレイヤーがメタデータレイヤーの値に基づいてネットワーク応答を送信するとき、メタデータレイヤーもその値を再度ブロードキャストします。

第3のルールは、実際に関連する場合に同期レベルを強制するために設計されています。たとえば、次のイベントのシーケンスを考えてみてください:

  • スケジューラがワーカーにミッションを実行するように要求します(制御レイヤーを介して)
  • ワーカーはミッションを実行し、結果をブロードキャストします(メタデータレイヤーを介して)、しかし、メッセージはスケジューラに到達する前に失われます。
  • スケジューラは、ワーカーがミッションを実行していないことに気付きます(制御レイヤーを介して)、そしてそれを再実行するように要求します。
  • ワーカーは、ミッションのスロットに既にメタデータレイヤーに結果があることに気付き、何もしません。何もする必要がないからです。

これは、スケジューラとワーカーがメタデータレイヤーのスロットの状態について意見が一致しないデッドロックです(ワーカーは終了したと信じており、スケジューラは終了していないと信じています)。第3のルールは、ワーカーの「このミッションにはもう取り組んでいない」という応答が、スロットに結果があるというワーカーの観察に基づいているため、メタデータレイヤーがこの情報を再度ブロードキャストすることを決定します。その結果、デッドロックは解決されます:

  • ワーカーのメタデータレイヤーがスロットの結果を再度ブロードキャストし、スケジューラがそれを受信します。
  • スケジューラは、ミッションのスロットに結果が表示されたことに対応して、そのミッションを完了としてフラグを立て、そのミッションを要求したクライアントに通知します。

アトムレイヤー

ワーカーはアトムレイヤーを組み合わせて分散ブロブストアを作成します。ここでは、各アトムはその内容の128ビットハッシュである識別子でリクエストできます。このハッシュはSpookyHashで作成されます。これは分散ハッシュテーブル(DHT)ではありません。なぜなら、DHTではアトムを見つけることが速くなります(ハッシュが与えられた場合、それを保持するワーカーの識別子は単純な関数で計算できます)、しかし、アトムを書き込むことは遅くなります(それは計算したマシンから、DHTの現在のレイアウトに基づいて保持するマシンに送信する必要があります)。ほとんどのアトムは、それを生成した同じマシンで消費されることが予想されるため、これは無駄です。

代わりに、ワーカーが自分自身のアトムレイヤーからアトムをリクエストする際には、まず自分自身のNVMeドライブでそのアトムを探します。見つからない場合、他のワーカーにそのアトムの存在を問い合わせます。これはEnvisionの分散設計の中で最も大きなパフォーマンスの課題であり、これらのクエリはできるだけ迅速に完了する必要があります。また、応答がないワーカーに対処するために複雑なタイムアウト戦略が必要です。長すぎる間を待てば、応答が来なかったために数秒を無駄に待ってしまいます。途中で諦めすぎると、他のワーカーからダウンロードできたアトムを再計算する必要があります。

これを支援するために、アトムレイヤーは複数のリクエストをまとめてバッチ処理し、他のワーカーが応答する必要のあるリクエストのパイプラインを保持しやすくします。また、ワーカーの応答時間が急激に上昇した場合にも容易に検出できます。

少なくとも別のワーカーがそのアトムをディスク上で確認した場合、2回目のリクエストが送信され、アトムをダウンロードします。このようなダウンロードリクエストは非常にスパイキーです。なぜなら、多くのスラムクが最初にアトムをリクエストし、その内容を処理し始めるからです。そのため、アトムレイヤーは各ワーカーペアごとに単一のダウンロードキューがあることを認識しており、特定のアトムリクエストが最初のバイトを数秒間受信できなかった場合でも(キューがいっぱいで他のアトムはバイトを受信している場合は心配する必要はありません)、パニックになりません。ある意味では、タイムアウトはアトムリクエストのレベルではなく、レイヤー全体のレベルで行われます。

さらに、転送キューには2つの最適化が適用されています。

  1. 各リクエストはデータが必要なスラムを指定するため、送信者は同じスラムからのリクエストをまとめようとします(特定のスラムがブロックされている時間が短ければ短いほど、そのスラムは入力の処理を開始できる速くなります)。
  2. スラムの実行がキャンセルされた場合(エラーのため、優先度の変更のため、または別のワーカーが既に完了していることが判明したため)、アトムレイヤーはこのキャンセルを通知し、そのスラムのすべてのリクエストをダウンロードキューから削除します。

典型的なワーカーは1GB/sのバーストでデータを送信し、通常はバーストごとに7GBのデータをカバーします。

ロギングレイヤー

このレイヤーは実行の状態に関する追加情報を保持し、後で問題を調査したりパフォーマンスを測定したりするために使用されます。これは非常に詳細であり、実行されたスラム、実行にかかった時間、生成された結果の種類などの情報を含んでいます。新しいDAGの構築(シリアライズされたDAG自体を含む)やアトムの欠落の発見など、重要なイベントもログに記録されます。合計して、1日あたりの各ワーカーで数ギガバイトのログが生成されます。

パフォーマンスへの影響を最小限に抑えるために、各ワーカーは蓄積されたログを60秒ごとに書き出すか、4メガバイトが蓄積された場合に書き出します(アクティビティのバーストがあるとしばしば発生します)。これはAzure Blob Storageのブロックブロブに書き込まれ、各ワーカーは複数のライターをサポートする必要がないように、独自の専用ブロブを持っています。

その後、Envisionの本番環境外の他のマシンで、クラスター上で何が起こったかについての詳細な統計情報を読み取ることができます。

自己紹介: 私たちはソフトウェアエンジニアを募集しています。リモートワークも可能です。


  1. 帯域幅の観点からは無駄に見えるかもしれませんが、各スロット識別子の重さは24バイトであり、ワーカーごとに最大32スロットありますので、各更新は768バイトしかかかりません。これはTCPパケットよりも少ないです! ↩︎

  2. パフォーマンスの観点からは、アトムレイヤーの方がはるかに難しいです。 ↩︎

  3. メタデータレイヤーは、ワーカーごとではなく、スロットごとに保持される巨大なベクトルクロックです。 ↩︎