Load CSV data

View on TensorFlow.org Run in Google Colab View source on GitHub Download notebook

This tutorial provides examples of how to use CSV data with TensorFlow.

There are two main parts to this:

  1. Loading the data off disk
  2. Pre-processing it into a form suitable for training.

This tutorial focuses on the loading, and gives some quick examples of preprocessing. To learn more about the preprocessing aspect, check out the Working with preprocessing layers guide and the Classify structured data using Keras preprocessing layers tutorial.

Setup

import pandas as pd
import numpy as np

# Make numpy values easier to read.
np.set_printoptions(precision=3, suppress=True)

import tensorflow as tf
from tensorflow.keras import layers
2024-07-13 05:31:36.932819: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-13 05:31:36.958817: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-13 05:31:36.958856: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1442] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

In memory data

For any small CSV dataset the simplest way to train a TensorFlow model on it is to load it into memory as a pandas DataFrame or a NumPy array.

A relatively simple example is the abalone dataset.

  • The dataset is small.
  • All the input features are limited-range floating point values.

Here is how to download the data into a DataFrame:

abalone_train = pd.read_csv(
    "https://storage.googleapis.com/download.tensorflow.org/data/abalone_train.csv",
    names=["Length", "Diameter", "Height", "Whole weight", "Shucked weight",
           "Viscera weight", "Shell weight", "Age"])

abalone_train.head()

The dataset contains a set of measurements of abalone, a type of sea snail.

an abalone shell

“Abalone shell” (by Nicki Dugan Pogue, CC BY-SA 2.0)

The nominal task for this dataset is to predict the age from the other measurements, so separate the features and labels for training:

abalone_features = abalone_train.copy()
abalone_labels = abalone_features.pop('Age')

For this dataset you will treat all features identically. Pack the features into a single NumPy array.:

abalone_features = np.array(abalone_features)
abalone_features
array([[0.435, 0.335, 0.11 , ..., 0.136, 0.077, 0.097],
       [0.585, 0.45 , 0.125, ..., 0.354, 0.207, 0.225],
       [0.655, 0.51 , 0.16 , ..., 0.396, 0.282, 0.37 ],
       ...,
       [0.53 , 0.42 , 0.13 , ..., 0.374, 0.167, 0.249],
       [0.395, 0.315, 0.105, ..., 0.118, 0.091, 0.119],
       [0.45 , 0.355, 0.12 , ..., 0.115, 0.067, 0.16 ]])

Next make a regression model predict the age. Since there is only a single input tensor, a tf.keras.Sequential model is sufficient here.

abalone_model = tf.keras.Sequential([
  layers.Dense(64, activation='relu'),
  layers.Dense(1)
])

abalone_model.compile(loss = tf.keras.losses.MeanSquaredError(),
                      optimizer = tf.keras.optimizers.Adam())

To train that model, pass the features and labels to Model.fit:

abalone_model.fit(abalone_features, abalone_labels, epochs=10)
Epoch 1/10
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1720848701.706380  451497 service.cc:145] XLA service 0x7f9e70006580 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1720848701.706434  451497 service.cc:153]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
I0000 00:00:1720848701.706438  451497 service.cc:153]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5
I0000 00:00:1720848701.706441  451497 service.cc:153]   StreamExecutor device (2): Tesla T4, Compute Capability 7.5
I0000 00:00:1720848701.706444  451497 service.cc:153]   StreamExecutor device (3): Tesla T4, Compute Capability 7.5
94/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 96.5334
I0000 00:00:1720848702.122367  451497 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.
104/104 ━━━━━━━━━━━━━━━━━━━━ 2s 4ms/step - loss: 94.9891
Epoch 2/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 31.2786
Epoch 3/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 8.3721
Epoch 4/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 8.1522
Epoch 5/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 7.7770
Epoch 6/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 7.0686
Epoch 7/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 6.6384
Epoch 8/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 6.5113
Epoch 9/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 7.2517
Epoch 10/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 6.0659
<keras.src.callbacks.history.History at 0x7fa03002be80>

You have just seen the most basic way to train a model using CSV data. Next, you will learn how to apply preprocessing to normalize numeric columns.

Basic preprocessing

It's good practice to normalize the inputs to your model. The Keras preprocessing layers provide a convenient way to build this normalization into your model.

The tf.keras.layers.Normalization layer precomputes the mean and variance of each column, and uses these to normalize the data.

First, create the layer:

normalize = layers.Normalization()

Then, use the Normalization.adapt method to adapt the normalization layer to your data.

normalize.adapt(abalone_features)

Then, use the normalization layer in your model:

norm_abalone_model = tf.keras.Sequential([
  normalize,
  layers.Dense(64, activation='relu'),
  layers.Dense(1)
])

norm_abalone_model.compile(loss = tf.keras.losses.MeanSquaredError(),
                           optimizer = tf.keras.optimizers.Adam())

norm_abalone_model.fit(abalone_features, abalone_labels, epochs=10)
Epoch 1/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 1s 3ms/step - loss: 93.6417
Epoch 2/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 48.7377
Epoch 3/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 26.9893
Epoch 4/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 18.4746
Epoch 5/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 11.3473
Epoch 6/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 7.5720
Epoch 7/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 6.3920
Epoch 8/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 6.4452
Epoch 9/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 5.8708
Epoch 10/10
104/104 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - loss: 5.8409
<keras.src.callbacks.history.History at 0x7fa0280efee0>

Mixed data types

In the previous sections, you worked with a dataset where all the features were limited-range floating point values. But not all datasets are limited to a single data type.

The "Titanic" dataset contains information about the passengers on the Titanic. The nominal task on this dataset is to predict who survived.

The Titanic

Image from Wikimedia

The raw data can easily be loaded as a Pandas DataFrame, but is not immediately usable as input to a TensorFlow model.

titanic = pd.read_csv("https://storage.googleapis.com/tf-datasets/titanic/train.csv")
titanic.head()
titanic_features = titanic.copy()
titanic_labels = titanic_features.pop('survived')

Because of the different data types and ranges, you can't simply stack the features into a NumPy array and pass it to a tf.keras.Sequential model. Each column needs to be handled individually.

As one option, you could preprocess your data offline (using any tool you like) to convert categorical columns to numeric columns, then pass the processed output to your TensorFlow model. The disadvantage to that approach is that if you save and export your model the preprocessing is not saved with it. The Keras preprocessing layers avoid this problem because they're part of the model.

In this example, you'll build a model that implements the preprocessing logic using Keras functional API. You could also do it by subclassing.

The functional API operates on "symbolic" tensors. Normal "eager" tensors have a value. In contrast these "symbolic" tensors do not. Instead they keep track of which operations are run on them, and build a representation of the calculation, that you can run later. Here's a quick example:

# Create a symbolic input
input = tf.keras.Input(shape=(), dtype=tf.float32)

# Perform a calculation using the input
result = 2*input + 1

# the result doesn't have a value
result
<KerasTensor shape=(None,), dtype=float32, sparse=False, name=keras_tensor_9>
calc = tf.keras.Model(inputs=input, outputs=result)
print(calc(np.array([1])).numpy())
print(calc(np.array([2])).numpy())
[3.]
[5.]

To build the preprocessing model, start by building a set of symbolic tf.keras.Input objects, matching the names and data-types of the CSV columns.

inputs = {}

for name, column in titanic_features.items():
  dtype = column.dtype
  if dtype == object:
    dtype = tf.string
  else:
    dtype = tf.float32

  inputs[name] = tf.keras.Input(shape=(1,), name=name, dtype=dtype)

inputs
{'sex': <KerasTensor shape=(None, 1), dtype=string, sparse=None, name=sex>,
 'age': <KerasTensor shape=(None, 1), dtype=float32, sparse=None, name=age>,
 'n_siblings_spouses': <KerasTensor shape=(None, 1), dtype=float32, sparse=None, name=n_siblings_spouses>,
 'parch': <KerasTensor shape=(None, 1), dtype=float32, sparse=None, name=parch>,
 'fare': <KerasTensor shape=(None, 1), dtype=float32, sparse=None, name=fare>,
 'class': <KerasTensor shape=(None, 1), dtype=string, sparse=None, name=class>,
 'deck': <KerasTensor shape=(None, 1), dtype=string, sparse=None, name=deck>,
 'embark_town': <KerasTensor shape=(None, 1), dtype=string, sparse=None, name=embark_town>,
 'alone': <KerasTensor shape=(None, 1), dtype=string, sparse=None, name=alone>}

The first step in your preprocessing logic is to concatenate the numeric inputs together, and run them through a normalization layer:

numeric_inputs = {name:input for name,input in inputs.items()
                  if input.dtype==tf.float32}

x = layers.Concatenate()(list(numeric_inputs.values()))
norm = layers.Normalization()
norm.adapt(np.array(titanic[numeric_inputs.keys()]))
all_numeric_inputs = norm(x)

all_numeric_inputs
<KerasTensor shape=(None, 4), dtype=float32, sparse=False, name=keras_tensor_11>

Collect all the symbolic preprocessing results, to concatenate them later:

preprocessed_inputs = [all_numeric_inputs]

For the string inputs use the tf.keras.layers.StringLookup function to map from strings to integer indices in a vocabulary. Next, use tf.keras.layers.CategoryEncoding to convert the indexes into float32 data appropriate for the model.

The default settings for the tf.keras.layers.CategoryEncoding layer create a one-hot vector for each input. A tf.keras.layers.Embedding would also work. Check out the Working with preprocessing layers guide and the Classify structured data using Keras preprocessing layers tutorial for more on this topic.

for name, input in inputs.items():
  if input.dtype == tf.float32:
    continue

  lookup = layers.StringLookup(vocabulary=np.unique(titanic_features[name]))
  one_hot = layers.CategoryEncoding(num_tokens=lookup.vocabulary_size())

  x = lookup(input)
  x = one_hot(x)
  preprocessed_inputs.append(x)

With the collection of inputs and preprocessed_inputs, you can concatenate all the preprocessed inputs together, and build a model that handles the preprocessing:

preprocessed_inputs_cat = layers.Concatenate()(preprocessed_inputs)

titanic_preprocessing = tf.keras.Model(inputs, preprocessed_inputs_cat)

tf.keras.utils.plot_model(model = titanic_preprocessing , rankdir="LR", dpi=72, show_shapes=True)

png

This model just contains the input preprocessing. You can run it to see what it does to your data. Keras models don't automatically convert pandas DataFrames because it's not clear if it should be converted to one tensor or to a dictionary of tensors. So, convert it to a dictionary of tensors:

titanic_features_dict = {name: np.array(value) 
                         for name, value in titanic_features.items()}

Slice out the first training example and pass it to this preprocessing model, you see the numeric features and string one-hots all concatenated together:

features_dict = {name:values[:1] for name, values in titanic_features_dict.items()}
titanic_preprocessing(features_dict)
<tf.Tensor: shape=(1, 28), dtype=float32, numpy=
array([[-0.61 ,  0.395, -0.479, -0.497,  0.   ,  0.   ,  1.   ,  0.   ,

         0.   ,  0.   ,  1.   ,  0.   ,  0.   ,  0.   ,  0.   ,  0.   ,
         0.   ,  0.   ,  0.   ,  1.   ,  0.   ,  0.   ,  0.   ,  1.   ,
         0.   ,  0.   ,  1.   ,  0.   ]], dtype=float32)>

Now, build the model on top of this:

def titanic_model(preprocessing_head, inputs):
  body = tf.keras.Sequential([
    layers.Dense(64, activation='relu'),
    layers.Dense(1)
  ])

  preprocessed_inputs = preprocessing_head(inputs)
  result = body(preprocessed_inputs)
  model = tf.keras.Model(inputs, result)

  model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam())
  return model

titanic_model = titanic_model(titanic_preprocessing, inputs)

When you train the model, pass the dictionary of features as x, and the label as y.

titanic_model.fit(x=titanic_features_dict, y=titanic_labels, epochs=10)
Epoch 1/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 2s 3ms/step - loss: 0.6619
Epoch 2/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.5967
Epoch 3/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.5366
Epoch 4/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4809
Epoch 5/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.5004
Epoch 6/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4659
Epoch 7/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4474
Epoch 8/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4304
Epoch 9/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.3952
Epoch 10/10
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4256
<keras.src.callbacks.history.History at 0x7fa153fe5280>

Since the preprocessing is part of the model, you can save the model and reload it somewhere else and get identical results:

titanic_model.save('test.keras')
reloaded = tf.keras.models.load_model('test.keras')
features_dict = {name:values[:1] for name, values in titanic_features_dict.items()}

before = titanic_model(features_dict)
after = reloaded(features_dict)
assert (before-after)<1e-3
print(before)
print(after)
tf.Tensor([[-1.932]], shape=(1, 1), dtype=float32)
tf.Tensor([[-1.932]], shape=(1, 1), dtype=float32)

Using tf.data

In the previous section you relied on the model's built-in data shuffling and batching while training the model.

If you need more control over the input data pipeline or need to use data that doesn't easily fit into memory: use tf.data.

For more examples, refer to the tf.data: Build TensorFlow input pipelines guide.

On in memory data

As a first example of applying tf.data to CSV data, consider the following code to manually slice up the dictionary of features from the previous section. For each index, it takes that index for each feature:

import itertools

def slices(features):
  for i in itertools.count():
    # For each feature take index `i`
    example = {name:values[i] for name, values in features.items()}
    yield example

Run this and print the first example:

for example in slices(titanic_features_dict):
  for name, value in example.items():
    print(f"{name:19s}: {value}")
  break
sex                : male
age                : 22.0
n_siblings_spouses : 1
parch              : 0
fare               : 7.25
class              : Third
deck               : unknown
embark_town        : Southampton
alone              : n

The most basic tf.data.Dataset in memory data loader is the Dataset.from_tensor_slices constructor. This returns a tf.data.Dataset that implements a generalized version of the above slices function, in TensorFlow.

features_ds = tf.data.Dataset.from_tensor_slices(titanic_features_dict)

You can iterate over a tf.data.Dataset like any other python iterable:

for example in features_ds:
  for name, value in example.items():
    print(f"{name:19s}: {value}")
  break
sex                : b'male'
age                : 22.0
n_siblings_spouses : 1
parch              : 0
fare               : 7.25
class              : b'Third'
deck               : b'unknown'
embark_town        : b'Southampton'
alone              : b'n'

The from_tensor_slices function can handle any structure of nested dictionaries or tuples. The following code makes a dataset of (features_dict, labels) pairs:

titanic_ds = tf.data.Dataset.from_tensor_slices((titanic_features_dict, titanic_labels))

To train a model using this Dataset, you'll need to at least shuffle and batch the data.

titanic_batches = titanic_ds.shuffle(len(titanic_labels)).batch(32)

Instead of passing features and labels to Model.fit, you pass the dataset:

titanic_model.fit(titanic_batches, epochs=5)
Epoch 1/5
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4309
Epoch 2/5
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4024
Epoch 3/5
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.4184
Epoch 4/5
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.3813
Epoch 5/5
20/20 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.3939
<keras.src.callbacks.history.History at 0x7fa153fe73d0>

From a single file

So far this tutorial has worked with in-memory data. tf.data is a highly scalable toolkit for building data pipelines, and provides a few functions for loading CSV files.

titanic_file_path = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
Downloading data from https://storage.googleapis.com/tf-datasets/titanic/train.csv
30874/30874 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step

Now read the CSV data from the file and create a tf.data.Dataset.

(For the full documentation, see tf.data.experimental.make_csv_dataset)

titanic_csv_ds = tf.data.experimental.make_csv_dataset(
    titanic_file_path,
    batch_size=5, # Artificially small to make examples easier to show.
    label_name='survived',
    num_epochs=1,
    ignore_errors=True,)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/data/experimental/ops/readers.py:572: ignore_errors (from tensorflow.python.data.experimental.ops.error_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.ignore_errors` instead.

This function includes many convenient features, so the data is easy to work with. This includes:

  • Using the column headers as dictionary keys.
  • Automatically determining the type of each column.
for batch, label in titanic_csv_ds.take(1):
  for key, value in batch.items():
    print(f"{key:20s}: {value}")
  print()
  print(f"{'label':20s}: {label}")
sex                 : [b'female' b'male' b'male' b'male' b'male']
age                 : [ 2. 28. 18. 32. 28.]
n_siblings_spouses  : [0 0 1 0 0]
parch               : [1 0 0 0 0]
fare                : [ 12.288   7.896 108.9    30.5    13.   ]
class               : [b'Third' b'Third' b'First' b'First' b'Second']
deck                : [b'unknown' b'unknown' b'C' b'B' b'unknown']
embark_town         : [b'Southampton' b'Southampton' b'Cherbourg' b'Cherbourg' b'Southampton']
alone               : [b'n' b'y' b'n' b'y' b'y']

label               : [1 0 0 1 1]
2024-07-13 05:31:51.519261: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

It can also decompress the data on the fly. Here's a gzipped CSV file containing the metro interstate traffic dataset.

A traffic jam.

Image from Wikimedia

traffic_volume_csv_gz = tf.keras.utils.get_file(
    'Metro_Interstate_Traffic_Volume.csv.gz', 
    "https://archive.ics.uci.edu/ml/machine-learning-databases/00492/Metro_Interstate_Traffic_Volume.csv.gz",
    cache_dir='.', cache_subdir='traffic')
Downloading data from https://archive.ics.uci.edu/ml/machine-learning-databases/00492/Metro_Interstate_Traffic_Volume.csv.gz
 335872/Unknown 0s 1us/step

Set the compression_type argument to read directly from the compressed file:

traffic_volume_csv_gz_ds = tf.data.experimental.make_csv_dataset(
    traffic_volume_csv_gz,
    batch_size=256,
    label_name='traffic_volume',
    num_epochs=1,
    compression_type="GZIP")

for batch, label in traffic_volume_csv_gz_ds.take(1):
  for key, value in batch.items():
    print(f"{key:20s}: {value[:5]}")
  print()
  print(f"{'label':20s}: {label[:5]}")
holiday             : [b'None' b'None' b'None' b'None' b'None']
temp                : [288.99 294.07 262.2  284.58 295.89]
rain_1h             : [0. 0. 0. 0. 0.]
snow_1h             : [0. 0. 0. 0. 0.]
clouds_all          : [ 0 56 90 90  1]
weather_main        : [b'Clear' b'Clouds' b'Clouds' b'Rain' b'Clear']
weather_description : [b'Sky is Clear' b'broken clouds' b'overcast clouds' b'light rain'
 b'sky is clear']
date_time           : [b'2013-08-08 07:00:00' b'2013-07-15 04:00:00' b'2013-02-21 19:00:00'
 b'2013-05-10 13:00:00' b'2013-04-28 16:00:00']

label               : [6771  828 3431 5457 4475]
2024-07-13 05:31:52.189408: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

Caching

There is some overhead to parsing the CSV data. For small models this can be the bottleneck in training.

Depending on your use case, it may be a good idea to use Dataset.cache or tf.data.Dataset.snapshot, so that the CSV data is only parsed on the first epoch.

The main difference between the cache and snapshot methods is that cache files can only be used by the TensorFlow process that created them, but snapshot files can be read by other processes.

For example, iterating over the traffic_volume_csv_gz_ds 20 times may take around 15 seconds without caching, or about two seconds with caching.

%%time
for i, (batch, label) in enumerate(traffic_volume_csv_gz_ds.repeat(20)):
  if i % 40 == 0:
    print('.', end='')
print()
...............................................................................................
CPU times: user 13.3 s, sys: 2.27 s, total: 15.6 s
Wall time: 9.8 s
2024-07-13 05:32:01.991227: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
%%time
caching = traffic_volume_csv_gz_ds.cache().shuffle(1000)

for i, (batch, label) in enumerate(caching.shuffle(1000).repeat(20)):
  if i % 40 == 0:
    print('.', end='')
print()
...............................................................................................
CPU times: user 1.85 s, sys: 200 ms, total: 2.05 s
Wall time: 1.75 s
2024-07-13 05:32:03.764820: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
%%time
snapshotting = traffic_volume_csv_gz_ds.snapshot('titanic.tfsnap').shuffle(1000)

for i, (batch, label) in enumerate(snapshotting.shuffle(1000).repeat(20)):
  if i % 40 == 0:
    print('.', end='')
print()
...............................................................................................
CPU times: user 2.75 s, sys: 583 ms, total: 3.33 s
Wall time: 2.05 s
2024-07-13 05:32:05.819285: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

If your data loading is slowed by loading CSV files, and Dataset.cache and tf.data.Dataset.snapshot are insufficient for your use case, consider re-encoding your data into a more streamlined format.

Multiple files

All the examples so far in this section could easily be done without tf.data. One place where tf.data can really simplify things is when dealing with collections of files.

For example, the character font images dataset is distributed as a collection of csv files, one per font.

Fonts

Image by Willi Heidelbach from Pixabay

Download the dataset, and review the files inside:

fonts_zip = tf.keras.utils.get_file(
    'fonts.zip',  "https://archive.ics.uci.edu/ml/machine-learning-databases/00417/fonts.zip",
    cache_dir='.', cache_subdir='fonts',
    extract=True)
Downloading data from https://archive.ics.uci.edu/ml/machine-learning-databases/00417/fonts.zip
159481856/Unknown 4s 0us/step
import pathlib
font_csvs =  sorted(str(p) for p in pathlib.Path('fonts').glob("*.csv"))

font_csvs[:10]
['fonts/AGENCY.csv',
 'fonts/ARIAL.csv',
 'fonts/BAITI.csv',
 'fonts/BANKGOTHIC.csv',
 'fonts/BASKERVILLE.csv',
 'fonts/BAUHAUS.csv',
 'fonts/BELL.csv',
 'fonts/BERLIN.csv',
 'fonts/BERNARD.csv',
 'fonts/BITSTREAMVERA.csv']
len(font_csvs)
153

When dealing with a bunch of files, you can pass a glob-style file_pattern to the tf.data.experimental.make_csv_dataset function. The order of the files is shuffled each iteration.

Use the num_parallel_reads argument to set how many files are read in parallel and interleaved together.

fonts_ds = tf.data.experimental.make_csv_dataset(
    file_pattern = "fonts/*.csv",
    batch_size=10, num_epochs=1,
    num_parallel_reads=20,
    shuffle_buffer_size=10000)

These CSV files have the images flattened out into a single row. The column names are formatted r{row}c{column}. Here's the first batch:

for features in fonts_ds.take(1):
  for i, (name, value) in enumerate(features.items()):
    if i>15:
      break
    print(f"{name:20s}: {value}")
print('...')
print(f"[total: {len(features)} features]")
font                : [b'BODONI' b'PHAGSPA' b'PLAYBILL' b'EDWARDIAN' b'EDWARDIAN' b'BOOK'
 b'HAETTENSCHWEILER' b'CENTAUR' b'CENTAUR' b'LEELAWADEE']
fontVariant         : [b'BODONI MT POSTER COMPRESSED' b'MICROSOFT PHAGSPA' b'PLAYBILL'
 b'EDWARDIAN SCRIPT ITC' b'EDWARDIAN SCRIPT ITC' b'BOOK ANTIQUA'
 b'HAETTENSCHWEILER' b'CENTAUR' b'CENTAUR' b'LEELAWADEE UI SEMILIGHT']
m_label             : [ 114 8254   93  100  710 1084 1025 8224  338 9723]
strength            : [0.4 0.4 0.4 0.4 0.4 0.4 0.4 0.4 0.4 0.4]
italic              : [1 0 1 0 1 0 0 0 1 0]
orientation         : [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
m_top               : [47 40 30 44 49 53 27 33 34 50]
m_left              : [20 20 20 23 46 21 21 21 28 29]
originalH           : [35  4 56 34  8 31 55 53 44 39]
originalW           : [27 28 28 32 16 44 20 19 68 39]
h                   : [20 20 20 20 20 20 20 20 20 20]
w                   : [20 20 20 20 20 20 20 20 20 20]
r0c0                : [  1 255   1   1   0 255   1   1   1 255]
r0c1                : [  1 255   1   1   0 255   1   1   1 255]
r0c2                : [  1 255   1   1   0 255   1   1   1 255]
r0c3                : [  1 255   1   1   0 255 163   1   1 255]
...
[total: 412 features]
2024-07-13 05:32:17.807571: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

Optional: Packing fields

You probably don't want to work with each pixel in separate columns like this. Before trying to use this dataset be sure to pack the pixels into an image-tensor.

Here is code that parses the column names to build images for each example:

import re

def make_images(features):
  image = [None]*400
  new_feats = {}

  for name, value in features.items():
    match = re.match('r(\d+)c(\d+)', name)
    if match:
      image[int(match.group(1))*20+int(match.group(2))] = value
    else:
      new_feats[name] = value

  image = tf.stack(image, axis=0)
  image = tf.reshape(image, [20, 20, -1])
  new_feats['image'] = image

  return new_feats

Apply that function to each batch in the dataset:

fonts_image_ds = fonts_ds.map(make_images)

for features in fonts_image_ds.take(1):
  break

Plot the resulting images:

from matplotlib import pyplot as plt

plt.figure(figsize=(6,6), dpi=120)

for n in range(9):
  plt.subplot(3,3,n+1)
  plt.imshow(features['image'][..., n])
  plt.title(chr(features['m_label'][n]))
  plt.axis('off')

png

Lower level functions

So far this tutorial has focused on the highest-level utilities for reading csv data. There are two other APIs that may be helpful for advanced users if your use-case doesn't fit the basic patterns.

This section recreates functionality provided by tf.data.experimental.make_csv_dataset, to demonstrate how this lower-level functionality can be used.

tf.io.decode_csv

This function decodes a string, or list of strings into a list of columns.

Unlike tf.data.experimental.make_csv_dataset this function does not try to guess column data-types. You specify the column types by providing a list of record_defaults containing a value of the correct type, for each column.

To read the Titanic data as strings using tf.io.decode_csv you would say:

text = pathlib.Path(titanic_file_path).read_text()
lines = text.split('\n')[1:-1]

all_strings = [str()]*10
all_strings
['', '', '', '', '', '', '', '', '', '']
features = tf.io.decode_csv(lines, record_defaults=all_strings) 

for f in features:
  print(f"type: {f.dtype.name}, shape: {f.shape}")
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)

To parse them with their actual types, create a list of record_defaults of the corresponding types:

print(lines[0])
0,male,22.0,1,0,7.25,Third,unknown,Southampton,n
titanic_types = [int(), str(), float(), int(), int(), float(), str(), str(), str(), str()]
titanic_types
[0, '', 0.0, 0, 0, 0.0, '', '', '', '']
features = tf.io.decode_csv(lines, record_defaults=titanic_types) 

for f in features:
  print(f"type: {f.dtype.name}, shape: {f.shape}")
type: int32, shape: (627,)
type: string, shape: (627,)
type: float32, shape: (627,)
type: int32, shape: (627,)
type: int32, shape: (627,)
type: float32, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)
type: string, shape: (627,)

tf.data.experimental.CsvDataset

The tf.data.experimental.CsvDataset class provides a minimal CSV Dataset interface without the convenience features of the tf.data.experimental.make_csv_dataset function: column header parsing, column type-inference, automatic shuffling, file interleaving.

This constructor uses record_defaults the same way as tf.io.decode_csv:

simple_titanic = tf.data.experimental.CsvDataset(titanic_file_path, record_defaults=titanic_types, header=True)

for example in simple_titanic.take(1):
  print([e.numpy() for e in example])
[0, b'male', 22.0, 1, 0, 7.25, b'Third', b'unknown', b'Southampton', b'n']
2024-07-13 05:32:20.987770: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

The above code is basically equivalent to:

def decode_titanic_line(line):
  return tf.io.decode_csv(line, titanic_types)

manual_titanic = (
    # Load the lines of text
    tf.data.TextLineDataset(titanic_file_path)
    # Skip the header row.
    .skip(1)
    # Decode the line.
    .map(decode_titanic_line)
)

for example in manual_titanic.take(1):
  print([e.numpy() for e in example])
[0, b'male', 22.0, 1, 0, 7.25, b'Third', b'unknown', b'Southampton', b'n']
2024-07-13 05:32:21.082269: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

Multiple files

To parse the fonts dataset using tf.data.experimental.CsvDataset, you first need to determine the column types for the record_defaults. Start by inspecting the first row of one file:

font_line = pathlib.Path(font_csvs[0]).read_text().splitlines()[1]
print(font_line)
AGENCY,AGENCY FB,64258,0.400000,0,0.000000,35,21,51,22,20,20,1,1,1,21,101,210,255,255,255,255,255,255,255,255,255,255,255,255,255,255,1,1,1,93,255,255,255,176,146,146,146,146,146,146,146,146,216,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,141,141,141,182,255,255,255,172,141,141,141,115,1,1,1,1,163,255,255,255,255,255,255,255,255,255,255,255,255,255,255,209,1,1,1,1,163,255,255,255,6,6,6,96,255,255,255,74,6,6,6,5,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255,1,1,1,93,255,255,255,70,1,1,1,1,1,1,1,1,163,255,255,255

Only the first two fields are strings, the rest are integers or floats, and you can get the total number of features by counting the commas:

num_font_features = font_line.count(',')+1
font_column_types = [str(), str()] + [float()]*(num_font_features-2)

The tf.data.experimental.CsvDataset constructor can take a list of input files, but reads them sequentially. The first file in the list of CSVs is AGENCY.csv:

font_csvs[0]
'fonts/AGENCY.csv'

So, when you pass the list of files to CsvDataset, the records from AGENCY.csv are read first:

simple_font_ds = tf.data.experimental.CsvDataset(
    font_csvs, 
    record_defaults=font_column_types, 
    header=True)
for row in simple_font_ds.take(10):
  print(row[0].numpy())
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
b'AGENCY'
2024-07-13 05:32:21.216095: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

To interleave multiple files, use Dataset.interleave.

Here's an initial dataset that contains the CSV file names:

font_files = tf.data.Dataset.list_files("fonts/*.csv")

This shuffles the file names each epoch:

print('Epoch 1:')
for f in list(font_files)[:5]:
  print("    ", f.numpy())
print('    ...')
print()

print('Epoch 2:')
for f in list(font_files)[:5]:
  print("    ", f.numpy())
print('    ...')
Epoch 1:
     b'fonts/GIGI.csv'
     b'fonts/NUMERICS.csv'
     b'fonts/COPPERPLATE.csv'
     b'fonts/PHAGSPA.csv'
     b'fonts/ROMAN.csv'
    ...

Epoch 2:
     b'fonts/MV_BOLI.csv'
     b'fonts/PROXY.csv'
     b'fonts/MONOTXT.csv'
     b'fonts/MAIANDRA.csv'
     b'fonts/PALACE.csv'
    ...
2024-07-13 05:32:21.557274: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
2024-07-13 05:32:21.573293: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

The interleave method takes a map_func that creates a child-Dataset for each element of the parent-Dataset.

Here, you want to create a tf.data.experimental.CsvDataset from each element of the dataset of files:

def make_font_csv_ds(path):
  return tf.data.experimental.CsvDataset(
    path, 
    record_defaults=font_column_types, 
    header=True)

The Dataset returned by interleave returns elements by cycling over a number of the child-Datasets. Note, below, how the dataset cycles over cycle_length=3 three font files:

font_rows = font_files.interleave(make_font_csv_ds,
                                  cycle_length=3)
fonts_dict = {'font_name':[], 'character':[]}

for row in font_rows.take(10):
  fonts_dict['font_name'].append(row[0].numpy().decode())
  fonts_dict['character'].append(chr(int(row[2].numpy())))

pd.DataFrame(fonts_dict)
2024-07-13 05:32:21.931277: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

Performance

Earlier, it was noted that tf.io.decode_csv is more efficient when run on a batch of strings.

It is possible to take advantage of this fact, when using large batch sizes, to improve CSV loading performance (but try caching first).

With the built-in loader 20, 2048-example batches take about 17s.

BATCH_SIZE=2048
fonts_ds = tf.data.experimental.make_csv_dataset(
    file_pattern = "fonts/*.csv",
    batch_size=BATCH_SIZE, num_epochs=1,
    num_parallel_reads=100)
%%time
for i,batch in enumerate(fonts_ds.take(20)):
  print('.',end='')

print()
....................
CPU times: user 44.6 s, sys: 4.21 s, total: 48.9 s
Wall time: 20.6 s
2024-07-13 05:32:43.296482: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

Passing batches of text lines todecode_csv runs faster, in about 5s:

fonts_files = tf.data.Dataset.list_files("fonts/*.csv")
fonts_lines = fonts_files.interleave(
    lambda fname:tf.data.TextLineDataset(fname).skip(1), 
    cycle_length=100).batch(BATCH_SIZE)

fonts_fast = fonts_lines.map(lambda x: tf.io.decode_csv(x, record_defaults=font_column_types))
%%time
for i,batch in enumerate(fonts_fast.take(20)):
  print('.',end='')

print()
....................
CPU times: user 3.35 s, sys: 111 ms, total: 3.47 s
Wall time: 752 ms
2024-07-13 05:32:44.690941: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence

For another example of increasing CSV performance by using large batches, refer to the Overfit and underfit tutorial.

This sort of approach may work, but consider other options like Dataset.cache and tf.data.Dataset.snapshot, or re-encoding your data into a more streamlined format.