כתיבת לולאת אימונים מאפס

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

להכין

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

מבוא

Keras מספק הכשרה מחדל ולולאות הערכה, fit() ו evaluate() . השימוש שלהם מכוסה במדריך הדרכה והערכה עם שיטות מובנות .

אם אתה רוצה להתאים אישית את אלגוריתם הלמידה של המודל שלך ועדיין מינוף לנוחיות fit() (למשל, כדי להכשיר גן באמצעות fit() ), אתה יכול מכל מחלקה Model בכיתה וליישם משלך train_step() שיטה, אשר נקרא שוב ושוב במהלך fit() . זה מכוסה במדריך התאמה אישית מה שקורה fit() .

כעת, אם אתה רוצה שליטה ברמה נמוכה מאוד על אימון והערכה, עליך לכתוב לולאות אימון והערכה משלך מאפס. על זה עוסק המדריך הזה.

שימוש GradientTape : דוגמה מקצה לקצה ראשונה

קורא מודל בתוך GradientTape היקף מאפשר לכם לאחזר את הדרגתיים של משקולות שאפשר לאלף של השכבה ביחס לערך הפסד. באמצעות מופע האופטימיזציה, אתה יכול להשתמש הדרגתי אלה כדי לעדכן משתנים אלה (אשר ניתן לאחזר באמצעות model.trainable_weights ).

בואו נשקול מודל פשוט של MNIST:

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

בואו לאמן אותו באמצעות שיפוע מיני-אצווה עם לולאת אימון מותאמת אישית.

ראשית, נצטרך מייעל, פונקציית אובדן ומערך נתונים:

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 1s 0us/step
11501568/11490434 [==============================] - 1s 0us/step

הנה לולאת האימון שלנו:

  • אנחנו פותחים for לולאה כי סובב במשך עידנים
  • במשך כל תקופה, אנו פותחים for לולאה כי סובבת על הנתונים, בקבוצות
  • עבור כל אצווה, אנו פותחים GradientTape() היקף
  • בתוך ההיקף הזה, אנו קוראים למודל (העברה קדימה) ומחשבים את ההפסד
  • מחוץ לתחום, אנו שואבים את שיפוע המשקולות של המודל ביחס לאובדן
  • לבסוף, אנו משתמשים במייעל כדי לעדכן את משקלי המודל בהתבסס על ההדרגות
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0
Training loss (for one batch) at step 0: 68.7478
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.9448
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.1859
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6914
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.9113
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.9550
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5139
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.7227
Seen so far: 38464 samples

טיפול במדדים ברמה נמוכה

בואו נוסיף ניטור מדדים ללולאה הבסיסית הזו.

אתה יכול בקלות לעשות שימוש חוזר במדדים המובנים (או באלה המותאמים אישית שכתבת) בלולאות אימון כאלה שנכתבו מאפס. הנה הזרימה:

  • הצג את המדד בתחילת הלולאה
  • התקשר metric.update_state() לאחר כל אצווה
  • התקשר metric.result() כאשר אתה צריך להציג את הערך הנוכחי של המדד
  • התקשר metric.reset_states() כאשר אתה צריך לנקות את המדינה של הערך (בדרך כלל בסוף עידן)

בואו להשתמש בידע הזה כדי לחשב SparseCategoricalAccuracy על נתוני אימות בסוף כל תקופה:

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

להלן לולאת ההדרכה וההערכה שלנו:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 88.9958
Seen so far: 64 samples
Training loss (for one batch) at step 200: 2.2214
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.3083
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.8282
Seen so far: 38464 samples
Training acc over epoch: 0.7406
Validation acc: 0.8201
Time taken: 6.31s

Start of epoch 1
Training loss (for one batch) at step 0: 0.3276
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.4819
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5971
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5862
Seen so far: 38464 samples
Training acc over epoch: 0.8474
Validation acc: 0.8676
Time taken: 5.98s

להאצת צעד למעלה האימונים שלך עם tf.function

זמן ריצת ברירת מחדל TensorFlow 2 הוא ביצוע להוט . ככזה, לולאת האימון שלנו למעלה יוצאת לפועל בשקיקה.

זה נהדר עבור ניפוי באגים, אבל להידור גרפים יש יתרון ביצועים מובהק. תיאור החישוב שלך כגרף סטטי מאפשר למסגרת ליישם מיטוב ביצועים גלובלי. זה בלתי אפשרי כאשר המסגרת מוגבלת לבצע בתאוותנות פעולה אחת אחרי השנייה, ללא ידיעה מה מגיע אחר כך.

אתה יכול להרכיב לגרף סטטי כל פונקציה שלוקחת טנסורים כקלט. רק להוסיף @tf.function מעצב על זה, כמו זה:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

בואו נעשה את אותו הדבר עם שלב ההערכה:

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

כעת, בואו נריץ מחדש את לולאת האימון שלנו עם שלב האימון המהודר הזה:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 0.7921
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.7755
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.1564
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3181
Seen so far: 38464 samples
Training acc over epoch: 0.8788
Validation acc: 0.8866
Time taken: 1.59s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5222
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.4574
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.4035
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.7561
Seen so far: 38464 samples
Training acc over epoch: 0.8959
Validation acc: 0.9028
Time taken: 1.27s

הרבה יותר מהר, לא?

טיפול ברמה נמוכה בהפסדים שעוקבים אחר המודל

שכבות & מודלים רקורסיבי לעקוב אחר כל ההפסדים שנוצרו במהלך לעבור קדימה על ידי שכבות כי השיחה self.add_loss(value) . הרשימה וכתוצאה מכך ערכי פסד סקלר זמינה דרך רכוש model.losses בסוף המעבר קדימה.

אם אתה רוצה להשתמש ברכיבי ההפסד האלה, עליך לסכם אותם ולהוסיף אותם להפסד העיקרי בשלב האימון שלך.

קחו בחשבון את השכבה הזו, שיוצרת אובדן הסדרת פעילות:

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

בואו נבנה מודל ממש פשוט שמשתמש בו:

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

כך אמור להיראות שלב האימון שלנו כעת:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

סיכום

עכשיו אתה יודע כל מה שצריך לדעת על שימוש בלולאות אימון מובנות וכתיבת משלך מאפס.

לסיום, הנה דוגמה פשוטה מקצה לקצה הקושרת את כל מה שלמדת במדריך זה: DCGAN מאומן על ספרות MNIST.

דוגמה מקצה לקצה: לולאת אימון של GAN מאפס

יתכן שאתה מכיר את רשתות יריביות יצירתיות (GANs). GANs יכולים ליצור תמונות חדשות שנראות כמעט אמיתיות, על ידי לימוד ההפצה הסמויה של מערך אימון של תמונות ("המרחב הסמוי" של התמונות).

GAN מורכב משני חלקים: מודל "מחולל" הממפה נקודות במרחב הסמוי לנקודות במרחב התמונה, מודל "מפלה", סיווג שיכול להבדיל בין תמונות אמיתיות (מתוך מערך האימון) לבין מזויפות. תמונות (הפלט של רשת המחוללים).

לולאת אימון GAN נראית כך:

1) להכשיר את המפלה. - דגום אצווה של נקודות אקראיות במרחב הסמוי. - הפוך את הנקודות לתמונות מזויפות באמצעות מודל ה"גנרטור". - קבל אצווה של תמונות אמיתיות ושלב אותן עם התמונות שנוצרו. - אמן את מודל "המפלה" לסיווג תמונות שנוצרו לעומת תמונות אמיתיות.

2) אימון הגנרטור. - דגימת נקודות אקראיות במרחב הסמוי. - הפכו את הנקודות לתמונות מזויפות באמצעות רשת "מחולל". - קבל אצווה של תמונות אמיתיות ושלב אותן עם התמונות שנוצרו. - אמן את מודל ה"מחולל" ל"שטות" במאבחן ולסווג את התמונות המזויפות כאמיתיות.

לסקירה הרבה יותר מפורט של עבודות איך גנס, לראות למידה עמוק עם Python .

בואו ליישם את לולאת ההדרכה הזו. ראשית, צור את המאבחן שנועד לסווג ספרות מזויפות לעומת ספרות אמיתיות:

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

אז בואו ליצור רשת גנרטור, שהופך וקטורים סמויים לתפוקות של צורה (28, 28, 1) (המייצג ספרות MNIST):

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

הנה עיקר המפתח: לולאת האימון. כפי שאתה יכול לראות זה די פשוט. פונקציית צעדי האימון לוקחת רק 17 שורות.

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

בואו נאמן הגן שלנו, על ידי התקשרות שוב ושוב train_step על קבוצות של תמונות.

מכיוון שהמאבחן והמחולל שלנו הם קונוונטים, אתה תרצה להפעיל את הקוד הזה על GPU.

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break
Start epoch 0
discriminator loss at step 0: 0.69
adversarial loss at step 0: 0.69

זהו זה! תקבל ספרות MNIST מזויפות למראה יפה לאחר רק כ-30 שנים של אימון על ה-Colab GPU.