TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
tf.distribute API उपयोगकर्ताओं को एक मशीन से कई मशीनों तक अपने प्रशिक्षण को स्केल करने का एक आसान तरीका प्रदान करता है। अपने मॉडल को स्केल करते समय, उपयोगकर्ताओं को अपने इनपुट को कई उपकरणों में वितरित करना होता है। tf.distribute एपीआई प्रदान करता है जिसके उपयोग से आप अपने इनपुट को स्वचालित रूप से सभी उपकरणों में वितरित कर सकते हैं।
यह मार्गदर्शिका आपको tf.distribute APIs का उपयोग करके वितरित डेटासेट और इटरेटर बनाने के विभिन्न तरीकों को दिखाएगी। इसके अतिरिक्त, निम्नलिखित विषयों को कवर किया जाएगा:
- tf.distribute.Strategy.experimental_distribute_dataset और
tf.distribute.Strategy.distribute_datasets_from_functionका उपयोग करते समय उपयोग,tf.distribute.Strategy.experimental_distribute_datasetऔर बैचिंग विकल्प। - विभिन्न तरीकों से आप वितरित डेटासेट पर पुनरावृति कर सकते हैं।
-
tf.distribute.Strategy.experimental_distribute_dataset/tf.distribute.Strategy.distribute_datasets_from_functionAPIs औरtf.dataAPIs के बीच अंतर और साथ ही ऐसी कोई भी सीमाएं जो उपयोगकर्ताओं को उनके उपयोग में आ सकती हैं।
यह मार्गदर्शिका केरस एपीआई के साथ वितरित इनपुट के उपयोग को कवर नहीं करती है।
वितरित डेटासेट
बड़े पैमाने पर tf.वितरित API का उपयोग करने के लिए, यह अनुशंसा की जाती है कि उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.distribute का उपयोग tf.data.Dataset । tf.distribute के साथ कुशलतापूर्वक कार्य करने के लिए tf.data.Dataset .वितरण किया गया है। यदि आपके पास tf.data.Dataset के अलावा किसी अन्य चीज़ का उपयोग करने के लिए उपयोग का मामला है, तो कृपया इस गाइड में बाद का अनुभाग देखें। एक गैर-वितरित प्रशिक्षण लूप में, उपयोगकर्ता पहले एक tf.data.Dataset उदाहरण बनाते हैं और फिर तत्वों पर पुनरावृति करते हैं। उदाहरण के लिए:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
उपयोगकर्ताओं को उपयोगकर्ता के मौजूदा कोड में न्यूनतम परिवर्तनों के साथ tf.distribute रणनीति का उपयोग करने की अनुमति देने के लिए, दो एपीआई पेश किए गए थे जो एक tf.data.Dataset उदाहरण वितरित करेंगे और एक वितरित डेटासेट ऑब्जेक्ट लौटाएंगे। एक उपयोगकर्ता तब इस वितरित डेटासेट उदाहरण पर पुनरावृति कर सकता है और अपने मॉडल को पहले की तरह प्रशिक्षित कर सकता है। आइए अब हम दो एपीआई - tf.distribute.Strategy.experimental_distribute_dataset और tf.distribute.Strategy.distribute_datasets_from_function को अधिक विस्तार से देखें:
tf.distribute.Strategy.experimental_distribute_dataset
प्रयोग
यह एपीआई इनपुट के रूप में एक tf.data.Dataset उदाहरण लेता है और एक tf.distribute.DistributedDataset उदाहरण देता है। आपको इनपुट डेटासेट को वैश्विक बैच आकार के बराबर मान के साथ बैच करना चाहिए। यह वैश्विक बैच आकार उन नमूनों की संख्या है, जिन्हें आप 1 चरण में सभी उपकरणों पर संसाधित करना चाहते हैं। आप इस वितरित डेटासेट पर पाइथोनिक फैशन में पुनरावृति कर सकते हैं या iter का उपयोग करके एक पुनरावर्तक बना सकते हैं। लौटाई गई वस्तु एक tf.data.Dataset उदाहरण नहीं है और किसी भी अन्य API का समर्थन नहीं करती है जो किसी भी तरह से डेटासेट को रूपांतरित या निरीक्षण करता है। यह अनुशंसित एपीआई है यदि आपके पास विशिष्ट तरीके नहीं हैं जिसमें आप विभिन्न प्रतिकृतियों पर अपने इनपुट को शार्प करना चाहते हैं।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.],
[1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 1
}
}
attr {
key: "metadata"
value {
s: "\n\017TensorDataset:4"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 1
}
}
shape {
dim {
size: 1
}
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
गुण
बैचिंग
tf.वितरित इनपुट tf.distribute उदाहरण को एक नए बैच आकार के साथ tf.data.Dataset करता है जो सिंक में प्रतिकृतियों की संख्या से विभाजित वैश्विक बैच आकार के बराबर है। सिंक में प्रतिकृतियों की संख्या उन उपकरणों की संख्या के बराबर है जो प्रशिक्षण के दौरान ग्रेडिएंट सभी में भाग ले रहे हैं। जब कोई उपयोगकर्ता वितरित इटरेटर पर next कॉल करता है, तो प्रत्येक प्रतिकृति पर डेटा का प्रति प्रतिकृति बैच आकार वापस कर दिया जाता है। रीबैच्ड डेटासेट कार्डिनैलिटी हमेशा प्रतिकृतियों की संख्या का गुणक होगी। यहां कुछ उदाहरण दिए गए हैं:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)- वितरण के बिना:
- बैच 1: [0, 1, 2, 3]
- बैच 2: [4, 5]
2 प्रतिकृतियों पर वितरण के साथ। अंतिम बैच ([4, 5]) 2 प्रतिकृतियों के बीच विभाजित है।
बैच 1:
- प्रतिकृति 1:[0, 1]
- प्रतिकृति 2:[2, 3]
बैच 2:
- प्रतिकृति 2: [4]
- प्रतिकृति 2: [5]
tf.data.Dataset.range(4).batch(4)- वितरण के बिना:
- बैच 1: [[0], [1], [2], [3]]
- 5 से अधिक प्रतिकृतियों के वितरण के साथ:
- बैच 1:
- प्रतिकृति 1: [0]
- प्रतिकृति 2: [1]
- प्रतिकृति 3: [2]
- प्रतिकृति 4: [3]
- प्रतिकृति 5: []
tf.data.Dataset.range(8).batch(4)- वितरण के बिना:
- बैच 1: [0, 1, 2, 3]
- बैच 2: [4, 5, 6, 7]
- 3 प्रतिकृतियों पर वितरण के साथ:
- बैच 1:
- प्रतिकृति 1: [0, 1]
- प्रतिकृति 2: [2, 3]
- प्रतिकृति 3: []
- बैच 2:
- प्रतिकृति 1: [4, 5]
- प्रतिकृति 2: [6, 7]
- प्रतिकृति 3: []
डेटासेट को रीबैच करने में एक स्थान जटिलता होती है जो प्रतिकृतियों की संख्या के साथ रैखिक रूप से बढ़ती है। इसका मतलब है कि बहु-कार्यकर्ता प्रशिक्षण उपयोग के मामले में इनपुट पाइपलाइन OOM त्रुटियों में चल सकती है।
शेयरिंग
tf.distribute MultiWorkerMirroredStrategy और TPUStrategy के साथ मल्टी वर्कर ट्रेनिंग में इनपुट डेटासेट को ऑटोशर्ड भी करता है। प्रत्येक डेटासेट कार्यकर्ता के सीपीयू डिवाइस पर बनाया जाता है। श्रमिकों के एक समूह पर एक डेटासेट को ऑटोशर्डिंग करने का अर्थ है कि प्रत्येक कार्यकर्ता को संपूर्ण डेटासेट का एक सबसेट सौंपा गया है (यदि सही tf.data.experimental.AutoShardPolicy सेट है)। यह सुनिश्चित करने के लिए है कि प्रत्येक चरण में, गैर-अतिव्यापी डेटासेट तत्वों का एक वैश्विक बैच आकार प्रत्येक कार्यकर्ता द्वारा संसाधित किया जाएगा। ऑटोशर्डिंग में कुछ अलग विकल्प होते हैं जिन्हें tf.data.experimental.DistributeOptions का उपयोग करके निर्दिष्ट किया जा सकता है। ध्यान दें कि ParameterServerStrategy के साथ बहु-कार्यकर्ता प्रशिक्षण में कोई ऑटोशेयरिंग नहीं है, और इस रणनीति के साथ डेटासेट निर्माण पर अधिक जानकारी पैरामीटर सर्वर रणनीति ट्यूटोरियल में पाई जा सकती है।
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
आप tf.data.experimental.AutoShardPolicy के लिए तीन अलग-अलग विकल्प सेट कर सकते हैं:
- AUTO: यह डिफ़ॉल्ट विकल्प है जिसका अर्थ है कि FILE द्वारा शार्प करने का प्रयास किया जाएगा। फ़ाइल-आधारित डेटासेट का पता नहीं चलने पर FILE द्वारा शार्प करने का प्रयास विफल हो जाता है।
tf.distributeफिर DATA द्वारा शार्डिंग पर वापस आ जाएगा। ध्यान दें कि यदि इनपुट डेटासेट फ़ाइल-आधारित है, लेकिन फ़ाइलों की संख्या श्रमिकों की संख्या से कम है, तो एकInvalidArgumentErrorउठाया जाएगा। यदि ऐसा होता है, तो नीति को स्पष्ट रूप सेAutoShardPolicy.DATAपर सेट करें, या अपने इनपुट स्रोत को छोटी फ़ाइलों में विभाजित करें ताकि फाइलों की संख्या श्रमिकों की संख्या से अधिक हो। FILE: यह विकल्प है यदि आप सभी श्रमिकों पर इनपुट फ़ाइलों को शार्प करना चाहते हैं। आपको इस विकल्प का उपयोग करना चाहिए यदि इनपुट फाइलों की संख्या श्रमिकों की संख्या से बहुत अधिक है और फाइलों में डेटा समान रूप से वितरित किया गया है। यदि फाइलों में डेटा समान रूप से वितरित नहीं किया जाता है, तो इस विकल्प का नकारात्मक पक्ष निष्क्रिय श्रमिकों का होना है। यदि फाइलों की संख्या श्रमिकों की संख्या से कम है, तो एक
InvalidArgumentErrorहो जाएगी। यदि ऐसा होता है, तो नीति को स्पष्ट रूप सेAutoShardPolicy.DATAपर सेट करें। उदाहरण के लिए, आइए हम 2 फाइलों को 2 कर्मचारियों पर वितरित करें जिनमें से प्रत्येक में 1 प्रतिकृति है। फ़ाइल 1 में [0, 1, 2, 3, 4, 5] और फ़ाइल 2 में [6, 7, 8, 9, 10, 11] हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है और वैश्विक बैच का आकार 4 है।- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [2, 3]
- बैच 3 = प्रतिकृति 1: [4]
- बैच 4 = प्रतिकृति 1: [5]
- कार्यकर्ता 1:
- बैच 1 = प्रतिकृति 2: [6, 7]
- बैच 2 = प्रतिकृति 2: [8, 9]
- बैच 3 = प्रतिकृति 2: [10]
- बैच 4 = प्रतिकृति 2: [11]
डेटा: यह सभी कर्मचारियों के तत्वों को ऑटोशर्ड करेगा। प्रत्येक कार्यकर्ता पूरे डेटासेट को पढ़ेगा और केवल उसे सौंपे गए शार्प को प्रोसेस करेगा। अन्य सभी शार्क को त्याग दिया जाएगा। यह आमतौर पर तब उपयोग किया जाता है जब इनपुट फाइलों की संख्या श्रमिकों की संख्या से कम होती है और आप सभी श्रमिकों के डेटा को बेहतर तरीके से साझा करना चाहते हैं। नकारात्मक पक्ष यह है कि प्रत्येक कार्यकर्ता पर संपूर्ण डेटासेट पढ़ा जाएगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है।
- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [4, 5]
- बैच 3 = प्रतिकृति 1: [8, 9]
- कार्यकर्ता 1:
- बैच 1 = प्रतिकृति 2: [2, 3]
- बैच 2 = प्रतिकृति 2: [6, 7]
- बैच 3 = प्रतिकृति 2: [10, 11]
बंद: यदि आप ऑटोशेयरिंग को बंद करते हैं, तो प्रत्येक कार्यकर्ता सभी डेटा को संसाधित करेगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। सिंक में प्रतिकृतियों की कुल संख्या 2 होने दें। फिर प्रत्येक कार्यकर्ता को निम्नलिखित वितरण दिखाई देगा:
- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [2, 3]
- बैच 3 = प्रतिकृति 1: [4, 5]
- बैच 4 = प्रतिकृति 1: [6, 7]
- बैच 5 = प्रतिकृति 1: [8, 9]
बैच 6 = प्रतिकृति 1: [10, 11]
कार्यकर्ता 1:
बैच 1 = प्रतिकृति 2: [0, 1]
बैच 2 = प्रतिकृति 2: [2, 3]
बैच 3 = प्रतिकृति 2: [4, 5]
बैच 4 = प्रतिकृति 2: [6, 7]
बैच 5 = प्रतिकृति 2: [8, 9]
बैच 6 = प्रतिकृति 2: [10, 11]
प्रीफेचिंग
डिफ़ॉल्ट रूप से, tf.distribute उपयोगकर्ता द्वारा प्रदान किए गए tf.data.Dataset उदाहरण के अंत में एक प्रीफ़ेच परिवर्तन जोड़ता है। प्रीफ़ेच ट्रांसफ़ॉर्मेशन का तर्क जो कि buffer_size है, सिंक में प्रतिकृतियों की संख्या के बराबर है।
tf.distribute.Strategy.distribute_datasets_from_function
प्रयोग
यह एपीआई एक इनपुट फ़ंक्शन लेता है और एक tf.distribute.DistributedDataset उदाहरण देता है। उपयोगकर्ता द्वारा पास किए जाने वाले इनपुट फ़ंक्शन में tf.distribute.InputContext तर्क होता है और उसे tf.data.Dataset उदाहरण वापस करना चाहिए। इस एपीआई के साथ, tf.distribute उपयोगकर्ता के tf.data.Dataset में कोई और बदलाव नहीं करता है। इनपुट फ़ंक्शन से लौटाए गए डेटासेट इंस्टेंस। डेटासेट को बैच और शार्प करना उपयोगकर्ता की जिम्मेदारी है। tf.distribute प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। उपयोगकर्ताओं को अपने स्वयं के बैचिंग और शार्डिंग लॉजिक को निर्दिष्ट करने की अनुमति देने के अलावा, यह एपीआई बहु-कार्यकर्ता प्रशिक्षण के लिए उपयोग किए जाने पर tf.distribute.Strategy.experimental_distribute_dataset की तुलना में बेहतर मापनीयता और प्रदर्शन भी प्रदर्शित करता है।
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
गुण
बैचिंग
tf.data.Dataset उदाहरण जो कि इनपुट फ़ंक्शन का रिटर्न मान है, प्रति प्रतिकृति बैच आकार का उपयोग करके बैच किया जाना चाहिए। प्रति प्रतिकृति बैच आकार वैश्विक बैच आकार है जो सिंक प्रशिक्षण में भाग लेने वाले प्रतिकृतियों की संख्या से विभाजित होता है। ऐसा इसलिए है क्योंकि tf.distribute प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। किसी दिए गए कार्यकर्ता पर बनाया गया डेटासेट उस कार्यकर्ता पर सभी प्रतिकृतियों द्वारा उपयोग के लिए तैयार होना चाहिए।
शेयरिंग
tf.distribute.InputContext ऑब्जेक्ट जो परोक्ष रूप से उपयोगकर्ता के इनपुट फ़ंक्शन के लिए एक तर्क के रूप में पारित किया जाता है, हुड के तहत tf.distribute .वितरण द्वारा बनाया जाता है। इसमें श्रमिकों की संख्या, वर्तमान कार्यकर्ता आईडी आदि के बारे में जानकारी है। यह इनपुट फ़ंक्शन इन गुणों का उपयोग करके उपयोगकर्ता द्वारा निर्धारित नीतियों के अनुसार शार्डिंग को संभाल सकता है जो tf.distribute.InputContext ऑब्जेक्ट का हिस्सा हैं।
प्रीफेचिंग
tf. tf.distribute के अंत में एक प्रीफ़ेच परिवर्तन नहीं जोड़ता है। उपयोगकर्ता द्वारा प्रदान किए गए इनपुट फ़ंक्शन द्वारा लौटाया गया tf.data.Dataset ।
वितरित इटरेटर्स
गैर-वितरित tf.data.Dataset उदाहरणों के समान, आपको tf.distribute.DistributedDataset उदाहरणों पर पुनरावृति करने और tf.distribute.DistributedDataset में तत्वों तक पहुंचने के लिए एक पुनरावर्तक बनाने की आवश्यकता होगी। निम्नलिखित तरीके हैं जिनसे आप एक tf.distribute.DistributedIterator बना सकते हैं और अपने मॉडल को प्रशिक्षित करने के लिए इसका उपयोग कर सकते हैं:
उपयोगों
लूप निर्माण के लिए पाइथोनिक का उपयोग करें
आप tf.distribute.DistributedDataset पर पुनरावृति करने के लिए एक उपयोगकर्ता के अनुकूल tf.distribute.DistributedDataset लूप का उपयोग कर सकते हैं। tf.distribute.DistributedIterator से लौटाए गए तत्व एकल tf.Tensor या tf.distribute.DistributedValues हो सकते हैं जिसमें प्रति प्रतिकृति एक मान होता है। लूप को tf.function के अंदर रखने से प्रदर्शन को बढ़ावा मिलेगा। हालांकि, break और return वर्तमान में tf.distribute.DistributedDataset tf.function अंदर रखा गया है।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 1
}
}
attr {
key: "metadata"
value {
s: "\n\020TensorDataset:29"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 1
}
}
shape {
dim {
size: 1
}
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]
[0.7]], shape=(16, 1), dtype=float32)
Loss is tf.Tensor(
[[0.7]
[0.7]
[0.7]
[0.7]], shape=(4, 1), dtype=float32)
एक स्पष्ट इटरेटर बनाने के लिए iter का उपयोग करें
tf.distribute.DistributedDataset इंस्टेंस में तत्वों पर पुनरावृति करने के लिए, आप उस पर iter API का उपयोग करके tf.distribute.DistributedIterator बना सकते हैं। एक स्पष्ट पुनरावर्तक के साथ, आप निश्चित चरणों के लिए पुनरावृति कर सकते हैं। tf.distribute.DistributedIterator उदाहरण dist_iterator से अगला तत्व प्राप्त करने के लिए, आप next(dist_iterator) , dist_iterator.get_next() , या dist_iterator.get_next_as_optional() पर कॉल कर सकते हैं। पूर्व दो अनिवार्य रूप से समान हैं:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next() या tf.distribute.DistributedIterator.get_next() के साथ, यदि tf.distribute.DistributedIterator अपने अंत तक पहुँच गया है, तो एक OutOfRange त्रुटि फेंक दी जाएगी। क्लाइंट अजगर की तरफ से त्रुटि पकड़ सकता है और अन्य काम जैसे कि चेकपॉइंटिंग और मूल्यांकन करना जारी रख सकता है। हालांकि, यह काम नहीं करेगा यदि आप एक मेजबान प्रशिक्षण लूप का उपयोग कर रहे हैं (यानी, प्रति tf.function कई चरणों को चलाएं), जो इस तरह दिखता है:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn में tf.range के अंदर स्टेप बॉडी को लपेटकर कई चरण होते हैं। इस मामले में, बिना किसी निर्भरता के लूप में अलग-अलग पुनरावृत्तियों समानांतर में शुरू हो सकते हैं, इसलिए पिछले पुनरावृत्तियों की गणना समाप्त होने से पहले बाद के पुनरावृत्तियों में एक आउटऑफरेंज त्रुटि ट्रिगर की जा सकती है। एक बार OutOfRange त्रुटि होने के बाद, फ़ंक्शन के सभी ऑप्स तुरंत समाप्त कर दिए जाएंगे। यदि यह कुछ ऐसा मामला है जिससे आप बचना चाहते हैं, तो एक विकल्प जो आउटऑफरेंज त्रुटि नहीं फेंकता है वह है tf.distribute.DistributedIterator.get_next_as_optional() । get_next_as_optional एक tf.experimental.Optional देता है जिसमें tf.distribute.DistributedIterator समाप्त होने पर अगला तत्व या कोई मान नहीं होता है।
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
key: "_cardinality"
value {
i: 9
}
}
attr {
key: "metadata"
value {
s: "\n\020RangeDataset:104"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
}
}
}
}
attr {
key: "output_types"
value {
list {
type: DT_INT64
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])
element_spec संपत्ति का उपयोग करना
यदि आप एक वितरित डेटासेट के तत्वों को tf.function में पास करते हैं और एक tf.TypeSpec गारंटी चाहते हैं, तो आप tf.function के input_signature तर्क को निर्दिष्ट कर सकते हैं। एक वितरित डेटासेट का आउटपुट tf.distribute.DistributedValues है जो एक डिवाइस या कई डिवाइस के इनपुट का प्रतिनिधित्व कर सकता है। इस वितरित मूल्य के अनुरूप tf.TypeSpec प्राप्त करने के लिए आप वितरित डेटासेट या वितरित इटरेटर ऑब्जेक्ट की element_spec संपत्ति का उपयोग कर सकते हैं।
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_FLOAT
}
}
}
attr {
key: "_cardinality"
value {
i: 1
}
}
attr {
key: "metadata"
value {
s: "\n\021TensorDataset:122"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 1
}
}
shape {
dim {
size: 1
}
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
([[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]], [[1]
[1]
[1]
...
[1]
[1]
[1]])
आंशिक बैच
आंशिक बैचों का सामना तब होता है जब tf.data.Dataset उदाहरण जो उपयोगकर्ता बनाते हैं उनमें बैच आकार हो सकते हैं जो प्रतिकृतियों की संख्या से समान रूप से विभाज्य नहीं होते हैं या जब डेटासेट आवृत्ति की कार्डिनैलिटी बैच आकार से विभाज्य नहीं होती है। इसका मतलब यह है कि जब डेटासेट को कई प्रतियों में वितरित किया जाता है, तो कुछ पुनरावृत्तियों पर next कॉल के परिणामस्वरूप OutOfRangeError होगा। इस उपयोग के मामले को संभालने के लिए, tf.distribute उन प्रतिकृतियों पर बैच आकार 0 के डमी बैच देता है जिनके पास संसाधित करने के लिए कोई और डेटा नहीं है।
सिंगल वर्कर केस के लिए, यदि डेटा इटरेटर पर next कॉल द्वारा वापस नहीं किया जाता है, तो 0 बैच आकार के डमी बैच बनाए जाते हैं और डेटासेट में वास्तविक डेटा के साथ उपयोग किए जाते हैं। आंशिक बैचों के मामले में, डेटा के अंतिम वैश्विक बैच में डेटा के डमी बैचों के साथ वास्तविक डेटा होगा। डेटा को संसाधित करने के लिए रुकने की स्थिति अब जाँचती है कि क्या किसी भी प्रतिकृति में डेटा है। यदि किसी भी प्रतिकृति पर कोई डेटा नहीं है, तो आउटऑफरेंज त्रुटि फेंक दी जाती है।
बहु कार्यकर्ता मामले के लिए, प्रत्येक कार्यकर्ता पर डेटा की उपस्थिति का प्रतिनिधित्व करने वाला बूलियन मान क्रॉस प्रतिकृति संचार का उपयोग करके एकत्रित किया जाता है और इसका उपयोग यह पहचानने के लिए किया जाता है कि सभी श्रमिकों ने वितरित डेटासेट को संसाधित करना समाप्त कर दिया है या नहीं। चूंकि इसमें क्रॉस वर्कर संचार शामिल है, इसमें कुछ प्रदर्शन दंड शामिल है।
चेतावनियां
एकाधिक कार्यकर्ता सेटअप के साथ
tf.distribute.Strategy.experimental_distribute_datasetAPI का उपयोग करते समय, उपयोगकर्ता एकtf.data.Datasetपास करते हैं जो फ़ाइलों से पढ़ता है। यदिtf.data.experimental.AutoShardPolicyकोAUTOयाFILEपर सेट किया जाता है, तो वास्तविक प्रति चरण बैच आकार उपयोगकर्ता द्वारा परिभाषित वैश्विक बैच आकार से छोटा हो सकता है। यह तब हो सकता है जब फ़ाइल में शेष तत्व वैश्विक बैच आकार से कम हों। उपयोगकर्ता या तो डेटासेट को चलाने के चरणों की संख्या के आधार पर समाप्त कर सकते हैं या इसके आसपास काम करने के लिएtf.data.experimental.AutoShardPolicyकोDATAपर सेट कर सकते हैं।स्टेटफुल डेटासेट ट्रांसफॉर्मेशन वर्तमान में
tf.distributeके साथ समर्थित नहीं हैं और डेटासेट के किसी भी स्टेटफुल ऑप्स को वर्तमान में अनदेखा किया जा सकता है। उदाहरण के लिए, यदि आपके डेटासेट में एकmap_fnहै जो किसी छवि को घुमाने के लिएtf.random.uniformका उपयोग करता है, तो आपके पास एक डेटासेट ग्राफ़ है जो स्थानीय मशीन पर राज्य (यानी यादृच्छिक बीज) पर निर्भर करता है जहां पायथन प्रक्रिया निष्पादित की जा रही है।प्रायोगिक
tf.data.experimental.OptimizationOptionsजो डिफ़ॉल्ट रूप से अक्षम होते हैं, कुछ संदर्भों में - जैसे किtf.distribute.वितरण के साथ उपयोग किए जाने पर - प्रदर्शन में गिरावट का कारण बन सकते हैं। आपको उन्हें तभी सक्षम करना चाहिए जब आप यह सत्यापित कर लें कि वे वितरण सेटिंग में आपके कार्यभार के प्रदर्शन को लाभान्वित करते हैं।सामान्य रूप से
tf.dataके साथ अपनी इनपुट पाइपलाइन को कैसे अनुकूलित करें, इसके लिए कृपया इस गाइड को देखें। कुछ अतिरिक्त टिप्स:यदि आपके पास एक से अधिक कर्मचारी हैं और एक या अधिक ग्लोब पैटर्न से मेल खाने वाली सभी फ़ाइलों से डेटासेट बनाने के लिए
tf.data.Dataset.list_filesका उपयोग कर रहे हैं, तो याद रखें किseedतर्क सेट करें याshuffle=Falseसेट करें ताकि प्रत्येक कार्यकर्ता फ़ाइल को लगातार शार्प करे।यदि आपकी इनपुट पाइपलाइन में रिकॉर्ड स्तर पर डेटा को शफ़ल करना और डेटा को पार्स करना दोनों शामिल हैं, जब तक कि अनपेक्षित डेटा पार्स किए गए डेटा (जो आमतौर पर ऐसा नहीं है) से काफी बड़ा है, पहले फेरबदल करें और फिर पार्स करें, जैसा कि निम्नलिखित उदाहरण में दिखाया गया है। यह स्मृति उपयोग और प्रदर्शन को लाभ पहुंचा सकता है।
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)buffer_sizeतत्वों के आंतरिक बफर को बनाए रखता है, और इस प्रकारbuffer_sizeको कम करने से OOM समस्या कम हो सकती है।tf.distribute.experimental_distribute_datasetयाtf.distribute.distribute_datasets_from_functionका उपयोग करते समय श्रमिकों द्वारा डेटा को संसाधित करने के क्रम की गारंटी नहीं है। यह आमतौर पर तब आवश्यक होता है जब आपtf.distributeसे स्केल भविष्यवाणी का उपयोग कर रहे हों। हालांकि आप बैच में प्रत्येक तत्व के लिए एक इंडेक्स सम्मिलित कर सकते हैं और तदनुसार आउटपुट ऑर्डर कर सकते हैं। निम्न स्निपेट आउटपुट ऑर्डर करने का एक उदाहरण है।
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
key: "_cardinality"
value {
i: 9223372036854775807
}
}
attr {
key: "metadata"
value {
s: "\n\020RangeDataset:162"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
}
}
}
}
attr {
key: "output_types"
value {
list {
type: DT_INT64
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
यदि मैं विहित tf.data.Dataset उदाहरण का उपयोग नहीं कर रहा हूँ, तो मैं अपना डेटा कैसे वितरित करूँ?
कभी-कभी उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.data.Dataset का उपयोग नहीं कर सकते हैं और बाद में उपर्युक्त एपीआई का उपयोग डेटासेट को कई उपकरणों में वितरित करने के लिए कर सकते हैं। ऐसे मामलों में आप जनरेटर से कच्चे टेंसर या इनपुट का उपयोग कर सकते हैं।
मनमाना टेंसर इनपुट के लिए प्रयोग_वितरण_वैल्यू_फ्रॉम_फंक्शन का उपयोग करें
strategy.run tf.distribute.DistributedValues स्वीकार करता है जो next(iterator) का आउटपुट है। टेंसर मानों को पास करने के लिए, tf.वितरण का निर्माण करने के लिए experimental_distribute_values_from_function tf.distribute.DistributedValues का उपयोग करें। कच्चे टेंसर से वितरित मान।
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
यदि आपका इनपुट जनरेटर से है तो tf.data.Dataset.from_generator का उपयोग करें
यदि आपके पास एक जनरेटर फ़ंक्शन है जिसका आप उपयोग करना चाहते हैं, तो आप from_generator API का उपयोग करके एक tf.data.Dataset उदाहरण बना सकते हैं।
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
key: "Targuments"
value {
list {
}
}
}
attr {
key: "_cardinality"
value {
i: -2
}
}
attr {
key: "f"
value {
func {
name: "__inference_Dataset_flat_map_flat_map_fn_3980"
}
}
}
attr {
key: "metadata"
value {
s: "\n\022FlatMapDataset:178"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 4
}
}
}
}
}
attr {
key: "output_types"
value {
list {
type: DT_FLOAT
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
}
}
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
TensorFlow.org पर देखें
Google Colab में चलाएं
GitHub पर स्रोत देखें
नोटबुक डाउनलोड करें