Pomoc chronić Wielkiej Rafy Koralowej z TensorFlow na Kaggle Dołącz Wyzwanie

Lepsza wydajność dzięki API tf.data

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Układy GPU i TPU mogą radykalnie skrócić czas potrzebny na wykonanie pojedynczego etapu uczenia. Osiągnięcie szczytowej wydajności wymaga wydajnego potoku wejściowego, który dostarcza dane do następnego kroku przed zakończeniem bieżącego kroku. Interfejs API tf.data pomaga w budowaniu elastycznych i wydajnych potoków wejściowych. W tym dokumencie pokazano, jak używać interfejsu API tf.data do tworzenia wysoce wydajnych potoków wejściowych TensorFlow.

Zanim przejdziesz dalej, zapoznaj się z przewodnikiem kompilowania potoków wejściowych TensorFlow, aby dowiedzieć się, jak korzystać z interfejsu API tf.data .

Zasoby

Ustawiać

import tensorflow as tf

import time

W tym przewodniku będziesz iterować po zbiorze danych i mierzyć wydajność. Tworzenie powtarzalnych testów wydajności może być trudne. Różne czynniki wpływające na odtwarzalność obejmują:

  • Aktualne obciążenie procesora
  • Ruch sieciowy
  • Złożone mechanizmy, takie jak pamięć podręczna

Aby uzyskać powtarzalny test porównawczy, zbudujesz sztuczny przykład.

Zbiór danych

Zacznij od zdefiniowania klasy dziedziczącej z tf.data.Dataset o nazwie ArtificialDataset . Ten zbiór danych:

  • Generuje próbki num_samples (domyślnie 3)
  • Uśpiony przez jakiś czas przed pierwszym elementem, aby zasymulować otwarcie pliku
  • Uśpiony przez pewien czas przed wyprodukowaniem każdego elementu w celu symulacji odczytu danych z pliku
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Ten zestaw danych jest podobny do zestawu tf.data.Dataset.range , dodając stałe opóźnienie na początku i pomiędzy każdą próbką.

Pętla treningowa

Następnie napisz fikcyjną pętlę treningową, która mierzy, ile czasu zajmuje iteracja zestawu danych. Symulowany jest czas treningu.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Optymalizacja wydajności

Aby pokazać, jak można zoptymalizować wydajność, poprawisz wydajność ArtificialDataset .

Naiwne podejście

Zacznij od naiwnego potoku bez żadnych sztuczek, iterując po zbiorze danych bez zmian.

benchmark(ArtificialDataset())
Execution time: 0.26497629899995445

Pod maską tak spędziliśmy czas egzekucji:

Wykres czasu wykonania danych - metoda naiwna

Z wykresu wynika, że ​​wykonanie etapu treningu obejmuje:

  • Otwieranie pliku, jeśli nie został jeszcze otwarty
  • Pobieranie wpisu danych z pliku
  • Wykorzystanie danych do treningu

Jednak w naiwnej implementacji synchronicznej, takiej jak tutaj, podczas gdy potok pobiera dane, model jest bezczynny. I odwrotnie, podczas gdy Twój model jest trenowany, potok wejściowy jest bezczynny. Czas kroku treningowego jest więc sumą czasów otwierania, czytania i treningu.

Następne sekcje opierają się na tym potoku wejściowym, ilustrując najlepsze praktyki projektowania wydajnych potoków wejściowych TensorFlow.

Pobieranie z wyprzedzeniem

Pobieranie wstępne nakłada się na przetwarzanie wstępne i wykonanie modelu etapu uczenia. Podczas gdy model wykonuje kroki uczenia s , potok wejściowy odczytuje dane dla kroku s+1 . Spowoduje to skrócenie czasu kroku do maksimum (w przeciwieństwie do sumy) szkolenia i czasu potrzebnego na wyodrębnienie danych.

Interfejs API tf.data udostępnia transformację tf.data.Dataset.prefetch . Może służyć do oddzielenia czasu generowania danych od czasu, w którym dane są zużywane. W szczególności transformacja wykorzystuje wątek działający w tle i wewnętrzny bufor do wstępnego pobierania elementów z wejściowego zestawu danych przed zażądaniem ich. Liczba elementów do pobrania z wyprzedzeniem powinna być równa (lub być może większa niż) liczbie partii zużywanych przez pojedynczy krok uczenia. Można albo ręcznie dostroić tę wartość, albo ustawić ją na tf.data.AUTOTUNE , co spowoduje, że środowisko wykonawcze tf.data będzie dostroić wartość dynamicznie w czasie wykonywania.

Należy zauważyć, że transformacja pobierania z wyprzedzeniem zapewnia korzyści za każdym razem, gdy istnieje możliwość nałożenia pracy „producenta” z pracą „konsumenta”.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357

Wykres czasu wykonania danych - metoda pobierania wstępnego

Teraz, jak pokazuje wykres czasu wykonania danych, podczas gdy krok uczenia jest uruchomiony dla próbki 0, potok wejściowy odczytuje dane dla próbki 1 i tak dalej.

Równoległa ekstrakcja danych

W warunkach rzeczywistych dane wejściowe mogą być przechowywane zdalnie (na przykład w Google Cloud Storage lub HDFS). Potok zestawu danych, który działa dobrze podczas lokalnego odczytu danych, może powodować wąskie gardło we/wy podczas zdalnego odczytu danych z powodu następujących różnic między magazynem lokalnym i zdalnym:

  • Czas do pierwszego bajtu : odczytanie pierwszego bajtu pliku z pamięci zdalnej może trwać o rząd wielkości dłużej niż z pamięci lokalnej.
  • Przepustowość odczytu : chociaż magazyn zdalny zazwyczaj oferuje dużą łączną przepustowość, odczyt pojedynczego pliku może być w stanie wykorzystać tylko niewielką część tej przepustowości.

Ponadto po załadowaniu surowych bajtów do pamięci może być również konieczna deserializacja i/lub odszyfrowanie danych (np. protobuf ), co wymaga dodatkowych obliczeń. Ten narzut jest obecny niezależnie od tego, czy dane są przechowywane lokalnie czy zdalnie, ale może być gorszy w przypadku zdalnym, jeśli dane nie są skutecznie pobierane z wyprzedzeniem.

Aby złagodzić wpływ różnych kosztów ogólnych ekstrakcji danych, można użyć transformacji tf.data.Dataset.interleave do zrównoleglenia etapu ładowania danych, przeplatając zawartość innych zestawów danych (takich jak czytniki plików danych). Liczbę zestawów danych, które mają się nakładać, można określić za pomocą argumentu cycle_length , natomiast poziom równoległości można określić za pomocą argumentu num_parallel_calls . Podobnie jak w przypadku transformacji prefetch , transformacja z interleave obsługuje funkcję tf.data.AUTOTUNE , która deleguje decyzję o tym, jaki poziom równoległości ma być używany do środowiska uruchomieniowego tf.data .

Przeplatanie sekwencyjne

Domyślne argumenty transformacji tf.data.Dataset.interleave sprawiają, że przeplata ona kolejno pojedyncze próbki z dwóch zestawów danych.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098

Wykres czasu wykonania danych - przeplatanie sekwencyjne

Ten wykres czasu wykonania danych pozwala pokazać zachowanie transformacji z interleave , pobierając próbki alternatywnie z dwóch dostępnych zestawów danych. Jednak nie jest to związane z poprawą wydajności.

Przeplatanie równoległe

Teraz użyj argumentu num_parallel_calls transformacji z interleave . To ładuje wiele zestawów danych równolegle, skracając czas oczekiwania na otwarcie plików.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.283668874000341

Wykres czasu wykonania danych - metoda przeplatania równoległego

Tym razem, jak pokazuje wykres czasu wykonania danych, odczyt dwóch zestawów danych jest zrównoleglony, co skraca globalny czas przetwarzania danych.

Równoległa transformacja danych

Podczas przygotowywania danych elementy wejściowe mogą wymagać wstępnego przetworzenia. W tym celu interfejs API tf.data oferuje transformację tf.data.Dataset.map , która stosuje funkcję zdefiniowaną przez użytkownika do każdego elementu wejściowego zestawu danych. Ponieważ elementy wejściowe są niezależne od siebie, wstępne przetwarzanie może być zrównoleglone w wielu rdzeniach procesora. Aby było to możliwe, podobnie do przekształceń prefetch i interleave , transformacja map dostarcza argument num_parallel_calls do określenia poziomu równoległości.

Wybór najlepszej wartości dla argumentu num_parallel_calls zależy od sprzętu, właściwości danych treningowych (takich jak rozmiar i kształt), kosztu funkcji mapy i innych procesów przetwarzania, które w tym samym czasie są wykonywane na procesorze. Prosta heurystyka polega na wykorzystaniu liczby dostępnych rdzeni procesora. Jednak, jeśli chodzi o transformację prefetch z wyprzedzeniem i interleave , transformacja map obsługuje funkcję tf.data.AUTOTUNE , która deleguje decyzję o tym, jakiego poziomu równoległości użyć do środowiska wykonawczego tf.data .

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Mapowanie sekwencyjne

Zacznij od użycia transformacji map bez równoległości jako przykładu bazowego.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.4505277170001136

Wykres czasu wykonania danych - metoda mapowania sekwencyjnego

Jeśli chodzi o naiwne podejście , tutaj, jak pokazuje wykres, czas spędzony na otwieraniu, czytaniu, wstępnym przetwarzaniu (mapowaniu) i krokach szkoleniowych sumuje się w jednej iteracji.

Mapowanie równoległe

Teraz użyj tej samej funkcji przetwarzania wstępnego, ale zastosuj ją równolegle do wielu próbek.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2839677860001757

Czas wykonania danych - mapowanie równoległe

Jak pokazuje wykres danych, etapy przetwarzania wstępnego nakładają się, co skraca całkowity czas pojedynczej iteracji.

Buforowanie

Transformacja tf.data.Dataset.cache może buforować zestaw danych w pamięci lub w magazynie lokalnym. Pozwoli to zaoszczędzić niektóre operacje (takie jak otwieranie plików i odczytywanie danych) przed wykonaniem w każdej epoce.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.3848854380003104

Czas wykonania danych - metoda buforowanego zbioru danych

Tutaj wykres czasu wykonania danych pokazuje, że podczas buforowania zestawu danych przekształcenia przed cache (takie jak otwieranie pliku i odczyt danych) są wykonywane tylko w pierwszej epoce. Kolejne epoki będą ponownie wykorzystywać dane z pamięci podręcznej przez transformację cache .

Jeśli funkcja zdefiniowana przez użytkownika przekazana do transformacji map jest kosztowna, zastosuj transformację cache po transformacji map , o ile wynikowy zestaw danych nadal mieści się w pamięci lub magazynie lokalnym. Jeśli funkcja zdefiniowana przez użytkownika zwiększa przestrzeń wymaganą do przechowywania zestawu danych poza pojemność pamięci podręcznej, zastosuj ją po transformacji cache lub rozważ wstępne przetworzenie danych przed zadaniem treningowym, aby zmniejszyć zużycie zasobów.

Mapowanie wektoryzujące

Wywołanie funkcji zdefiniowanej przez użytkownika przekazanej do transformacji map wiąże się z narzutem związanym z planowaniem i wykonywaniem funkcji zdefiniowanej przez użytkownika. Zwektoryzuj funkcję zdefiniowaną przez użytkownika (to znaczy, aby działała na partii danych wejściowych jednocześnie) i zastosuj transformację batch przed transformacją map .

Aby zilustrować tę dobrą praktykę, Twój sztuczny zbiór danych nie jest odpowiedni. Opóźnienie planowania wynosi około 10 mikrosekund (10e-6 sekund), znacznie mniej niż dziesiątki milisekund używanych w zestawie ArtificialDataset , a zatem jego wpływ jest trudny do zauważenia.

W tym przykładzie użyj podstawowej funkcji tf.data.Dataset.range i uprość pętlę treningową do najprostszej postaci.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Mapowanie skalarne

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.2712608739998359

Czas wykonania danych - metoda mapy skalarnej

Powyższy wykres ilustruje, co się dzieje (przy mniejszej liczbie próbek) przy użyciu metody mapowania skalarnego. Pokazuje, że mapowana funkcja jest stosowana dla każdej próbki. Chociaż ta funkcja jest bardzo szybka, ma pewne narzuty, które wpływają na wydajność czasu.

Mapowanie wektorowe

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.02737950600021577

Czas wykonania danych - metoda zwektoryzowanej mapy

Tym razem mapowana funkcja jest wywoływana raz i dotyczy partii próbki. Jak pokazuje wykres czasu wykonania danych, chociaż wykonanie funkcji może zająć więcej czasu, narzut pojawia się tylko raz, poprawiając ogólną wydajność czasu.

Zmniejszenie zużycia pamięci

Szereg przekształceń, w tym interleave , prefetch i shuffle , utrzymuje wewnętrzny bufor elementów. Jeśli funkcja zdefiniowana przez użytkownika przekazana do transformacji map zmienia rozmiar elementów, kolejność transformacji mapy i przekształcenia elementów buforujących wpływają na użycie pamięci. Ogólnie rzecz biorąc, wybierz kolejność, która skutkuje mniejszym zużyciem pamięci, chyba że inna kolejność jest pożądana ze względu na wydajność.

Buforowanie częściowych obliczeń

Zaleca się buforowanie zestawu danych po transformacji map , chyba że ta transformacja powoduje, że dane są zbyt duże, aby zmieściły się w pamięci. Kompromis można osiągnąć, jeśli mapowaną funkcję można podzielić na dwie części: część pochłaniającą czas i część pochłaniającą pamięć. W takim przypadku możesz połączyć swoje transformacje jak poniżej:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

W ten sposób część czasochłonna jest wykonywana tylko w pierwszej epoce i unikasz używania zbyt dużej ilości pamięci podręcznej.

Podsumowanie najlepszych praktyk

Oto podsumowanie najlepszych praktyk projektowania wydajnych potoków wejściowych TensorFlow:

Odwzorowanie postaci

Aby głębiej zrozumieć interfejs API tf.data.Dataset , możesz pobawić się własnymi potokami. Poniżej znajduje się kod użyty do wykreślenia obrazów z tego przewodnika. Może to być dobry punkt wyjścia, pokazujący niektóre obejścia typowych problemów, takich jak:

  • Odtwarzalność czasu wykonania
  • Mapowane funkcje chętne do wykonania
  • wywoływalna transformacja z interleave
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Zbiór danych

Podobnie jak w przypadku ArtificialDataset , możesz zbudować zestaw danych zwracający czas spędzony na każdym kroku.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Ten zestaw danych zawiera próbki o kształcie [[2, 1], [2, 2], [2, 3]] i typu [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Każda próbka to:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Gdzie:

  • Open i Read to identyfikatory kroków
  • t0 to sygnatura czasowa rozpoczęcia odpowiedniego kroku
  • d to czas spędzony w odpowiednim kroku
  • i jest indeksem instancji
  • e to indeks epoki (liczba iteracji zbioru danych)
  • s jest indeksem próbki

Pętla iteracyjna

Spraw, aby pętla iteracji była nieco bardziej skomplikowana, aby agregować wszystkie czasy. Będzie to działać tylko z zestawami danych generującymi próbki, jak opisano powyżej.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Metoda kreślenia

Na koniec zdefiniuj funkcję zdolną do wykreślenia osi czasu na podstawie wartości zwracanych przez funkcję timelined_benchmark .

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Użyj wrapperów dla funkcji mapowanej

Aby uruchomić mapowaną funkcję w gorliwym kontekście, musisz umieścić je wewnątrz wywołania tf.py_function .

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Porównanie rurociągów

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Naiwny

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 13.13538893499981

Zoptymalizowany

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png