Google I/O にご注目いただきありがとうございます。すべてのセッションをオンデマンドで表示オンデマンドで見る

TensorFlow による分散型トレーニング

TensorFlow.org で表示 Google Colab で実行 GitHubでソースを表示 ノートブックをダウンロード

概要

tf.distribute.Strategy は、複数の GPU、複数のマシン、または TPU でトレーニングを分散する TensorFlow API です。この API を使用すると、最小限のコード変更により、既存のモデルとトレーニングコードを分散することができます。

tf.distribute.Strategy は、次の主な目標を念頭に設計されています。

  • 使いやすく、研究者や ML エンジニアなど、複数のユーザーセグメントをサポートすること。
  • 調整することなく優れたパフォーマンスを提供できること。
  • ストラテジーを簡単に切り替えられること。

tf.distribute.Strategy は、 Keras Model.fit などの高レベル API とともに使用でき、カスタムトレーニングループを分散するためにも使用できます。

TensorFlow 2.x では、プログラムを Eager モードで、または tf.function を使用してグラフ内で実行することができます。tf.distribute.Strategy はこれらの実行モードをサポートするのが狙いですが、tf.function を使用するのが最適です。Eager モードはデバッグ目的にのみ推奨され、TPUStrategy ではサポートされていません。このガイドの大半はトレーニングに関する内容ですが、異なるプラットフォームに評価と予測を分散する上でも、この API を使用することができます。

tf.distribute.Strategy は、コードをほとんど変更せずに使用することができます。TensorFlow の基盤のコンポーネントをストラテジーを認識するように変更されているからです。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。

このガイドでは、さまざまなストラテジーと、異なる状況においてそれらのストラテジーをどのように使用するかについて説明します。

注意: 概念の理解を深めるには、こちらの詳しいプレゼンテーションをご覧ください。独自のトレーニングループを記述する予定の方に、特にお勧めです。

TensorFlow をセットアップする

import tensorflow as tf
2022-12-14 21:00:24.325859: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-12-14 21:00:24.325957: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2022-12-14 21:00:24.325966: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

ストラテジーの種類

tf.distribute.Strategy は、さまざまな軸に沿って多数の使用事例をカバーすることを目的としています。これらの組み合わせの中には、現在サポートされているものもあれば、今後の追加が予定されているものもあります。次に一部の軸を示します。

  • 同期と非同期トレーニング: これらは、データの並列処理を使用してトレーニングを分散する 2 つの一般的な方法です。同期トレーニングでは、すべてのワーカーは入力データのさまざまなスライスで同期的にトレーニングし、各ステップで勾配を収集します。一方、非同期トレーニングでは、すべてのワーカーは独立的に入力データでトレーニングし、変数を非同期的に更新します。通常、同期トレーニングは all-reduce を介して、非同期トレーニングはパラメーターサーバーアーキテクチャを介してサポートされています。
  • ハードウェアプラットフォーム: トレーニングを、1台のマシンの複数の GPU またはネットワーク内の複数のマシン(それぞれに 0 個以上の GPU)、さらには Cloud TPU に拡張することができます。

これらのユースケースをサポートするために、TensorFlow では MirroredStrategyTPUStrategyMultiWorkerMirroredStrategyParameterServerStrategyCentralStorageStrategy などのストラテジーを利用できます。次のセクションでは、TensorFlow のどのシナリオでこれらのどれがサポートされているかを説明します。以下は簡単な概要です。

トレーニング API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API サポート中 サポート中 実験的サポート 実験的サポート 2.3 以降でサポート予定
カスタムトレーニングループ サポート中 サポート中 実験的サポート 実験的サポート 2.3 以降でサポート予定
Estimator API 制限サポート 未サポート 制限サポート 制限サポート 制限サポート

注意:「実験的サポート」とは、API の互換性が保証されていないことを意味します。

警告: Estimator のサポートは制限されています。基本的なトレーニングと評価は実験的なものであり、スキャフォールドなどの高度な機能は実装されていません。ユースケースがサポートされていない場合は、Keras またはカスタムトレーニングループを使用する必要があります。新しいコードには Estimator は推奨されません。Estimator は v1.Session スタイルのコードを実行しますが、これは正しく記述するのはより難しく、特に TF 2 コードと組み合わせると予期しない動作をする可能性があります。Estimator は、互換性保証の対象となりますが、セキュリティの脆弱性以外の修正は行われません。詳細については、移行ガイドを参照してください。

MirroredStrategy

tf.distribute.MirroredStrategy は、1 台のマシンの複数の GPU での同期分散型トレーニングをサポートしています。GPU デバイス当たり 1 つのレプリカを作成します。モデルの各変数はそのレプリカ全体にミラーリングされます。これらの変数を合わせて形成されるのが、MirroredVariable と呼ばれる 1 つの概念的変数です。これらの変数は、同一の更新を適用することで、相互の同期が維持されます。

デバイス全体に変数の更新を伝達するには、有効性の高い all-reduce アルゴリズムが使用されます。all-reduce はすべてのデバイスのテンソルを加算して集計し、各デバイスで使用できるようにします。非常に効率性が高く、同期のオーバーヘッドを著しく軽減できる融合アルゴリズムです。デバイス間で利用できる通信の種類に応じて、さまざまな all-reduce アルゴリズムと実装が用意されています。デフォルトでは、all-reduce 実装として NVIDIA NCCL が使用されます。提供されているいくつかのオプションから選択できます。また、独自に作成することもできます。

次は、最も単純な MirroredStrategy の作成方法です。

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')

これにより、TensorFlow が認識できるすべての GPU を使用する MirroredStrategy インスタンスが作成され、クロスデバイス通信として NCCL が使用されるようになります。

マシン上の一部の GPU のみを使用する場合は、次のように行うことができます。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

クロスデバイス通信をオーバーライドする場合は、cross_device_ops 引数に tf.distribute.CrossDeviceOps のインスタンスを提供することで実行できます。現在のところ、デフォルトの tf.distribute.NcclAllReduce のほかに、tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDevice の 2 つのオプションを使用できます。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')

TPUStrategy

tf.distribute.experimental.TPUStrategy は、Tensor Processing Unit(TPU)で TensorFlow トレーニングを実行できるようにします。TPU は、機械学習ワークロードを劇的に高速化するように設計された Google の特殊 ASIC です。Google Colab の TensorFlow Research CloudCloud TPU で利用できます。

分散型トレーニングアーキテクチャの観点で言えば、TPUStrategyMirroredStrategy と同じで、同期分散型トレーニングを実装します。TPU は複数の TPU コアに、効率的な all-reduce の独自の実装とほかの集合演算を提供しており、TPUStrategy で使用されています。

以下に、TPUStrategy をインスタンス化する方法を示します。

注意: Colab でこのコードを実行するには、Colab ランタイムとして TPU を選択する必要があります。TensorFlow TPU ガイドを参照してください。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolver インスタンスを使って TPU の場所を特定することができます。Colab では、引数を指定する必要はありません。

Cloud TPU でこれを使用する場合は、次の内容に注意してくだい。

  • tpu 引数に、TPU リソースの名前を指定する必要があります。
  • プログラムの開始時点で、TPU システム明示的に初期化する必要があります。これは、TPU が計算に使用される前に必要なことです。TPU システムを初期化すると、TPU メモリも消去されるため、状態を失わないようにするには、この手順を完了しておくことが重要となります。

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy は、MirroredStrategy に非常に似通ったストラテジーです。複数のワーカーにそれぞれ潜在的に複数の GPU を使って同期分散型トレーニングを実装します。MirroredStrategy と同様に、すべてのワーカーの各デバイスのモデルにすべての変数のコピーを作成します。

次に、MultiWorkerMirroredStrategy を作成する最も簡単な方法を示します。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy では、現在、2 つの collective 演算の実装を選択することができます。 CollectiveCommunication.RING は、gRPC を使用してリング型の collective を通信レイヤーとして実装します。 CollectiveCommunication.NCCL は、Nvidia の NCCL を使用して集合を実装します。CollectiveCommunication.AUTO は、選択をランタイムに任せます。collective 実装の最適な選択は、GPU の数と種類、およびクラスタ内のネットワーク相互接続によって決まります。これらは、次のようにして指定することができます。

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.NCCL

マルチワーカートレーニングを進める上でマルチ GPU トレーニングとの主な違いの 1 つに、マルチワーカーのセットアップが挙げられます。TF_CONFIG 環境変数は、TensorFlow においてクラスタ構成をそのクラスタの一部である各ワーカーに指定する標準的な方法です。TF_CONFIG の設定についてさらにお読みください。

MultiWorkerMirroredStrategy についての詳細は、以下のチュートリアルを参照してくだい。

ParameterServerStrategy

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメータサーバートレーニングクラスタは、ワーカーとパラメータサーバーで構成されます。変数はパラメータサーバー上に作成され、各ステップでワーカーにより読み取られ、更新されます。詳細については、パラメータサーバートレーニングチュートリアルを参照してください。

TensorFlow 2 では、パラメータサーバートレーニングは、tf.distribute.experimental.coordinator.ClusterCoordinator クラスを介して中央コーディネーターベースのアーキテクチャを使用します。

この実装では、workerparameter server タスクは、コーディネーターからのタスクをリッスンする tf.distribute.Serverを実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。

コーディネーターで実行されているプログラミングでは、ParameterServerStrategy オブジェクトを使用してトレーニングステップを定義し、ClusterCoordinator を使用してトレーニングステップをリモートワーカーにディスパッチします。これらを作成する最も簡単な方法は次のとおりです。

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

ParameterServerStrategy の詳細については、 Keras Model.fit を使用したパラメータサーバートレーニングとカスタムトレーニングループチュートリアルを参照してください。

注意: TFConfigClusterResolver を使用する場合は、TF_CONFIG 環境変数を構成する必要があります。TF_CONFIGMultiWorkerMirroredStrategy)に似ていますが、別の注意点があります。

TensorFlow 1 では、ParameterServerStrategy{ code0}は、<code data-md-type="codespan">tf.compat.v1.distribute.experimental.ParameterServerStrategyシンボルを介した Estimator でのみ使用できます。

注意: このストラテジーは現在改善中であり、より多くのシナリオで機能できるようにしていることから、「実験的」に指定されています。この改善プロセスの一環として、API の振る舞いが今後変更される可能性があることにご注意ください。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy も同期トレーニングを行います。変数はミラーリングされず、代わりに CPU に配置され、演算はすべてのローカル GPU に複製されます。GPU が 1 つしかない場合は、すべての変数と演算はその GPU に配置されます。

次のようにして、CentralStorageStrategy のインスタンスを作成します。

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0'

これにより、すべての GPU と CPU を使用する CentralStorageStrategy インスタンスが作成されます。レプリカでの変数の更新は、変数に適用される前に集計されます。

注意: このストラテジーは現在改善中であり、より多くのシナリオで機能できるようにしていることから、「実験的」に指定されています。この改善プロセスの一環として、API の振る舞いが今後変更される可能性があることにご注意ください。

その他のストラテジー

上述のストラテジーのほかに、tf.distribute API を使用する際のプロトタイピングとデバッグに役立つ可能性のあるストラテジーが 2 つあります。

デフォルトストラテジー

明示的な分散ストラテジーがスコープにない場合には、分散ストラテジーとしてデフォルトストラテジーが使用されます。tf.distribute.Strategy インターフェースを実装しますが、パススルーであるため、実際の分散を提供しません。たとえば、strategy.run(fn)fn だけを呼び出します。このストラテジーを使用して書かれたコードは、ストラテジーを指定せずに書かれたコードとまった同じ振る舞いとなります。「演算なし」ストラテジーとして考えるとわかりやすいかもしれません。

デフォルトストラテジーはシングルトンであり、インスタンスを 1 つしか作成できません。明示的なストラテジーのスコープの外で、tf.distribute.get_strategy() を使って取得することができます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用するのと同じ API)。

default_strategy = tf.distribute.get_strategy()

このストラテジーには、主に 2 つの目的があります。

  • 無条件で分散対応のライブラリコードを記述すること。たとえば、tf.keras.optimizers において、tf.distribute.get_strategy を実行し、勾配を減らすためにそのストラテジーを使用します。これは常に、Strategy.reduce API を呼び出せるストラテジーオブジェクトを返します。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • ライブラリコードと同様に、条件論理を必要とせずに、分散ストラテジーの有無に関係なく機能するエンドユーザーのプログラムを記述すること。これを説明したサンプルコードスニペットを、次に示します。
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>,
  1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=float32, numpy=1.0>,
  2: <tf.Variable 'Variable/replica_2:0' shape=() dtype=float32, numpy=1.0>,
  3: <tf.Variable 'Variable/replica_3:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy

tf.distribute.OneDeviceStrategy は、すべての変数と計算を単一の指定デバイスに配置するストラテジーです。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

このストラテジーは、多数の点において、デフォルトストラテジーと異なります。デフォルトストラテジーでは、分散ストラテジーを使用せずに TensorFlow を実行した場合と比べ、変数の配置ロジックに変化はありません。しかし、OneDeviceStrategy を使用した場合、そのスコープで作成されるすべての変数は、指定されたデバイスに明示的に配置されます。さらに、OneDeviceStrategy.run から呼び出される関数も、その指定デバイスに配置されるようになります。

このストラテジーを通じて分散された入力は、指定デバイスにプリフェッチされます。デフォルトストラテジーには、入力の分散はありません。

デフォルトストラテジーと同様に、このストラテジーも、複数のデバイス/マシンに実際に分散する別のストラテジーに切り替える前のコードテストに使用することができます。これにより、分散ストラテジーの仕組みはある程度はデフォルトストラテジーよりもが強化されますが、MirroredStrategyTPUStrategy などを使用した場合ほど最大限には強化されません。ストラテジーが指定されていないかのようなコードの振る舞いを希望する場合は、デフォルトストラテジーを使用してください。

以上、利用できるさまざまなストラテジーとそのインスタンス化の方法を見てきました。以下では、トレーニングを分散化するために使用できるさまざまな方法を見ていきます。

tf.distribute.Strategy を Keras Model.fit と使用する

TensorFlow の Keras API 仕様の実装である tf.kerastf.distribute.Strategy を統合しました。tf.keras は、モデルを構築してトレーニングするための高位 API です。tf.keras バックエンドに統合することによって、model.fit を使用する Keras トレーニングフレームワークに記述されたトレーニングの分散をシームレスに行えるようになっています。

コードを次のように変更してください。

  1. 適切な tf.distribute.Strategy のインスタンスを作成します。
  2. 作成した Keras モデル、オプティマイザ、および指標をstrategy.scope に移動します。モデルの call()train_step()、および test_step() メソッド内のコードは、分散化されてアクセラレータで実行されます。

TensorFlow 分散ストラテジーは、Sequential、Functional、および Subclassed のすべての Keras モデルをサポートしています。

次に、1 つの Dense レイヤーを持つ非常に単純な Keras モデルに対してこれを実行するコードスニペットを示します。

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

この例では、複数の GPU を持つマシンで実行できるように、MirroredStrategy を使用しています。strategy.scope() は、トレーニングの分散に度のストラテジーを使用するかを Keras に示しています。このスコープ内にモデル/オプティマイザー/メトリックを作成することで、通常の変数ではなく、分散化された変数を作成することができます。これをセットアップしたら、通常通り、モデルをフィットさせることができます。利用できる GPU に対するモデルのトレーニングの複製や勾配の集計などは、MirroredStrategy によって行われます。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
2022-12-14 21:00:29.908813: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Epoch 1/2
INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
 1/10 [==>...........................] - ETA: 40s - loss: 4.8996INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 5s 5ms/step - loss: 3.4872
Epoch 2/2
10/10 [==============================] - 0s 5ms/step - loss: 1.5414
2022-12-14 21:00:35.294877: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
10/10 [==============================] - 2s 4ms/step - loss: 0.9572
0.9572181701660156

ここでは、tf.data.Dataset を使用してトレーニングと eval 入力を提供していますが、次のように numpy 配列を使用することもできます。

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 4ms/step - loss: 0.6813
Epoch 2/2
10/10 [==============================] - 0s 5ms/step - loss: 0.3011
<keras.callbacks.History at 0x7f7cec057e50>

いずれの場合(Dataset または Numpy)でも、指定された入力の各バッチは、複数のレプリカで均等に分割されます。たとえば、2 GPU で MirroredStrategy を使用している場合、サイズが 10 の各バッチは 2 GPU 間で分割されるため、各ステップでそれぞれに 5 つの入力例が提供されます。その後、GPU をさらに追加するにつれ、各エポックはより高速にトレーニングするようになります。一般的に、アクセラレータを増やすたびにバッチサイズを増加することが望ましく、そうすることで、追加の計算パワーを有効活用できるようになります。また、モデルに応じて学習速度を再調整することも必要です。レプリカの数を取得するには、strategy.num_replicas_in_sync を使用できます。

mirrored_strategy.num_replicas_in_sync
4
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

現在、何がサポートされていますか?

トレーニング API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras API サポート中 サポート中 実験的サポート 実験的サポート 2.3 以降でサポート予定

例とチュートリアル

次は、上述した Keras のエンドツーエンド統合を説明するチュートリアルと例の一覧です。

  1. チュートリアル: Model.fitMirroredStrategy でトレーニングする。
  2. チュートリアル: Model.fitMultiWorkerMirroredStrategy でトレーニングする。
  3. ガイド: Model.fitTPUStrategy を使用する例を含むガイド。
  4. チュートリアル: Model.fitParameterServerStrategy でパラメータサーバーをトレーニングする。
  5. チュートリアル: Model.fitTPUStrategy を使用して、GLUE ベンチマークから多くのタスクの BERT を微調整する。
  6. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ

カスタムトレーニングループで tf.distribute.Strategy
を使用する

これまで見てきたように、Keras の model.fittf.distribute.Strategy を使用するにはコードの数行のみを変更する必要がありました。もう少し手を加えれば、カスタムトレーニングループでも tf.distribute.Strategy を使用できるようになります。

トレーニングループに Estimator や Keras よりも高い柔軟性と制御が必要な場合は、カスタムトレーニングループを作成することができます。たとえば、GANを使用する際に、各ラウンドで異なる数のジェネレータやディスクリミネータのステップを実行することができます。同様に、高度なフレームワークは、強化学習トレーニングにはあまり適していません。

カスタムとレニングループをサポートするために、tf.distribute.Strategy クラスでメソッドのコアセットを提供しています。これらを使用するには、最初にコードをわずかに再構成する必要があるかもしれませんが、それを一度行うと、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えられるようになります。

ここでは、以前と同じ Keras モデルを使って、単純なトレーニングでの使用事例を表した簡単なスニペットを示します。

まず、ストラテジーのスコープ内にモデルとオプティマイザーを作成します。こうすることで、そのモデルとオプティマイザーを使って作成された変数がミラーリングされた変数であることを保証することができます。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

次に、入力データセットを作成し、tf.distribute.Strategy.experimental_distribute_dataset を呼び出して、ストラテジーに応じてデータセットを分散します。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2022-12-14 21:00:37.537268: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:126"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

そして、トレーニングの一ステップを定義します。勾配とオプティマイザーを計算し、その勾配を適用してモデルの変数を更新するために、tf.GradientTape を使用します。このトレーニングステップを分散するには、train_step 関数に入れて、前に作成した dist_dataset から取得するデータセット入力とともに、tf.distrbute.Strategy.run に渡します。

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上記のコードには、注意すべき点がいくつかあります。

  1. tf.nn.compute_average_loss を使用して損失を計算しました。tf.nn.compute_average_loss は例ごとの損失を加算し、その和を global_batch_size で除算します。これは、各レプリカで勾配が計算された後、その勾配を加算することで、レプリカ全体の勾配が集計されるため重要です。
  2. tf.distribute.Strategy.run が返す結果を集計するために、tf.distribute.Strategy.reduce API を使用しました。tf.distribute.Strategy.run は、ストラテジーの各ローカルレプリカの結果を返し、この結果の消費方法もさまざまです。集計された値を取得するには、結果を reduce することができます。また、tf.distribute.Strategy.experimental_local_results を実行して、結果に含まれる、ローカルレプリカ当たり 1 つの値のリストを取得することもできます。
  3. 分散ストラテジーのスコープ内で apply_gradients が呼び出されると、その振る舞いが変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配の sum-over-all-replicas を実行します。

最後に、トレーニングステップの定義が済んだら、dist_dataset をイテレートしてトレーニングをループで実行できます。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1
tf.Tensor(0.21269155, shape=(), dtype=float32)
tf.Tensor(0.21195853, shape=(), dtype=float32)
tf.Tensor(0.21123002, shape=(), dtype=float32)
tf.Tensor(0.210506, shape=(), dtype=float32)
tf.Tensor(0.20978644, shape=(), dtype=float32)
tf.Tensor(0.20907126, shape=(), dtype=float32)
tf.Tensor(0.20836048, shape=(), dtype=float32)
tf.Tensor(0.20765403, shape=(), dtype=float32)
tf.Tensor(0.20695189, shape=(), dtype=float32)
tf.Tensor(0.206254, shape=(), dtype=float32)
tf.Tensor(0.20556036, shape=(), dtype=float32)
tf.Tensor(0.2048709, shape=(), dtype=float32)
tf.Tensor(0.20418559, shape=(), dtype=float32)
tf.Tensor(0.20350443, shape=(), dtype=float32)
tf.Tensor(0.20282733, shape=(), dtype=float32)
tf.Tensor(0.20215431, shape=(), dtype=float32)
tf.Tensor(0.2014853, shape=(), dtype=float32)
tf.Tensor(0.20082025, shape=(), dtype=float32)
tf.Tensor(0.20015922, shape=(), dtype=float32)
tf.Tensor(0.19950207, shape=(), dtype=float32)
tf.Tensor(0.1988488, shape=(), dtype=float32)
tf.Tensor(0.19819942, shape=(), dtype=float32)
tf.Tensor(0.19755386, shape=(), dtype=float32)
tf.Tensor(0.1969121, shape=(), dtype=float32)
tf.Tensor(0.19627407, shape=(), dtype=float32)
tf.Tensor(0.19563983, shape=(), dtype=float32)
tf.Tensor(0.19500923, shape=(), dtype=float32)
tf.Tensor(0.19438234, shape=(), dtype=float32)
tf.Tensor(0.19375908, shape=(), dtype=float32)
tf.Tensor(0.19313946, shape=(), dtype=float32)
tf.Tensor(0.19252343, shape=(), dtype=float32)
tf.Tensor(0.19191094, shape=(), dtype=float32)
tf.Tensor(0.19130194, shape=(), dtype=float32)
tf.Tensor(0.19069648, shape=(), dtype=float32)
tf.Tensor(0.19009452, shape=(), dtype=float32)
tf.Tensor(0.18949597, shape=(), dtype=float32)
tf.Tensor(0.18890083, shape=(), dtype=float32)
tf.Tensor(0.1883091, shape=(), dtype=float32)
tf.Tensor(0.18772076, shape=(), dtype=float32)
tf.Tensor(0.1871357, shape=(), dtype=float32)
tf.Tensor(0.18655394, shape=(), dtype=float32)
tf.Tensor(0.1859755, shape=(), dtype=float32)
tf.Tensor(0.1854003, shape=(), dtype=float32)
tf.Tensor(0.18482836, shape=(), dtype=float32)
tf.Tensor(0.18425961, shape=(), dtype=float32)
tf.Tensor(0.18369403, shape=(), dtype=float32)
tf.Tensor(0.18313164, shape=(), dtype=float32)
tf.Tensor(0.18257232, shape=(), dtype=float32)
tf.Tensor(0.18201615, shape=(), dtype=float32)
tf.Tensor(0.18146305, shape=(), dtype=float32)

上記の例では、dist_dataset をイテレートして、トレーニングに入力を提供しました。また、numpy 入力をサポートするために、tf.distribute.Strategy.make_experimental_numpy_dataset も提供しています。tf.distribute.Strategy.experimental_distribute_dataset を呼び出す前にこの API を使って、データセットを作成することができます。

データをイテレートするための別の方法は、明示的にイテレータを使用することです。データセット全体をイテレートするのに対し、特定のステップ数だけ実行する場合に、これを行うことができます。上記のイテレートは、最初にイテレータを作成し、それに明示的にnext を呼び出して入力データを取得するように変更されます。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.18091302, shape=(), dtype=float32)
tf.Tensor(0.18036604, shape=(), dtype=float32)
tf.Tensor(0.17982203, shape=(), dtype=float32)
tf.Tensor(0.17928106, shape=(), dtype=float32)
tf.Tensor(0.17874302, shape=(), dtype=float32)
tf.Tensor(0.17820795, shape=(), dtype=float32)
tf.Tensor(0.17767578, shape=(), dtype=float32)
tf.Tensor(0.17714654, shape=(), dtype=float32)
tf.Tensor(0.17662014, shape=(), dtype=float32)
tf.Tensor(0.17609662, shape=(), dtype=float32)

これは、カスタムトレーニングループを分散化する tf.distribute.Strategy API の最も単純な使用ケースを説明しています。

現在、何がサポートされていますか?

トレーニング API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
カスタムトレーニングループ サポート中 サポート中 実験的サポート 実験的サポート 2.3 以降でサポート予定

例とチュートリアル

次は、カスタムトレーニングループを使って分散ストラテジーを使用するいくつかの例です。

  1. MirroredStrategy で MNIST をトレーニングするチュートリアル
  2. TPUStrategy による MNIST のトレーニングに関するガイド
  3. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ
  4. カスタムトレーニングループと ParameterServerStrategy でパラメータサーバーをトレーニングするチュートリアル
  5. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ

その他のトピック

このセクションでは、複数の使用事例に関連のあるトピックをいくつかカバーしています。

TF_CONFIG 環境変数のセットアップ

マルチワーカートレーニングでは、前述のとおり、クラスタで実行されている各バイナリに対して TF_CONFIG 環境変数を設定する必要があります。TF_CONFIG 環境変数は、クラスタを構成するタスク、そのアドレス、およびクラスタ内の各タスクのロールを指定する JSON 文字列です。tensorflow/ecosystem レポジトリには、トレーニングタスクの TF_CONFIG を設定する Kubernotes テンプレートを提供しています。

TF_CONFIG には、cluster と task の 2 つのコンポーネントがあります。

  • cluster はトレーニングクラスタに関する情報を提供します。これは、worker などのさまざまなタイプのジョブで構成されるディクショナリーです。マルチワーカートレーニングでは、通常、一般的なワーカーの作業に加えて、チェックポイントの保存や TensorBoard のサマリーファイルの書き込みなど、ほかよりタスクを担うワーカーが 1 つあります。こういったワーカーは、「チーフ」ワーカーと呼ばれ、index 0 のワーカーがチーフワーカーに指定されるようになっています(実際、このようにして、tf.distribute.Strategy が実装されています)。
  • 一方、task は現在のタスクに関する情報を提供します。最初のコンポーネント cluster はすべてのワーカーで同じであり、2番目のコンポーネント task はワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。

次は、TF_CONFIG の一例です。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

この TF_CONFIGは、"cluster" 内に 3 つのワーカーと 2 つの "ps" タスク、およびそれらのホストとポートがあることを指定しています。 "task" の部分は、"cluster" の現在のタスクのロールであるワーカー 1(2番目のワーカー)を指定します。クラスタ内の有効なロールは、"chief""worker""ps"、および "evaluator" です。tf.distribute.experimental.ParameterServerStrategyを使用する場合を除いて、"ps" ジョブはありません。

次のステップ

tf.distribute.Strategy は積極的な開発が進められています。ぜひお試しいただき、GitHub 課題 に皆さんの感想をお寄せください。