Xem trên TensorFlow.org | Chạy trong Google Colab | Xem nguồn trên GitHub | Tải xuống sổ ghi chép |
Các API tf.distribute cung cấp một cách dễ dàng để người dùng mở rộng quy mô đào tạo của họ từ một máy thành nhiều máy. Khi mở rộng mô hình của họ, người dùng cũng phải phân phối đầu vào của họ trên nhiều thiết bị. tf.distribute
cung cấp các API mà bạn có thể tự động phân phối thông tin đầu vào của mình trên các thiết bị.
Hướng dẫn này sẽ chỉ cho bạn các cách khác nhau mà bạn có thể tạo tập dữ liệu phân tán và trình vòng lặp bằng cách sử dụng các API tf.distribute
. Ngoài ra, các chủ đề sau sẽ được đề cập:
- Các tùy chọn sử dụng, sharding và theo lô khi sử dụng
tf.distribute.Strategy.experimental_distribute_dataset
vàtf.distribute.Strategy.distribute_datasets_from_function
. - Các cách khác nhau mà bạn có thể lặp qua tập dữ liệu phân tán.
- Sự khác biệt giữa các API
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
và APItf.data
cũng như bất kỳ hạn chế nào mà người dùng có thể gặp phải khi sử dụng.
Hướng dẫn này không đề cập đến việc sử dụng đầu vào phân tán với API Keras.
Tập dữ liệu được phân phối
Để sử dụng các API tf.distribute
để mở rộng quy mô, người dùng nên sử dụng tf.data.Dataset
để đại diện cho đầu vào của họ. tf.distribute
đã được thực hiện để hoạt động hiệu quả với tf.data.Dataset
(ví dụ: tự động tìm nạp trước dữ liệu trên mỗi thiết bị tăng tốc) với việc tối ưu hóa hiệu suất thường xuyên được tích hợp vào quá trình triển khai. Nếu bạn có trường hợp sử dụng để sử dụng thứ gì đó khác với tf.data.Dataset
, vui lòng tham khảo phần sau của hướng dẫn này. Trong vòng lặp đào tạo không phân tán, trước tiên người dùng tạo một cá thể tf.data.Dataset
và sau đó lặp lại các phần tử. Ví dụ:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Để cho phép người dùng sử dụng chiến lược tf.distribute
với những thay đổi tối thiểu đối với mã hiện có của người dùng, hai API đã được giới thiệu sẽ phân phối một tf.data.Dataset
và trả về một đối tượng tập dữ liệu phân tán. Sau đó, người dùng có thể lặp lại phiên bản tập dữ liệu phân tán này và đào tạo mô hình của họ như trước. Bây giờ chúng ta hãy xem xét hai API - tf.distribute.Strategy.experimental_distribute_dataset
và tf.distribute.Strategy.distribute_datasets_from_function
năng chi tiết hơn:
tf.distribute.Strategy.experimental_distribute_dataset
Cách sử dụng
API này lấy một cá thể tf.data.Dataset
làm đầu vào và trả về một cá thể tf.distribute.DistributedDataset
. Bạn nên nhập hàng loạt tập dữ liệu đầu vào với giá trị bằng với kích thước lô chung. Kích thước lô toàn cầu này là số lượng mẫu mà bạn muốn xử lý trên tất cả các thiết bị trong 1 bước. Bạn có thể lặp lại tập dữ liệu được phân phối này theo kiểu iter
hoặc tạo một trình lặp bằng cách sử dụng nó. Đối tượng được trả về không phải là một tf.data.Dataset
và không hỗ trợ bất kỳ API nào khác có thể chuyển đổi hoặc kiểm tra tập dữ liệu theo bất kỳ cách nào. Đây là API được đề xuất nếu bạn không có các cách cụ thể mà bạn muốn phân đoạn đầu vào của mình thành các bản sao khác nhau.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Tính chất
Hàng loạt
tf.distribute
bắt lại cá thể tf.data.Dataset
đầu vào với kích thước lô mới bằng kích thước lô chung chia cho số lượng bản sao đồng bộ. Số lượng bản sao được đồng bộ hóa bằng số lượng thiết bị đang tham gia phân bổ gradient trong quá trình đào tạo. Khi người dùng gọi next
trên trình lặp phân tán, kích thước lô dữ liệu trên mỗi bản sao sẽ được trả về trên mỗi bản sao. Số lượng bản sao của tập dữ liệu được phục hồi sẽ luôn là bội số của số bản sao. Dưới đây là một số ví dụ:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Không có phân phối:
- Đợt 1: [0, 1, 2, 3]
- Đợt 2: [4, 5]
Với phân phối trên 2 bản sao. Lô cuối cùng ([4, 5]) được chia thành 2 bản sao.
Đợt 1:
- Bản sao 1: [0, 1]
- Bản sao 2: [2, 3]
Đợt 2:
- Bản sao 2: [4]
- Bản sao 2: [5]
tf.data.Dataset.range(4).batch(4)
- Không có phân phối:
- Đợt 1: [[0], [1], [2], [3]]
- Với phân phối trên 5 bản sao:
- Đợt 1:
- Bản sao 1: [0]
- Bản sao 2: [1]
- Bản sao 3: [2]
- Bản sao 4: [3]
- Bản sao 5: []
tf.data.Dataset.range(8).batch(4)
- Không có phân phối:
- Đợt 1: [0, 1, 2, 3]
- Đợt 2: [4, 5, 6, 7]
- Với phân phối trên 3 bản sao:
- Đợt 1:
- Bản sao 1: [0, 1]
- Bản sao 2: [2, 3]
- Bản sao 3: []
- Đợt 2:
- Bản sao 1: [4, 5]
- Bản sao 2: [6, 7]
- Bản sao 3: []
Việc so khớp tập dữ liệu có độ phức tạp về không gian tăng tuyến tính với số lượng bản sao. Điều này có nghĩa là đối với trường hợp sử dụng đào tạo nhiều nhân viên, đường ống đầu vào có thể gặp lỗi OOM.
Mài giũa
tf.distribute
cũng tự động cứng bộ dữ liệu đầu vào trong quá trình đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy
và TPUStrategy
. Mỗi tập dữ liệu được tạo trên thiết bị CPU của nhân viên. Tự động tích hợp tập dữ liệu qua một tập hợp công nhân có nghĩa là mỗi công nhân được chỉ định một tập hợp con của toàn bộ tập dữ liệu (nếu đặt đúng tf.data.experimental.AutoShardPolicy
). Điều này là để đảm bảo rằng ở mỗi bước, kích thước lô toàn cầu của các phần tử tập dữ liệu không chồng chéo sẽ được xử lý bởi từng công nhân. Tự động sạc có một số tùy chọn khác nhau có thể được chỉ định bằng cách sử dụng tf.data.experimental.DistributeOptions
. Lưu ý rằng không có tự động sạc trong đào tạo nhiều nhân viên với ParameterServerStrategy
và bạn có thể tìm thấy thêm thông tin về cách tạo tập dữ liệu với chiến lược này trong hướng dẫn Chiến lược máy chủ tham số .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Có ba tùy chọn khác nhau mà bạn có thể đặt cho tf.data.experimental.AutoShardPolicy
:
- AUTO: Đây là tùy chọn mặc định có nghĩa là một nỗ lực sẽ được thực hiện để phân đoạn bằng FILE. Nỗ lực phân đoạn bằng FILE không thành công nếu tập dữ liệu dựa trên tệp không được phát hiện.
tf.distribute
sau đó sẽ trở lại sharding bởi DATA. Lưu ý rằng nếu tập dữ liệu đầu vào là dựa trên tệp nhưng số tệp ít hơn số công nhân, mộtInvalidArgumentError
không hợp lệ sẽ được đưa ra. Nếu điều này xảy ra, hãy đặt chính sách thànhAutoShardPolicy.DATA
một cách rõ ràng hoặc chia nguồn đầu vào của bạn thành các tệp nhỏ hơn sao cho số tệp nhiều hơn số công nhân. FILE: Đây là tùy chọn nếu bạn muốn chia nhỏ các tệp đầu vào trên tất cả các worker. Bạn nên sử dụng tùy chọn này nếu số lượng tệp đầu vào lớn hơn nhiều so với số lượng nhân công và dữ liệu trong tệp được phân bổ đồng đều. Nhược điểm của tùy chọn này là có công nhân nhàn rỗi nếu dữ liệu trong các tệp không được phân phối đồng đều. Nếu số lượng tệp ít hơn số lượng công nhân, một
InvalidArgumentError
không hợp lệ sẽ xuất hiện. Nếu điều này xảy ra, hãy đặt chính sách thànhAutoShardPolicy.DATA
một cách rõ ràng. Ví dụ: chúng ta hãy phân phối 2 tệp cho 2 công nhân với mỗi tệp 1 bản sao. Tệp 1 chứa [0, 1, 2, 3, 4, 5] và Tệp 2 chứa [6, 7, 8, 9, 10, 11]. Đặt tổng số bản sao được đồng bộ hóa là 2 và kích thước lô toàn cầu là 4.- Công nhân 0:
- Đợt 1 = Bản sao 1: [0, 1]
- Batch 2 = Replica 1: [2, 3]
- Batch 3 = Replica 1: [4]
- Đợt 4 = Bản sao 1: [5]
- Công nhân 1:
- Đợt 1 = Bản sao 2: [6, 7]
- Batch 2 = Replica 2: [8, 9]
- Batch 3 = Replica 2: [10]
- Batch 4 = Replica 2: [11]
DỮ LIỆU: Điều này sẽ tự động đóng cứng các phần tử trên tất cả các công nhân. Mỗi công nhân sẽ đọc toàn bộ tập dữ liệu và chỉ xử lý phân đoạn được chỉ định cho nó. Tất cả các phân đoạn khác sẽ bị loại bỏ. Điều này thường được sử dụng nếu số lượng tệp đầu vào ít hơn số lượng nhân viên và bạn muốn phân tích dữ liệu tốt hơn trên tất cả các nhân viên. Nhược điểm là toàn bộ tập dữ liệu sẽ được đọc trên mỗi worker. Ví dụ: hãy để chúng tôi phân phối 1 tệp trên 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Đặt tổng số bản sao đồng bộ là 2.
- Công nhân 0:
- Đợt 1 = Bản sao 1: [0, 1]
- Batch 2 = Replica 1: [4, 5]
- Batch 3 = Replica 1: [8, 9]
- Công nhân 1:
- Đợt 1 = Bản sao 2: [2, 3]
- Batch 2 = Replica 2: [6, 7]
- Batch 3 = Replica 2: [10, 11]
TẮT: Nếu bạn tắt tính năng tự động sạc, mỗi nhân viên sẽ xử lý tất cả dữ liệu. Ví dụ: hãy để chúng tôi phân phối 1 tệp trên 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Hãy để tổng số bản sao được đồng bộ hóa là 2. Sau đó, mỗi công nhân sẽ thấy phân phối sau:
- Công nhân 0:
- Đợt 1 = Bản sao 1: [0, 1]
- Batch 2 = Replica 1: [2, 3]
- Đợt 3 = Bản sao 1: [4, 5]
- Đợt 4 = Bản sao 1: [6, 7]
- Đợt 5 = Bản sao 1: [8, 9]
Đợt 6 = Bản sao 1: [10, 11]
Công nhân 1:
Đợt 1 = Bản sao 2: [0, 1]
Batch 2 = Replica 2: [2, 3]
Batch 3 = Replica 2: [4, 5]
Batch 4 = Replica 2: [6, 7]
Lô 5 = Bản sao 2: [8, 9]
Batch 6 = Replica 2: [10, 11]
Tìm nạp trước
Theo mặc định, tf.distribute
thêm một chuyển đổi tìm nạp trước vào cuối phiên bản tf.data.Dataset
do người dùng cung cấp. Đối số cho phép chuyển đổi tìm nạp trước là buffer_size
bằng với số lượng bản sao được đồng bộ hóa.
tf.distribute.Strategy.distribute_datasets_from_function
Cách sử dụng
API này nhận một hàm đầu vào và trả về một cá thể tf.distribute.DistributedDataset
. Hàm đầu vào mà người dùng truyền vào có đối số tf.distribute.InputContext
và phải trả về một cá thể tf.data.Dataset
. Với API này, tf.distribute
không thực hiện thêm bất kỳ thay đổi nào đối với cá thể tf.data.Dataset
của người dùng được trả về từ hàm đầu vào. Người dùng có trách nhiệm phân lô và chia nhỏ tập dữ liệu. tf.distribute
gọi hàm đầu vào trên thiết bị CPU của từng công nhân. Ngoài việc cho phép người dùng chỉ định lô-gic và phân bổ theo lô của riêng họ, API này cũng thể hiện khả năng mở rộng và hiệu suất tốt hơn so với tf.distribute.Strategy.experimental_distribute_dataset
khi được sử dụng để đào tạo nhiều nhân viên.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Tính chất
Hàng loạt
Ví dụ tf.data.Dataset
là giá trị trả về của hàm đầu vào nên được phân lô bằng cách sử dụng kích thước lô bản sao cho mỗi bản sao. Kích thước lô mỗi bản sao là kích thước lô toàn cầu chia cho số bản sao đang tham gia đào tạo đồng bộ. Điều này là do tf.distribute
gọi hàm đầu vào trên thiết bị CPU của mỗi công nhân. Tập dữ liệu được tạo trên một worker nhất định phải sẵn sàng để sử dụng bởi tất cả các bản sao trên worker đó.
Mài giũa
Đối tượng tf.distribute.InputContext
được truyền ngầm làm đối số cho hàm đầu vào của người dùng được tạo bởi tf.distribute
. Nó có thông tin về số lượng công nhân, id công nhân hiện tại, v.v. Hàm đầu vào này có thể xử lý sharding theo chính sách do người dùng đặt bằng cách sử dụng các thuộc tính này là một phần của đối tượng tf.distribute.InputContext
.
Tìm nạp trước
tf.distribute
không thêm phép chuyển đổi tìm nạp trước vào cuối tf.data.Dataset
do hàm đầu vào do người dùng cung cấp trả về.
Trình lặp phân tán
Tương tự như các trường hợp tf.data.Dataset
không được phân phối, bạn sẽ cần tạo một trình lặp trên các cá thể tf.distribute.DistributedDataset
để lặp lại nó và truy cập các phần tử trong tf.distribute.DistributedDataset
. Sau đây là những cách bạn có thể tạo tf.distribute.DistributedIterator
và sử dụng nó để đào tạo mô hình của bạn:
Tập quán
Sử dụng cấu trúc vòng lặp cho Pythonic
Bạn có thể sử dụng vòng lặp Pythonic thân thiện với người dùng để lặp qua tf.distribute.DistributedDataset
. Các phần tử được trả về từ tf.distribute.DistributedIterator
có thể là một tf.Tensor
hoặc tf.distribute.DistributedValues
chứa một giá trị trên mỗi bản sao. Đặt vòng lặp bên trong tf.function
sẽ giúp tăng hiệu suất. Tuy nhiên, break
và return
hiện không được hỗ trợ cho một vòng lặp qua tf.distribute.DistributedDataset
được đặt bên trong tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Sử dụng iter
để tạo một trình lặp rõ ràng
Để lặp lại các phần tử trong cá thể tf.distribute.DistributedDataset
, bạn có thể tạo một tf.distribute.DistributedIterator
bằng cách sử dụng API iter
trên đó. Với một trình lặp rõ ràng, bạn có thể lặp lại cho một số bước cố định. Để lấy phần tử tiếp theo từ một phiên bản tf.distribute.DistributedIterator
dist_iterator
, bạn có thể gọi next(dist_iterator)
, dist_iterator.get_next()
hoặc dist_iterator.get_next_as_optional()
. Hai cái trước về cơ bản giống nhau:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Với next()
hoặc tf.distribute.DistributedIterator.get_next()
, nếu tf.distribute.DistributedIterator
đã kết thúc, lỗi OutOfRange sẽ được đưa ra. Máy khách có thể bắt lỗi ở phía python và tiếp tục thực hiện các công việc khác như kiểm tra và đánh giá. Tuy nhiên, điều này sẽ không hoạt động nếu bạn đang sử dụng vòng lặp đào tạo máy chủ (tức là chạy nhiều bước cho mỗi tf.function
. Chức năng), giống như sau:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
chứa nhiều bước bằng cách gói phần thân bước bên trong tf.range
. Trong trường hợp này, các lần lặp khác nhau trong vòng lặp không có phụ thuộc có thể bắt đầu song song, do đó, lỗi OutOfRange có thể được kích hoạt trong các lần lặp sau trước khi quá trình tính toán các lần lặp trước kết thúc. Một khi lỗi OutOfRange được tạo ra, tất cả các hoạt động trong hàm sẽ bị chấm dứt ngay lập tức. Nếu đây là một số trường hợp mà bạn muốn tránh, một giải pháp thay thế không gây ra lỗi OutOfRange là tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
trả về một tf.experimental.Optional
chứa phần tử tiếp theo hoặc không có giá trị nếu tf.distribute.DistributedIterator
đã kết thúc.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Sử dụng thuộc tính element_spec
Nếu bạn chuyển các phần tử của một tập dữ liệu phân tán cho một tf.function
và muốn đảm bảo tf.TypeSpec
, bạn có thể chỉ định đối số input_signature
của tf. tf.function
. Đầu ra của tập dữ liệu phân tán là tf.distribute.DistributedValues
có thể đại diện cho đầu vào cho một thiết bị hoặc nhiều thiết bị. Để nhận tf.TypeSpec
tương ứng với giá trị phân tán này, bạn có thể sử dụng thuộc tính element_spec
của tập dữ liệu phân tán hoặc đối tượng trình vòng lặp phân tán.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Hàng loạt một phần
Gặp phải các lô một phần khi các cá thể tf.data.Dataset
mà người dùng tạo có thể chứa các kích thước lô không chia hết cho số lượng bản sao hoặc khi số lượng bản sao của cá thể tập dữ liệu không chia hết cho kích thước lô. Điều này có nghĩa là khi tập dữ liệu được phân phối trên nhiều bản sao, lần gọi next
trên một số trình vòng lặp sẽ dẫn đến lỗi OutOfRangeError. Để xử lý trường hợp sử dụng này, tf.distribute
trả về lô giả có kích thước lô 0 trên các bản sao không có thêm bất kỳ dữ liệu nào để xử lý.
Đối với trường hợp worker đơn lẻ, nếu dữ liệu không được trả về bởi cuộc gọi next
trên trình vòng lặp, các lô giả có kích thước lô 0 được tạo và sử dụng cùng với dữ liệu thực trong tập dữ liệu. Trong trường hợp các lô một phần, lô dữ liệu tổng thể cuối cùng sẽ chứa dữ liệu thực cùng với các lô dữ liệu giả. Điều kiện dừng để xử lý dữ liệu bây giờ sẽ kiểm tra xem có bất kỳ bản sao nào có dữ liệu hay không. Nếu không có dữ liệu trên bất kỳ bản sao nào, lỗi OutOfRange sẽ được đưa ra.
Đối với trường hợp multi worker, giá trị boolean đại diện cho sự hiện diện của dữ liệu trên từng worker được tổng hợp bằng cách sử dụng giao tiếp bản sao chéo và giá trị này được sử dụng để xác định xem tất cả các worker đã xử lý xong tập dữ liệu được phân phối hay chưa. Vì điều này liên quan đến việc trao đổi thông tin giữa các nhân viên với nhau nên có một số hình phạt liên quan đến hiệu suất.
Cảnh báo
Khi sử dụng API
tf.distribute.Strategy.experimental_distribute_dataset
với thiết lập nhiều worker, người dùng chuyển mộttf.data.Dataset
đọc từ các tệp. Nếutf.data.experimental.AutoShardPolicy
được đặt thànhAUTO
hoặcFILE
, kích thước lô thực tế trên mỗi bước có thể nhỏ hơn kích thước lô toàn cầu do người dùng xác định. Điều này có thể xảy ra khi các phần tử còn lại trong tệp nhỏ hơn kích thước lô chung. Người dùng có thể sử dụng hết tập dữ liệu mà không phụ thuộc vào số bước chạy hoặc đặttf.data.experimental.AutoShardPolicy
thànhDATA
để giải quyết vấn đề đó.Chuyển đổi tập dữ liệu trạng thái hiện không được hỗ trợ với
tf.distribute
và bất kỳ hoạt động trạng thái nào mà tập dữ liệu có thể có hiện đang bị bỏ qua. Ví dụ: nếu tập dữ liệu của bạn cómap_fn
sử dụngtf.random.uniform
để xoay hình ảnh, thì bạn có biểu đồ tập dữ liệu phụ thuộc vào trạng thái (tức là hạt ngẫu nhiên) trên máy cục bộ nơi quá trình python đang được thực thi.Thử nghiệm
tf.data.experimental.OptimizationOptions
Các tùy chọn bị tắt theo mặc định có thể trong một số ngữ cảnh nhất định - chẳng hạn như khi được sử dụng cùng vớitf.distribute
- gây ra sự suy giảm hiệu suất. Bạn chỉ nên bật chúng sau khi xác thực rằng chúng có lợi cho hiệu suất khối lượng công việc của bạn trong cài đặt phân phối.Vui lòng tham khảo hướng dẫn này để biết cách tối ưu hóa đường dẫn đầu vào của bạn với
tf.data
nói chung. Một số mẹo bổ sung:Nếu bạn có nhiều worker và đang sử dụng
tf.data.Dataset.list_files
để tạo tập dữ liệu từ tất cả các tệp khớp với một hoặc nhiều mẫu hình cầu, hãy nhớ đặt đối sốseed
hoặc đặtshuffle=False
để mỗi worker phân đoạn tệp một cách nhất quán.Nếu đường dẫn đầu vào của bạn bao gồm cả việc trộn dữ liệu ở mức bản ghi và phân tích cú pháp dữ liệu, trừ khi dữ liệu chưa được phân tích cú pháp lớn hơn đáng kể so với dữ liệu đã phân tích cú pháp (thường không phải như vậy), hãy xáo trộn trước rồi mới phân tích cú pháp, như thể hiện trong ví dụ sau. Điều này có thể có lợi cho việc sử dụng bộ nhớ và hiệu suất.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
duy trì một bộ đệm bên trong gồm các phần tửbuffer_size
và do đó việc giảm dung lượngbuffer_size
có thể giúp giảm bớt sự cố OOM.Thứ tự dữ liệu được xử lý bởi công nhân khi sử dụng
tf.distribute.experimental_distribute_dataset
hoặctf.distribute.distribute_datasets_from_function
không được đảm bảo. Điều này thường được yêu cầu nếu bạn đang sử dụngtf.distribute
để mở rộng dự đoán. Tuy nhiên, bạn có thể chèn một chỉ mục cho từng phần tử trong các đầu ra theo thứ tự và theo thứ tự. Đoạn mã sau đây là một ví dụ về cách đặt hàng đầu ra.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Làm cách nào để phân phối dữ liệu của tôi nếu tôi không sử dụng phiên bản tf.data.Dataset chuẩn?
Đôi khi người dùng không thể sử dụng tf.data.Dataset
để đại diện cho đầu vào của họ và sau đó là các API được đề cập ở trên để phân phối tập dữ liệu cho nhiều thiết bị. Trong những trường hợp như vậy, bạn có thể sử dụng bộ căng thô hoặc đầu vào từ máy phát điện.
Sử dụng thử nghiệm_distribute_values_from_ Chức năng cho các đầu vào tensor tùy ý
strategy.run
lược.run chấp nhận tf.distribute.DistributedValues
là đầu ra của next(iterator)
. Để chuyển các giá trị tensor, hãy sử dụng experimental_distribute_values_from_function
nghiệm_distribute_values_from_ Chức năng để tạo tf.distribute.DistributedValues
từ các tensor thô.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Sử dụng tf.data.Dataset.from_generator nếu đầu vào của bạn là từ trình tạo
Nếu bạn có một hàm trình tạo mà bạn muốn sử dụng, bạn có thể tạo một tf.data.Dataset
bằng cách sử dụng API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.