Đầu vào phân tán

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Các API tf.distribute cung cấp một cách dễ dàng để người dùng mở rộng quy mô đào tạo của họ từ một máy thành nhiều máy. Khi mở rộng mô hình của họ, người dùng cũng phải phân phối đầu vào của họ trên nhiều thiết bị. tf.distribute cung cấp các API mà bạn có thể tự động phân phối thông tin đầu vào của mình trên các thiết bị.

Hướng dẫn này sẽ chỉ cho bạn các cách khác nhau mà bạn có thể tạo tập dữ liệu phân tán và trình vòng lặp bằng cách sử dụng các API tf.distribute . Ngoài ra, các chủ đề sau sẽ được đề cập:

Hướng dẫn này không đề cập đến việc sử dụng đầu vào phân tán với API Keras.

Tập dữ liệu được phân phối

Để sử dụng các API tf.distribute để mở rộng quy mô, người dùng nên sử dụng tf.data.Dataset để đại diện cho đầu vào của họ. tf.distribute đã được thực hiện để hoạt động hiệu quả với tf.data.Dataset (ví dụ: tự động tìm nạp trước dữ liệu trên mỗi thiết bị tăng tốc) với việc tối ưu hóa hiệu suất thường xuyên được tích hợp vào quá trình triển khai. Nếu bạn có trường hợp sử dụng để sử dụng thứ gì đó khác với tf.data.Dataset , vui lòng tham khảo phần sau của hướng dẫn này. Trong vòng lặp đào tạo không phân tán, trước tiên người dùng tạo một cá thể tf.data.Dataset và sau đó lặp lại các phần tử. Ví dụ:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Để cho phép người dùng sử dụng chiến lược tf.distribute với những thay đổi tối thiểu đối với mã hiện có của người dùng, hai API đã được giới thiệu sẽ phân phối một tf.data.Dataset và trả về một đối tượng tập dữ liệu phân tán. Sau đó, người dùng có thể lặp lại phiên bản tập dữ liệu phân tán này và đào tạo mô hình của họ như trước. Bây giờ chúng ta hãy xem xét hai API - tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.distribute_datasets_from_function năng chi tiết hơn:

tf.distribute.Strategy.experimental_distribute_dataset

Cách sử dụng

API này lấy một cá thể tf.data.Dataset làm đầu vào và trả về một cá thể tf.distribute.DistributedDataset . Bạn nên nhập hàng loạt tập dữ liệu đầu vào với giá trị bằng với kích thước lô chung. Kích thước lô toàn cầu này là số lượng mẫu mà bạn muốn xử lý trên tất cả các thiết bị trong 1 bước. Bạn có thể lặp lại tập dữ liệu được phân phối này theo kiểu iter hoặc tạo một trình lặp bằng cách sử dụng nó. Đối tượng được trả về không phải là một tf.data.Dataset và không hỗ trợ bất kỳ API nào khác có thể chuyển đổi hoặc kiểm tra tập dữ liệu theo bất kỳ cách nào. Đây là API được đề xuất nếu bạn không có các cách cụ thể mà bạn muốn phân đoạn đầu vào của mình thành các bản sao khác nhau.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

Tính chất

Hàng loạt

tf.distribute bắt lại cá thể tf.data.Dataset đầu vào với kích thước lô mới bằng kích thước lô chung chia cho số lượng bản sao đồng bộ. Số lượng bản sao được đồng bộ hóa bằng số lượng thiết bị đang tham gia phân bổ gradient trong quá trình đào tạo. Khi người dùng gọi next trên trình lặp phân tán, kích thước lô dữ liệu trên mỗi bản sao sẽ được trả về trên mỗi bản sao. Số lượng bản sao của tập dữ liệu được phục hồi sẽ luôn là bội số của số bản sao. Dưới đây là một số ví dụ:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Không có phân phối:
    • Đợt 1: [0, 1, 2, 3]
    • Đợt 2: [4, 5]
    • Với phân phối trên 2 bản sao. Lô cuối cùng ([4, 5]) được chia thành 2 bản sao.

    • Đợt 1:

      • Bản sao 1: [0, 1]
      • Bản sao 2: [2, 3]
    • Đợt 2:

      • Bản sao 2: [4]
      • Bản sao 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Không có phân phối:
    • Đợt 1: [[0], [1], [2], [3]]
    • Với phân phối trên 5 bản sao:
    • Đợt 1:
      • Bản sao 1: [0]
      • Bản sao 2: [1]
      • Bản sao 3: [2]
      • Bản sao 4: [3]
      • Bản sao 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Không có phân phối:
    • Đợt 1: [0, 1, 2, 3]
    • Đợt 2: [4, 5, 6, 7]
    • Với phân phối trên 3 bản sao:
    • Đợt 1:
      • Bản sao 1: [0, 1]
      • Bản sao 2: [2, 3]
      • Bản sao 3: []
    • Đợt 2:
      • Bản sao 1: [4, 5]
      • Bản sao 2: [6, 7]
      • Bản sao 3: []

Việc so khớp tập dữ liệu có độ phức tạp về không gian tăng tuyến tính với số lượng bản sao. Điều này có nghĩa là đối với trường hợp sử dụng đào tạo nhiều nhân viên, đường ống đầu vào có thể gặp lỗi OOM.

Mài giũa

tf.distribute cũng tự động cứng bộ dữ liệu đầu vào trong quá trình đào tạo nhiều nhân viên với MultiWorkerMirroredStrategyTPUStrategy . Mỗi tập dữ liệu được tạo trên thiết bị CPU của nhân viên. Tự động tích hợp tập dữ liệu qua một tập hợp công nhân có nghĩa là mỗi công nhân được chỉ định một tập hợp con của toàn bộ tập dữ liệu (nếu đặt đúng tf.data.experimental.AutoShardPolicy ). Điều này là để đảm bảo rằng ở mỗi bước, kích thước lô toàn cầu của các phần tử tập dữ liệu không chồng chéo sẽ được xử lý bởi từng công nhân. Tự động sạc có một số tùy chọn khác nhau có thể được chỉ định bằng cách sử dụng tf.data.experimental.DistributeOptions . Lưu ý rằng không có tự động sạc trong đào tạo nhiều nhân viên với ParameterServerStrategy và bạn có thể tìm thấy thêm thông tin về cách tạo tập dữ liệu với chiến lược này trong hướng dẫn Chiến lược máy chủ tham số .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Có ba tùy chọn khác nhau mà bạn có thể đặt cho tf.data.experimental.AutoShardPolicy :

  • AUTO: Đây là tùy chọn mặc định có nghĩa là một nỗ lực sẽ được thực hiện để phân đoạn bằng FILE. Nỗ lực phân đoạn bằng FILE không thành công nếu tập dữ liệu dựa trên tệp không được phát hiện. tf.distribute sau đó sẽ trở lại sharding bởi DATA. Lưu ý rằng nếu tập dữ liệu đầu vào là dựa trên tệp nhưng số tệp ít hơn số công nhân, một InvalidArgumentError không hợp lệ sẽ được đưa ra. Nếu điều này xảy ra, hãy đặt chính sách thành AutoShardPolicy.DATA một cách rõ ràng hoặc chia nguồn đầu vào của bạn thành các tệp nhỏ hơn sao cho số tệp nhiều hơn số công nhân.
  • FILE: Đây là tùy chọn nếu bạn muốn chia nhỏ các tệp đầu vào trên tất cả các worker. Bạn nên sử dụng tùy chọn này nếu số lượng tệp đầu vào lớn hơn nhiều so với số lượng nhân công và dữ liệu trong tệp được phân bổ đồng đều. Nhược điểm của tùy chọn này là có công nhân nhàn rỗi nếu dữ liệu trong các tệp không được phân phối đồng đều. Nếu số lượng tệp ít hơn số lượng công nhân, một InvalidArgumentError không hợp lệ sẽ xuất hiện. Nếu điều này xảy ra, hãy đặt chính sách thành AutoShardPolicy.DATA một cách rõ ràng. Ví dụ: chúng ta hãy phân phối 2 tệp cho 2 công nhân với mỗi tệp 1 bản sao. Tệp 1 chứa [0, 1, 2, 3, 4, 5] và Tệp 2 chứa [6, 7, 8, 9, 10, 11]. Đặt tổng số bản sao được đồng bộ hóa là 2 và kích thước lô toàn cầu là 4.

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Batch 2 = Replica 1: [2, 3]
    • Batch 3 = Replica 1: [4]
    • Đợt 4 = Bản sao 1: [5]
    • Công nhân 1:
    • Đợt 1 = Bản sao 2: [6, 7]
    • Batch 2 = Replica 2: [8, 9]
    • Batch 3 = Replica 2: [10]
    • Batch 4 = Replica 2: [11]
  • DỮ LIỆU: Điều này sẽ tự động đóng cứng các phần tử trên tất cả các công nhân. Mỗi công nhân sẽ đọc toàn bộ tập dữ liệu và chỉ xử lý phân đoạn được chỉ định cho nó. Tất cả các phân đoạn khác sẽ bị loại bỏ. Điều này thường được sử dụng nếu số lượng tệp đầu vào ít hơn số lượng nhân viên và bạn muốn phân tích dữ liệu tốt hơn trên tất cả các nhân viên. Nhược điểm là toàn bộ tập dữ liệu sẽ được đọc trên mỗi worker. Ví dụ: hãy để chúng tôi phân phối 1 tệp trên 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Đặt tổng số bản sao đồng bộ là 2.

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Batch 2 = Replica 1: [4, 5]
    • Batch 3 = Replica 1: [8, 9]
    • Công nhân 1:
    • Đợt 1 = Bản sao 2: [2, 3]
    • Batch 2 = Replica 2: [6, 7]
    • Batch 3 = Replica 2: [10, 11]
  • TẮT: Nếu bạn tắt tính năng tự động sạc, mỗi nhân viên sẽ xử lý tất cả dữ liệu. Ví dụ: hãy để chúng tôi phân phối 1 tệp trên 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Hãy để tổng số bản sao được đồng bộ hóa là 2. Sau đó, mỗi công nhân sẽ thấy phân phối sau:

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Batch 2 = Replica 1: [2, 3]
    • Đợt 3 = Bản sao 1: [4, 5]
    • Đợt 4 = Bản sao 1: [6, 7]
    • Đợt 5 = Bản sao 1: [8, 9]
    • Đợt 6 = Bản sao 1: [10, 11]

    • Công nhân 1:

    • Đợt 1 = Bản sao 2: [0, 1]

    • Batch 2 = Replica 2: [2, 3]

    • Batch 3 = Replica 2: [4, 5]

    • Batch 4 = Replica 2: [6, 7]

    • Lô 5 = Bản sao 2: [8, 9]

    • Batch 6 = Replica 2: [10, 11]

Tìm nạp trước

Theo mặc định, tf.distribute thêm một chuyển đổi tìm nạp trước vào cuối phiên bản tf.data.Dataset do người dùng cung cấp. Đối số cho phép chuyển đổi tìm nạp trước là buffer_size bằng với số lượng bản sao được đồng bộ hóa.

tf.distribute.Strategy.distribute_datasets_from_function

Cách sử dụng

API này nhận một hàm đầu vào và trả về một cá thể tf.distribute.DistributedDataset . Hàm đầu vào mà người dùng truyền vào có đối số tf.distribute.InputContext và phải trả về một cá thể tf.data.Dataset . Với API này, tf.distribute không thực hiện thêm bất kỳ thay đổi nào đối với cá thể tf.data.Dataset của người dùng được trả về từ hàm đầu vào. Người dùng có trách nhiệm phân lô và chia nhỏ tập dữ liệu. tf.distribute gọi hàm đầu vào trên thiết bị CPU của từng công nhân. Ngoài việc cho phép người dùng chỉ định lô-gic và phân bổ theo lô của riêng họ, API này cũng thể hiện khả năng mở rộng và hiệu suất tốt hơn so với tf.distribute.Strategy.experimental_distribute_dataset khi được sử dụng để đào tạo nhiều nhân viên.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Tính chất

Hàng loạt

Ví dụ tf.data.Dataset là giá trị trả về của hàm đầu vào nên được phân lô bằng cách sử dụng kích thước lô bản sao cho mỗi bản sao. Kích thước lô mỗi bản sao là kích thước lô toàn cầu chia cho số bản sao đang tham gia đào tạo đồng bộ. Điều này là do tf.distribute gọi hàm đầu vào trên thiết bị CPU của mỗi công nhân. Tập dữ liệu được tạo trên một worker nhất định phải sẵn sàng để sử dụng bởi tất cả các bản sao trên worker đó.

Mài giũa

Đối tượng tf.distribute.InputContext được truyền ngầm làm đối số cho hàm đầu vào của người dùng được tạo bởi tf.distribute . Nó có thông tin về số lượng công nhân, id công nhân hiện tại, v.v. Hàm đầu vào này có thể xử lý sharding theo chính sách do người dùng đặt bằng cách sử dụng các thuộc tính này là một phần của đối tượng tf.distribute.InputContext .

Tìm nạp trước

tf.distribute không thêm phép chuyển đổi tìm nạp trước vào cuối tf.data.Dataset do hàm đầu vào do người dùng cung cấp trả về.

Trình lặp phân tán

Tương tự như các trường hợp tf.data.Dataset không được phân phối, bạn sẽ cần tạo một trình lặp trên các cá thể tf.distribute.DistributedDataset để lặp lại nó và truy cập các phần tử trong tf.distribute.DistributedDataset . Sau đây là những cách bạn có thể tạo tf.distribute.DistributedIterator và sử dụng nó để đào tạo mô hình của bạn:

Tập quán

Sử dụng cấu trúc vòng lặp cho Pythonic

Bạn có thể sử dụng vòng lặp Pythonic thân thiện với người dùng để lặp qua tf.distribute.DistributedDataset . Các phần tử được trả về từ tf.distribute.DistributedIterator có thể là một tf.Tensor hoặc tf.distribute.DistributedValues chứa một giá trị trên mỗi bản sao. Đặt vòng lặp bên trong tf.function sẽ giúp tăng hiệu suất. Tuy nhiên, breakreturn hiện không được hỗ trợ cho một vòng lặp qua tf.distribute.DistributedDataset được đặt bên trong tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Sử dụng iter để tạo một trình lặp rõ ràng

Để lặp lại các phần tử trong cá thể tf.distribute.DistributedDataset , bạn có thể tạo một tf.distribute.DistributedIterator bằng cách sử dụng API iter trên đó. Với một trình lặp rõ ràng, bạn có thể lặp lại cho một số bước cố định. Để lấy phần tử tiếp theo từ một phiên bản tf.distribute.DistributedIterator dist_iterator , bạn có thể gọi next(dist_iterator) , dist_iterator.get_next() hoặc dist_iterator.get_next_as_optional() . Hai cái trước về cơ bản giống nhau:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Với next() hoặc tf.distribute.DistributedIterator.get_next() , nếu tf.distribute.DistributedIterator đã kết thúc, lỗi OutOfRange sẽ được đưa ra. Máy khách có thể bắt lỗi ở phía python và tiếp tục thực hiện các công việc khác như kiểm tra và đánh giá. Tuy nhiên, điều này sẽ không hoạt động nếu bạn đang sử dụng vòng lặp đào tạo máy chủ (tức là chạy nhiều bước cho mỗi tf.function . Chức năng), giống như sau:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn chứa nhiều bước bằng cách gói phần thân bước bên trong tf.range . Trong trường hợp này, các lần lặp khác nhau trong vòng lặp không có phụ thuộc có thể bắt đầu song song, do đó, lỗi OutOfRange có thể được kích hoạt trong các lần lặp sau trước khi quá trình tính toán các lần lặp trước kết thúc. Một khi lỗi OutOfRange được tạo ra, tất cả các hoạt động trong hàm sẽ bị chấm dứt ngay lập tức. Nếu đây là một số trường hợp mà bạn muốn tránh, một giải pháp thay thế không gây ra lỗi OutOfRange là tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional trả về một tf.experimental.Optional chứa phần tử tiếp theo hoặc không có giá trị nếu tf.distribute.DistributedIterator đã kết thúc.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Sử dụng thuộc tính element_spec

Nếu bạn chuyển các phần tử của một tập dữ liệu phân tán cho một tf.function và muốn đảm bảo tf.TypeSpec , bạn có thể chỉ định đối số input_signature của tf. tf.function . Đầu ra của tập dữ liệu phân tán là tf.distribute.DistributedValues có thể đại diện cho đầu vào cho một thiết bị hoặc nhiều thiết bị. Để nhận tf.TypeSpec tương ứng với giá trị phân tán này, bạn có thể sử dụng thuộc tính element_spec của tập dữ liệu phân tán hoặc đối tượng trình vòng lặp phân tán.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Hàng loạt một phần

Gặp phải các lô một phần khi các cá thể tf.data.Dataset mà người dùng tạo có thể chứa các kích thước lô không chia hết cho số lượng bản sao hoặc khi số lượng bản sao của cá thể tập dữ liệu không chia hết cho kích thước lô. Điều này có nghĩa là khi tập dữ liệu được phân phối trên nhiều bản sao, lần gọi next trên một số trình vòng lặp sẽ dẫn đến lỗi OutOfRangeError. Để xử lý trường hợp sử dụng này, tf.distribute trả về lô giả có kích thước lô 0 trên các bản sao không có thêm bất kỳ dữ liệu nào để xử lý.

Đối với trường hợp worker đơn lẻ, nếu dữ liệu không được trả về bởi cuộc gọi next trên trình vòng lặp, các lô giả có kích thước lô 0 được tạo và sử dụng cùng với dữ liệu thực trong tập dữ liệu. Trong trường hợp các lô một phần, lô dữ liệu tổng thể cuối cùng sẽ chứa dữ liệu thực cùng với các lô dữ liệu giả. Điều kiện dừng để xử lý dữ liệu bây giờ sẽ kiểm tra xem có bất kỳ bản sao nào có dữ liệu hay không. Nếu không có dữ liệu trên bất kỳ bản sao nào, lỗi OutOfRange sẽ được đưa ra.

Đối với trường hợp multi worker, giá trị boolean đại diện cho sự hiện diện của dữ liệu trên từng worker được tổng hợp bằng cách sử dụng giao tiếp bản sao chéo và giá trị này được sử dụng để xác định xem tất cả các worker đã xử lý xong tập dữ liệu được phân phối hay chưa. Vì điều này liên quan đến việc trao đổi thông tin giữa các nhân viên với nhau nên có một số hình phạt liên quan đến hiệu suất.

Cảnh báo

  • Khi sử dụng API tf.distribute.Strategy.experimental_distribute_dataset với thiết lập nhiều worker, người dùng chuyển một tf.data.Dataset đọc từ các tệp. Nếu tf.data.experimental.AutoShardPolicy được đặt thành AUTO hoặc FILE , kích thước lô thực tế trên mỗi bước có thể nhỏ hơn kích thước lô toàn cầu do người dùng xác định. Điều này có thể xảy ra khi các phần tử còn lại trong tệp nhỏ hơn kích thước lô chung. Người dùng có thể sử dụng hết tập dữ liệu mà không phụ thuộc vào số bước chạy hoặc đặt tf.data.experimental.AutoShardPolicy thành DATA để giải quyết vấn đề đó.

  • Chuyển đổi tập dữ liệu trạng thái hiện không được hỗ trợ với tf.distribute và bất kỳ hoạt động trạng thái nào mà tập dữ liệu có thể có hiện đang bị bỏ qua. Ví dụ: nếu tập dữ liệu của bạn có map_fn sử dụng tf.random.uniform để xoay hình ảnh, thì bạn có biểu đồ tập dữ liệu phụ thuộc vào trạng thái (tức là hạt ngẫu nhiên) trên máy cục bộ nơi quá trình python đang được thực thi.

  • Thử nghiệm tf.data.experimental.OptimizationOptions Các tùy chọn bị tắt theo mặc định có thể trong một số ngữ cảnh nhất định - chẳng hạn như khi được sử dụng cùng với tf.distribute - gây ra sự suy giảm hiệu suất. Bạn chỉ nên bật chúng sau khi xác thực rằng chúng có lợi cho hiệu suất khối lượng công việc của bạn trong cài đặt phân phối.

  • Vui lòng tham khảo hướng dẫn này để biết cách tối ưu hóa đường dẫn đầu vào của bạn với tf.data nói chung. Một số mẹo bổ sung:

    • Nếu bạn có nhiều worker và đang sử dụng tf.data.Dataset.list_files để tạo tập dữ liệu từ tất cả các tệp khớp với một hoặc nhiều mẫu hình cầu, hãy nhớ đặt đối số seed hoặc đặt shuffle=False để mỗi worker phân đoạn tệp một cách nhất quán.

    • Nếu đường dẫn đầu vào của bạn bao gồm cả việc trộn dữ liệu ở mức bản ghi và phân tích cú pháp dữ liệu, trừ khi dữ liệu chưa được phân tích cú pháp lớn hơn đáng kể so với dữ liệu đã phân tích cú pháp (thường không phải như vậy), hãy xáo trộn trước rồi mới phân tích cú pháp, như thể hiện trong ví dụ sau. Điều này có thể có lợi cho việc sử dụng bộ nhớ và hiệu suất.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) duy trì một bộ đệm bên trong gồm các phần tử buffer_size và do đó việc giảm dung lượng buffer_size có thể giúp giảm bớt sự cố OOM.

  • Thứ tự dữ liệu được xử lý bởi công nhân khi sử dụng tf.distribute.experimental_distribute_dataset hoặc tf.distribute.distribute_datasets_from_function không được đảm bảo. Điều này thường được yêu cầu nếu bạn đang sử dụng tf.distribute để mở rộng dự đoán. Tuy nhiên, bạn có thể chèn một chỉ mục cho từng phần tử trong các đầu ra theo thứ tự và theo thứ tự. Đoạn mã sau đây là một ví dụ về cách đặt hàng đầu ra.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

Làm cách nào để phân phối dữ liệu của tôi nếu tôi không sử dụng phiên bản tf.data.Dataset chuẩn?

Đôi khi người dùng không thể sử dụng tf.data.Dataset để đại diện cho đầu vào của họ và sau đó là các API được đề cập ở trên để phân phối tập dữ liệu cho nhiều thiết bị. Trong những trường hợp như vậy, bạn có thể sử dụng bộ căng thô hoặc đầu vào từ máy phát điện.

Sử dụng thử nghiệm_distribute_values_from_ Chức năng cho các đầu vào tensor tùy ý

strategy.run lược.run chấp nhận tf.distribute.DistributedValues là đầu ra của next(iterator) . Để chuyển các giá trị tensor, hãy sử dụng experimental_distribute_values_from_function nghiệm_distribute_values_from_ Chức năng để tạo tf.distribute.DistributedValues từ các tensor thô.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Sử dụng tf.data.Dataset.from_generator nếu đầu vào của bạn là từ trình tạo

Nếu bạn có một hàm trình tạo mà bạn muốn sử dụng, bạn có thể tạo một tf.data.Dataset bằng cách sử dụng API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.