TFF สำหรับการวิจัยการเรียนรู้แบบสหพันธรัฐ: แบบจำลองและการปรับปรุงการบีบอัด

ในการกวดวิชานี้เราจะใช้ EMNIST ชุดข้อมูลที่แสดงให้เห็นถึงวิธีการเปิดใช้อัลกอริทึมการบีบอัด lossy เพื่อลดค่าใช้จ่ายในการติดต่อสื่อสารในขั้นตอนวิธีการ Averaging สหพันธ์ใช้ tff.learning.build_federated_averaging_process API และ tensor_encoding API สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับขั้นตอนวิธีการ Averaging สหพันธ์ดูกระดาษ การเรียนรู้การสื่อสารที่มีประสิทธิภาพของเครือข่ายการกระจายอำนาจลึกจากข้อมูล

ก่อนที่เราจะเริ่มต้น

ก่อนที่เราจะเริ่ม โปรดเรียกใช้สิ่งต่อไปนี้เพื่อให้แน่ใจว่าสภาพแวดล้อมของคุณได้รับการตั้งค่าอย่างถูกต้อง หากคุณไม่เห็นคำทักทายโปรดดูที่ การติดตั้ง คู่มือสำหรับคำแนะนำ

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade tensorflow-model-optimization
!pip install --quiet --upgrade nest-asyncio

import nest_asyncio
nest_asyncio
.apply()
%load_ext tensorboard

import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te

ตรวจสอบว่า TFF ทำงานหรือไม่

@tff.federated_computation
def hello_world():
 
return 'Hello, World!'

hello_world
()
b'Hello, World!'

การเตรียมข้อมูลเข้า

ในส่วนนี้ เราจะโหลดและประมวลผลชุดข้อมูล EMNIST ที่รวมอยู่ใน TFF ล่วงหน้า กรุณาตรวจสอบ สหพันธ์เรียนรู้สำหรับภาพการจำแนกประเภท กวดวิชาสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับชุด EMNIST

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE
= 418

CLIENT_EPOCHS_PER_ROUND
= 1
CLIENT_BATCH_SIZE
= 20
TEST_BATCH_SIZE
= 500

emnist_train
, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits
=True)

def reshape_emnist_element(element):
 
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
 
"""Preprocessing function for the EMNIST training dataset."""
 
return (dataset
         
# Shuffle according to the largest client dataset
         
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
         
# Repeat to do multiple local epochs
         
.repeat(CLIENT_EPOCHS_PER_ROUND)
         
# Batch to a fixed client batch size
         
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
         
# Preprocessing step
         
.map(reshape_emnist_element))

emnist_train
= emnist_train.preprocess(preprocess_train_dataset)

การกำหนดแบบจำลอง

ที่นี่เรากำหนดรูปแบบ keras บนพื้นฐานของ orginial FedAvg ซีเอ็นเอ็นและแล้วห่อรุ่น keras ในตัวอย่างของ tff.learning.Model เพื่อที่จะสามารถนำมาบริโภคโดยฉิบหาย

โปรดทราบว่าเราจะต้องฟังก์ชั่นซึ่งเป็นผู้ผลิตแบบแทนที่จะรูปแบบโดยตรง นอกจากนี้ฟังก์ชั่นไม่สามารถเพียงแค่จับรุ่นก่อนสร้างขึ้นนั้นจะต้องสร้างแบบจำลองในบริบทที่ว่ามันจะเรียกว่า เหตุผลก็คือ TFF ได้รับการออกแบบมาให้ใช้กับอุปกรณ์ และจำเป็นต้องควบคุมเมื่อทรัพยากรถูกสร้างขึ้นเพื่อให้สามารถดักจับและบรรจุหีบห่อได้

def create_original_fedavg_cnn_model(only_digits=True):
 
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format
= 'channels_last'

  max_pool
= functools.partial(
      tf
.keras.layers.MaxPooling2D,
      pool_size
=(2, 2),
      padding
='same',
      data_format
=data_format)
  conv2d
= functools.partial(
      tf
.keras.layers.Conv2D,
      kernel_size
=5,
      padding
='same',
      data_format
=data_format,
      activation
=tf.nn.relu)

  model
= tf.keras.models.Sequential([
      tf
.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d
(filters=32),
      max_pool
(),
      conv2d
(filters=64),
      max_pool
(),
      tf
.keras.layers.Flatten(),
      tf
.keras.layers.Dense(512, activation=tf.nn.relu),
      tf
.keras.layers.Dense(10 if only_digits else 62),
      tf
.keras.layers.Softmax(),
 
])

 
return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec
= emnist_train.create_tf_dataset_for_client(
    emnist_train
.client_ids[0]).element_spec

def tff_model_fn():
  keras_model
= create_original_fedavg_cnn_model()
 
return tff.learning.from_keras_model(
      keras_model
=keras_model,
      input_spec
=input_spec,
      loss
=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics
=[tf.keras.metrics.SparseCategoricalAccuracy()])

การฝึกอบรมแบบจำลองและการส่งออกตัวชี้วัดการฝึกอบรม

ตอนนี้ เราพร้อมที่จะสร้างอัลกอริธึม Federated Averaging และฝึกโมเดลที่กำหนดไว้บนชุดข้อมูล EMNIST

ครั้งแรกที่เราจำเป็นต้องสร้างอัลกอริทึม Averaging สหพันธ์ใช้ tff.learning.build_federated_averaging_process API

federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn
=tff_model_fn,
    client_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

ตอนนี้ มาเรียกใช้อัลกอริทึม Federated Averaging กัน การดำเนินการของอัลกอริธึม Federated Learning จากมุมมองของ TFF มีลักษณะดังนี้:

  1. เริ่มต้นอัลกอริทึมและรับสถานะเซิร์ฟเวอร์เริ่มต้น สถานะเซิร์ฟเวอร์มีข้อมูลที่จำเป็นในการดำเนินการอัลกอริทึม โปรดจำไว้ว่า เนื่องจาก TFF ใช้งานได้ สถานะนี้รวมทั้งสถานะของตัวเพิ่มประสิทธิภาพใดๆ ที่อัลกอริธึมใช้ (เช่น เงื่อนไขโมเมนตัม) เช่นเดียวกับพารามิเตอร์ของโมเดลด้วยตัวมันเอง -- สิ่งเหล่านี้จะถูกส่งผ่านเป็นอาร์กิวเมนต์และส่งคืนเป็นผลจากการคำนวณ TFF
  2. ดำเนินการอัลกอริทึมทีละรอบ ในแต่ละรอบ สถานะเซิร์ฟเวอร์ใหม่จะถูกส่งกลับอันเป็นผลมาจากการที่ไคลเอ็นต์แต่ละรายฝึกโมเดลเกี่ยวกับข้อมูลของตน โดยปกติในรอบเดียว:
    1. เซิร์ฟเวอร์เผยแพร่แบบจำลองให้กับลูกค้าที่เข้าร่วมทั้งหมด
    2. ลูกค้าแต่ละรายทำงานตามแบบจำลองและข้อมูลของตัวเอง
    3. เซิร์ฟเวอร์รวมโมเดลทั้งหมดเพื่อสร้างสถานะเซิร์ฟเวอร์ซึ่งมีโมเดลใหม่

สำหรับรายละเอียดเพิ่มเติมโปรดดู ที่กำหนดเองสหพันธ์อัลกอริทึม, ส่วนที่ 2: การใช้สหพันธ์ Averaging กวดวิชา

เมตริกการฝึกอบรมจะถูกเขียนลงในไดเร็กทอรี Tensorboard เพื่อแสดงหลังการฝึกอบรม

โหลดฟังก์ชั่นยูทิลิตี้

def format_size(size):
 
"""A helper function for creating a human-readable size."""
  size
= float(size)
 
for unit in ['bit','Kibit','Mibit','Gibit']:
   
if size < 1024.0:
     
return "{size:3.2f}{unit}".format(size=size, unit=unit)
    size
/= 1024.0
 
return "{size:.2f}{unit}".format(size=size, unit='TiB')

def set_sizing_environment():
 
"""Creates an environment that contains sizing information."""
 
# Creates a sizing executor factory to output communication cost
 
# after the training finishes. Note that sizing executor only provides an
 
# estimate (not exact) of communication cost, and doesn't capture cases like
 
# compression of over-the-wire representations. However, it's perfect for
 
# demonstrating the effect of compression in this tutorial.
  sizing_factory
= tff.framework.sizing_executor_factory()

 
# TFF has a modular runtime you can configure yourself for various
 
# environments and purposes, and this example just shows how to configure one
 
# part of it to report the size of things.
  context
= tff.framework.ExecutionContext(executor_fn=sizing_factory)
  tff
.framework.set_default_context(context)

 
return sizing_factory

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
 
"""Trains the federated averaging process and output metrics."""
 
# Create a environment to get communication cost.
  environment
= set_sizing_environment()

 
# Initialize the Federated Averaging algorithm to get the initial server state.
  state
= federated_averaging_process.initialize()

 
with summary_writer.as_default():
   
for round_num in range(num_rounds):
     
# Sample the clients parcitipated in this round.
      sampled_clients
= np.random.choice(
          emnist_train
.client_ids,
          size
=num_clients_per_round,
          replace
=False)
     
# Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data
= [
          emnist_train
.create_tf_dataset_for_client(client)
         
for client in sampled_clients
     
]
     
# Round one round of the algorithm based on the server state and client data
     
# and output the new state and metrics.
      state
, metrics = federated_averaging_process.next(state, sampled_train_data)

     
# For more about size_info, please see https://www.tensorflow.org/federated/api_docs/python/tff/framework/SizeInfo
      size_info
= environment.get_size_info()
      broadcasted_bits
= size_info.broadcast_bits[-1]
      aggregated_bits
= size_info.aggregate_bits[-1]

     
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

     
# Add metrics to Tensorboard.
     
for name, value in metrics['train'].items():
          tf
.summary.scalar(name, value, step=round_num)

     
# Add broadcasted and aggregated data size to Tensorboard.
      tf
.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf
.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer
.flush()
# Clean the log directory to avoid conflicts.
try:
  tf
.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
 
pass  # Path doesn't exist

# Set up the log directory and writer for Tensorboard.
logdir
= "/tmp/logs/scalars/original/"
summary_writer
= tf.summary.create_file_writer(logdir)

train
(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round
=10, summary_writer=summary_writer)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07383774), ('loss', 2.3276227)])), ('stat', OrderedDict([('num_examples', 1097)]))]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.099585064), ('loss', 2.3152695)])), ('stat', OrderedDict([('num_examples', 964)]))]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.24Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09760766), ('loss', 2.3077576)])), ('stat', OrderedDict([('num_examples', 1045)]))]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.0963035), ('loss', 2.3066626)])), ('stat', OrderedDict([('num_examples', 1028)]))]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10694184), ('loss', 2.3033001)])), ('stat', OrderedDict([('num_examples', 1066)]))]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.1185567), ('loss', 2.2999184)])), ('stat', OrderedDict([('num_examples', 970)]))]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.11751663), ('loss', 2.296883)])), ('stat', OrderedDict([('num_examples', 902)]))]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13063477), ('loss', 2.2990246)])), ('stat', OrderedDict([('num_examples', 1087)]))]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12742382), ('loss', 2.2971866)])), ('stat', OrderedDict([('num_examples', 1083)]))]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13555992), ('loss', 2.2934425)])), ('stat', OrderedDict([('num_examples', 1018)]))]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit

เริ่ม TensorBoard ด้วยไดเร็กทอรีบันทึกของรูทที่ระบุด้านบนเพื่อแสดงเมตริกการฝึกอบรม อาจใช้เวลาสองสามวินาทีในการโหลดข้อมูล ยกเว้นการสูญเสียและความแม่นยำ เรายังส่งออกจำนวนข้อมูลที่ออกอากาศและรวม ข้อมูลที่ออกอากาศหมายถึงเทนเซอร์ที่เซิร์ฟเวอร์ส่งไปยังแต่ละไคลเอ็นต์ ในขณะที่ข้อมูลที่รวมหมายถึงเมตริกซ์ที่ไคลเอ็นต์แต่ละรายส่งคืนไปยังเซิร์ฟเวอร์

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:53:14 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9135ef1630>

สร้างฟังก์ชันการออกอากาศและการรวมแบบกำหนดเอง

ตอนนี้ขอใช้ฟังก์ชั่นการใช้อัลกอริทึมการบีบอัด lossy กับข้อมูลการแพร่ภาพและข้อมูลที่รวบรวมโดยใช้ tensor_encoding API

ขั้นแรก เรากำหนดสองฟังก์ชัน:

  • broadcast_encoder_fn ซึ่งจะสร้างตัวอย่างของ te.core.SimpleEncoder เพื่อเทนเซอร์เข้ารหัสหรือตัวแปรในเซิร์ฟเวอร์สื่อสารของลูกค้า (ข้อมูล Broadcast)
  • mean_encoder_fn ซึ่งจะสร้างตัวอย่างของ te.core.GatherEncoder เพื่อเทนเซอร์เข้ารหัสหรือตัวแปรในลูกค้า communicaiton เซิร์ฟเวอร์ (ข้อมูล Aggregation)

สิ่งสำคัญคือต้องทราบว่าเราไม่ได้ใช้วิธีการบีบอัดกับโมเดลทั้งหมดในคราวเดียว แต่เราตัดสินใจว่าจะบีบอัดตัวแปรแต่ละตัวของโมเดลอย่างไร (และหรือไม่) อย่างอิสระ เหตุผลก็คือว่าโดยทั่วไป ตัวแปรขนาดเล็ก เช่น ความเอนเอียง มีความอ่อนไหวต่อความไม่ถูกต้องมากกว่า และเมื่อมีขนาดค่อนข้างเล็ก ความสามารถในการประหยัดการสื่อสารก็ค่อนข้างน้อยเช่นกัน ดังนั้นเราจึงไม่บีบอัดตัวแปรขนาดเล็กโดยค่าเริ่มต้น ในตัวอย่างนี้ เราใช้การหาปริมาณแบบสม่ำเสมอกับ 8 บิต (256 บัคเก็ต) กับทุกตัวแปรที่มีองค์ประกอบมากกว่า 10,000 รายการ และใช้ข้อมูลเฉพาะตัวกับตัวแปรอื่นๆ เท่านั้น

def broadcast_encoder_fn(value):
 
"""Function for building encoded broadcast."""
  spec
= tf.TensorSpec(value.shape, value.dtype)
 
if value.shape.num_elements() > 10000:
   
return te.encoders.as_simple_encoder(
        te
.encoders.uniform_quantization(bits=8), spec)
 
else:
   
return te.encoders.as_simple_encoder(te.encoders.identity(), spec)


def mean_encoder_fn(tensor_spec):
 
"""Function for building a GatherEncoder."""
  spec
= tf.TensorSpec(tensor_spec.shape, tensor_spec.dtype)
 
if tensor_spec.shape.num_elements() > 10000:
   
return te.encoders.as_gather_encoder(
        te
.encoders.uniform_quantization(bits=8), spec)
 
else:
   
return te.encoders.as_gather_encoder(te.encoders.identity(), spec)

ฉิบหายให้ API เพื่อแปลงฟังก์ชั่นการเข้ารหัสในรูปแบบที่ tff.learning.build_federated_averaging_process API สามารถใช้ โดยใช้ tff.learning.framework.build_encoded_broadcast_from_model และ tff.aggregators.MeanFactory เราสามารถสร้างวัตถุสองที่สามารถผ่านเข้าสู่ broadcast_process และ model_update_aggregation_factory agruments ของ tff.learning.build_federated_averaging_process การสร้างอัลกอริทึมสหพันธ์เฉลี่ยด้วยวิธีการบีบอัด lossy

encoded_broadcast_process = (
    tff
.learning.framework.build_encoded_broadcast_process_from_model(
        tff_model_fn
, broadcast_encoder_fn))

mean_factory
= tff.aggregators.MeanFactory(
    tff
.aggregators.EncodedSumFactory(mean_encoder_fn), # numerator
    tff
.aggregators.EncodedSumFactory(mean_encoder_fn), # denominator
)

federated_averaging_with_compression
= tff.learning.build_federated_averaging_process(
    tff_model_fn
,
    client_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn
=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    broadcast_process
=encoded_broadcast_process,
    model_update_aggregation_factory
=mean_factory)

ฝึกโมเดลอีกแล้ว

ตอนนี้ มาเรียกใช้อัลกอริทึม Federated Averaging ใหม่กัน

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression
= tf.summary.create_file_writer(
    logdir_for_compression
)

train
(federated_averaging_process=federated_averaging_with_compression,
      num_rounds
=10,
      num_clients_per_round
=10,
      summary_writer
=summary_writer_for_compression)
round  0, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.093), ('loss', 2.3194966)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=146.46Mibit, aggregated_bits=146.46Mibit
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.10432034), ('loss', 2.3079953)])), ('stat', OrderedDict([('num_examples', 949)]))]), broadcasted_bits=292.92Mibit, aggregated_bits=292.93Mibit
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.07886754), ('loss', 2.3101337)])), ('stat', OrderedDict([('num_examples', 989)]))]), broadcasted_bits=439.38Mibit, aggregated_bits=439.39Mibit
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09774436), ('loss', 2.305069)])), ('stat', OrderedDict([('num_examples', 1064)]))]), broadcasted_bits=585.84Mibit, aggregated_bits=585.85Mibit
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09404097), ('loss', 2.302943)])), ('stat', OrderedDict([('num_examples', 1074)]))]), broadcasted_bits=732.30Mibit, aggregated_bits=732.32Mibit
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.09), ('loss', 2.304385)])), ('stat', OrderedDict([('num_examples', 1000)]))]), broadcasted_bits=878.77Mibit, aggregated_bits=878.78Mibit
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14368932), ('loss', 2.2973824)])), ('stat', OrderedDict([('num_examples', 1030)]))]), broadcasted_bits=1.00Gibit, aggregated_bits=1.00Gibit
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12140871), ('loss', 2.2993405)])), ('stat', OrderedDict([('num_examples', 1079)]))]), broadcasted_bits=1.14Gibit, aggregated_bits=1.14Gibit
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13600783), ('loss', 2.2953267)])), ('stat', OrderedDict([('num_examples', 1022)]))]), broadcasted_bits=1.29Gibit, aggregated_bits=1.29Gibit
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13844621), ('loss', 2.295768)])), ('stat', OrderedDict([('num_examples', 1004)]))]), broadcasted_bits=1.43Gibit, aggregated_bits=1.43Gibit

เริ่ม TensorBoard อีกครั้งเพื่อเปรียบเทียบเมตริกการฝึกระหว่างการวิ่งสองครั้ง

ที่คุณสามารถดูใน Tensorboard, มีการลดลงอย่างมีนัยสำคัญระหว่าง orginial และ compression โค้งใน broadcasted_bits และ aggregated_bits แปลงในขณะที่ loss และ sparse_categorical_accuracy พล็อตสองโค้งคล้ายสวย

โดยสรุป เราใช้อัลกอริธึมการบีบอัดที่สามารถบรรลุประสิทธิภาพที่คล้ายคลึงกันกับอัลกอริธึม Federated Averaging ดั้งเดิม ในขณะที่ต้นทุนการสื่อสารลดลงอย่างเห็นได้ชัด

%tensorboard --logdir /tmp/logs/scalars/ --port=0
Launching TensorBoard...
Reusing TensorBoard on port 34445 (pid 579503), started 1:54:12 ago. (Use '!kill 579503' to kill it.)
<IPython.core.display.Javascript at 0x7f9140eb5ef0>

การออกกำลังกาย

ในการปรับใช้อัลกอริธึมการบีบอัดแบบกำหนดเองและนำไปใช้กับลูปการฝึก คุณสามารถ:

  1. ใช้วิธีการบีบอัดใหม่เป็น subclass ของ EncodingStageInterface หรือตัวแปรทั่วไปมากขึ้นของ AdaptiveEncodingStageInterface ต่อไปนี้ ตัวอย่างนี้
  2. สร้างใหม่ของคุณ Encoder และเชี่ยวชาญมันสำหรับ รูปแบบการออกอากาศ หรือ รูปแบบการปรับปรุงค่าเฉลี่ย
  3. ใช้วัตถุเหล่านั้นที่จะสร้างทั้ง การคำนวณการฝึกอบรม

คำถามการวิจัยแบบเปิดที่อาจมีประโยชน์ ได้แก่ การหาปริมาณที่ไม่สม่ำเสมอ การบีบอัดแบบไม่สูญเสียข้อมูล เช่น การเข้ารหัส Huffman และกลไกในการปรับการบีบอัดตามข้อมูลจากรอบการฝึกครั้งก่อน

สื่อการอ่านที่แนะนำ: