Szkolenie rozproszone z TensorFlow

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

tf.distribute.Strategy to interfejs API TensorFlow do dystrybucji szkoleń na wiele procesorów graficznych, wiele komputerów lub TPU. Korzystając z tego interfejsu API, możesz rozpowszechniać istniejące modele i kod szkoleniowy przy minimalnych zmianach w kodzie.

tf.distribute.Strategy został zaprojektowany z myślą o następujących kluczowych celach:

  • Łatwy w użyciu i obsługujący wiele segmentów użytkowników, w tym badaczy, inżynierów uczenia maszynowego itp.
  • Zapewnij dobrą wydajność po wyjęciu z pudełka.
  • Łatwe przełączanie między strategiami.

Trening można dystrybuować za pomocą tf.distribute.Strategy z interfejsem API wysokiego poziomu, takim jak Keras Model.fit , a także niestandardowymi pętlami treningowymi (i ogólnie dowolnymi obliczeniami przy użyciu TensorFlow).

W TensorFlow 2.x możesz wykonywać swoje programy z zapałem lub na wykresie za pomocą tf.function . tf.distribute.Strategy zamierza obsługiwać oba te tryby wykonywania, ale działa najlepiej z tf.function . Tryb Eager jest zalecany tylko do celów debugowania i nie jest obsługiwany przez tf.distribute.TPUStrategy . Chociaż szkolenie jest głównym tematem tego przewodnika, ten interfejs API może być również używany do rozpowszechniania oceny i prognozowania na różnych platformach.

Możesz użyć tf.distribute.Strategy z bardzo niewielkimi zmianami w kodzie, ponieważ podstawowe komponenty TensorFlow zostały zmienione tak, aby były zgodne ze strategią. Obejmuje to zmienne, warstwy, modele, optymalizatory, metryki, podsumowania i punkty kontrolne.

W tym przewodniku dowiesz się o różnych rodzajach strategii oraz o tym, jak możesz z nich korzystać w różnych sytuacjach. Aby dowiedzieć się, jak debugować problemy z wydajnością, zapoznaj się z przewodnikiem po optymalizacji wydajności GPU TensorFlow .

Skonfiguruj TensorFlow

import tensorflow as tf

Rodzaje strategii

tf.distribute.Strategy ma na celu objęcie szeregu przypadków użycia na różnych osiach. Niektóre z tych kombinacji są obecnie obsługiwane, a inne zostaną dodane w przyszłości. Niektóre z tych osi to:

  • Trening synchroniczny vs asynchroniczny: są to dwa popularne sposoby dystrybucji treningu z równoległością danych. Podczas szkolenia synchronizacji wszyscy pracownicy szkolą się na różnych wycinkach danych wejściowych w synchronizacji i agregują gradienty na każdym kroku. W szkoleniu asynchronicznym wszyscy pracownicy niezależnie trenują dane wejściowe i asynchronicznie aktualizują zmienne. Zazwyczaj uczenie synchronizacji jest obsługiwane za pośrednictwem architektury all-reduce i async za pomocą architektury serwera parametrów.
  • Platforma sprzętowa: możesz chcieć przeskalować szkolenie na wielu procesorach GPU na jednej maszynie lub na wielu maszynach w sieci (każda z 0 lub większą liczbą procesorów GPU) lub na jednostkach Cloud TPU.

Aby obsługiwać te przypadki użycia, TensorFlow ma MirroredStrategy , TPUStrategy , MultiWorkerMirroredStrategy , ParameterServerStrategy , CentralStorageStrategy , a także inne dostępne strategie. W następnej sekcji wyjaśniono, które z nich są obsługiwane w jakich scenariuszach w TensorFlow. Oto krótki przegląd:

Szkolenia API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras Model.fit Utrzymany Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne
Niestandardowa pętla treningowa Utrzymany Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne
Interfejs API estymatora Ograniczone wsparcie Nieobsługiwany Ograniczone wsparcie Ograniczone wsparcie Ograniczone wsparcie

Strategia lustrzana

tf.distribute.MirroredStrategy obsługuje synchroniczne szkolenie rozproszone na wielu procesorach graficznych na jednej maszynie. Tworzy jedną replikę na urządzenie GPU. Każda zmienna w modelu jest odzwierciedlona we wszystkich replikach. Razem te zmienne tworzą jedną zmienną koncepcyjną o nazwie MirroredVariable . Zmienne te są ze sobą zsynchronizowane dzięki zastosowaniu identycznych aktualizacji.

Wydajne algorytmy all-reduce są używane do przekazywania aktualizacji zmiennych na urządzeniach. All-reduce agreguje tensory na wszystkich urządzeniach, sumując je i udostępnia na każdym urządzeniu. Jest to połączony algorytm, który jest bardzo wydajny i może znacznie zmniejszyć obciążenie synchronizacji. Dostępnych jest wiele algorytmów all-reduce i implementacji, w zależności od rodzaju komunikacji dostępnej między urządzeniami. Domyślnie używa biblioteki zbiorowej komunikacji NVIDIA ( NCCL ) jako implementacji all-reduce. Możesz wybrać jedną z kilku innych opcji lub napisać własną.

Oto najprostszy sposób tworzenia MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Spowoduje to utworzenie instancji MirroredStrategy , która będzie wykorzystywać wszystkie procesory graficzne widoczne dla TensorFlow oraz NCCL — jako komunikację między urządzeniami.

Jeśli chcesz używać tylko niektórych procesorów graficznych na swoim komputerze, możesz to zrobić w następujący sposób:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Jeśli chcesz przesłonić komunikację między urządzeniami, możesz to zrobić za pomocą argumentu cross_device_ops , dostarczając wystąpienie tf.distribute.CrossDeviceOps . Obecnie tf.distribute.HierarchicalCopyAllReduce i tf.distribute.ReductionToOneDevice to dwie opcje inne niż tf.distribute.NcclAllReduce , który jest wartością domyślną.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Strategia TPU

tf.distribute.TPUStrategy umożliwia prowadzenie szkolenia TensorFlow na jednostkach przetwarzania Tensor (TPU) . TPU to wyspecjalizowane układy ASIC firmy Google zaprojektowane w celu znacznego przyspieszenia obciążeń związanych z uczeniem maszynowym. Są one dostępne w Google Colab , TPU Research Cloud i Cloud TPU .

Pod względem rozproszonej architektury szkoleniowej TPUStrategy jest tą samą MirroredStrategy — implementuje synchroniczne rozproszone szkolenie. Jednostki TPU zapewniają własną implementację wydajnych operacji all-reduce i innych zbiorowych operacji na wielu rdzeniach TPU, które są używane w TPUStrategy .

Oto jak utworzyć instancję TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

Instancja TPUClusterResolver pomaga zlokalizować TPU. W Colab nie musisz podawać żadnych argumentów.

Jeśli chcesz użyć tego w przypadku Cloud TPU:

  • Musisz podać nazwę zasobu TPU w argumencie tpu .
  • Musisz jawnie zainicjować system TPU na początku programu. Jest to wymagane przed użyciem jednostek TPU do obliczeń. Inicjowanie systemu TPU powoduje również wymazanie pamięci TPU, dlatego ważne jest, aby najpierw wykonać ten krok, aby uniknąć utraty stanu.

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategy jest bardzo podobny do MirroredStrategy . Implementuje synchroniczne, rozproszone szkolenia dla wielu pracowników, z których każdy ma potencjalnie wiele procesorów graficznych. Podobnie jak tf.distribute.MirroredStrategy , tworzy kopie wszystkich zmiennych w modelu na każdym urządzeniu we wszystkich procesach roboczych.

Oto najprostszy sposób tworzenia MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy ma dwie implementacje do komunikacji między urządzeniami. CommunicationImplementation.RING jest oparty na RPC i obsługuje zarówno procesory CPU, jak i GPU. CommunicationImplementation.NCCL używa NCCL i zapewnia najwyższą wydajność na procesorach graficznych, ale nie obsługuje procesorów. CollectiveCommunication.AUTO odkłada wybór na Tensorflow. Możesz je określić w następujący sposób:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Jedną z kluczowych różnic w szkoleniu z wieloma pracownikami w porównaniu ze szkoleniem z wykorzystaniem wielu procesorów graficznych jest konfiguracja z wieloma pracownikami. Zmienna środowiskowa 'TF_CONFIG' jest standardowym sposobem w TensorFlow określania konfiguracji klastra dla każdego procesu roboczego, który jest częścią klastra. Dowiedz się więcej w sekcji dotyczącej konfiguracji TF_CONFIG tego dokumentu.

Aby uzyskać więcej informacji na temat MultiWorkerMirroredStrategy , zapoznaj się z następującymi samouczkami:

Strategia serwera parametrów

Uczenie serwera parametrów to powszechna metoda równoległości danych służąca do skalowania uczenia modelu w górę na wielu komputerach. Klaster uczący serwera parametrów składa się z procesów roboczych i serwerów parametrów. Zmienne są tworzone na serwerach parametrów i na każdym kroku są odczytywane i aktualizowane przez pracowników. Zapoznaj się z samouczkiem dotyczącym szkolenia serwera parametrów, aby uzyskać szczegółowe informacje.

W TensorFlow 2 uczenie serwera parametrów wykorzystuje architekturę opartą na centralnym koordynatorze za pośrednictwem klasy tf.distribute.experimental.coordinator.ClusterCoordinator .

W tej implementacji zadania tf.distribute.Server i parameter server uruchamiają serwery worker , które nasłuchują zadań od koordynatora. Koordynator tworzy zasoby, wysyła zadania szkoleniowe, zapisuje punkty kontrolne i zajmuje się niepowodzeniem zadań.

W programowaniu działającym na koordynatorze użyjesz obiektu ParameterServerStrategy , aby zdefiniować krok szkolenia i użyjesz ClusterCoordinator , aby rozesłać kroki szkolenia do pracowników zdalnych. Oto najprostszy sposób ich tworzenia:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Aby dowiedzieć się więcej o ParameterServerStrategy , zapoznaj się ze szkoleniem dotyczącym serwera parametrów za pomocą Keras Model.fit i samouczkiem dotyczącym niestandardowej pętli szkoleniowej.

W TensorFlow 1 ParameterServerStrategy jest dostępny tylko z estymatorem za pośrednictwem symbolu tf.compat.v1.distribute.experimental.ParameterServerStrategy .

Centralna strategia magazynowania

tf.distribute.experimental.CentralStorageStrategy również wykonuje treningi synchroniczne. Zmienne nie są dublowane, zamiast tego są umieszczane na procesorze, a operacje są replikowane na wszystkich lokalnych procesorach graficznych. Jeśli jest tylko jeden GPU, wszystkie zmienne i operacje zostaną umieszczone na tym GPU.

Utwórz instancję CentralStorageStrategy poprzez:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Spowoduje to utworzenie instancji CentralStorageStrategy , która będzie wykorzystywać wszystkie widoczne procesory graficzne i procesory. Aktualizacja zmiennych w replikach zostanie zagregowana przed zastosowaniem do zmiennych.

Inne strategie

Oprócz powyższych strategii istnieją dwie inne strategie, które mogą być przydatne do prototypowania i debugowania podczas korzystania z interfejsów API tf.distribute .

Domyślna strategia

Strategia Domyślna jest strategią dystrybucji, która jest obecna, gdy w zakresie nie ma wyraźnej strategii dystrybucji. Implementuje interfejs tf.distribute.Strategy , ale jest tranzytem i nie zapewnia rzeczywistej dystrybucji. Na przykład Strategy.run(fn) po prostu wywoła fn . Kod napisany przy użyciu tej strategii powinien zachowywać się dokładnie tak, jak kod napisany bez żadnej strategii. Możesz myśleć o tym jako o strategii „bez operacji”.

Strategia domyślna jest singletonem – i nie można stworzyć jej więcej. Można go uzyskać za pomocą tf.distribute.get_strategy poza zakresem jawnej strategii (tego samego interfejsu API, którego można użyć, aby uzyskać bieżącą strategię w zakresie jawnej strategii).

default_strategy = tf.distribute.get_strategy()

Ta strategia służy dwóm głównym celom:

  • Umożliwia bezwarunkowe pisanie kodu biblioteki zależnej od dystrybucji. Na przykład w tf.optimizer s możesz użyć tf.distribute.get_strategy i użyć tej strategii do redukcji gradientów — zawsze zwróci ona obiekt strategii, na którym możesz wywołać API Strategy.reduce .
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Podobnie jak kod biblioteczny, może być używany do pisania programów użytkowników końcowych do pracy ze strategią dystrybucji i bez niej, bez konieczności stosowania logiki warunkowej. Oto przykładowy fragment kodu ilustrujący to:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

Strategia OneDevice

tf.distribute.OneDeviceStrategy to strategia umieszczania wszystkich zmiennych i obliczeń na jednym określonym urządzeniu.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Strategia ta różni się od Strategii Domyślnej na wiele sposobów. W strategii domyślnej logika rozmieszczania zmiennych pozostaje niezmieniona w porównaniu z uruchamianiem TensorFlow bez żadnej strategii dystrybucji. Jednak podczas korzystania z OneDeviceStrategy wszystkie zmienne utworzone w jego zakresie są jawnie umieszczane na określonym urządzeniu. Ponadto wszelkie funkcje wywoływane za pośrednictwem OneDeviceStrategy.run również zostaną umieszczone na określonym urządzeniu.

Dane wejściowe dystrybuowane za pośrednictwem tej strategii będą wstępnie pobierane do określonego urządzenia. W Strategii Domyślnej nie ma dystrybucji danych wejściowych.

Podobnie jak w przypadku strategii domyślnej, strategia ta może być również używana do testowania kodu przed przejściem na inne strategie, które faktycznie dystrybuują do wielu urządzeń/maszyn. Spowoduje to ćwiczenie maszynerii strategii dystrybucji nieco bardziej niż Strategii Domyślnej, ale nie w pełnym zakresie wykorzystania na przykład MirroredStrategy lub TPUStrategy . Jeśli chcesz, aby kod zachowywał się tak, jakby nie było żadnej strategii, użyj strategii domyślnej.

Do tej pory poznałeś różne strategie i sposoby ich tworzenia. W kilku następnych sekcjach przedstawiono różne sposoby wykorzystania ich do dystrybucji treningu.

Użyj tf.distribute.Strategy z Keras Model.fit

tf.distribute.Strategy jest zintegrowany z tf.keras , który jest implementacją specyfikacji Keras API w TensorFlow . tf.keras to wysokopoziomowe API do budowania i trenowania modeli. Integrując się z tf.keras , możesz bezproblemowo dystrybuować swoje szkolenia napisane w środowisku szkoleniowym Keras przy użyciu Model.fit .

Oto, co musisz zmienić w swoim kodzie:

  1. Utwórz instancję odpowiedniego tf.distribute.Strategy .
  2. Przenieś tworzenie modelu Keras, optymalizatora i metryk do pliku strategy.scope .

Strategie dystrybucji TensorFlow obsługują wszystkie typy modeli Keras — Sequential , Functional i subclassed .

Oto fragment kodu, który to zrobi dla bardzo prostego modelu Keras z jedną warstwą Dense :

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

W tym przykładzie użyto MirroredStrategy , więc możesz uruchomić to na komputerze z wieloma procesorami graficznymi. strategy.scope() wskazuje Kerasowi, której strategii użyć do dystrybucji szkolenia. Tworzenie modeli/optymalizatorów/metryk w tym zakresie umożliwia tworzenie zmiennych rozproszonych zamiast zwykłych zmiennych. Po skonfigurowaniu możesz dopasować swój model tak, jak zwykle. MirroredStrategy zajmuje się replikacją treningu modelu na dostępnych procesorach graficznych, agregacją gradientów i nie tylko.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

Tutaj tf.data.Dataset zapewnia dane wejściowe szkolenia i oceny. Możesz także użyć tablic NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>

W obu przypadkach — z Dataset lub NumPy — każda partia danych wejściowych jest dzielona równo między wiele replik. Na przykład, jeśli używasz MirroredStrategy z 2 procesorami graficznymi, każda partia o rozmiarze 10 zostanie podzielona między 2 procesory graficzne, z których każdy otrzyma 5 przykładów wejściowych w każdym kroku. Każda epoka będzie trenować szybciej, gdy dodasz więcej procesorów graficznych. Zazwyczaj chciałbyś zwiększyć rozmiar partii, dodając więcej akceleratorów, aby efektywnie wykorzystać dodatkową moc obliczeniową. W zależności od modelu konieczne będzie również ponowne dostrojenie szybkości uczenia się. Aby uzyskać liczbę replik, możesz użyć strategy.num_replicas_in_sync .

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Co jest teraz obsługiwane?

Szkolenia API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras Model.fit Utrzymany Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne

Przykłady i tutoriale

Oto lista samouczków i przykładów ilustrujących powyższą integrację end-to-end z Keras Model.fit :

  1. Samouczek : Trening z Model.fit i MirroredStrategy .
  2. Samouczek : Trening z Model.fit i MultiWorkerMirroredStrategy .
  3. Przewodnik : zawiera przykład użycia Model.fit i TPUStrategy .
  4. Samouczek : Trening serwera parametrów za pomocą Model.fit i ParameterServerStrategy .
  5. Samouczek : Dostrajanie BERT do wielu zadań z benchmarku GLUE za pomocą Model.fit i TPUStrategy .
  6. Repozytorium TensorFlow Model Garden zawierające kolekcje najnowocześniejszych modeli zaimplementowanych przy użyciu różnych strategii.

Użyj tf.distribute.Strategy z niestandardowymi pętlami treningowymi

Jak pokazano powyżej, użycie tf.distribute.Strategy z Keras Model.fit wymaga zmiany tylko kilku linijek kodu. Przy odrobinie wysiłku możesz również użyć tf.distribute.Strategy z niestandardowymi pętlami treningowymi .

Jeśli potrzebujesz większej elastyczności i kontroli nad pętlami treningowymi niż jest to możliwe w Estimatorze lub Keras, możesz napisać własne pętle treningowe. Na przykład, używając GAN, możesz chcieć wykonać inną liczbę kroków generatora lub dyskryminatora w każdej rundzie. Podobnie, ramy wysokiego poziomu nie są zbyt odpowiednie do szkolenia w zakresie uczenia się przez wzmacnianie.

Klasy tf.distribute.Strategy zapewniają podstawowy zestaw metod do obsługi niestandardowych pętli szkoleniowych. Korzystanie z nich może początkowo wymagać niewielkiej przebudowy kodu, ale gdy to zrobisz, powinieneś być w stanie przełączać się między procesorami graficznymi, jednostkami TPU i wieloma maszynami, po prostu zmieniając instancję strategii.

Poniżej znajduje się krótki fragment ilustrujący ten przypadek użycia dla prostego przykładu szkoleniowego przy użyciu tego samego modelu Keras co poprzednio.

Najpierw utwórz model i optymalizator w zakresie strategii. Zapewnia to, że wszelkie zmienne utworzone za pomocą modelu i optymalizatora są zmiennymi lustrzanymi.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Następnie utwórz wejściowy zestaw danych i wywołaj tf.distribute.Strategy.experimental_distribute_dataset , aby dystrybuować zestaw danych na podstawie strategii.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Następnie zdefiniuj jeden krok szkolenia. Użyj tf.GradientTape do obliczenia gradientów i optymalizacji, aby zastosować te gradienty w celu aktualizacji zmiennych modelu. Aby rozpowszechnić ten krok uczenia, umieść go w funkcji train_step i przekaż do tf.distribute.Strategy.run wraz z danymi wejściowymi zestawu danych otrzymanymi z dist_dataset utworzonego wcześniej:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Kilka innych rzeczy do zapamiętania w powyższym kodzie:

  1. Do obliczenia straty użyto tf.nn.compute_average_loss . tf.nn.compute_average_loss sumuje straty na przykład i dzieli sumę przez global_batch_size . Jest to ważne, ponieważ później po obliczeniu gradientów dla każdej repliki są one agregowane w replikach poprzez ich sumowanie .
  2. Użyłeś również interfejsu API tf.distribute.Strategy.reduce do agregacji wyników zwróconych przez tf.distribute.Strategy.run . tf.distribute.Strategy.run zwraca wyniki z każdej lokalnej repliki w strategii i istnieje wiele sposobów wykorzystania tego wyniku. Możesz je reduce , aby uzyskać zagregowaną wartość. Możesz również wykonać tf.distribute.Strategy.experimental_local_results , aby uzyskać listę wartości zawartych w wyniku, po jednej na replikę lokalną.
  3. Gdy wywołujesz apply_gradients w zakresie strategii dystrybucji, jego zachowanie jest modyfikowane. W szczególności przed zastosowaniem gradientów na każdym równoległym wystąpieniu podczas treningu synchronicznego wykonuje sumowanie wszystkich replik gradientów.

Wreszcie, po zdefiniowaniu kroku uczenia, możesz iterować przez dist_dataset i uruchomić szkolenie w pętli:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

W powyższym przykładzie wykonałeś iterację po zestawie danych dist_dataset , aby wprowadzić dane wejściowe do treningu. Otrzymasz również tf.distribute.Strategy.make_experimental_numpy_dataset do obsługi danych wejściowych NumPy. Możesz użyć tego interfejsu API do utworzenia zestawu danych przed wywołaniem tf.distribute.Strategy.experimental_distribute_dataset .

Innym sposobem iteracji po danych jest jawne użycie iteratorów. Możesz to zrobić, jeśli chcesz uruchomić określoną liczbę kroków, w przeciwieństwie do iteracji całego zestawu danych. Powyższa iteracja zostałaby teraz zmodyfikowana, aby najpierw utworzyć iterator, a next jawnie wywołać go w celu uzyskania danych wejściowych.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)

Obejmuje to najprostszy przypadek użycia tf.distribute.Strategy API do dystrybucji niestandardowych pętli treningowych.

Co jest teraz obsługiwane?

Szkolenia API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Niestandardowa pętla treningowa Utrzymany Utrzymany Utrzymany Wsparcie eksperymentalne Wsparcie eksperymentalne

Przykłady i tutoriale

Oto kilka przykładów wykorzystania strategii dystrybucji z niestandardowymi pętlami treningowymi:

  1. Samouczek : Trening z wykorzystaniem niestandardowej pętli treningowej i MirroredStrategy .
  2. Samouczek : Trening z wykorzystaniem niestandardowej pętli szkoleniowej i MultiWorkerMirroredStrategy .
  3. Przewodnik : zawiera przykład niestandardowej pętli treningowej z TPUStrategy .
  4. Samouczek : Trening serwera parametrów z niestandardową pętlą treningową i ParameterServerStrategy .
  5. Repozytorium TensorFlow Model Garden zawierające kolekcje najnowocześniejszych modeli zaimplementowanych przy użyciu różnych strategii.

Inne tematy

W tej sekcji omówiono niektóre tematy, które dotyczą wielu przypadków użycia.

Konfigurowanie zmiennej środowiskowej TF_CONFIG

W przypadku szkolenia z wieloma pracownikami, jak wspomniano wcześniej, należy skonfigurować zmienną środowiskową 'TF_CONFIG' dla każdego pliku binarnego działającego w klastrze. Zmienna środowiskowa 'TF_CONFIG' to ciąg znaków JSON, który określa, jakie zadania tworzą klaster, ich adresy i rolę każdego zadania w klastrze. tensorflow/ecosystem udostępnia szablon Kubernetes, który konfiguruje 'TF_CONFIG' dla Twoich zadań szkoleniowych.

'TF_CONFIG' się z dwóch elementów: klastra i zadania.

  • Klaster dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jak pracownicy. W szkoleniu dla wielu pracowników zazwyczaj jeden pracownik bierze na siebie nieco większą odpowiedzialność, jak zapisywanie punktu kontrolnego i pisanie pliku podsumowującego dla TensorBoard, oprócz tego, co robi zwykły pracownik. Taki pracownik jest określany jako „główny” i zwyczajowo pracownik z indeksem 0 jest wyznaczany jako główny pracownik (w rzeczywistości tak realizuje się tf.distribute.Strategy ).
  • Zadanie natomiast dostarcza informacji o bieżącym zadaniu. Pierwszy klaster komponentów jest taki sam dla wszystkich pracowników, a drugie zadanie komponentu jest inne dla każdego pracownika i określa typ i indeks tego pracownika.

Jednym z przykładów 'TF_CONFIG' jest:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

To 'TF_CONFIG' określa, że ​​w "cluster" znajdują się trzy procesy robocze i dwa zadania "ps" wraz z ich hostami i portami. Część "task" określa rolę bieżącego zadania w "cluster" pracownik 1 (drugi pracownik). Prawidłowe role w klastrze to "chief" , "worker" , "ps" i "evaluator" . Nie powinno być żadnego zadania "ps" , chyba że używa tf.distribute.experimental.ParameterServerStrategy .

Co dalej?

tf.distribute.Strategy jest aktywnie rozwijany. Wypróbuj go i przekaż swoją opinię, korzystając z problemów z GitHub .