TFF для исследования федеративного обучения: сжатие моделей и обновлений

В этом руководстве мы используем EMNIST набор данные , чтобы продемонстрировать , как включить алгоритмы сжатия с потерями , чтобы уменьшить стоимость связи в алгоритме усреднения Федеративного с использованием tff.learning.build_federated_averaging_process API и tensor_encoding API. Для получения более подробной информации об алгоритме усреднения Федеративных см бумаги Коммуникационного-эффективном заучивание глубоких сетей от децентрализованных данных .

Прежде, чем мы начнем

Прежде чем мы начнем, выполните следующее, чтобы убедиться, что ваша среда правильно настроена. Если вы не видите приветствие, пожалуйста , обратитесь к установке руководству для получения инструкций.

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio
.apply()
%load_ext tensorboard

import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

Убедитесь, что TFF работает.

@tff.federated_computation
def hello_world():
 
return 'Hello, World!'

hello_world
()
b'Hello, World!'

Подготовка входных данных

В этом разделе мы загружаем и предварительно обрабатываем набор данных EMNIST, включенный в TFF. Пожалуйста , ознакомьтесь с федеративным Обучением для изображения Классификации учебника для более подробной информации о EMNIST данных.

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE
= 418

CLIENT_EPOCHS_PER_ROUND
= 1
CLIENT_BATCH_SIZE
= 20
TEST_BATCH_SIZE
= 500

emnist_train
, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits
=True)

def reshape_emnist_element(element):
 
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
 
"""Preprocessing function for the EMNIST training dataset."""
 
return (dataset
         
# Shuffle according to the largest client dataset
         
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
         
# Repeat to do multiple local epochs
         
.repeat(CLIENT_EPOCHS_PER_ROUND)
         
# Batch to a fixed client batch size
         
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
         
# Preprocessing step
         
.map(reshape_emnist_element))

emnist_train
= emnist_train.preprocess(preprocess_train_dataset)

Определение модели

Здесь мы определяем модель keras на основе orginial FedAvg CNN, а затем обернуть модель keras в экземпляре tff.learning.Model так , что она может потребляться TFF.

Обратите внимание , что нам нужна функция , которая производит модель вместо просто моделей напрямую. Кроме того, эта функция не может просто захватить предварительно построенной модели, она должна создать модель в контексте того, что она называется. Причина в том, что TFF предназначен для передачи на устройства и требует контроля над созданием ресурсов, чтобы их можно было захватить и упаковать.

def create_original_fedavg_cnn_model(only_digits=True):
 
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format
= 'channels_last'

  max_pool
= functools.partial(
      tf
.keras.layers.MaxPooling2D,
      pool_size
=(2, 2),
      padding
='same',
      data_format
=data_format)
  conv2d
= functools.partial(
      tf
.keras.layers.Conv2D,
      kernel_size
=5,
      padding
='same',
      data_format
=data_format,
      activation
=tf.nn.relu)

  model
= tf.keras.models.Sequential([
      tf
.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d
(filters=32),
      max_pool
(),
      conv2d
(filters=64),
      max_pool
(),
      tf
.keras.layers.Flatten(),
      tf
.keras.layers.Dense(512, activation=tf.nn.relu),
      tf
.keras.layers.Dense(10 if only_digits else 62),
      tf
.keras.layers.Softmax(),
 
])

 
return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec
= emnist_train.create_tf_dataset_for_client(
    emnist_train
.client_ids[0]).element_spec

def tff_model_fn():
  keras_model
= create_original_fedavg_cnn_model()
 
return tff.learning.from_keras_model(
      keras_model
=keras_model,
      input_spec
=input_spec,
      loss
=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics
=[tf.keras.metrics.SparseCategoricalAccuracy()])

Обучение модели и вывод показателей обучения

Теперь мы готовы построить алгоритм федеративного усреднения и обучить заданную модель на наборе данных EMNIST.

Во- первых , мы должны построить алгоритм усреднения Федеративные с использованием tff.learning.build_federated_averaging_process API.

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn
=tff_model_fn,
    client_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Теперь запустим алгоритм федеративного усреднения. Выполнение алгоритма федеративного обучения с точки зрения TFF выглядит так:

  1. Инициализируйте алгоритм и получите исходное состояние сервера. Состояние сервера содержит информацию, необходимую для выполнения алгоритма. Напомним, поскольку TFF является функциональным, это состояние включает в себя любое состояние оптимизатора, которое использует алгоритм (например, параметры импульса), а также сами параметры модели - они будут переданы как аргументы и возвращены как результаты вычислений TFF.
  2. Выполните алгоритм по очереди. В каждом раунде будет возвращаться новое состояние сервера в результате обучения модели каждым клиентом на своих данных. Обычно за один раунд:
    1. Сервер рассылает модель всем участвующим клиентам.
    2. Каждый клиент выполняет работу на основе модели и собственных данных.
    3. Сервер объединяет всю модель для создания состояния сервера, содержащего новую модель.

Для получения более подробной информации, пожалуйста , см Выборочная Федеративные Алгоритмы, Часть 2: Реализация федеративного Усреднение учебник.

Метрики обучения записываются в каталог Tensorboard для отображения после обучения.

Загрузить служебные функции

def format_size(size):
 
"""A helper function for creating a human-readable size."""
  size
= float(size)
 
for unit in ['bit','Kibit','Mibit','Gibit']:
   
if size < 1024.0:
     
return "{size:3.2f}{unit}".format(size=size, unit=unit)
    size
/= 1024.0
 
return "{size:.2f}{unit}".format(size=size, unit='TiB')

def set_sizing_environment():
 
"""Creates an environment that contains sizing information."""
 
# Creates a sizing executor factory to output communication cost
 
# after the training finishes. Note that sizing executor only provides an
 
# estimate (not exact) of communication cost, and doesn't capture cases like
 
# compression of over-the-wire representations. However, it's perfect for
 
# demonstrating the effect of compression in this tutorial.
  sizing_factory
= tff.framework.sizing_executor_factory()

 
# TFF has a modular runtime you can configure yourself for various
 
# environments and purposes, and this example just shows how to configure one
 
# part of it to report the size of things.
  context
= tff.framework.ExecutionContext(executor_fn=sizing_factory)
  tff
.framework.set_default_context(context)

 
return sizing_factory

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
 
"""Trains the federated averaging process and output metrics."""
 
# Create a environment to get communication cost.
  environment
= set_sizing_environment()

 
# Initialize the Federated Averaging algorithm to get the initial server state.
  state
= federated_averaging_process.initialize()

 
with summary_writer.as_default():
   
for round_num in range(num_rounds):
     
# Sample the clients parcitipated in this round.
      sampled_clients
= np.random.choice(
          emnist_train
.client_ids,
          size
=num_clients_per_round,
          replace
=False)
     
# Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data
= [
          emnist_train
.create_tf_dataset_for_client(client)
         
for client in sampled_clients
     
]
     
# Round one round of the algorithm based on the server state and client data
     
# and output the new state and metrics.
      state
, metrics = federated_averaging_process.next(state, sampled_train_data)

     
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info
= environment.get_size_info()
      broadcasted_bits
= size_info.broadcast_bits[-1]
      aggregated_bits
= size_info.aggregate_bits[-1]

     
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

     
# Add metrics to Tensorboard.
     
for name, value in metrics['train'].items():
          tf
.summary.scalar(name, value, step=round_num)

     
# Add broadcasted and aggregated data size to Tensorboard.
      tf
.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf
.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer
.flush()
# Clean the log directory to avoid conflicts.
try:
  tf
.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
 
pass  # Path doesn't exist

# Set up the log directory and writer for Tensorboard.
logdir
= "/tmp/logs/scalars/original/"
summary_writer
= tf.summary.create_file_writer(logdir)

train
(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round
=10, summary_writer=summary_writer)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit

Запустите TensorBoard с указанным выше корневым каталогом журналов, чтобы отобразить показатели обучения. Загрузка данных может занять несколько секунд. Кроме потерь и точности, мы также выводим количество переданных и агрегированных данных. Широковещательные данные относятся к тензорам, которые сервер отправляет каждому клиенту, в то время как агрегированные данные относятся к тензорам, которые каждый клиент возвращает серверу.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9135ef1630>

Создайте настраиваемую широковещательную и агрегатную функцию

Теперь давайте реализуем функцию использовать алгоритмы сжатия с потерями на транслируемых данных и агрегированных данных с использованием tensor_encoding API.

Сначала мы определяем две функции:

  • broadcast_encoder_fn , который создает экземпляр te.core.SimpleEncoder для кодирования тензоров или переменного сервер клиента связи (данные Broadcast).
  • mean_encoder_fn , который создает экземпляр te.core.GatherEncoder для кодирования тензоров или переменных клиента к серверу communicaiton (данные Aggregation).

Важно отметить, что мы не применяем метод сжатия ко всей модели сразу. Вместо этого мы решаем, как (и нужно ли) сжимать каждую переменную модели независимо. Причина в том, что обычно небольшие переменные, такие как смещения, более чувствительны к неточности, и, будучи относительно небольшими, потенциальная экономия средств связи также относительно невелика. Следовательно, по умолчанию мы не сжимаем небольшие переменные. В этом примере мы применяем равномерное квантование к 8 битам (256 сегментов) для каждой переменной с более чем 10000 элементов и применяем идентичность только к другим переменным.

def broadcast_encoder_fn(value):
 
"""Function for building encoded broadcast."""
  spec
= tf.TensorSpec(value.shape, value.dtype)
 
if value.shape.num_elements() > 10000:
   
return te.encoders.as_simple_encoder(
        te
.encoders.uniform_quantization(bits=8), spec)
 
else:
   
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(tensor_spec):
 
"""Function for building a GatherEncoder."""
  spec
= tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
 
if tensor_spec.shape.num_elements() > 10000:
   
return te.encoders.as_gather_encoder(
        te
.encoders.uniform_quantization(bits=8), spec)
 
else:
   
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

ПТФ предоставляет API - интерфейсы для преобразования функции кодера в формат , который tff.learning.build_federated_averaging_process API может потреблять. Используя tff.learning.framework.build_encoded_broadcast_from_model и tff.aggregators.MeanFactory , мы можем создать два объекта , которые могут быть переданы в broadcast_process и model_update_aggregation_factory agruments из tff.learning.build_federated_averaging_process для создания федеративного Алгоритмы осреднения с алгоритмом сжатия с потерями.

encoded_broadcast_process = (
    tff
.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn
, broadcast_encoder_fn))

mean_factory
= tff.aggregators.MeanFactory(
    tff
.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
    tff
.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)

federated_averaging_with_compression
= tff.learning.build_federated_averaging_process(
    tff_model_fn
,
    client_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process
=encoded_broadcast_process,
    model_update_aggregation_factory
=mean_factory)

Снова тренируем модель

Теперь запустим новый алгоритм федеративного усреднения.

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression
= tf.summary.create_file_writer(
    logdir_for_compression
)

train
(federated_averaging_process=federated_averaging_with_compression,
      num_rounds
=10,
      num_clients_per_round
=10,
      summary_writer
=summary_writer_for_compression)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit

Снова запустите TensorBoard, чтобы сравнить показатели обучения между двумя запусками.

Как вы можете видеть в Tensorboard, существует значительное сокращение между orginial и compression кривыми в broadcasted_bits и aggregated_bits участках , а в loss и sparse_categorical_accuracy участке две кривые довольно похожи.

В заключение, мы реализовали алгоритм сжатия, который может достигать производительности, аналогичной исходному алгоритму федеративного усреднения, при этом значительно снижается стоимость коммутации.

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9140eb5ef0>

Упражнения

Чтобы реализовать собственный алгоритм сжатия и применить его к циклу обучения, вы можете:

  1. Реализовать новый алгоритм сжатия как подкласс EncodingStageInterface или его более общего вариант, AdaptiveEncodingStageInterface следующего данного примера .
  2. Построить свой новый Encoder и специализироваться его для модели вещания или модель усреднения обновлений .
  3. Используйте эти объекты , чтобы построить весь расчет обучения .

К потенциально ценным открытым вопросам исследования относятся: неравномерное квантование, сжатие без потерь, такое как кодирование Хаффмана, и механизмы адаптации сжатия на основе информации из предыдущих циклов обучения.

Рекомендуемые материалы для чтения: