TensorFlow.org에서 보기 | Google Colab에서 실행 | GitHub에서 소스 보기 | 노트북 다운로드 |
TFF는 매우 큰 모델 각 클라이언트 장치 만 다운로드를 양성하는 데 사용 및 사용 모델의 작은 부분을 업데이트 할 수있는 방법이 튜토리얼을 보여줍니다 tff.federated_select
및 스파 스 집계를. 이 튜토리얼은 상당히 동안, 자기가 포함 된 tff.federated_select
튜토리얼 및 사용자 정의 FL 알고리즘 튜토리얼 일부 기술 여기서 사용에 좋은 소개를 제공합니다.
구체적으로, 이 자습서에서는 단어 모음 기능 표현을 기반으로 텍스트 문자열과 연결된 "태그"를 예측하는 다중 레이블 분류를 위한 로지스틱 회귀를 고려합니다. 중요한 통신 클라이언트 측 연산 비용은 고정 된 상수 (의해 제어된다 MAX_TOKENS_SELECTED_PER_CLIENT
) 및 실제 설정이 매우 클 수 전체 어휘의 크기로 조절되지 않는다.
!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
import nest_asyncio
nest_asyncio.apply()
import collections
import itertools
import numpy as np
from typing import Callable, List, Tuple
import tensorflow as tf
import tensorflow_federated as tff
tff.backends.native.set_local_python_execution_context()
각 클라이언트는 것이다 federated_select
최대이 많은 독특한 토큰 모델 가중치의 행. 이것은 클라이언트의 로컬 모델의 크기와 서버의 양을 상단 경계 -> 클라이언트 ( federated_select
) 및 클라이언트 -> 서버 (federated_aggregate
) 통신을 수행.
이 자습서는 이 값을 1로 작게 설정하거나(각 클라이언트의 모든 토큰이 선택되지 않도록) 큰 값으로 설정하더라도 올바르게 실행되어야 하지만 모델 수렴이 영향을 받을 수 있습니다.
MAX_TOKENS_SELECTED_PER_CLIENT = 6
또한 다양한 유형에 대해 몇 가지 상수를 정의합니다. 이 colab 들어, 토큰은 데이터 세트를 파싱 한 후, 특정 단어에 대한 정수 식별자이다.
# There are some constraints on types
# here that will require some explicit type conversions:
# - `tff.federated_select` requires int32
# - `tf.SparseTensor` requires int64 indices.
TOKEN_DTYPE = tf.int64
SELECT_KEY_DTYPE = tf.int32
# Type for counts of token occurences.
TOKEN_COUNT_DTYPE = tf.int32
# A sparse feature vector can be thought of as a map
# from TOKEN_DTYPE to FEATURE_DTYPE.
# Our features are {0, 1} indicators, so we could potentially
# use tf.int8 as an optimization.
FEATURE_DTYPE = tf.int32
문제 설정: 데이터 세트 및 모델
이 튜토리얼에서는 쉽게 실험할 수 있도록 작은 장난감 데이터 세트를 구성합니다. 그러나, 데이터 세트의 형식과 호환되는 연합에 StackOverflow 및 전처리 및 모델 아키텍처 의 StackOverflow의 태그 예측 문제에서 채택 적응 연합 최적화 .
데이터 세트 파싱 및 사전 처리
NUM_OOV_BUCKETS = 1
BatchType = collections.namedtuple('BatchType', ['tokens', 'tags'])
def build_to_ids_fn(word_vocab: List[str],
tag_vocab: List[str]) -> Callable[[tf.Tensor], tf.Tensor]:
"""Constructs a function mapping examples to sequences of token indices."""
word_table_values = np.arange(len(word_vocab), dtype=np.int64)
word_table = tf.lookup.StaticVocabularyTable(
tf.lookup.KeyValueTensorInitializer(word_vocab, word_table_values),
num_oov_buckets=NUM_OOV_BUCKETS)
tag_table_values = np.arange(len(tag_vocab), dtype=np.int64)
tag_table = tf.lookup.StaticVocabularyTable(
tf.lookup.KeyValueTensorInitializer(tag_vocab, tag_table_values),
num_oov_buckets=NUM_OOV_BUCKETS)
def to_ids(example):
"""Converts a Stack Overflow example to a bag-of-words/tags format."""
sentence = tf.strings.join([example['tokens'], example['title']],
separator=' ')
# We represent that label (output tags) densely.
raw_tags = example['tags']
tags = tf.strings.split(raw_tags, sep='|')
tags = tag_table.lookup(tags)
tags, _ = tf.unique(tags)
tags = tf.one_hot(tags, len(tag_vocab) + NUM_OOV_BUCKETS)
tags = tf.reduce_max(tags, axis=0)
# We represent the features as a SparseTensor of {0, 1}s.
words = tf.strings.split(sentence)
tokens = word_table.lookup(words)
tokens, _ = tf.unique(tokens)
# Note: We could choose to use the word counts as the feature vector
# instead of just {0, 1} values (see tf.unique_with_counts).
tokens = tf.reshape(tokens, shape=(tf.size(tokens), 1))
tokens_st = tf.SparseTensor(
tokens,
tf.ones(tf.size(tokens), dtype=FEATURE_DTYPE),
dense_shape=(len(word_vocab) + NUM_OOV_BUCKETS,))
tokens_st = tf.sparse.reorder(tokens_st)
return BatchType(tokens_st, tags)
return to_ids
def build_preprocess_fn(word_vocab, tag_vocab):
@tf.function
def preprocess_fn(dataset):
to_ids = build_to_ids_fn(word_vocab, tag_vocab)
# We *don't* shuffle in order to make this colab deterministic for
# easier testing and reproducibility.
# But real-world training should use `.shuffle()`.
return dataset.map(to_ids, num_parallel_calls=tf.data.experimental.AUTOTUNE)
return preprocess_fn
작은 장난감 데이터 세트
우리는 12개의 단어와 3명의 클라이언트로 구성된 글로벌 어휘로 작은 장난감 데이터 세트를 구성합니다. 이 작은 예제는 가장자리 케이스를 테스트하는 데 유용합니다 (예를 들어, 우리는보다 두 클라이언트가 MAX_TOKENS_SELECTED_PER_CLIENT = 6
개 코드를 개발 별개의 토큰, 그리고 하나 더 있음).
그러나 이 접근 방식의 실제 사용 사례는 수천만 개 이상의 글로벌 어휘가 될 것이며 각 클라이언트에는 아마도 1000개의 고유한 토큰이 나타날 것입니다. 데이터의 형식이 동일하기 때문에,보다 현실적인 테스트 베드의 문제로 확장, 예를 들어 tff.simulation.datasets.stackoverflow.load_data()
데이터 세트, 간단합니다.
먼저 단어와 태그 어휘를 정의합니다.
# Features
FRUIT_WORDS = ['apple', 'orange', 'pear', 'kiwi']
VEGETABLE_WORDS = ['carrot', 'broccoli', 'arugula', 'peas']
FISH_WORDS = ['trout', 'tuna', 'cod', 'salmon']
WORD_VOCAB = FRUIT_WORDS + VEGETABLE_WORDS + FISH_WORDS
# Labels
TAG_VOCAB = ['FRUIT', 'VEGETABLE', 'FISH']
이제 작은 로컬 데이터 세트로 3개의 클라이언트를 생성합니다. colab에서 이 튜토리얼을 실행하는 경우 아래 개발된 기능의 출력을 해석/확인하기 위해 "탭에서 셀 미러링" 기능을 사용하여 이 셀과 해당 출력을 고정하는 것이 유용할 수 있습니다.
preprocess_fn = build_preprocess_fn(WORD_VOCAB, TAG_VOCAB)
def make_dataset(raw):
d = tf.data.Dataset.from_tensor_slices(
# Matches the StackOverflow formatting
collections.OrderedDict(
tokens=tf.constant([t[0] for t in raw]),
tags=tf.constant([t[1] for t in raw]),
title=['' for _ in raw]))
d = preprocess_fn(d)
return d
# 4 distinct tokens
CLIENT1_DATASET = make_dataset([
('apple orange apple orange', 'FRUIT'),
('carrot trout', 'VEGETABLE|FISH'),
('orange apple', 'FRUIT'),
('orange', 'ORANGE|CITRUS') # 2 OOV tag
])
# 6 distinct tokens
CLIENT2_DATASET = make_dataset([
('pear cod', 'FRUIT|FISH'),
('arugula peas', 'VEGETABLE'),
('kiwi pear', 'FRUIT'),
('sturgeon', 'FISH'), # OOV word
('sturgeon bass', 'FISH') # 2 OOV words
])
# A client with all possible words & tags (13 distinct tokens).
# With MAX_TOKENS_SELECTED_PER_CLIENT = 6, we won't download the model
# slices for all tokens that occur on this client.
CLIENT3_DATASET = make_dataset([
(' '.join(WORD_VOCAB + ['oovword']), '|'.join(TAG_VOCAB)),
# Mathe the OOV token and 'salmon' occur in the largest number
# of examples on this client:
('salmon oovword', 'FISH|OOVTAG')
])
print('Word vocab')
for i, word in enumerate(WORD_VOCAB):
print(f'{i:2d} {word}')
print('\nTag vocab')
for i, tag in enumerate(TAG_VOCAB):
print(f'{i:2d} {tag}')
Word vocab 0 apple 1 orange 2 pear 3 kiwi 4 carrot 5 broccoli 6 arugula 7 peas 8 trout 9 tuna 10 cod 11 salmon Tag vocab 0 FRUIT 1 VEGETABLE 2 FISH
입력 기능(토큰/단어) 및 레이블(게시 태그)의 원시 번호에 대한 상수를 정의합니다. 우리 실제 입 / 출력 공간은 NUM_OOV_BUCKETS = 1
우리는 OOV 토큰 / 태그를 추가하기 때문에 크다.
NUM_WORDS = len(WORD_VOCAB)
NUM_TAGS = len(TAG_VOCAB)
WORD_VOCAB_SIZE = NUM_WORDS + NUM_OOV_BUCKETS
TAG_VOCAB_SIZE = NUM_TAGS + NUM_OOV_BUCKETS
데이터 세트의 일괄 처리 버전과 개별 일괄 처리를 생성하면 코드를 테스트하는 데 유용할 것입니다.
batched_dataset1 = CLIENT1_DATASET.batch(2)
batched_dataset2 = CLIENT2_DATASET.batch(3)
batched_dataset3 = CLIENT3_DATASET.batch(2)
batch1 = next(iter(batched_dataset1))
batch2 = next(iter(batched_dataset2))
batch3 = next(iter(batched_dataset3))
희소 입력으로 모델 정의
각 태그에 대해 간단한 독립 로지스틱 회귀 모델을 사용합니다.
def create_logistic_model(word_vocab_size: int, vocab_tags_size: int):
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(word_vocab_size,), sparse=True),
tf.keras.layers.Dense(
vocab_tags_size,
activation='sigmoid',
kernel_initializer=tf.keras.initializers.zeros,
# For simplicity, don't use a bias vector; this means the model
# is a single tensor, and we only need sparse aggregation of
# the per-token slices of the model. Generalizing to also handle
# other model weights that are fully updated
# (non-dense broadcast and aggregate) would be a good exercise.
use_bias=False),
])
return model
먼저 예측을 수행하여 작동하는지 확인합시다.
model = create_logistic_model(WORD_VOCAB_SIZE, TAG_VOCAB_SIZE)
p = model.predict(batch1.tokens)
print(p)
[[0.5 0.5 0.5 0.5] [0.5 0.5 0.5 0.5]]
그리고 몇 가지 간단한 중앙 집중식 교육:
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.001),
loss=tf.keras.losses.BinaryCrossentropy())
model.train_on_batch(batch1.tokens, batch1.tags)
연합 계산을 위한 빌딩 블록
우리는 간단한 버전을 구현할 연합 평균화의 각 장치는 모델의 관련 부분 집합을 다운로드하는 키 차이 알고리즘을하고, 부분 집합만을 업데이트를 기여한다.
우리가 사용하는 M
위한 속기로 MAX_TOKENS_SELECTED_PER_CLIENT
. 높은 수준에서 교육의 한 라운드에는 다음 단계가 포함됩니다.
참여하는 각 클라이언트는 로컬 데이터 세트를 검색하여 입력 문자열을 구문 분석하고 올바른 토큰(int 인덱스)에 매핑합니다. 이는 글로벌 (대형) 사전에 액세스 (이 잠재적으로 사용하여 피할 수 필요 기능 해싱 기술). 그런 다음 각 토큰이 발생하는 횟수를 드물게 계산합니다. 경우
U
고유 한 토큰 장치에서 발생, 우리는 선택num_actual_tokens = min(U, M)
기차에 가장 자주 토큰을.클라이언트는 사용
federated_select
의 모델 계수를 검색 할num_actual_tokens
서버에서 토큰을 선택합니다. 각 모델은 조각 형상의 텐서(TAG_VOCAB_SIZE, )
클라이언트로 송신 된 전체 데이터의 크기가 가장 위에 오도록,TAG_VOCAB_SIZE * M
(아래의 설명을 보라).클라이언트가 매핑 구성
global_token -> local_token
로컬 토큰 (INT 지수)를 선택 토큰 목록에서 글로벌 토큰의 인덱스입니다.클라이언트는 단지 가장에 대한 계수를 가지고있는 글로벌 모델의 "작은"버전을 사용하는
M
범위에서, 토큰을[0, num_actual_tokens)
.global -> local
매핑은 선택된 모델 조각에서이 모델의 조밀 한 매개 변수를 초기화하는 데 사용됩니다.클라이언트는 함께 전처리 된 데이터에 SGD를 사용하여 자신의 로컬 모델을 학습
global -> local
매핑입니다.클라이언트에 로컬 모델의 매개 변수를 설정
IndexedSlices
사용하여 업데이트local -> global
인덱스에 행을 매핑합니다. 서버는 희소 합계 집계를 사용하여 이러한 업데이트를 집계합니다.서버는 위 집계의 (dense) 결과를 가져 와서 참여하는 클라이언트 수로 나누고 결과 평균 업데이트를 전역 모델에 적용합니다.
이 섹션에서는 다음 마지막에 결합됩니다 다음 단계를위한 빌딩 블록 구성 federated_computation
캡처 하나 훈련 라운드의 전체 논리.
클라이언트 토큰을 카운트하고있는 모델 조각을 결정 federated_select
각 장치는 로컬 훈련 데이터 세트와 관련된 모델의 "슬라이스"를 결정해야 합니다. 우리 문제의 경우 클라이언트 훈련 데이터 세트에 각 토큰이 포함된 예제의 수를 세어 (간소하게!) 이 작업을 수행합니다.
@tf.function
def token_count_fn(token_counts, batch):
"""Adds counts from `batch` to the running `token_counts` sum."""
# Sum across the batch dimension.
flat_tokens = tf.sparse.reduce_sum(
batch.tokens, axis=0, output_is_sparse=True)
flat_tokens = tf.cast(flat_tokens, dtype=TOKEN_COUNT_DTYPE)
return tf.sparse.add(token_counts, flat_tokens)
# Simple tests
# Create the initial zero token counts using empty tensors.
initial_token_counts = tf.SparseTensor(
indices=tf.zeros(shape=(0, 1), dtype=TOKEN_DTYPE),
values=tf.zeros(shape=(0,), dtype=TOKEN_COUNT_DTYPE),
dense_shape=(WORD_VOCAB_SIZE,))
client_token_counts = batched_dataset1.reduce(initial_token_counts,
token_count_fn)
tokens = tf.reshape(client_token_counts.indices, (-1,)).numpy()
print('tokens:', tokens)
np.testing.assert_array_equal(tokens, [0, 1, 4, 8])
# The count is the number of *examples* in which the token/word
# occurs, not the total number of occurences, since we still featurize
# multiple occurences in the same example as a "1".
counts = client_token_counts.values.numpy()
print('counts:', counts)
np.testing.assert_array_equal(counts, [2, 3, 1, 1])
tokens: [0 1 4 8] counts: [2 3 1 1]
우리는에 해당하는 모델 매개 변수를 선택합니다 MAX_TOKENS_SELECTED_PER_CLIENT
가장 자주 장치에 토큰을 발생하는합니다. 이 많은 토큰보다 적은 장치에 발생하는 경우, 우리는 패드 목록의 사용을 가능하게 federated_select
.
예를 들어 토큰을 무작위로 선택하는 것과 같은 다른 전략이 더 나을 수 있다는 점에 유의하십시오(발생 확률에 따라). 이렇게 하면 클라이언트에 데이터가 있는 모델의 모든 조각이 업데이트될 가능성이 있습니다.
@tf.function
def keys_for_client(client_dataset, max_tokens_per_client):
"""Computes a set of max_tokens_per_client keys."""
initial_token_counts = tf.SparseTensor(
indices=tf.zeros((0, 1), dtype=TOKEN_DTYPE),
values=tf.zeros((0,), dtype=TOKEN_COUNT_DTYPE),
dense_shape=(WORD_VOCAB_SIZE,))
client_token_counts = client_dataset.reduce(initial_token_counts,
token_count_fn)
# Find the most-frequently occuring tokens
tokens = tf.reshape(client_token_counts.indices, shape=(-1,))
counts = client_token_counts.values
perm = tf.argsort(counts, direction='DESCENDING')
tokens = tf.gather(tokens, perm)
counts = tf.gather(counts, perm)
num_raw_tokens = tf.shape(tokens)[0]
actual_num_tokens = tf.minimum(max_tokens_per_client, num_raw_tokens)
selected_tokens = tokens[:actual_num_tokens]
paddings = [[0, max_tokens_per_client - tf.shape(selected_tokens)[0]]]
padded_tokens = tf.pad(selected_tokens, paddings=paddings)
# Make sure the type is statically determined
padded_tokens = tf.reshape(padded_tokens, shape=(max_tokens_per_client,))
# We will pass these tokens as keys into `federated_select`, which
# requires SELECT_KEY_DTYPE=tf.int32 keys.
padded_tokens = tf.cast(padded_tokens, dtype=SELECT_KEY_DTYPE)
return padded_tokens, actual_num_tokens
# Simple test
# Case 1: actual_num_tokens > max_tokens_per_client
selected_tokens, actual_num_tokens = keys_for_client(batched_dataset1, 3)
assert tf.size(selected_tokens) == 3
assert actual_num_tokens == 3
# Case 2: actual_num_tokens < max_tokens_per_client
selected_tokens, actual_num_tokens = keys_for_client(batched_dataset1, 10)
assert tf.size(selected_tokens) == 10
assert actual_num_tokens == 4
글로벌 토큰을 로컬 토큰에 매핑
위의 선택은 우리에게 범위의 토큰의 조밀 한 세트 제공 [0, actual_num_tokens)
우리가 온 - 디바이스 모델에 사용하는 것이다. 그러나 우리가 읽은 데이터 세트는 훨씬 더 큰 글로벌 어휘 범위에서 토큰을 가지고 [0, WORD_VOCAB_SIZE)
.
따라서 글로벌 토큰을 해당 로컬 토큰에 매핑해야 합니다. 로컬 토큰 식별자는 단순히 인덱스로 주어진다 selected_tokens
이전 단계에서 계산 텐서.
@tf.function
def map_to_local_token_ids(client_data, client_keys):
global_to_local = tf.lookup.StaticHashTable(
# Note int32 -> int64 maps are not supported
tf.lookup.KeyValueTensorInitializer(
keys=tf.cast(client_keys, dtype=TOKEN_DTYPE),
# Note we need to use tf.shape, not the static
# shape client_keys.shape[0]
values=tf.range(0, limit=tf.shape(client_keys)[0],
dtype=TOKEN_DTYPE)),
# We use -1 for tokens that were not selected, which can occur for clients
# with more than MAX_TOKENS_SELECTED_PER_CLIENT distinct tokens.
# We will simply remove these invalid indices from the batch below.
default_value=-1)
def to_local_ids(sparse_tokens):
indices_t = tf.transpose(sparse_tokens.indices)
batch_indices = indices_t[0] # First column
tokens = indices_t[1] # Second column
tokens = tf.map_fn(
lambda global_token_id: global_to_local.lookup(global_token_id), tokens)
# Remove tokens that aren't actually available (looked up as -1):
available_tokens = tokens >= 0
tokens = tokens[available_tokens]
batch_indices = batch_indices[available_tokens]
updated_indices = tf.transpose(
tf.concat([[batch_indices], [tokens]], axis=0))
st = tf.sparse.SparseTensor(
updated_indices,
tf.ones(tf.size(tokens), dtype=FEATURE_DTYPE),
dense_shape=sparse_tokens.dense_shape)
st = tf.sparse.reorder(st)
return st
return client_data.map(lambda b: BatchType(to_local_ids(b.tokens), b.tags))
# Simple test
client_keys, actual_num_tokens = keys_for_client(
batched_dataset3, MAX_TOKENS_SELECTED_PER_CLIENT)
client_keys = client_keys[:actual_num_tokens]
d = map_to_local_token_ids(batched_dataset3, client_keys)
batch = next(iter(d))
all_tokens = tf.gather(batch.tokens.indices, indices=1, axis=1)
# Confirm we have local indices in the range [0, MAX):
assert tf.math.reduce_max(all_tokens) < MAX_TOKENS_SELECTED_PER_CLIENT
assert tf.math.reduce_max(all_tokens) >= 0
각 클라이언트에서 로컬(하위) 모델 학습
참고 federated_select
A와 선택한 조각을 반환합니다 tf.data.Dataset
선택 키와 같은 순서로한다. 따라서 우리는 먼저 이러한 데이터 세트를 가져 와서 클라이언트 모델의 모델 가중치로 사용할 수 있는 단일 밀도 텐서로 변환하는 유틸리티 함수를 정의합니다.
@tf.function
def slices_dataset_to_tensor(slices_dataset):
"""Convert a dataset of slices to a tensor."""
# Use batching to gather all of the slices into a single tensor.
d = slices_dataset.batch(MAX_TOKENS_SELECTED_PER_CLIENT,
drop_remainder=False)
iter_d = iter(d)
tensor = next(iter_d)
# Make sure we have consumed everything
opt = iter_d.get_next_as_optional()
tf.Assert(tf.logical_not(opt.has_value()), data=[''], name='CHECK_EMPTY')
return tensor
# Simple test
weights = np.random.random(
size=(MAX_TOKENS_SELECTED_PER_CLIENT, TAG_VOCAB_SIZE)).astype(np.float32)
model_slices_as_dataset = tf.data.Dataset.from_tensor_slices(weights)
weights2 = slices_dataset_to_tensor(model_slices_as_dataset)
np.testing.assert_array_equal(weights, weights2)
이제 각 클라이언트에서 실행할 간단한 로컬 훈련 루프를 정의하는 데 필요한 모든 구성 요소가 있습니다.
@tf.function
def client_train_fn(model, client_optimizer,
model_slices_as_dataset, client_data,
client_keys, actual_num_tokens):
initial_model_weights = slices_dataset_to_tensor(model_slices_as_dataset)
assert len(model.trainable_variables) == 1
model.trainable_variables[0].assign(initial_model_weights)
# Only keep the "real" (unpadded) keys.
client_keys = client_keys[:actual_num_tokens]
client_data = map_to_local_token_ids(client_data, client_keys)
loss_fn = tf.keras.losses.BinaryCrossentropy()
for features, labels in client_data:
with tf.GradientTape() as tape:
predictions = model(features)
loss = loss_fn(labels, predictions)
grads = tape.gradient(loss, model.trainable_variables)
client_optimizer.apply_gradients(zip(grads, model.trainable_variables))
model_weights_delta = model.trainable_weights[0] - initial_model_weights
model_weights_delta = tf.slice(model_weights_delta, begin=[0, 0],
size=[actual_num_tokens, -1])
return client_keys, model_weights_delta
# Simple test
# Note if you execute this cell a second time, you need to also re-execute
# the preceeding cell to avoid "tf.function-decorated function tried to
# create variables on non-first call" errors.
on_device_model = create_logistic_model(MAX_TOKENS_SELECTED_PER_CLIENT,
TAG_VOCAB_SIZE)
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)
client_keys, actual_num_tokens = keys_for_client(
batched_dataset2, MAX_TOKENS_SELECTED_PER_CLIENT)
model_slices_as_dataset = tf.data.Dataset.from_tensor_slices(
np.zeros((MAX_TOKENS_SELECTED_PER_CLIENT, TAG_VOCAB_SIZE),
dtype=np.float32))
keys, delta = client_train_fn(
on_device_model,
client_optimizer,
model_slices_as_dataset,
client_data=batched_dataset3,
client_keys=client_keys,
actual_num_tokens=actual_num_tokens)
print(delta)
IndexedSlice 집계
우리는 사용 tff.federated_aggregate
에 대한 연합 스파 스 합을 구성 할 IndexedSlices
. 이 간단한 구현은 그 제약이 dense_shape
사전에 정적으로 알려져있다. 참고이 합계 점에서 단지 반 희소가 있음이 클라이언트 -> 서버 통신 희소이지만 서버의 합 치밀한 표현 유지 accumulate
과 merge
하고,이 치밀한 표시를 출력한다.
def federated_indexed_slices_sum(slice_indices, slice_values, dense_shape):
"""
Sumes IndexedSlices@CLIENTS to a dense @SERVER Tensor.
Intermediate aggregation is performed by converting to a dense representation,
which may not be suitable for all applications.
Args:
slice_indices: An IndexedSlices.indices tensor @CLIENTS.
slice_values: An IndexedSlices.values tensor @CLIENTS.
dense_shape: A statically known dense shape.
Returns:
A dense tensor placed @SERVER representing the sum of the client's
IndexedSclies.
"""
slices_dtype = slice_values.type_signature.member.dtype
zero = tff.tf_computation(
lambda: tf.zeros(dense_shape, dtype=slices_dtype))()
@tf.function
def accumulate_slices(dense, client_value):
indices, slices = client_value
# There is no built-in way to add `IndexedSlices`, but
# tf.convert_to_tensor is a quick way to convert to a dense representation
# so we can add them.
return dense + tf.convert_to_tensor(
tf.IndexedSlices(slices, indices, dense_shape))
return tff.federated_aggregate(
(slice_indices, slice_values),
zero=zero,
accumulate=tff.tf_computation(accumulate_slices),
merge=tff.tf_computation(lambda d1, d2: tf.add(d1, d2, name='merge')),
report=tff.tf_computation(lambda d: d))
최소한의 구축 federated_computation
테스트로
dense_shape = (6, 2)
indices_type = tff.TensorType(tf.int64, (None,))
values_type = tff.TensorType(tf.float32, (None, 2))
client_slice_type = tff.type_at_clients(
(indices_type, values_type))
@tff.federated_computation(client_slice_type)
def test_sum_indexed_slices(indices_values_at_client):
indices, values = indices_values_at_client
return federated_indexed_slices_sum(indices, values, dense_shape)
print(test_sum_indexed_slices.type_signature)
({<int64[?],float32[?,2]>}@CLIENTS -> float32[6,2]@SERVER)
x = tf.IndexedSlices(
values=np.array([[2., 2.1], [0., 0.1], [1., 1.1], [5., 5.1]],
dtype=np.float32),
indices=[2, 0, 1, 5],
dense_shape=dense_shape)
y = tf.IndexedSlices(
values=np.array([[0., 0.3], [3.1, 3.2]], dtype=np.float32),
indices=[1, 3],
dense_shape=dense_shape)
# Sum one.
result = test_sum_indexed_slices([(x.indices, x.values)])
np.testing.assert_array_equal(tf.convert_to_tensor(x), result)
# Sum two.
expected = [[0., 0.1], [1., 1.4], [2., 2.1], [3.1, 3.2], [0., 0.], [5., 5.1]]
result = test_sum_indexed_slices([(x.indices, x.values), (y.indices, y.values)])
np.testing.assert_array_almost_equal(expected, result)
A의 모든 함께 퍼팅 federated_computation
우리는 지금에 구성 요소를 함께 결합하는 TFF를 사용 tff.federated_computation
.
DENSE_MODEL_SHAPE = (WORD_VOCAB_SIZE, TAG_VOCAB_SIZE)
client_data_type = tff.SequenceType(batched_dataset1.element_spec)
model_type = tff.TensorType(tf.float32, shape=DENSE_MODEL_SHAPE)
Federated Averaging을 기반으로 하는 기본 서버 훈련 기능을 사용하여 서버 학습률 1.0으로 업데이트를 적용합니다. 클라이언트가 제공한 모델을 단순히 평균화하는 대신 모델에 업데이트(델타)를 적용하는 것이 중요합니다. 밖.
@tff.tf_computation
def server_update(current_model_weights, update_sum, num_clients):
average_update = update_sum / num_clients
return current_model_weights + average_update
우리는 몇 가지 더 필요 tff.tf_computation
구성 요소 :
# Function to select slices from the model weights in federated_select:
select_fn = tff.tf_computation(
lambda model_weights, index: tf.gather(model_weights, index))
# We need to wrap `client_train_fn` as a `tff.tf_computation`, making
# sure we do any operations that might construct `tf.Variable`s outside
# of the `tf.function` we are wrapping.
@tff.tf_computation
def client_train_fn_tff(model_slices_as_dataset, client_data, client_keys,
actual_num_tokens):
# Note this is amaller than the global model, using
# MAX_TOKENS_SELECTED_PER_CLIENT which is much smaller than WORD_VOCAB_SIZE.
# W7e would like a model of size `actual_num_tokens`, but we
# can't build the model dynamically, so we will slice off the padded
# weights at the end.
client_model = create_logistic_model(MAX_TOKENS_SELECTED_PER_CLIENT,
TAG_VOCAB_SIZE)
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)
return client_train_fn(client_model, client_optimizer,
model_slices_as_dataset, client_data, client_keys,
actual_num_tokens)
@tff.tf_computation
def keys_for_client_tff(client_data):
return keys_for_client(client_data, MAX_TOKENS_SELECTED_PER_CLIENT)
이제 모든 조각을 결합할 준비가 되었습니다!
@tff.federated_computation(
tff.type_at_server(model_type), tff.type_at_clients(client_data_type))
def sparse_model_update(server_model, client_data):
max_tokens = tff.federated_value(MAX_TOKENS_SELECTED_PER_CLIENT, tff.SERVER)
keys_at_clients, actual_num_tokens = tff.federated_map(
keys_for_client_tff, client_data)
model_slices = tff.federated_select(keys_at_clients, max_tokens, server_model,
select_fn)
update_keys, update_slices = tff.federated_map(
client_train_fn_tff,
(model_slices, client_data, keys_at_clients, actual_num_tokens))
dense_update_sum = federated_indexed_slices_sum(update_keys, update_slices,
DENSE_MODEL_SHAPE)
num_clients = tff.federated_sum(tff.federated_value(1.0, tff.CLIENTS))
updated_server_model = tff.federated_map(
server_update, (server_model, dense_update_sum, num_clients))
return updated_server_model
print(sparse_model_update.type_signature)
(<server_model=float32[13,4]@SERVER,client_data={<tokens=<indices=int64[?,2],values=int32[?],dense_shape=int64[2]>,tags=float32[?,4]>*}@CLIENTS> -> float32[13,4]@SERVER)
모델을 훈련시키자!
이제 훈련 기능이 생겼으니 사용해 보겠습니다.
server_model = create_logistic_model(WORD_VOCAB_SIZE, TAG_VOCAB_SIZE)
server_model.compile( # Compile to make evaluation easy.
optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.0), # Unused
loss=tf.keras.losses.BinaryCrossentropy(),
metrics=[
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.AUC(name='auc'),
tf.keras.metrics.Recall(top_k=2, name='recall_at_2'),
])
def evaluate(model, dataset, name):
metrics = model.evaluate(dataset, verbose=0)
metrics_str = ', '.join([f'{k}={v:.2f}' for k, v in
(zip(server_model.metrics_names, metrics))])
print(f'{name}: {metrics_str}')
print('Before training')
evaluate(server_model, batched_dataset1, 'Client 1')
evaluate(server_model, batched_dataset2, 'Client 2')
evaluate(server_model, batched_dataset3, 'Client 3')
model_weights = server_model.trainable_weights[0]
client_datasets = [batched_dataset1, batched_dataset2, batched_dataset3]
for _ in range(10): # Run 10 rounds of FedAvg
# We train on 1, 2, or 3 clients per round, selecting
# randomly.
cohort_size = np.random.randint(1, 4)
clients = np.random.choice([0, 1, 2], cohort_size, replace=False)
print('Training on clients', clients)
model_weights = sparse_model_update(
model_weights, [client_datasets[i] for i in clients])
server_model.set_weights([model_weights])
print('After training')
evaluate(server_model, batched_dataset1, 'Client 1')
evaluate(server_model, batched_dataset2, 'Client 2')
evaluate(server_model, batched_dataset3, 'Client 3')
Before training Client 1: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.60 Client 2: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.50 Client 3: loss=0.69, precision=0.00, auc=0.50, recall_at_2=0.40 Training on clients [0 1] Training on clients [0 2 1] Training on clients [2 0] Training on clients [1 0 2] Training on clients [2] Training on clients [2 0] Training on clients [1 2 0] Training on clients [0] Training on clients [2] Training on clients [1 2] After training Client 1: loss=0.67, precision=0.80, auc=0.91, recall_at_2=0.80 Client 2: loss=0.68, precision=0.67, auc=0.96, recall_at_2=1.00 Client 3: loss=0.65, precision=1.00, auc=0.93, recall_at_2=0.80