TFF çok büyük bir modeli her istemci cihazı sadece indirme eğitmek için kullanılan ve kullanan modelin küçük bir kısmını günceller nasıl Bu eğitimde gösterileri tff.federated_select
ve seyrek agregasyonunu. Bu öğretici oldukça iken, kendi kendine yeten tff.federated_select
öğretici ve özel FL algoritmaları öğretici tekniklerden bazılarını burada kullanılan iyi sunumlar sağlar.
Somut olarak, bu derste, çok etiketli sınıflandırma için lojistik regresyonu ele alıyoruz ve bir kelime torbası özellik gösterimine dayalı olarak hangi "etiketlerin" bir metin dizisiyle ilişkili olduğunu tahmin ediyoruz. Önemli olan, iletişim ve istemci tarafı hesaplama maliyetleri sabit bir sabiti (tarafından kontrol edilmektedir MAX_TOKENS_SELECTED_PER_CLIENT
) ve pratik ortamlarda aşırı büyük olabilir genel kelime büyüklüğü ile ölçekli değil.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
import collections
import itertools
import numpy as np
from typing import Callable, List, Tuple
import tensorflow as tf
import tensorflow_federated as tff
Her istemci olacak federated_select
en fazla bu birçok benzersiz simgeleri için model ağırlıklarında satırları. Bu müşterinin yerel modelinin boyutu ve sunucunun bir miktarının üst sınırları -> müşteri ( federated_select
) ve istemci -> sunucu (federated_aggregate
) iletişimi gerçekleştirilir.
Model yakınsaması etkilenebilse de, bunu 1 olarak küçük (her istemciden tüm belirteçlerin seçilmediğinden emin olarak) veya büyük bir değer olarak ayarlasanız bile bu öğretici yine de doğru şekilde çalışmalıdır.
Ayrıca çeşitli türler için birkaç sabit tanımlıyoruz. Bu CoLab, bir belirteç veri kümesi ayrıştırma sonra belirli bir kelime için bir tamsayıdır tanımlayıcıdır.
# There are some constraints on types
# here that will require some explicit type conversions:
# - `tff.federated_select` requires int32
# - `tf.SparseTensor` requires int64 indices.
TOKEN_DTYPE = tf.int64
# Type for counts of token occurences.
# A sparse feature vector can be thought of as a map
# Our features are {0, 1} indicators, so we could potentially
# use tf.int8 as an optimization.
FEATURE_DTYPE = tf.int32
Sorunu kurma: Veri Kümesi ve Model
Bu eğitimde kolay deneme için küçük bir oyuncak veri seti oluşturuyoruz. Ancak, veri kümesinin formatı ile uyumlu olan Federe StackOverflow'daki ve ön işleme ve model mimari ve StackOverflow etiketi tahmini problemden kabul edilir Adaptif Federe Optimizasyonu .
Veri kümesi ayrıştırma ve ön işleme
BatchType = collections.namedtuple('BatchType', ['tokens', 'tags'])
def build_to_ids_fn(word_vocab: List[str],
tag_vocab: List[str]) -> Callable[[tf.Tensor], tf.Tensor]:
"""Constructs a function mapping examples to sequences of token indices."""
word_table_values = np.arange(len(word_vocab), dtype=np.int64)
word_table = tf.lookup.StaticVocabularyTable(
tf.lookup.KeyValueTensorInitializer(word_vocab, word_table_values),
tag_table_values = np.arange(len(tag_vocab), dtype=np.int64)
tag_table = tf.lookup.StaticVocabularyTable(
tf.lookup.KeyValueTensorInitializer(tag_vocab, tag_table_values),
def to_ids(example):
"""Converts a Stack Overflow example to a bag-of-words/tags format."""
sentence = tf.strings.join([example['tokens'], example['title']],
separator=' ')
# We represent that label (output tags) densely.
raw_tags = example['tags']
tags = tf.strings.split(raw_tags, sep='|')
tags = tag_table.lookup(tags)
tags, _ = tf.unique(tags)
tags = tf.one_hot(tags, len(tag_vocab) + NUM_OOV_BUCKETS)
tags = tf.reduce_max(tags, axis=0)
# We represent the features as a SparseTensor of {0, 1}s.
words = tf.strings.split(sentence)
tokens = word_table.lookup(words)
tokens, _ = tf.unique(tokens)
# Note: We could choose to use the word counts as the feature vector
# instead of just {0, 1} values (see tf.unique_with_counts).
tokens = tf.reshape(tokens, shape=(tf.size(tokens), 1))
tokens_st = tf.SparseTensor(
tf.ones(tf.size(tokens), dtype=FEATURE_DTYPE),
dense_shape=(len(word_vocab) + NUM_OOV_BUCKETS,))
tokens_st = tf.sparse.reorder(tokens_st)
return BatchType(tokens_st, tags)
return to_ids
def build_preprocess_fn(word_vocab, tag_vocab):
def preprocess_fn(dataset):
to_ids = build_to_ids_fn(word_vocab, tag_vocab)
# We *don't* shuffle in order to make this colab deterministic for
# easier testing and reproducibility.
# But real-world training should use `.shuffle()`.
return dataset.map(to_ids, num_parallel_calls=tf.data.experimental.AUTOTUNE)
return preprocess_fn
Küçük bir oyuncak veri seti
12 kelime ve 3 müşteriden oluşan global bir kelime hazinesi ile küçük bir oyuncak veri seti oluşturuyoruz. Bu küçük, örneğin kenar durumlarda test etmek için yararlıdır (örneğin, biz daha az olan iki müşteri bilgisi MAX_TOKENS_SELECTED_PER_CLIENT = 6
ve kod geliştirme farklı jeton ve bir daha fazlası ile).
Bununla birlikte, bu yaklaşımın gerçek dünyadaki kullanım örnekleri, her istemcide görünen belki de 1000'lerce farklı jeton ile 10 milyon veya daha fazla küresel kelime hazinesi olacaktır. Verilerin formatı aynı olduğundan, daha gerçekçi testbed sorunlarına uzatma, örneğin tff.simulation.datasets.stackoverflow.load_data()
veri kümesi, basit olmalıdır.
İlk olarak, kelime ve etiket kelime dağarcığımızı tanımlarız.
# Features
FRUIT_WORDS = ['apple', 'orange', 'pear', 'kiwi']
VEGETABLE_WORDS = ['carrot', 'broccoli', 'arugula', 'peas']
FISH_WORDS = ['trout', 'tuna', 'cod', 'salmon']
# Labels
Şimdi, küçük yerel veri kümeleriyle 3 istemci oluşturuyoruz. Bu öğreticiyi colab'de çalıştırıyorsanız, aşağıda geliştirilen işlevlerin çıktısını yorumlamak/kontrol etmek için bu hücreyi ve çıktısını sabitlemek için "sekmedeki ayna hücresi" özelliğini kullanmak yararlı olabilir.
preprocess_fn = build_preprocess_fn(WORD_VOCAB, TAG_VOCAB)
def make_dataset(raw):
d = tf.data.Dataset.from_tensor_slices(
# Matches the StackOverflow formatting
tokens=tf.constant([t[0] for t in raw]),
tags=tf.constant([t[1] for t in raw]),
title=['' for _ in raw]))
d = preprocess_fn(d)
return d
# 4 distinct tokens
CLIENT1_DATASET = make_dataset([
('apple orange apple orange', 'FRUIT'),
('carrot trout', 'VEGETABLE|FISH'),
('orange apple', 'FRUIT'),
('orange', 'ORANGE|CITRUS') # 2 OOV tag
# 6 distinct tokens
CLIENT2_DATASET = make_dataset([
('pear cod', 'FRUIT|FISH'),
('arugula peas', 'VEGETABLE'),
('kiwi pear', 'FRUIT'),
('sturgeon', 'FISH'), # OOV word
('sturgeon bass', 'FISH') # 2 OOV words
# A client with all possible words & tags (13 distinct tokens).
# With MAX_TOKENS_SELECTED_PER_CLIENT = 6, we won't download the model
# slices for all tokens that occur on this client.
CLIENT3_DATASET = make_dataset([
(' '.join(WORD_VOCAB + ['oovword']), '|'.join(TAG_VOCAB)),
# Mathe the OOV token and 'salmon' occur in the largest number
# of examples on this client:
('salmon oovword', 'FISH|OOVTAG')
print('Word vocab')
for i, word in enumerate(WORD_VOCAB):
print(f'{i:2d} {word}')
print('\nTag vocab')
for i, tag in enumerate(TAG_VOCAB):
print(f'{i:2d} {tag}')
Word vocab 0 apple 1 orange 2 pear 3 kiwi 4 carrot 5 broccoli 6 arugula 7 peas 8 trout 9 tuna 10 cod 11 salmon Tag vocab 0 FRUIT 1 VEGETABLE 2 FISH
Giriş özelliklerinin (belirteçler/kelimeler) ve etiketlerin (yazı etiketleri) ham sayıları için sabitleri tanımlayın. Bizim asıl giriş / çıkış mekanlardır NUM_OOV_BUCKETS = 1
biz oov belirteç / etiket eklemek çünkü daha büyük.
Veri kümelerinin toplu sürümlerini ve ilerledikçe kodu test etmede faydalı olacak ayrı toplu grupları oluşturun.
batched_dataset1 = CLIENT1_DATASET.batch(2)
batched_dataset2 = CLIENT2_DATASET.batch(3)
batched_dataset3 = CLIENT3_DATASET.batch(2)
batch1 = next(iter(batched_dataset1))
batch2 = next(iter(batched_dataset2))
batch3 = next(iter(batched_dataset3))
Seyrek girdileri olan bir model tanımlayın
Her etiket için basit bir bağımsız lojistik regresyon modeli kullanıyoruz.
def create_logistic_model(word_vocab_size: int, vocab_tags_size: int):
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(word_vocab_size,), sparse=True),
# For simplicity, don't use a bias vector; this means the model
# is a single tensor, and we only need sparse aggregation of
# the per-token slices of the model. Generalizing to also handle
# other model weights that are fully updated
# (non-dense broadcast and aggregate) would be a good exercise.
return model
Önce tahminlerde bulunarak çalıştığından emin olalım:
model = create_logistic_model(WORD_VOCAB_SIZE, TAG_VOCAB_SIZE)
p = model.predict(batch1.tokens)
[[0.5 0.5 0.5 0.5] [0.5 0.5 0.5 0.5]]
Ve bazı basit merkezi eğitim:
model.train_on_batch(batch1.tokens, batch1.tags)
Birleşik hesaplama için yapı taşları
Biz basit bir versiyonunu uygulayacak Federe Averaging her bir cihaz sadece modelin ilgili bir alt kümesi indirir anahtar farkla algoritması ve sadece o alt kümesine güncellemeler katkıda bulunur.
Biz kullanmak M
. Yüksek düzeyde, bir tur eğitim şu adımları içerir:
Katılan her istemci, giriş dizelerini ayrıştırarak ve bunları doğru belirteçlerle (int dizinleri) eşleştirerek yerel veri kümesini tarar. Bu, küresel (büyük) sözlüğüne erişimi (bu potansiyel kullanılarak önlenebilir gerektirir özelliği karma teknikleri). Daha sonra seyrek olarak her bir belirtecin kaç kez oluştuğunu sayarız. Eğer
eşsiz belirteçleri cihazda meydana biz tercihnum_actual_tokens = min(U, M)
trene en sık belirteçleri.İstemciler kullanmak
model katsayıları almak içinnum_actual_tokens
sunucudan belirteçleri seçilmiş. Her model dilim şeklinde bir tensör olan(TAG_VOCAB_SIZE, )
müşteriye iletilen toplam büyüklüğü en fazla olacak şekilde,TAG_VOCAB_SIZE * M
(aşağıdaki nota bakınız).İstemcilerin bir eşleme inşa
global_token -> local_token
yerel belirteç (int endeksi) seçilen bir simge listesinde küresel belirteci endeksini;.Yalnızca en az katsayıları vardır küresel modelin bir "küçük" sürümünü kullanmak
aralığından, jeton[0, num_actual_tokens)
.global -> local
haritalama seçilen modelin dilimlerinden gelen bu modelin yoğun parametreleri başlatmak için kullanılır.Müşteriler ile ön işlemden verilere SGD kullanarak kendi yerel modelini eğitmek
global -> local
haritalama.Müşteriler kendi lokal modelin parametrelerini çevirmek
kullanarak güncellemelerilocal -> global
dizine satırları eşleştirmesini. Sunucu, bu güncellemeleri seyrek toplam toplama kullanarak toplar.Sunucu, yukarıdaki toplamanın (yoğun) sonucunu alır, katılan istemci sayısına böler ve elde edilen ortalama güncellemeyi global modele uygular.
Bu bölümde daha sonra nihai bir araya getirilecek bu adımlar için yapı taşları inşa federated_computation
yakalamaları bu bir eğitim atışı dolu mantığı.
İstemci belirteçleri sayın ve hangi modelin dilimleri karar federated_select
Her cihazın, modelin hangi "dilimlerinin" yerel eğitim veri seti ile ilgili olduğuna karar vermesi gerekir. Bizim problemimiz için, bunu (nadiren!) müşteri eğitim veri setinde her bir tokenin kaç tane örnek içerdiğini sayarak yapıyoruz.
def token_count_fn(token_counts, batch):
"""Adds counts from `batch` to the running `token_counts` sum."""
# Sum across the batch dimension.
flat_tokens = tf.sparse.reduce_sum(
batch.tokens, axis=0, output_is_sparse=True)
flat_tokens = tf.cast(flat_tokens, dtype=TOKEN_COUNT_DTYPE)
return tf.sparse.add(token_counts, flat_tokens)
# Simple tests
# Create the initial zero token counts using empty tensors.
initial_token_counts = tf.SparseTensor(
indices=tf.zeros(shape=(0, 1), dtype=TOKEN_DTYPE),
values=tf.zeros(shape=(0,), dtype=TOKEN_COUNT_DTYPE),
client_token_counts = batched_dataset1.reduce(initial_token_counts,
tokens = tf.reshape(client_token_counts.indices, (-1,)).numpy()
print('tokens:', tokens)
np.testing.assert_array_equal(tokens, [0, 1, 4, 8])
# The count is the number of *examples* in which the token/word
# occurs, not the total number of occurences, since we still featurize
# multiple occurences in the same example as a "1".
counts = client_token_counts.values.numpy()
print('counts:', counts)
np.testing.assert_array_equal(counts, [2, 3, 1, 1])
tokens: [0 1 4 8] counts: [2 3 1 1]
Biz tekabül modeli parametreleri seçecektir MAX_TOKENS_SELECTED_PER_CLIENT
en sık cihazda belirteçleri ortaya çıkan. Bu birçok jeton daha az cihazda oluşursa, biz ped liste kullanımını etkinleştirmek için federated_select
Diğer stratejilerin muhtemelen daha iyi olduğuna dikkat edin, örneğin jetonları rastgele seçmek (belki de oluşma olasılıklarına göre). Bu, modelin tüm dilimlerinin (istemcinin verilerine sahip olduğu) güncellenme şansına sahip olmasını sağlayacaktır.
def keys_for_client(client_dataset, max_tokens_per_client):
"""Computes a set of max_tokens_per_client keys."""
initial_token_counts = tf.SparseTensor(
indices=tf.zeros((0, 1), dtype=TOKEN_DTYPE),
values=tf.zeros((0,), dtype=TOKEN_COUNT_DTYPE),
client_token_counts = client_dataset.reduce(initial_token_counts,
# Find the most-frequently occuring tokens
tokens = tf.reshape(client_token_counts.indices, shape=(-1,))
counts = client_token_counts.values
perm = tf.argsort(counts, direction='DESCENDING')
tokens = tf.gather(tokens, perm)
counts = tf.gather(counts, perm)
num_raw_tokens = tf.shape(tokens)[0]
actual_num_tokens = tf.minimum(max_tokens_per_client, num_raw_tokens)
selected_tokens = tokens[:actual_num_tokens]
paddings = [[0, max_tokens_per_client - tf.shape(selected_tokens)[0]]]
padded_tokens = tf.pad(selected_tokens, paddings=paddings)
# Make sure the type is statically determined
padded_tokens = tf.reshape(padded_tokens, shape=(max_tokens_per_client,))
# We will pass these tokens as keys into `federated_select`, which
# requires SELECT_KEY_DTYPE=tf.int32 keys.
padded_tokens = tf.cast(padded_tokens, dtype=SELECT_KEY_DTYPE)
return padded_tokens, actual_num_tokens
# Simple test
# Case 1: actual_num_tokens > max_tokens_per_client
selected_tokens, actual_num_tokens = keys_for_client(batched_dataset1, 3)
assert tf.size(selected_tokens) == 3
assert actual_num_tokens == 3
# Case 2: actual_num_tokens < max_tokens_per_client
selected_tokens, actual_num_tokens = keys_for_client(batched_dataset1, 10)
assert tf.size(selected_tokens) == 10
assert actual_num_tokens == 4
Küresel belirteçleri yerel belirteçlerle eşleyin
Yukarıdaki seçim bize aralığında jeton yoğun bir dizi verir [0, actual_num_tokens)
biz cihaz modeli için hangi. Ancak, okumak veri kümesi çok daha büyük küresel kelime aralığından belirteç var [0, WORD_VOCAB_SIZE)
Bu nedenle, küresel belirteçleri karşılık gelen yerel belirteçleriyle eşleştirmemiz gerekir. Yerel belirteç kimlikleri basit bir şekilde dizinler tarafından verilir selected_tokens
önceki adımda hesaplanır tensörü.
def map_to_local_token_ids(client_data, client_keys):
global_to_local = tf.lookup.StaticHashTable(
# Note int32 -> int64 maps are not supported
keys=tf.cast(client_keys, dtype=TOKEN_DTYPE),
# Note we need to use tf.shape, not the static
# shape client_keys.shape[0]
values=tf.range(0, limit=tf.shape(client_keys)[0],
# We use -1 for tokens that were not selected, which can occur for clients
# with more than MAX_TOKENS_SELECTED_PER_CLIENT distinct tokens.
# We will simply remove these invalid indices from the batch below.
def to_local_ids(sparse_tokens):
indices_t = tf.transpose(sparse_tokens.indices)
batch_indices = indices_t[0] # First column
tokens = indices_t[1] # Second column
tokens = tf.map_fn(
lambda global_token_id: global_to_local.lookup(global_token_id), tokens)
# Remove tokens that aren't actually available (looked up as -1):
available_tokens = tokens >= 0
tokens = tokens[available_tokens]
batch_indices = batch_indices[available_tokens]
updated_indices = tf.transpose(
tf.concat([[batch_indices], [tokens]], axis=0))
st = tf.sparse.SparseTensor(
tf.ones(tf.size(tokens), dtype=FEATURE_DTYPE),
st = tf.sparse.reorder(st)
return st
return client_data.map(lambda b: BatchType(to_local_ids(b.tokens), b.tags))
# Simple test
client_keys, actual_num_tokens = keys_for_client(
client_keys = client_keys[:actual_num_tokens]
d = map_to_local_token_ids(batched_dataset3, client_keys)
batch = next(iter(d))
all_tokens = tf.gather(batch.tokens.indices, indices=1, axis=1)
# Confirm we have local indices in the range [0, MAX):
assert tf.math.reduce_max(all_tokens) < MAX_TOKENS_SELECTED_PER_CLIENT
assert tf.math.reduce_max(all_tokens) >= 0
Her istemcide yerel (alt) modeli eğitin
Not federated_select
bir şekilde seçilen dilimleri dönecektir tf.data.Dataset
seçim tuşları aynı sırada. Bu nedenle, ilk önce böyle bir Veri Kümesini almak ve onu müşteri modelinin model ağırlıkları olarak kullanılabilecek tek bir yoğun tensöre dönüştürmek için bir fayda fonksiyonu tanımlarız.
def slices_dataset_to_tensor(slices_dataset):
"""Convert a dataset of slices to a tensor."""
# Use batching to gather all of the slices into a single tensor.
d = slices_dataset.batch(MAX_TOKENS_SELECTED_PER_CLIENT,
iter_d = iter(d)
tensor = next(iter_d)
# Make sure we have consumed everything
opt = iter_d.get_next_as_optional()
tf.Assert(tf.logical_not(opt.has_value()), data=[''], name='CHECK_EMPTY')
return tensor
# Simple test
weights = np.random.random(
model_slices_as_dataset = tf.data.Dataset.from_tensor_slices(weights)
weights2 = slices_dataset_to_tensor(model_slices_as_dataset)
np.testing.assert_array_equal(weights, weights2)
Artık her istemcide çalışacak basit bir yerel eğitim döngüsü tanımlamamız için gereken tüm bileşenlere sahibiz.
def client_train_fn(model, client_optimizer,
model_slices_as_dataset, client_data,
client_keys, actual_num_tokens):
initial_model_weights = slices_dataset_to_tensor(model_slices_as_dataset)
assert len(model.trainable_variables) == 1
# Only keep the "real" (unpadded) keys.
client_keys = client_keys[:actual_num_tokens]
client_data = map_to_local_token_ids(client_data, client_keys)
loss_fn = tf.keras.losses.BinaryCrossentropy()
for features, labels in client_data:
with tf.GradientTape() as tape:
predictions = model(features)
loss = loss_fn(labels, predictions)
grads = tape.gradient(loss, model.trainable_variables)
client_optimizer.apply_gradients(zip(grads, model.trainable_variables))
model_weights_delta = model.trainable_weights[0] - initial_model_weights
model_weights_delta = tf.slice(model_weights_delta, begin=[0, 0],
size=[actual_num_tokens, -1])
return client_keys, model_weights_delta
# Simple test
# Note if you execute this cell a second time, you need to also re-execute
# the preceeding cell to avoid "tf.function-decorated function tried to
# create variables on non-first call" errors.
on_device_model = create_logistic_model(MAX_TOKENS_SELECTED_PER_CLIENT,
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)
client_keys, actual_num_tokens = keys_for_client(
model_slices_as_dataset = tf.data.Dataset.from_tensor_slices(
keys, delta = client_train_fn(
Toplam Dizine Alınmış Dilimler
Biz kullanmak tff.federated_aggregate
için federe seyrek toplamı oluşturmak için IndexedSlices
. Bu basit uygulama olduğunu kısıtlaması vardır dense_shape
önceden statik bilinmektedir. Not Bu toplamı anlamda sadece yarı seyrek olduğunu istemci -> sunucu iletişim seyrek, ancak sunucu içinde toplamı yoğun bir temsilini muhafaza accumulate
ve merge
ve bu yoğun temsilini verir.
def federated_indexed_slices_sum(slice_indices, slice_values, dense_shape):
Sumes IndexedSlices@CLIENTS to a dense @SERVER Tensor.
Intermediate aggregation is performed by converting to a dense representation,
which may not be suitable for all applications.
slice_indices: An IndexedSlices.indices tensor @CLIENTS.
slice_values: An IndexedSlices.values tensor @CLIENTS.
dense_shape: A statically known dense shape.
A dense tensor placed @SERVER representing the sum of the client's
slices_dtype = slice_values.type_signature.member.dtype
zero = tff.tf_computation(
lambda: tf.zeros(dense_shape, dtype=slices_dtype))()
def accumulate_slices(dense, client_value):
indices, slices = client_value
# There is no built-in way to add `IndexedSlices`, but
# tf.convert_to_tensor is a quick way to convert to a dense representation
# so we can add them.
return dense + tf.convert_to_tensor(
tf.IndexedSlices(slices, indices, dense_shape))
return tff.federated_aggregate(
(slice_indices, slice_values),
merge=tff.tf_computation(lambda d1, d2: tf.add(d1, d2, name='merge')),
report=tff.tf_computation(lambda d: d))
En az bir Construct federated_computation
bir test olarak
dense_shape = (6, 2)
indices_type = tff.TensorType(tf.int64, (None,))
values_type = tff.TensorType(tf.float32, (None, 2))
client_slice_type = tff.type_at_clients(
(indices_type, values_type))
def test_sum_indexed_slices(indices_values_at_client):
indices, values = indices_values_at_client
return federated_indexed_slices_sum(indices, values, dense_shape)
({<int64[?],float32[?,2]>}@CLIENTS -> float32[6,2]@SERVER)
x = tf.IndexedSlices(
values=np.array([[2., 2.1], [0., 0.1], [1., 1.1], [5., 5.1]],
indices=[2, 0, 1, 5],
y = tf.IndexedSlices(
values=np.array([[0., 0.3], [3.1, 3.2]], dtype=np.float32),
indices=[1, 3],
# Sum one.
result = test_sum_indexed_slices([(x.indices, x.values)])
np.testing.assert_array_equal(tf.convert_to_tensor(x), result)
# Sum two.
expected = [[0., 0.1], [1., 1.4], [2., 2.1], [3.1, 3.2], [0., 0.], [5., 5.1]]
result = test_sum_indexed_slices([(x.indices, x.values), (y.indices, y.values)])
np.testing.assert_array_almost_equal(expected, result)
Bir de hep birlikte koyarak federated_computation
Biz şimdi bir içine bileşenleri birbirine bağlamak için TFF'ye kullanır tff.federated_computation
client_data_type = tff.SequenceType(batched_dataset1.element_spec)
model_type = tff.TensorType(tf.float32, shape=DENSE_MODEL_SHAPE)
Güncellemeyi 1.0'lık bir sunucu öğrenme oranıyla uygulayarak, Birleşik Ortalamaya dayalı temel bir sunucu eğitimi işlevi kullanıyoruz. Yalnızca müşteri tarafından sağlanan modellerin ortalamasını almak yerine modele bir güncelleme (delta) uygulamamız önemlidir, aksi takdirde modelin belirli bir dilimi belirli bir turda herhangi bir müşteri tarafından eğitilmediyse katsayıları sıfırlanabilir. dışarı.
def server_update(current_model_weights, update_sum, num_clients):
average_update = update_sum / num_clients
return current_model_weights + average_update
Biz bir çift daha ihtiyaç tff.tf_computation
# Function to select slices from the model weights in federated_select:
select_fn = tff.tf_computation(
lambda model_weights, index: tf.gather(model_weights, index))
# We need to wrap `client_train_fn` as a `tff.tf_computation`, making
# sure we do any operations that might construct `tf.Variable`s outside
# of the `tf.function` we are wrapping.
def client_train_fn_tff(model_slices_as_dataset, client_data, client_keys,
# Note this is amaller than the global model, using
# W7e would like a model of size `actual_num_tokens`, but we
# can't build the model dynamically, so we will slice off the padded
# weights at the end.
client_model = create_logistic_model(MAX_TOKENS_SELECTED_PER_CLIENT,
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)
return client_train_fn(client_model, client_optimizer,
model_slices_as_dataset, client_data, client_keys,
def keys_for_client_tff(client_data):
return keys_for_client(client_data, MAX_TOKENS_SELECTED_PER_CLIENT)
Artık tüm parçaları bir araya getirmeye hazırız!
tff.type_at_server(model_type), tff.type_at_clients(client_data_type))
def sparse_model_update(server_model, client_data):
max_tokens = tff.federated_value(MAX_TOKENS_SELECTED_PER_CLIENT, tff.SERVER)
keys_at_clients, actual_num_tokens = tff.federated_map(
keys_for_client_tff, client_data)
model_slices = tff.federated_select(keys_at_clients, max_tokens, server_model,
update_keys, update_slices = tff.federated_map(
(model_slices, client_data, keys_at_clients, actual_num_tokens))
dense_update_sum = federated_indexed_slices_sum(update_keys, update_slices,
num_clients = tff.federated_sum(tff.federated_value(1.0, tff.CLIENTS))
updated_server_model = tff.federated_map(
server_update, (server_model, dense_update_sum, num_clients))
return updated_server_model
(<server_model=float32[13,4]@SERVER,client_data={<tokens=<indices=int64[?,2],values=int32[?],dense_shape=int64[2]>,tags=float32[?,4]>*}@CLIENTS> -> float32[13,4]@SERVER)
Hadi bir model eğitelim!
Artık eğitim fonksiyonumuz olduğuna göre, deneyelim.
server_model = create_logistic_model(WORD_VOCAB_SIZE, TAG_VOCAB_SIZE)
server_model.compile( # Compile to make evaluation easy.
optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.0), # Unused
tf.keras.metrics.Recall(top_k=2, name='recall_at_2'),
def evaluate(model, dataset, name):
metrics = model.evaluate(dataset, verbose=0)
metrics_str = ', '.join([f'{k}={v:.2f}' for k, v in
(zip(server_model.metrics_names, metrics))])
print(f'{name}: {metrics_str}')
print('Before training')
evaluate(server_model, batched_dataset1, 'Client 1')
evaluate(server_model, batched_dataset2, 'Client 2')
evaluate(server_model, batched_dataset3, 'Client 3')
model_weights = server_model.trainable_weights[0]
client_datasets = [batched_dataset1, batched_dataset2, batched_dataset3]
for _ in range(10): # Run 10 rounds of FedAvg
# We train on 1, 2, or 3 clients per round, selecting
# randomly.
cohort_size = np.random.randint(1, 4)
clients = np.random.choice([0, 1, 2], cohort_size, replace=False)
print('Training on clients', clients)
model_weights = sparse_model_update(
model_weights, [client_datasets[i] for i in clients])
print('After training')
evaluate(server_model, batched_dataset1, 'Client 1')
evaluate(server_model, batched_dataset2, 'Client 2')
evaluate(server_model, batched_dataset3, 'Client 3')
Before training Client 1: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.60 Client 2: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.50 Client 3: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.40 Training on clients [0 1] Training on clients [0 2 1] Training on clients [2 0] Training on clients [1 0 2] Training on clients [2] Training on clients [2 0] Training on clients [1 2 0] Training on clients [0] Training on clients [2] Training on clients [1 2] After training Client 1: loss=0.67, precision=0.80, auc=0.91, recall_at_2=0.80 Client 2: loss=0.68, precision=0.67, auc=0.96, recall_at_2=1.00 Client 3: loss=0.65, precision=1.00, auc=0.93, recall_at_2=0.80