Hiệu suất tốt hơn với API tf.data

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

GPU và TPU có thể giảm đáng kể thời gian cần thiết để thực hiện một bước đào tạo duy nhất. Để đạt được hiệu suất cao nhất yêu cầu một đường dẫn đầu vào hiệu quả cung cấp dữ liệu cho bước tiếp theo trước khi bước hiện tại kết thúc. API tf.data giúp xây dựng các đường ống đầu vào linh hoạt và hiệu quả. Tài liệu này trình bày cách sử dụng API tf.data để xây dựng các đường ống đầu vào TensorFlow có hiệu suất cao.

Trước khi bạn tiếp tục, hãy xem hướng dẫn Xây dựng đường ống đầu vào TensorFlow để tìm hiểu cách sử dụng API tf.data .

Tài nguyên

Thành lập

import tensorflow as tf

import time

Trong suốt hướng dẫn này, bạn sẽ lặp lại trên một tập dữ liệu và đo lường hiệu suất. Việc tạo ra các điểm chuẩn hiệu suất có thể tái tạo có thể khó khăn. Các yếu tố khác nhau ảnh hưởng đến khả năng tái lập bao gồm:

  • Tải CPU hiện tại
  • Lưu lượng mạng
  • Các cơ chế phức tạp, chẳng hạn như bộ nhớ cache

Để có được một điểm chuẩn có thể tái tạo, bạn sẽ xây dựng một ví dụ nhân tạo.

Bộ dữ liệu

Bắt đầu với việc xác định một lớp kế thừa từ tf.data.Dataset được gọi là ArtificialDataset . Tập dữ liệu này:

  • Tạo mẫu num_samples (mặc định là 3)
  • Ngủ một lúc trước khi mục đầu tiên mô phỏng việc mở tệp
  • Ngủ một lúc trước khi tạo từng mục để mô phỏng việc đọc dữ liệu từ một tệp
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Tập dữ liệu này tương tự như tập dữ liệu tf.data.Dataset.range , thêm độ trễ cố định ở đầu và giữa mỗi mẫu.

Vòng lặp đào tạo

Tiếp theo, viết một vòng lặp huấn luyện giả đo lường thời gian cần thiết để lặp qua một tập dữ liệu. Thời gian đào tạo được mô phỏng.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Tối ưu hóa hiệu suất

Để thể hiện cách tối ưu hóa hiệu suất, bạn sẽ cải thiện hiệu suất của ArtificialDataset .

Cách tiếp cận ngây thơ

Bắt đầu với một đường dẫn đơn giản không sử dụng thủ thuật, lặp lại tập dữ liệu như hiện tại.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

Dưới đây là cách bạn đã sử dụng thời gian thực hiện:

Biểu đồ thời gian thực thi dữ liệu - một phương pháp ngây thơ

Cốt truyện cho thấy rằng thực hiện một bước đào tạo bao gồm:

  • Mở tệp nếu nó chưa được mở
  • Tìm nạp một mục nhập dữ liệu từ tệp
  • Sử dụng dữ liệu để đào tạo

Tuy nhiên, trong một triển khai đồng bộ ngây thơ như ở đây, trong khi đường ống của bạn đang tìm nạp dữ liệu, thì mô hình của bạn đang ở chế độ chờ. Ngược lại, trong khi mô hình của bạn đang được đào tạo, đường dẫn đầu vào sẽ không hoạt động. Do đó, thời gian của bước đào tạo là tổng thời gian mở, đọc và đào tạo.

Các phần tiếp theo xây dựng trên đường ống đầu vào này, minh họa các phương pháp hay nhất để thiết kế đường ống đầu vào TensorFlow hiệu quả.

Tìm nạp trước

Tìm nạp trước chồng chéo quá trình tiền xử lý và thực thi mô hình của một bước đào tạo. Trong khi mô hình đang thực hiện bước huấn luyện s , thì đường dẫn đầu vào đang đọc dữ liệu cho bước s+1 . Làm như vậy sẽ giảm thời gian từng bước xuống mức tối đa (trái ngược với tổng) của quá trình đào tạo và thời gian cần thiết để trích xuất dữ liệu.

API tf.data cung cấp chuyển đổi tf.data.Dataset.prefetch . Nó có thể được sử dụng để tách thời gian dữ liệu được tạo ra từ thời điểm dữ liệu được sử dụng. Đặc biệt, quá trình chuyển đổi sử dụng một chuỗi nền và một bộ đệm nội bộ để tìm nạp trước các phần tử từ tập dữ liệu đầu vào trước thời điểm chúng được yêu cầu. Số phần tử cần tìm nạp trước phải bằng (hoặc có thể lớn hơn) số lô được tiêu thụ bởi một bước đào tạo duy nhất. Bạn có thể điều chỉnh thủ công giá trị này hoặc đặt nó thành tf.data.AUTOTUNE , điều này sẽ nhắc thời gian chạy tf.data điều chỉnh giá trị động trong thời gian chạy.

Lưu ý rằng chuyển đổi tìm nạp trước mang lại lợi ích bất cứ khi nào có cơ hội trùng lặp công việc của "nhà sản xuất" với công việc của "người tiêu dùng".

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

Biểu đồ thời gian thực thi dữ liệu - phương pháp tìm nạp trước

Bây giờ, khi biểu đồ thời gian thực thi dữ liệu hiển thị, trong khi bước đào tạo đang chạy cho mẫu 0, thì đường dẫn đầu vào đang đọc dữ liệu cho mẫu 1, v.v.

Khai thác dữ liệu song song

Trong cài đặt thế giới thực, dữ liệu đầu vào có thể được lưu trữ từ xa (ví dụ: trên Google Cloud Storage hoặc HDFS). Đường dẫn tập dữ liệu hoạt động tốt khi đọc dữ liệu cục bộ có thể bị tắc nghẽn trên I / O khi đọc dữ liệu từ xa do sự khác biệt sau giữa lưu trữ cục bộ và lưu trữ từ xa:

  • Time-to-first-byte : Đọc byte đầu tiên của tệp từ bộ lưu trữ từ xa có thể mất nhiều thời gian hơn so với từ bộ lưu trữ cục bộ.
  • Thông lượng đọc : Mặc dù lưu trữ từ xa thường cung cấp băng thông tổng hợp lớn, nhưng việc đọc một tệp đơn lẻ có thể chỉ sử dụng được một phần nhỏ băng thông này.

Ngoài ra, khi các byte thô được tải vào bộ nhớ, cũng có thể cần giải mã hóa và / hoặc giải mã dữ liệu (ví dụ: protobuf ), việc này yêu cầu tính toán bổ sung. Chi phí này hiện hữu bất kể dữ liệu được lưu trữ cục bộ hay từ xa, nhưng có thể tồi tệ hơn trong trường hợp từ xa nếu dữ liệu không được tìm nạp trước một cách hiệu quả.

Để giảm thiểu tác động của các chi phí khai thác dữ liệu khác nhau, phép chuyển đổi tf.data.Dataset.interleave có thể được sử dụng để song song hóa bước tải dữ liệu, xen kẽ nội dung của các bộ dữ liệu khác (chẳng hạn như trình đọc tệp dữ liệu). Số lượng bộ dữ liệu chồng chéo có thể được chỉ định bằng đối số cycle_length , trong khi mức độ song song có thể được chỉ định bằng đối số num_parallel_calls . Tương tự như phép chuyển đổi prefetch , phép chuyển đổi interleave hỗ trợ tf.data.AUTOTUNE , sẽ ủy quyền quyết định về mức độ song song sẽ sử dụng cho thời gian chạy tf.data .

Xen kẽ tuần tự

Các đối số mặc định của biến đổi tf.data.Dataset.interleave làm cho nó xen kẽ các mẫu đơn từ hai tập dữ liệu một cách tuần tự.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

Biểu đồ thời gian thực thi dữ liệu - xen kẽ tuần tự

Biểu đồ thời gian thực thi dữ liệu này cho phép thể hiện hành vi của phép chuyển đổi interleave , tìm nạp các mẫu thay thế từ hai tập dữ liệu có sẵn. Tuy nhiên, không có cải tiến hiệu suất nào được tham gia ở đây.

Song song xen kẽ

Bây giờ, sử dụng đối số num_parallel_calls của phép biến đổi interleave . Thao tác này tải song song nhiều bộ dữ liệu, giảm thời gian chờ đợi các tệp được mở.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

Biểu đồ thời gian thực thi dữ liệu - phương pháp xen kẽ song song

Lần này, như biểu đồ thời gian thực thi dữ liệu hiển thị, việc đọc hai tập dữ liệu được song song hóa, giảm thời gian xử lý dữ liệu chung.

Chuyển đổi dữ liệu song song

Khi chuẩn bị dữ liệu, các yếu tố đầu vào có thể cần được xử lý trước. Để đạt được điều này, API tf.data cung cấp phép chuyển đổi tf.data.Dataset.map , áp dụng một chức năng do người dùng xác định cho từng phần tử của tập dữ liệu đầu vào. Bởi vì các phần tử đầu vào độc lập với nhau, quá trình tiền xử lý có thể được thực hiện song song trên nhiều lõi CPU. Để thực hiện điều này, tương tự như phép biến đổi prefetch và phép biến đổi interleave , phép biến đổi map cung cấp đối số num_parallel_calls để chỉ định mức độ song song.

Việc chọn giá trị tốt nhất cho đối số num_parallel_calls phụ thuộc vào phần cứng của bạn, đặc điểm của dữ liệu đào tạo của bạn (chẳng hạn như kích thước và hình dạng của nó), chi phí của chức năng bản đồ của bạn và những xử lý khác đang diễn ra trên CPU cùng lúc. Một phương pháp đơn giản là sử dụng số lượng lõi CPU có sẵn. Tuy nhiên, đối với chuyển đổi prefetch và chuyển đổi interleave , chuyển đổi map hỗ trợ tf.data.AUTOTUNE sẽ ủy quyền quyết định về mức độ song song sẽ sử dụng cho thời gian chạy tf.data .

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Ánh xạ tuần tự

Bắt đầu bằng cách sử dụng phép biến đổi map không có song song làm ví dụ về đường cơ sở.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

Biểu đồ thời gian thực thi dữ liệu - phương pháp ánh xạ tuần tự

Đối với cách tiếp cận đơn giản, ở đây, như cốt truyện cho thấy, thời gian dành cho các bước mở, đọc, xử lý trước (ánh xạ) và đào tạo cộng lại với nhau cho một lần lặp.

Ánh xạ song song

Bây giờ, sử dụng cùng một chức năng tiền xử lý nhưng áp dụng nó song song trên nhiều mẫu.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

Thời gian thực thi dữ liệu - ánh xạ song song

Như biểu đồ dữ liệu minh họa, các bước xử lý trước chồng chéo lên nhau, giảm thời gian tổng thể cho một lần lặp.

Bộ nhớ đệm

Phép biến đổi tf.data.Dataset.cache có thể lưu vào bộ nhớ cache một tập dữ liệu, trong bộ nhớ hoặc trên bộ nhớ cục bộ. Điều này sẽ tiết kiệm một số hoạt động (như mở tệp và đọc dữ liệu) khỏi được thực thi trong mỗi kỷ nguyên.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

Thời gian thực thi dữ liệu - phương pháp tập dữ liệu đã lưu trong bộ nhớ cache

Ở đây, biểu đồ thời gian thực thi dữ liệu cho thấy rằng khi bạn lưu vào bộ nhớ cache một tập dữ liệu, các chuyển đổi trước tập dữ liệu trong cache (như mở tệp và đọc dữ liệu) chỉ được thực thi trong kỷ nguyên đầu tiên. Các kỷ nguyên tiếp theo sẽ sử dụng lại dữ liệu được lưu trong bộ nhớ cache bằng cách chuyển đổi cache .

Nếu hàm do người dùng xác định được truyền vào quá trình chuyển đổi map là tốn kém, hãy áp dụng chuyển đổi cache sau khi chuyển đổi map miễn là tập dữ liệu kết quả vẫn có thể vừa với bộ nhớ hoặc bộ nhớ cục bộ. Nếu chức năng do người dùng xác định làm tăng không gian cần thiết để lưu trữ tập dữ liệu vượt quá dung lượng bộ đệm, hãy áp dụng nó sau khi chuyển đổi cache hoặc xem xét xử lý trước dữ liệu của bạn trước công việc đào tạo để giảm mức sử dụng tài nguyên.

Lập bản đồ véc tơ hóa

Việc gọi một chức năng do người dùng xác định được chuyển vào chuyển đổi map có chi phí liên quan đến việc lập lịch và thực thi chức năng do người dùng xác định. Vectơ hóa chức năng do người dùng xác định (nghĩa là để nó hoạt động trên một loạt đầu vào cùng một lúc) và áp dụng chuyển đổi batch trước khi chuyển đổi map .

Để minh họa cho thực tiễn tốt này, tập dữ liệu nhân tạo của bạn không phù hợp. Độ trễ lập lịch là khoảng 10 micro giây (10e-6 giây), ít hơn nhiều so với hàng chục mili giây được sử dụng trong ArtificialDataset , và do đó khó có thể nhìn thấy tác động của nó.

Đối với ví dụ này, hãy sử dụng hàm tf.data.Dataset.range cơ sở và đơn giản hóa vòng lặp huấn luyện thành dạng đơn giản nhất.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Ánh xạ vô hướng

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

Thời gian thực thi dữ liệu - phương pháp bản đồ vô hướng

Biểu đồ trên minh họa những gì đang diễn ra (với ít mẫu hơn) bằng cách sử dụng phương pháp ánh xạ vô hướng. Nó cho thấy rằng chức năng được ánh xạ được áp dụng cho mỗi mẫu. Mặc dù chức năng này rất nhanh, nhưng nó có một số chi phí ảnh hưởng đến hiệu suất thời gian.

Ánh xạ vector

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

Thời gian thực thi dữ liệu - phương pháp bản đồ vector hóa

Lần này, hàm được ánh xạ được gọi một lần và áp dụng cho một lô mẫu. Khi biểu đồ thời gian thực thi dữ liệu hiển thị, trong khi hàm có thể mất nhiều thời gian hơn để thực thi, chi phí chỉ xuất hiện một lần, cải thiện hiệu suất thời gian tổng thể.

Giảm dấu chân bộ nhớ

Một số phép biến đổi, bao gồm interleave , prefetchshuffle , duy trì bộ đệm bên trong của các phần tử. Nếu hàm do người dùng định nghĩa được truyền vào phép biến đổi map làm thay đổi kích thước của các phần tử, thì thứ tự của phép biến đổi bản đồ và các phép biến đổi mà các phần tử đệm ảnh hưởng đến việc sử dụng bộ nhớ. Nói chung, hãy chọn thứ tự dẫn đến dung lượng bộ nhớ thấp hơn, trừ khi thứ tự khác nhau là mong muốn cho hiệu suất.

Lưu vào bộ đệm các tính toán từng phần

Bạn nên lưu vào bộ nhớ cache tập dữ liệu sau khi chuyển đổi map , ngoại trừ trường hợp chuyển đổi này làm cho dữ liệu quá lớn để vừa trong bộ nhớ. Có thể đạt được sự cân bằng nếu chức năng được ánh xạ của bạn có thể được chia thành hai phần: một phần tốn thời gian và một phần tốn bộ nhớ. Trong trường hợp này, bạn có thể xâu chuỗi các phép biến đổi của mình như sau:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Bằng cách này, phần tốn thời gian chỉ được thực thi trong kỷ nguyên đầu tiên và bạn tránh sử dụng quá nhiều dung lượng bộ nhớ cache.

Tóm tắt phương pháp hay nhất

Dưới đây là tóm tắt về các phương pháp hay nhất để thiết kế đường ống đầu vào TensorFlow hiệu quả:

Sao chép các số liệu

Để hiểu sâu hơn về tf.data.Dataset API, bạn có thể sử dụng các đường ống dẫn của riêng mình. Dưới đây là mã được sử dụng để vẽ các hình ảnh từ hướng dẫn này. Nó có thể là một điểm khởi đầu tốt, cho thấy một số cách giải quyết cho những khó khăn chung như:

  • Khả năng tái tạo thời gian thực thi
  • Các hàm được ánh xạ háo hức thực thi
  • chuyển đổi interleave có thể gọi
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Bộ dữ liệu

Tương tự như ArtificialDataset , bạn có thể xây dựng một tập dữ liệu trả về thời gian dành cho mỗi bước.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Tập dữ liệu này cung cấp các mẫu hình dạng [[2, 1], [2, 2], [2, 3]] và kiểu [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Mỗi mẫu là:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Ở đâu:

  • OpenRead là số nhận dạng các bước
  • t0 là dấu thời gian khi bước tương ứng bắt đầu
  • d là thời gian dành cho bước tương ứng
  • i là chỉ mục cá thể
  • e là chỉ số kỷ nguyên (số lần tập dữ liệu đã được lặp lại)
  • s là chỉ số mẫu

Vòng lặp lặp lại

Làm cho vòng lặp phức tạp hơn một chút để tổng hợp tất cả các thời gian. Điều này sẽ chỉ hoạt động với các tập dữ liệu tạo mẫu như đã nêu chi tiết ở trên.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Phương pháp lập kế hoạch

Cuối cùng, xác định một hàm có thể vẽ một dòng thời gian với các giá trị được trả về bởi hàm timelined_benchmark .

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Sử dụng trình bao bọc cho chức năng được ánh xạ

Để chạy hàm được ánh xạ trong một ngữ cảnh háo hức, bạn phải bọc chúng bên trong một lệnh gọi tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

So sánh đường ống

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Ngây thơ

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

Tối ưu hóa

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png