Bessere Performance mit der tf.data API

Auf TensorFlow.org ansehen Quelle auf GitHub anzeigen Notizbuch herunterladen

Überblick

GPUs und TPUs können die für die Ausführung eines einzelnen Trainingsschritts erforderliche Zeit radikal reduzieren. Um Spitzenleistungen zu erzielen, ist eine effiziente Eingabepipeline erforderlich, die Daten für den nächsten Schritt liefert, bevor der aktuelle Schritt abgeschlossen ist. Die tf.data API hilft flexible und effiziente Eingangsleitungen zu bauen. Dieses Dokument zeigt , wie die verwenden tf.data API hochperformante TensorFlow Eingangsleitungen zu bauen.

Bevor Sie fortfahren, überprüfen Sie die Build - tf.data TensorFlow Eingangsleitungen führen zu lernen , wie die verwenden tf.data API.

Ressourcen

Aufstellen

import tensorflow as tf

import time

In diesem Handbuch durchlaufen Sie ein Dataset und messen die Leistung. Reproduzierbare Leistungsbenchmarks zu erstellen kann schwierig sein. Verschiedene Faktoren, die die Reproduzierbarkeit beeinflussen, sind:

  • Die aktuelle CPU-Last
  • Der Netzwerkverkehr
  • Komplexe Mechanismen wie Cache

Um einen reproduzierbaren Benchmark zu erhalten, bauen Sie ein künstliches Beispiel.

Der Datensatz

Starten Sie eine Klasse mit der Definition von Vererbungs tf.data.Dataset genannt ArtificialDataset . Dieser Datensatz:

  • Erzeugt num_samples Proben (Standard : 3)
  • Schläft einige Zeit vor dem ersten Element, um das Öffnen einer Datei zu simulieren
  • Schläft einige Zeit, bevor jedes Element produziert wird, um das Lesen von Daten aus einer Datei zu simulieren
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Dieser Datensatz ist ähnlich den tf.data.Dataset.range eines, eine feste Verzögerung zu Beginn und zwischen jeder Probe hinzugefügt wird .

Die Trainingsschleife

Schreiben Sie als Nächstes eine Dummy-Trainingsschleife, die misst, wie lange es dauert, ein Dataset zu durchlaufen. Die Trainingszeit wird simuliert.

def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Leistung optimieren

Zu zeigen , wie die Leistung optimiert werden kann, werden Sie die Leistung der Verbesserung ArtificialDataset .

Der naive Ansatz

Beginnen Sie mit einer naiven Pipeline ohne Tricks und iterieren Sie den Datensatz unverändert.

benchmark(ArtificialDataset())
Execution time: 0.2588925869999912

Unter der Haube wurde Ihre Ausführungszeit so verbracht:

Datenausführungszeitdiagramm - eine naive Methode

Das Diagramm zeigt, dass die Durchführung eines Trainingsschritts Folgendes umfasst:

  • Öffnen einer Datei, wenn sie noch nicht geöffnet wurde
  • Abrufen eines Dateneintrags aus der Datei
  • Nutzung der Daten für das Training

In einer naiven synchronen Implementierung wie hier befindet sich Ihr Modell jedoch im Leerlauf, während Ihre Pipeline die Daten abruft. Umgekehrt befindet sich die Eingabepipeline während des Trainings Ihres Modells im Leerlauf. Die Trainingsschrittzeit ist somit die Summe aus Öffnungs-, Lese- und Trainingszeit.

Die nächsten Abschnitte bauen auf dieser Eingabepipeline auf und veranschaulichen Best Practices für den Entwurf performanter TensorFlow-Eingabepipelines.

Vorabrufen

Das Vorabrufen überlappt die Vorverarbeitung und die Modellausführung eines Trainingsschritts. Während der Modelltrainingsschritt ausführt s wird die Eingangsleitung die Daten für Leseschritt s+1 . Dies reduziert die Schrittzeit auf das Maximum (im Gegensatz zur Summe) des Trainings und die Zeit, die zum Extrahieren der Daten benötigt wird.

Die tf.data API stellt die tf.data.Dataset.prefetch Transformation. Es kann verwendet werden, um die Zeit, in der Daten produziert werden, von der Zeit, in der Daten konsumiert werden, zu entkoppeln. Insbesondere verwendet die Transformation einen Hintergrund-Thread und einen internen Puffer, um Elemente aus dem Eingabe-Dataset vorab abzurufen, bevor sie angefordert werden. Die Anzahl der vorab abzurufenden Elemente sollte gleich (oder möglicherweise größer als) der Anzahl der Batches sein, die von einem einzelnen Trainingsschritt verbraucht werden. Sie können entweder diesen Wert manuell einstellen, oder setzen Sie ihn auf tf.data.AUTOTUNE , was die fordert tf.data den Wert Laufzeit abzustimmen dynamisch zur Laufzeit.

Beachten Sie, dass die Vorabrufumwandlung immer dann Vorteile bietet, wenn die Möglichkeit besteht, die Arbeit eines "Produzenten" mit der Arbeit eines "Verbrauchers" zu überschneiden.

benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21371877499996117

Darstellung der Datenausführungszeit - Vorabrufmethode

Wie das Diagramm der Datenausführungszeit zeigt, liest die Eingabepipeline nun, während der Trainingsschritt für Beispiel 0 ausgeführt wird, die Daten für Beispiel 1 und so weiter.

Parallelisieren der Datenextraktion

In einer realen Umgebung können die Eingabedaten remote gespeichert werden (z. B. in Google Cloud Storage oder HDFS). Eine Dataset-Pipeline, die beim lokalen Lesen von Daten gut funktioniert, kann beim Remote-Lesen von Daten aufgrund der folgenden Unterschiede zwischen lokalem und Remote-Speicher zu Engpässen bei der E/A führen:

  • Time-to-first-Byte: das erste Byte einer Datei von Remote - Speicher zu lesen können Größenordnung länger dauern , als aus dem lokalen Speicher.
  • Der Lesedurchsatz: Während Remote - Speicher typischerweise große aggregierte Bandbreite bieten, um eine einzelne Datei zu lesen nur in der Lage sein könnte , einen kleinen Bruchteil dieser Bandbreite zu nutzen.

Darüber hinaus, wenn das Roh - Bytes in dem Speicher geladen wird, kann es auch notwendig sein , die Daten (beispielsweise deserialisieren und / oder Entschlüsseln protobuf ), die zusätzliche Berechnung erfordert. Dieser Overhead ist unabhängig davon vorhanden, ob die Daten lokal oder entfernt gespeichert werden, kann jedoch im entfernten Fall schlimmer sein, wenn die Daten nicht effektiv vorab abgerufen werden.

Um die Auswirkungen der verschieden Datenextraktionsgemeinkosten zu mindern, die tf.data.Dataset.interleave kann die Transformation verwendet werden , um den Datenladeschritt parallelisieren, Verschachteln die Inhalte anderer Datensätze (wie Datendateileser). Die Anzahl von Datensätzen zu überlappen können durch die angegeben werden cycle_length Argument, während der Grad der Parallelität kann durch die angegeben werden num_parallel_calls Argument. Ähnlich wie bei der prefetch - Transformation, die interleave unterstützt Transformation tf.data.AUTOTUNE , die die Entscheidung über das, was hohe Niveau zu halten Gebrauch auf die delegiert tf.data Laufzeit.

Sequentielles Interleave

Die Standardargumente der tf.data.Dataset.interleave Transformation machen es nacheinander einzelne Proben aus zwei Datensätze verschachteln.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4863386449999325

Zeitdiagramm der Datenausführung - sequentielles Interleave

Diese Datenausführungszeit Grundstück ermöglicht , das Verhalten der zeigen interleave Transformation zur Verfügung stehenden Proben abwechselnd von den beiden Datensätze abgerufen werden . Allerdings ist hier keine Leistungssteigerung im Spiel.

Parallele Verschachtelung

Verwenden Sie nun die num_parallel_calls Argument der interleave Transformation. Dadurch werden mehrere Datensätze parallel geladen, wodurch die Wartezeit auf das Öffnen der Dateien verkürzt wird.

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2785680570000295

Zeitdiagramm der Datenausführung - Parallel-Interleave-Methode

Wie das Diagramm der Datenausführungszeit zeigt, wird diesmal das Lesen der beiden Datensätze parallelisiert, was die globale Datenverarbeitungszeit verkürzt.

Parallelisierende Datentransformation

Beim Vorbereiten von Daten müssen möglicherweise Eingabeelemente vorverarbeitet werden. Zu diesem Zweck sind die tf.data API bietet die tf.data.Dataset.map Transformation, das eine benutzerdefinierte Funktion , die jedem Element des Eingangsdatensatzes gilt. Da Eingabeelemente voneinander unabhängig sind, kann die Vorverarbeitung über mehrere CPU-Kerne hinweg parallelisiert werden. Um dies möglich zu machen, ähnlich wie die prefetch und interleave Transformationen, die map bietet Transformation das num_parallel_calls Argument den Grad der Parallelität zu spezifizieren.

Die Auswahl des besten Wertes für das num_parallel_calls Argument hängt von Ihrer Hardware, Eigenschaften Ihrer Trainingsdaten (wie seine Größe und Form), die Kosten Ihrer Map - Funktion, und was andere Verarbeitung zur gleichen Zeit auf der CPU geschieht. Eine einfache Heuristik besteht darin, die Anzahl der verfügbaren CPU-Kerne zu verwenden. Wie jedoch für den prefetch und interleave Transformation, die map Transformation unterstützt tf.data.AUTOTUNE , die die Entscheidung über das, was hohe Niveau zu halte Verwendung zur delegieren tf.data Laufzeit.

def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

Sequentielles Mapping

Beginnen Sie mit der Verwendung von map Transformation ohne Parallelität als Basis Beispiel.

benchmark(
    ArtificialDataset()
    .map(mapped_function)
)
Execution time: 0.44197996000002604

Darstellung der Datenausführungszeit - sequentielles Mapping-Verfahren

Wie für den naiven Ansatz , hier, wie die Handlung zeigt, sind die Zeiten für das Öffnen verbrachte, Lesen, Vorverarbeitung (Mapping) und Stufen Ausbildung summieren zusammen für eine einzelne Iteration.

Paralleles Mapping

Verwenden Sie nun dieselbe Vorverarbeitungsfunktion, wenden Sie sie jedoch parallel auf mehrere Samples an.

benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)
Execution time: 0.2774802769999951

Datenausführungszeit - paralleles Mapping

Wie das Diagramm zeigt, überlappen sich die Vorverarbeitungsschritte, wodurch sich die Gesamtzeit für eine einzelne Iteration verringert.

Caching

Die tf.data.Dataset.cache Transformation kann eine Datenmenge zwischenzuspeichern, entweder im Speicher oder auf lokalem Speicher. Dadurch werden einige Operationen (wie das Öffnen von Dateien und das Lesen von Daten) vor der Ausführung während jeder Epoche bewahrt.

benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)
Execution time: 0.37056952400007503

Datenausführungszeit - Cached-Dataset-Methode

Dabei werden die Datenausführungszeitverlauf zeigt , dass , wenn Sie einen Datensatz zwischenzuspeichern, die Transformationen vor dem cache ein (wie die Datei Öffnen und Lesen von Daten) nur während der ersten Epoche ausgeführt werden. Die nächsten Epochen werden die Daten durch die im Cache gespeicherten Wiederverwendung cache - Transformation.

Wenn die benutzerdefinierte Funktion in die übergebenen map Transformation teuer ist, gilt die cache - Transformation nach der map Transformation solange die resultierende Datenmenge kann nach wie vor in dem Speicher oder lokale Speicher passen. Wenn die benutzerdefinierte Funktion der Raum erhöht erforderlich , um die Datenmenge über die Cache - Kapazität zu speichern, entweder gilt es nach der cache - Transformation oder betrachtet Vorverarbeitung Ihre Daten vor dem Training Job Ressourcenverbrauch zu reduzieren.

Vektorisierendes Mapping

Aufrufen einer benutzerdefinierten Funktion in den übergebenen map Transformation Overhead im Zusammenhang mit der Ausführung der Zeitplanung und benutzerdefinierte Funktion. Vektorisieren die benutzerdefinierte Funktion (das heißt, müssen sie über eine Charge von Eingängen auf einmal arbeiten) und wendet die batch - Transformation vor der map Transformation.

Um diese bewährte Vorgehensweise zu veranschaulichen, ist Ihr künstlicher Datensatz nicht geeignet. Die Einteilungsverzögerung beträgt etwa 10 Mikrosekunden (10e-6 Sekunden), weit weniger als die zehn Millisekunden in dem verwendeten ArtificialDataset und damit seine Wirkung ist schwer zu sehen.

Für dieses Beispiel verwenden , um die Basis tf.data.Dataset.range Funktion und vereinfachen zu seiner einfachsten Form die Ausbildung Schleife.

fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

Skalare Abbildung

fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)
Execution time: 0.23767012700000123

Datenausführungszeit - Skalarkartenmethode

Das obige Diagramm veranschaulicht, was (mit weniger Stichproben) mit der skalaren Mapping-Methode vor sich geht. Es zeigt, dass die abgebildete Funktion für jede Probe angewendet wird. Diese Funktion ist zwar sehr schnell, hat jedoch einen gewissen Overhead, der sich auf die Zeitleistung auswirkt.

Vektorisiertes Mapping

fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)
Execution time: 0.023475749999988693

Datenausführungszeit - vektorisierte Kartenmethode

Diesmal wird die zugeordnete Funktion einmal aufgerufen und auf einen Probenstapel angewendet. Wie das Diagramm der Datenausführungszeit zeigt, kann die Ausführung der Funktion zwar länger dauern, der Overhead wird jedoch nur einmal angezeigt, wodurch die Gesamtzeitleistung verbessert wird.

Reduzierung des Speicherbedarfs

Eine Reihe von Transformationen, einschließlich interleave , prefetch , und shuffle , einen internen Puffer der Elemente beizubehalten. Wenn die benutzerdefinierte Funktion in der übergebenen map Transformation ändert die Größe der Elemente, dann die Bestellung von der Karte Transformation und die Veränderungen, die Pufferelemente die Speichernutzung beeinflusst. Wählen Sie im Allgemeinen die Reihenfolge, die zu einem geringeren Speicherbedarf führt, es sei denn, aus Gründen der Leistung ist eine andere Reihenfolge wünschenswert.

Zwischenspeichern von Teilberechnungen

Es wird empfohlen , den Datensatz nach der zwischenzuspeichern map außer Transformation , wenn diese Transformation macht die Daten zu groß in dem Speicher zu passen. Ein Kompromiss kann erzielt werden, wenn Ihre zugeordnete Funktion in zwei Teile aufgeteilt werden kann: einen zeitaufwendigen und einen speicherverbrauchenden Teil. In diesem Fall können Sie Ihre Transformationen wie folgt verketten:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Auf diese Weise wird der zeitaufwendige Teil nur während der ersten Epoche ausgeführt und Sie vermeiden zu viel Cache-Speicherplatz.

Zusammenfassung der bewährten Verfahren

Hier ist eine Zusammenfassung der Best Practices zum Entwerfen leistungsfähiger TensorFlow-Eingabepipelines:

Reproduktion der Figuren

Um tiefer in dem tf.data.Dataset API Verständnis, können Sie mit Ihrer eigenen Pipeline spielen. Unten ist der Code, der zum Plotten der Bilder aus diesem Handbuch verwendet wird. Es kann ein guter Ausgangspunkt sein, um einige Problemumgehungen für häufig auftretende Probleme aufzuzeigen, wie zum Beispiel:

  • Reproduzierbarkeit der Ausführungszeit
  • Zugeordnete Funktionen eifrige Ausführung
  • interleave Transformation aufrufbar
import itertools
from collections import defaultdict

import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

Der Datensatz

Ähnlich wie bei der ArtificialDataset können Sie einen Datensatz erstellen , die Zeit in jedem Schritt verbrachte zurück.

class TimeMeasuredDataset(tf.data.Dataset):
    # OUTPUT: (steps, timings, counters)
    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))

    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated
    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset

    def _generator(instance_idx, num_samples):
        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])

        # Opening the file
        open_enter = time.perf_counter()
        time.sleep(0.03)
        open_elapsed = time.perf_counter() - open_enter

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            read_enter = time.perf_counter()
            time.sleep(0.015)
            read_elapsed = time.perf_counter() - read_enter

            yield (
                [("Open",), ("Read",)],
                [(open_enter, open_elapsed), (read_enter, read_elapsed)],
                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
            )
            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered


    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_types=cls.OUTPUT_TYPES,
            output_shapes=cls.OUTPUT_SHAPES,
            args=(next(cls._INSTANCES_COUNTER), num_samples)
        )

Dieser Datensatz stellt Proben von Form [[2, 1], [2, 2], [2, 3]] und des Typs [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32] . Jede Probe ist:

(
  [("Open"), ("Read")],
  [(t0, d), (t0, d)],
  [(i, e, -1), (i, e, s)]
)

Woher:

  • Open und Read sind Schritte Identifikatoren
  • t0 ist der Zeitstempel , wenn der entsprechende Schritt gestartet
  • d ist die Zeit , in der entsprechenden Stufe ausgegeben
  • i ist der Index - Instanz
  • e ist die Epoche Index (Anzahl , wie oft die Datenmenge iteriert wurde)
  • s ist der Abtastindex

Die Iterationsschleife

Machen Sie die Iterationsschleife etwas komplizierter, um alle Timings zu aggregieren. Dies funktioniert nur mit Datensätzen, die wie oben beschrieben Stichproben generieren.

def timelined_benchmark(dataset, num_epochs=2):
    # Initialize accumulators
    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)

    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        epoch_enter = time.perf_counter()
        for (steps, times, values) in dataset:
            # Record dataset preparation informations
            steps_acc = tf.concat((steps_acc, steps), axis=0)
            times_acc = tf.concat((times_acc, times), axis=0)
            values_acc = tf.concat((values_acc, values), axis=0)

            # Simulate training time
            train_enter = time.perf_counter()
            time.sleep(0.01)
            train_elapsed = time.perf_counter() - train_enter

            # Record training informations
            steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)

        epoch_elapsed = time.perf_counter() - epoch_enter
        # Record epoch informations
        steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
        time.sleep(0.001)

    tf.print("Execution time:", time.perf_counter() - start_time)
    return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Die Plotmethode

Schließlich definiert eine Funktion der Lage , eine Zeitleiste der durch die zurückgegebenen Werte gegeben plotten timelined_benchmark Funktion.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
    # Remove invalid entries (negative times, or empty steps) from the timelines
    invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
    steps = timeline['steps'][invalid_mask].numpy()
    times = timeline['times'][invalid_mask].numpy()
    values = timeline['values'][invalid_mask].numpy()

    # Get a set of different steps, ordered by the first time they are encountered
    step_ids, indices = np.stack(np.unique(steps, return_index=True))
    step_ids = step_ids[np.argsort(indices)]

    # Shift the starting time to 0 and compute the maximal time value
    min_time = times[:,0].min()
    times[:,0] = (times[:,0] - min_time)
    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)

    cmap = mpl.cm.get_cmap("plasma")
    plt.close()
    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
    fig.suptitle(title)
    fig.set_size_inches(17.0, len(step_ids))
    plt.xlim(-0.01, end)

    for i, step in enumerate(step_ids):
        step_name = step.decode()
        ax = axs[i]
        ax.set_ylabel(step_name)
        ax.set_ylim(0, 1)
        ax.set_yticks([])
        ax.set_xlabel("time (s)")
        ax.set_xticklabels([])
        ax.grid(which="both", axis="x", color="k", linestyle=":")

        # Get timings and annotation for the given step
        entries_mask = np.squeeze(steps==step)
        serie = np.unique(times[entries_mask], axis=0)
        annotations = values[entries_mask]

        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
        if annotate:
            for j, (start, width) in enumerate(serie):
                annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
                        horizontalalignment='left', verticalalignment='center')
    if save:
        plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Verwenden Sie Wrapper für zugeordnete Funktionen

Um abgebildet Funktion in einem eifrigen Kontext ausführen, müssen Sie sie in einem wickeln tf.py_function Anruf.

def map_decorator(func):
    def wrapper(steps, times, values):
        # Use a tf.py_function to prevent auto-graph from compiling the method
        return tf.py_function(
            func,
            inp=(steps, times, values),
            Tout=(steps.dtype, times.dtype, values.dtype)
        )
    return wrapper

Rohrleitungsvergleich

_batch_map_num_items = 50

def dataset_generator_fun(*args):
    return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Naiv

@map_decorator
def naive_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001)  # Time consuming step
    time.sleep(0.0001)  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, [["Map"]]), axis=0),
        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
        tf.concat((values, [values[-1]]), axis=0)
    )

naive_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .flat_map(dataset_generator_fun)
    .map(naive_map)
    .batch(_batch_map_num_items, drop_remainder=True)
    .unbatch(),
    5
)
WARNING:tensorflow:From /tmp/ipykernel_31283/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
WARNING:tensorflow:From /tmp/ipykernel_31283/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.
Instructions for updating:
Use output_signature instead
Execution time: 12.536449214999948

Optimiert

@map_decorator
def time_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.001 * values.shape[0])  # Time consuming step
    map_elapsed = time.perf_counter() - map_enter

    return (
        tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


@map_decorator
def memory_consuming_map(steps, times, values):
    map_enter = time.perf_counter()
    time.sleep(0.0001 * values.shape[0])  # Memory consuming step
    map_elapsed = time.perf_counter() - map_enter

    # Use tf.tile to handle batch dimension
    return (
        tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
    )


optimized_timeline = timelined_benchmark(
    tf.data.Dataset.range(2)
    .interleave(  # Parallelize data reading
        dataset_generator_fun,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(  # Vectorize your mapped function
        _batch_map_num_items,
        drop_remainder=True)
    .map(  # Parallelize map transformation
        time_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .cache()  # Cache data
    .map(  # Reduce memory usage
        memory_consuming_map,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    .prefetch(  # Overlap producer and consumer works
        tf.data.AUTOTUNE
    )
    .unbatch(),
    5
)
Execution time: 6.391495143999919
draw_timeline(naive_timeline, "Naive", 15)

png

draw_timeline(optimized_timeline, "Optimized", 15)

png