![]() |
![]() |
![]() |
![]() |
概要
tf.distribute.Strategy
は、複数の GPU、複数のマシン、または TPU でトレーニングを分散する TensorFlow API です。この API を使用すると、最小限のコード変更により、既存のモデルとトレーニングコードを分散することができます。
tf.distribute.Strategy
は、次の主な目標を念頭に設計されています。
- 使いやすく、研究者や ML エンジニアなど、複数のユーザーセグメントをサポートすること。
- 調整することなく優れたパフォーマンスを提供できること。
- ストラテジーを簡単に切り替えられること。
tf.distribute.Strategy
は、 Keras Model.fit
などの高レベル API とともに使用でき、カスタムトレーニングループを分散するためにも使用できます。
TensorFlow 2.x では、プログラムを Eager モードで、または tf.function
を使用してグラフ内で実行することができます。tf.distribute.Strategy
はこれらの実行モードをサポートするのが狙いですが、tf.function
を使用するのが最適です。Eager モードはデバッグ目的にのみ推奨され、TPUStrategy
ではサポートされていません。このガイドの大半はトレーニングに関する内容ですが、異なるプラットフォームに評価と予測を分散する上でも、この API を使用することができます。
tf.distribute.Strategy
は、コードをほとんど変更せずに使用することができます。TensorFlow の基盤のコンポーネントをストラテジーを認識するように変更されているからです。これには、変数、レイヤー、モデル、オプティマイザー、メトリック、要約、およびチェックポイントが含まれます。
このガイドでは、さまざまなストラテジーと、異なる状況においてそれらのストラテジーをどのように使用するかについて説明します。
注意: 概念の理解を深めるには、こちらの詳しいプレゼンテーションをご覧ください。独自のトレーニングループを記述する予定の方に、特にお勧めです。
TensorFlow をセットアップする
import tensorflow as tf
2022-12-14 21:00:24.325859: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:00:24.325957: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:00:24.325966: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
ストラテジーの種類
tf.distribute.Strategy
は、さまざまな軸に沿って多数の使用事例をカバーすることを目的としています。これらの組み合わせの中には、現在サポートされているものもあれば、今後の追加が予定されているものもあります。次に一部の軸を示します。
- 同期と非同期トレーニング: これらは、データの並列処理を使用してトレーニングを分散する 2 つの一般的な方法です。同期トレーニングでは、すべてのワーカーは入力データのさまざまなスライスで同期的にトレーニングし、各ステップで勾配を収集します。一方、非同期トレーニングでは、すべてのワーカーは独立的に入力データでトレーニングし、変数を非同期的に更新します。通常、同期トレーニングは all-reduce を介して、非同期トレーニングはパラメーターサーバーアーキテクチャを介してサポートされています。
- ハードウェアプラットフォーム: トレーニングを、1台のマシンの複数の GPU またはネットワーク内の複数のマシン(それぞれに 0 個以上の GPU)、さらには Cloud TPU に拡張することができます。
これらのユースケースをサポートするために、TensorFlow では MirroredStrategy
、TPUStrategy
、MultiWorkerMirroredStrategy
、ParameterServerStrategy
、CentralStorageStrategy
などのストラテジーを利用できます。次のセクションでは、TensorFlow のどのシナリオでこれらのどれがサポートされているかを説明します。以下は簡単な概要です。
トレーニング API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
CentralStorageStrategy |
ParameterServerStrategy |
---|---|---|---|---|---|
Keras API | サポート中 | サポート中 | 実験的サポート | 実験的サポート | 2.3 以降でサポート予定 |
カスタムトレーニングループ | サポート中 | サポート中 | 実験的サポート | 実験的サポート | 2.3 以降でサポート予定 |
Estimator API | 制限サポート | 未サポート | 制限サポート | 制限サポート | 制限サポート |
注意:「実験的サポート」とは、API の互換性が保証されていないことを意味します。
警告: Estimator のサポートは制限されています。基本的なトレーニングと評価は実験的なものであり、スキャフォールドなどの高度な機能は実装されていません。ユースケースがサポートされていない場合は、Keras またはカスタムトレーニングループを使用する必要があります。新しいコードには Estimator は推奨されません。Estimator は v1.Session
スタイルのコードを実行しますが、これは正しく記述するのはより難しく、特に TF 2 コードと組み合わせると予期しない動作をする可能性があります。Estimator は、互換性保証の対象となりますが、セキュリティの脆弱性以外の修正は行われません。詳細については、移行ガイドを参照してください。
MirroredStrategy
tf.distribute.MirroredStrategy
は、1 台のマシンの複数の GPU での同期分散型トレーニングをサポートしています。GPU デバイス当たり 1 つのレプリカを作成します。モデルの各変数はそのレプリカ全体にミラーリングされます。これらの変数を合わせて形成されるのが、MirroredVariable
と呼ばれる 1 つの概念的変数です。これらの変数は、同一の更新を適用することで、相互の同期が維持されます。
デバイス全体に変数の更新を伝達するには、有効性の高い all-reduce アルゴリズムが使用されます。all-reduce はすべてのデバイスのテンソルを加算して集計し、各デバイスで使用できるようにします。非常に効率性が高く、同期のオーバーヘッドを著しく軽減できる融合アルゴリズムです。デバイス間で利用できる通信の種類に応じて、さまざまな all-reduce アルゴリズムと実装が用意されています。デフォルトでは、all-reduce 実装として NVIDIA NCCL が使用されます。提供されているいくつかのオプションから選択できます。また、独自に作成することもできます。
次は、最も単純な MirroredStrategy
の作成方法です。
mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
これにより、TensorFlow が認識できるすべての GPU を使用する MirroredStrategy
インスタンスが作成され、クロスデバイス通信として NCCL が使用されるようになります。
マシン上の一部の GPU のみを使用する場合は、次のように行うことができます。
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
クロスデバイス通信をオーバーライドする場合は、cross_device_ops
引数に tf.distribute.CrossDeviceOps
のインスタンスを提供することで実行できます。現在のところ、デフォルトの tf.distribute.NcclAllReduce
のほかに、tf.distribute.HierarchicalCopyAllReduce
と tf.distribute.ReductionToOneDevice
の 2 つのオプションを使用できます。
mirrored_strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
TPUStrategy
tf.distribute.experimental.TPUStrategy
は、Tensor Processing Unit(TPU)で TensorFlow トレーニングを実行できるようにします。TPU は、機械学習ワークロードを劇的に高速化するように設計された Google の特殊 ASIC です。Google Colab の TensorFlow Research Cloud と Cloud TPU で利用できます。
分散型トレーニングアーキテクチャの観点で言えば、TPUStrategy
は MirroredStrategy
と同じで、同期分散型トレーニングを実装します。TPU は複数の TPU コアに、効率的な all-reduce の独自の実装とほかの集合演算を提供しており、TPUStrategy
で使用されています。
以下に、TPUStrategy
をインスタンス化する方法を示します。
注意: Colab でこのコードを実行するには、Colab ランタイムとして TPU を選択する必要があります。TensorFlow TPU ガイドを参照してください。
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
TPUClusterResolver
インスタンスを使って TPU の場所を特定することができます。Colab では、引数を指定する必要はありません。
Cloud TPU でこれを使用する場合は、次の内容に注意してくだい。
tpu
引数に、TPU リソースの名前を指定する必要があります。- プログラムの開始時点で、TPU システム明示的に初期化する必要があります。これは、TPU が計算に使用される前に必要なことです。TPU システムを初期化すると、TPU メモリも消去されるため、状態を失わないようにするには、この手順を完了しておくことが重要となります。
MultiWorkerMirroredStrategy
tf.distribute.MultiWorkerMirroredStrategy
は、MirroredStrategy
に非常に似通ったストラテジーです。複数のワーカーにそれぞれ潜在的に複数の GPU を使って同期分散型トレーニングを実装します。MirroredStrategy
と同様に、すべてのワーカーの各デバイスのモデルにすべての変数のコピーを作成します。
次に、MultiWorkerMirroredStrategy
を作成する最も簡単な方法を示します。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
では、現在、2 つの collective 演算の実装を選択することができます。 CollectiveCommunication.RING
は、gRPC を使用してリング型の collective を通信レイヤーとして実装します。 CollectiveCommunication.NCCL
は、Nvidia の NCCL を使用して集合を実装します。CollectiveCommunication.AUTO
は、選択をランタイムに任せます。collective 実装の最適な選択は、GPU の数と種類、およびクラスタ内のネットワーク相互接続によって決まります。これらは、次のようにして指定することができます。
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.NCCL
マルチワーカートレーニングを進める上でマルチ GPU トレーニングとの主な違いの 1 つに、マルチワーカーのセットアップが挙げられます。TF_CONFIG
環境変数は、TensorFlow においてクラスタ構成をそのクラスタの一部である各ワーカーに指定する標準的な方法です。TF_CONFIG の設定についてさらにお読みください。
MultiWorkerMirroredStrategy
についての詳細は、以下のチュートリアルを参照してくだい。
ParameterServerStrategy
パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメータサーバートレーニングクラスタは、ワーカーとパラメータサーバーで構成されます。変数はパラメータサーバー上に作成され、各ステップでワーカーにより読み取られ、更新されます。詳細については、パラメータサーバートレーニングチュートリアルを参照してください。
TensorFlow 2 では、パラメータサーバートレーニングは、tf.distribute.experimental.coordinator.ClusterCoordinator
クラスを介して中央コーディネーターベースのアーキテクチャを使用します。
この実装では、worker
と parameter server
タスクは、コーディネーターからのタスクをリッスンする tf.distribute.Server
を実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。
コーディネーターで実行されているプログラミングでは、ParameterServerStrategy
オブジェクトを使用してトレーニングステップを定義し、ClusterCoordinator
を使用してトレーニングステップをリモートワーカーにディスパッチします。これらを作成する最も簡単な方法は次のとおりです。
strategy = tf.distribute.experimental.ParameterServerStrategy(
tf.distribute.cluster_resolver.TFConfigClusterResolver(),
variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
strategy)
ParameterServerStrategy
の詳細については、 Keras Model.fit を使用したパラメータサーバートレーニングとカスタムトレーニングループチュートリアルを参照してください。
注意: TFConfigClusterResolver
を使用する場合は、TF_CONFIG
環境変数を構成する必要があります。TF_CONFIG
(MultiWorkerMirroredStrategy
)に似ていますが、別の注意点があります。
TensorFlow 1 では、ParameterServerStrategy{ code0}は、<code data-md-type="codespan">tf.compat.v1.distribute.experimental.ParameterServerStrategy
シンボルを介した Estimator でのみ使用できます。
注意: このストラテジーは現在改善中であり、より多くのシナリオで機能できるようにしていることから、「実験的
」に指定されています。この改善プロセスの一環として、API の振る舞いが今後変更される可能性があることにご注意ください。
CentralStorageStrategy
tf.distribute.experimental.CentralStorageStrategy
も同期トレーニングを行います。変数はミラーリングされず、代わりに CPU に配置され、演算はすべてのローカル GPU に複製されます。GPU が 1 つしかない場合は、すべての変数と演算はその GPU に配置されます。
次のようにして、CentralStorageStrategy
のインスタンスを作成します。
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0'
これにより、すべての GPU と CPU を使用する CentralStorageStrategy
インスタンスが作成されます。レプリカでの変数の更新は、変数に適用される前に集計されます。
注意: このストラテジーは現在改善中であり、より多くのシナリオで機能できるようにしていることから、「実験的
」に指定されています。この改善プロセスの一環として、API の振る舞いが今後変更される可能性があることにご注意ください。
その他のストラテジー
上述のストラテジーのほかに、tf.distribute
API を使用する際のプロトタイピングとデバッグに役立つ可能性のあるストラテジーが 2 つあります。
デフォルトストラテジー
明示的な分散ストラテジーがスコープにない場合には、分散ストラテジーとしてデフォルトストラテジーが使用されます。tf.distribute.Strategy
インターフェースを実装しますが、パススルーであるため、実際の分散を提供しません。たとえば、strategy.run(fn)
は fn
だけを呼び出します。このストラテジーを使用して書かれたコードは、ストラテジーを指定せずに書かれたコードとまった同じ振る舞いとなります。「演算なし」ストラテジーとして考えるとわかりやすいかもしれません。
デフォルトストラテジーはシングルトンであり、インスタンスを 1 つしか作成できません。明示的なストラテジーのスコープの外で、tf.distribute.get_strategy()
を使って取得することができます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用するのと同じ API)。
default_strategy = tf.distribute.get_strategy()
このストラテジーには、主に 2 つの目的があります。
- 無条件で分散対応のライブラリコードを記述すること。たとえば、
tf.keras.optimizers
において、tf.distribute.get_strategy
を実行し、勾配を減らすためにそのストラテジーを使用します。これは常に、Strategy.reduce
API を呼び出せるストラテジーオブジェクトを返します。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None) # reduce some values
1.0
- ライブラリコードと同様に、条件論理を必要とせずに、分散ストラテジーの有無に関係なく機能するエンドユーザーのプログラムを記述すること。これを説明したサンプルコードスニペットを、次に示します。
if tf.config.list_physical_devices('GPU'):
strategy = tf.distribute.MirroredStrategy()
else: # Use the Default Strategy
strategy = tf.distribute.get_strategy()
with strategy.scope():
# Do something interesting
print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3') MirroredVariable:{ 0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>, 1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=float32, numpy=1.0>, 2: <tf.Variable 'Variable/replica_2:0' shape=() dtype=float32, numpy=1.0>, 3: <tf.Variable 'Variable/replica_3:0' shape=() dtype=float32, numpy=1.0> }
OneDeviceStrategy
tf.distribute.OneDeviceStrategy
は、すべての変数と計算を単一の指定デバイスに配置するストラテジーです。
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
このストラテジーは、多数の点において、デフォルトストラテジーと異なります。デフォルトストラテジーでは、分散ストラテジーを使用せずに TensorFlow を実行した場合と比べ、変数の配置ロジックに変化はありません。しかし、OneDeviceStrategy
を使用した場合、そのスコープで作成されるすべての変数は、指定されたデバイスに明示的に配置されます。さらに、OneDeviceStrategy.run
から呼び出される関数も、その指定デバイスに配置されるようになります。
このストラテジーを通じて分散された入力は、指定デバイスにプリフェッチされます。デフォルトストラテジーには、入力の分散はありません。
デフォルトストラテジーと同様に、このストラテジーも、複数のデバイス/マシンに実際に分散する別のストラテジーに切り替える前のコードテストに使用することができます。これにより、分散ストラテジーの仕組みはある程度はデフォルトストラテジーよりもが強化されますが、MirroredStrategy
や TPUStrategy
などを使用した場合ほど最大限には強化されません。ストラテジーが指定されていないかのようなコードの振る舞いを希望する場合は、デフォルトストラテジーを使用してください。
以上、利用できるさまざまなストラテジーとそのインスタンス化の方法を見てきました。以下では、トレーニングを分散化するために使用できるさまざまな方法を見ていきます。
tf.distribute.Strategy を Keras Model.fit と使用する
TensorFlow の Keras API 仕様の実装である tf.keras
に tf.distribute.Strategy
を統合しました。tf.keras
は、モデルを構築してトレーニングするための高位 API です。tf.keras
バックエンドに統合することによって、model.fit
を使用する Keras トレーニングフレームワークに記述されたトレーニングの分散をシームレスに行えるようになっています。
コードを次のように変更してください。
- 適切な
tf.distribute.Strategy
のインスタンスを作成します。 - 作成した Keras モデル、オプティマイザ、および指標を
strategy.scope
に移動します。モデルのcall()
、train_step()
、およびtest_step()
メソッド内のコードは、分散化されてアクセラレータで実行されます。
TensorFlow 分散ストラテジーは、Sequential、Functional、および Subclassed のすべての Keras モデルをサポートしています。
次に、1 つの Dense
レイヤーを持つ非常に単純な Keras モデルに対してこれを実行するコードスニペットを示します。
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3') INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
この例では、複数の GPU を持つマシンで実行できるように、MirroredStrategy
を使用しています。strategy.scope()
は、トレーニングの分散に度のストラテジーを使用するかを Keras に示しています。このスコープ内にモデル/オプティマイザー/メトリックを作成することで、通常の変数ではなく、分散化された変数を作成することができます。これをセットアップしたら、通常通り、モデルをフィットさせることができます。利用できる GPU に対するモデルのトレーニングの複製や勾配の集計などは、MirroredStrategy
によって行われます。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
2022-12-14 21:00:29.908813: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Epoch 1/2 INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1 INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). 1/10 [==>...........................] - ETA: 40s - loss: 4.8996INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). 10/10 [==============================] - 5s 5ms/step - loss: 3.4872 Epoch 2/2 10/10 [==============================] - 0s 5ms/step - loss: 1.5414 2022-12-14 21:00:35.294877: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } 10/10 [==============================] - 2s 4ms/step - loss: 0.9572 0.9572181701660156
ここでは、tf.data.Dataset
を使用してトレーニングと eval 入力を提供していますが、次のように numpy 配列を使用することもできます。
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2 10/10 [==============================] - 0s 4ms/step - loss: 0.6813 Epoch 2/2 10/10 [==============================] - 0s 5ms/step - loss: 0.3011 <keras.callbacks.History at 0x7f7cec057e50>
いずれの場合(Dataset
または Numpy)でも、指定された入力の各バッチは、複数のレプリカで均等に分割されます。たとえば、2 GPU で MirroredStrategy
を使用している場合、サイズが 10 の各バッチは 2 GPU 間で分割されるため、各ステップでそれぞれに 5 つの入力例が提供されます。その後、GPU をさらに追加するにつれ、各エポックはより高速にトレーニングするようになります。一般的に、アクセラレータを増やすたびにバッチサイズを増加することが望ましく、そうすることで、追加の計算パワーを有効活用できるようになります。また、モデルに応じて学習速度を再調整することも必要です。レプリカの数を取得するには、strategy.num_replicas_in_sync
を使用できます。
mirrored_strategy.num_replicas_in_sync
4
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)
LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
現在、何がサポートされていますか?
トレーニング API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
ParameterServerStrategy |
CentralStorageStrategy |
---|---|---|---|---|---|
Keras API | サポート中 | サポート中 | 実験的サポート | 実験的サポート | 2.3 以降でサポート予定 |
例とチュートリアル
次は、上述した Keras のエンドツーエンド統合を説明するチュートリアルと例の一覧です。
- チュートリアル:
Model.fit
とMirroredStrategy
でトレーニングする。 - チュートリアル:
Model.fit
とMultiWorkerMirroredStrategy
でトレーニングする。 - ガイド:
Model.fit
とTPUStrategy
を使用する例を含むガイド。 - チュートリアル:
Model.fit
とParameterServerStrategy
でパラメータサーバーをトレーニングする。 - チュートリアル:
Model.fit
とTPUStrategy
を使用して、GLUE ベンチマークから多くのタスクの BERT を微調整する。 - さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ
カスタムトレーニングループで tf.distribute.Strategy
を使用する
これまで見てきたように、Keras の model.fit
で tf.distribute.Strategy
を使用するにはコードの数行のみを変更する必要がありました。もう少し手を加えれば、カスタムトレーニングループでも tf.distribute.Strategy
を使用できるようになります。
トレーニングループに Estimator や Keras よりも高い柔軟性と制御が必要な場合は、カスタムトレーニングループを作成することができます。たとえば、GANを使用する際に、各ラウンドで異なる数のジェネレータやディスクリミネータのステップを実行することができます。同様に、高度なフレームワークは、強化学習トレーニングにはあまり適していません。
カスタムとレニングループをサポートするために、tf.distribute.Strategy
クラスでメソッドのコアセットを提供しています。これらを使用するには、最初にコードをわずかに再構成する必要があるかもしれませんが、それを一度行うと、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えられるようになります。
ここでは、以前と同じ Keras モデルを使って、単純なトレーニングでの使用事例を表した簡単なスニペットを示します。
まず、ストラテジーのスコープ内にモデルとオプティマイザーを作成します。こうすることで、そのモデルとオプティマイザーを使って作成された変数がミラーリングされた変数であることを保証することができます。
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()
次に、入力データセットを作成し、tf.distribute.Strategy.experimental_distribute_dataset
を呼び出して、ストラテジーに応じてデータセットを分散します。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2022-12-14 21:00:37.537268: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:126" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
そして、トレーニングの一ステップを定義します。勾配とオプティマイザーを計算し、その勾配を適用してモデルの変数を更新するために、tf.GradientTape
を使用します。このトレーニングステップを分散するには、train_step
関数に入れて、前に作成した dist_dataset
から取得するデータセット入力とともに、tf.distrbute.Strategy.run
に渡します。
loss_object = tf.keras.losses.BinaryCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
def train_step(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
predictions = model(features, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
@tf.function
def distributed_train_step(dist_inputs):
per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
上記のコードには、注意すべき点がいくつかあります。
tf.nn.compute_average_loss
を使用して損失を計算しました。tf.nn.compute_average_loss
は例ごとの損失を加算し、その和を global_batch_size で除算します。これは、各レプリカで勾配が計算された後、その勾配を加算することで、レプリカ全体の勾配が集計されるため重要です。tf.distribute.Strategy.run
が返す結果を集計するために、tf.distribute.Strategy.reduce
API を使用しました。tf.distribute.Strategy.run
は、ストラテジーの各ローカルレプリカの結果を返し、この結果の消費方法もさまざまです。集計された値を取得するには、結果をreduce
することができます。また、tf.distribute.Strategy.experimental_local_results
を実行して、結果に含まれる、ローカルレプリカ当たり 1 つの値のリストを取得することもできます。- 分散ストラテジーのスコープ内で
apply_gradients
が呼び出されると、その振る舞いが変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配の sum-over-all-replicas を実行します。
最後に、トレーニングステップの定義が済んだら、dist_dataset
をイテレートしてトレーニングをループで実行できます。
for dist_inputs in dist_dataset:
print(distributed_train_step(dist_inputs))
INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1 INFO:tensorflow:batch_all_reduce: 2 all-reduces with algorithm = nccl, num_packs = 1 tf.Tensor(0.21269155, shape=(), dtype=float32) tf.Tensor(0.21195853, shape=(), dtype=float32) tf.Tensor(0.21123002, shape=(), dtype=float32) tf.Tensor(0.210506, shape=(), dtype=float32) tf.Tensor(0.20978644, shape=(), dtype=float32) tf.Tensor(0.20907126, shape=(), dtype=float32) tf.Tensor(0.20836048, shape=(), dtype=float32) tf.Tensor(0.20765403, shape=(), dtype=float32) tf.Tensor(0.20695189, shape=(), dtype=float32) tf.Tensor(0.206254, shape=(), dtype=float32) tf.Tensor(0.20556036, shape=(), dtype=float32) tf.Tensor(0.2048709, shape=(), dtype=float32) tf.Tensor(0.20418559, shape=(), dtype=float32) tf.Tensor(0.20350443, shape=(), dtype=float32) tf.Tensor(0.20282733, shape=(), dtype=float32) tf.Tensor(0.20215431, shape=(), dtype=float32) tf.Tensor(0.2014853, shape=(), dtype=float32) tf.Tensor(0.20082025, shape=(), dtype=float32) tf.Tensor(0.20015922, shape=(), dtype=float32) tf.Tensor(0.19950207, shape=(), dtype=float32) tf.Tensor(0.1988488, shape=(), dtype=float32) tf.Tensor(0.19819942, shape=(), dtype=float32) tf.Tensor(0.19755386, shape=(), dtype=float32) tf.Tensor(0.1969121, shape=(), dtype=float32) tf.Tensor(0.19627407, shape=(), dtype=float32) tf.Tensor(0.19563983, shape=(), dtype=float32) tf.Tensor(0.19500923, shape=(), dtype=float32) tf.Tensor(0.19438234, shape=(), dtype=float32) tf.Tensor(0.19375908, shape=(), dtype=float32) tf.Tensor(0.19313946, shape=(), dtype=float32) tf.Tensor(0.19252343, shape=(), dtype=float32) tf.Tensor(0.19191094, shape=(), dtype=float32) tf.Tensor(0.19130194, shape=(), dtype=float32) tf.Tensor(0.19069648, shape=(), dtype=float32) tf.Tensor(0.19009452, shape=(), dtype=float32) tf.Tensor(0.18949597, shape=(), dtype=float32) tf.Tensor(0.18890083, shape=(), dtype=float32) tf.Tensor(0.1883091, shape=(), dtype=float32) tf.Tensor(0.18772076, shape=(), dtype=float32) tf.Tensor(0.1871357, shape=(), dtype=float32) tf.Tensor(0.18655394, shape=(), dtype=float32) tf.Tensor(0.1859755, shape=(), dtype=float32) tf.Tensor(0.1854003, shape=(), dtype=float32) tf.Tensor(0.18482836, shape=(), dtype=float32) tf.Tensor(0.18425961, shape=(), dtype=float32) tf.Tensor(0.18369403, shape=(), dtype=float32) tf.Tensor(0.18313164, shape=(), dtype=float32) tf.Tensor(0.18257232, shape=(), dtype=float32) tf.Tensor(0.18201615, shape=(), dtype=float32) tf.Tensor(0.18146305, shape=(), dtype=float32)
上記の例では、dist_dataset
をイテレートして、トレーニングに入力を提供しました。また、numpy 入力をサポートするために、tf.distribute.Strategy.make_experimental_numpy_dataset
も提供しています。tf.distribute.Strategy.experimental_distribute_dataset
を呼び出す前にこの API を使って、データセットを作成することができます。
データをイテレートするための別の方法は、明示的にイテレータを使用することです。データセット全体をイテレートするのに対し、特定のステップ数だけ実行する場合に、これを行うことができます。上記のイテレートは、最初にイテレータを作成し、それに明示的にnext
を呼び出して入力データを取得するように変更されます。
iterator = iter(dist_dataset)
for _ in range(10):
print(distributed_train_step(next(iterator)))
tf.Tensor(0.18091302, shape=(), dtype=float32) tf.Tensor(0.18036604, shape=(), dtype=float32) tf.Tensor(0.17982203, shape=(), dtype=float32) tf.Tensor(0.17928106, shape=(), dtype=float32) tf.Tensor(0.17874302, shape=(), dtype=float32) tf.Tensor(0.17820795, shape=(), dtype=float32) tf.Tensor(0.17767578, shape=(), dtype=float32) tf.Tensor(0.17714654, shape=(), dtype=float32) tf.Tensor(0.17662014, shape=(), dtype=float32) tf.Tensor(0.17609662, shape=(), dtype=float32)
これは、カスタムトレーニングループを分散化する tf.distribute.Strategy
API の最も単純な使用ケースを説明しています。
現在、何がサポートされていますか?
トレーニング API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
ParameterServerStrategy |
CentralStorageStrategy |
---|---|---|---|---|---|
カスタムトレーニングループ | サポート中 | サポート中 | 実験的サポート | 実験的サポート | 2.3 以降でサポート予定 |
例とチュートリアル
次は、カスタムトレーニングループを使って分散ストラテジーを使用するいくつかの例です。
MirroredStrategy
で MNIST をトレーニングするチュートリアルTPUStrategy
による MNIST のトレーニングに関するガイド- さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ
- カスタムトレーニングループと
ParameterServerStrategy
でパラメータサーバーをトレーニングするチュートリアル 。 - さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden リポジトリ
その他のトピック
このセクションでは、複数の使用事例に関連のあるトピックをいくつかカバーしています。
TF_CONFIG 環境変数のセットアップ
マルチワーカートレーニングでは、前述のとおり、クラスタで実行されている各バイナリに対して TF_CONFIG
環境変数を設定する必要があります。TF_CONFIG
環境変数は、クラスタを構成するタスク、そのアドレス、およびクラスタ内の各タスクのロールを指定する JSON 文字列です。tensorflow/ecosystem レポジトリには、トレーニングタスクの TF_CONFIG
を設定する Kubernotes テンプレートを提供しています。
TF_CONFIG
には、cluster と task の 2 つのコンポーネントがあります。
- cluster はトレーニングクラスタに関する情報を提供します。これは、worker などのさまざまなタイプのジョブで構成されるディクショナリーです。マルチワーカートレーニングでは、通常、一般的なワーカーの作業に加えて、チェックポイントの保存や TensorBoard のサマリーファイルの書き込みなど、ほかよりタスクを担うワーカーが 1 つあります。こういったワーカーは、「チーフ」ワーカーと呼ばれ、index 0 のワーカーがチーフワーカーに指定されるようになっています(実際、このようにして、
tf.distribute.Strategy
が実装されています)。 - 一方、task は現在のタスクに関する情報を提供します。最初のコンポーネント cluster はすべてのワーカーで同じであり、2番目のコンポーネント task はワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。
次は、TF_CONFIG
の一例です。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"]
},
"task": {"type": "worker", "index": 1}
})
この TF_CONFIG
は、"cluster"
内に 3 つのワーカーと 2 つの "ps"
タスク、およびそれらのホストとポートがあることを指定しています。 "task"
の部分は、"cluster"
の現在のタスクのロールであるワーカー 1
(2番目のワーカー)を指定します。クラスタ内の有効なロールは、"chief"
、"worker"
、"ps"
、および "evaluator"
です。tf.distribute.experimental.ParameterServerStrategy
を使用する場合を除いて、"ps"
ジョブはありません。
次のステップ
tf.distribute.Strategy
は積極的な開発が進められています。ぜひお試しいただき、GitHub 課題 に皆さんの感想をお寄せください。