Sfederowane uczenie się klasyfikacji obrazów

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

W tym tutorialu użyjemy klasyczny przykład szkolenia MNIST przedstawić stowarzyszonej Learning (Floryda) warstwę API TFF, tff.learning - zestaw interfejsów wyższego poziomu, które mogą być używane do wykonywania typowych zadań federacyjnych uczenia się, takich jak szkolenie sfederowane, przeciwko modelom dostarczonym przez użytkownika zaimplementowanym w TensorFlow.

Ten samouczek oraz Federated Learning API są przeznaczone głównie dla użytkowników, którzy chcą podłączyć własne modele TensorFlow do TFF, traktując ten ostatni głównie jako czarną skrzynkę. Dla zrozumienia bardziej dogłębnej TFF i jak zaimplementować własne algorytmy uczenia federacyjnych, zobacz samouczki na API FC core - Niestandardowe Federalne Algorytmy część 1 i część 2 .

Więcej informacji na temat tff.learning , kontynuować Federalne Learning do generowania tekstu , samouczka, który oprócz obejmujące modele nawracające, pokazuje również ładowanie wstępnie przeszkolony odcinkach modelu Keras dla wyrafinowania ze stowarzyszonym nauki połączonej z oceny za pomocą Keras.

Zanim zaczniemy

Zanim zaczniemy, wykonaj następujące czynności, aby upewnić się, że Twoje środowisko jest poprawnie skonfigurowane. Jeśli nie pojawi się powitanie, proszę odnieść się do montażu prowadnicy do instrukcji.

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

Przygotowanie danych wejściowych

Zacznijmy od danych. Nauczanie sfederowane wymaga sfederowanego zestawu danych, tj. zbioru danych od wielu użytkowników. Federalne danych jest zazwyczaj nie- IID , który stanowi unikalny zestaw wyzwań.

W celu ułatwienia eksperymentowania, my zaszczepiono repozytorium TFF z kilku zestawów danych, w tym stowarzyszonym wersji MNIST który zawiera wersję oryginalną NIST zbiorze danych , które zostało poddane obróbce z zastosowaniem liści tak, że dane jest osadzone przez oryginalnego pisarza cyfry. Ponieważ każdy piszący ma unikalny styl, ten zestaw danych wykazuje zachowanie inne niż iid, jakiego oczekuje się od sfederowanych zestawów danych.

Oto jak możemy go załadować.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Zestawy danych zwróconych przez load_data() są instancjami tff.simulation.ClientData , interfejs, który pozwala wyliczyć zbiór użytkowników, aby skonstruować tf.data.Dataset reprezentującą dane konkretnego użytkownika, a do kwerendy struktura poszczególnych elementów. Oto, w jaki sposób możesz użyć tego interfejsu do eksploracji zawartości zestawu danych. Należy pamiętać, że chociaż ten interfejs umożliwia iterację identyfikatorów klientów, jest to tylko funkcja danych symulacji. Jak wkrótce zobaczysz, tożsamości klientów nie są używane przez platformę sfederowanego uczenia się — ich jedynym celem jest umożliwienie wybrania podzbiorów danych do symulacji.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

png

Badanie heterogeniczności danych federacyjnych

Federalne danych jest zazwyczaj nie- iid , użytkownicy zwykle mają różne dystrybucje danych w zależności od sposobu użytkowania. Niektórzy klienci mogą mieć mniej przykładów szkoleniowych na urządzeniu, cierpiąc z powodu lokalnego niedoboru danych, podczas gdy niektórzy klienci będą mieli więcej niż wystarczającą liczbę przykładów szkoleniowych. Przyjrzyjmy się tej koncepcji heterogeniczności danych typowej dla systemu sfederowanego z dostępnymi danymi EMNIST. Należy zauważyć, że ta dogłębna analiza danych klienta jest dostępna tylko dla nas, ponieważ jest to środowisko symulacyjne, w którym wszystkie dane są dla nas dostępne lokalnie. W rzeczywistym, sfederowanym środowisku produkcyjnym nie byłoby możliwe sprawdzenie danych pojedynczego klienta.

Najpierw pobierzmy próbkę danych jednego klienta, aby zapoznać się z przykładami na jednym symulowanym urządzeniu. Ponieważ zestaw danych, którego używamy, jest kluczowany przez unikalny program piszący, dane jednego klienta reprezentują pismo ręczne jednej osoby dla próbki cyfr od 0 do 9, symulując unikalny „wzorzec użycia” jednego użytkownika.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

png

Teraz zwizualizujmy liczbę przykładów na każdym kliencie dla każdej etykiety z cyframi MNIST. W środowisku federacyjnym liczba przykładów na każdym kliencie może się znacznie różnić w zależności od zachowania użytkownika.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

png

Teraz zwizualizujmy średni obraz na klienta dla każdej etykiety MNIST. Ten kod wygeneruje średnią z każdej wartości piksela dla wszystkich przykładów użytkownika dla jednej etykiety. Zobaczymy, że średni obraz cyfry jednego klienta będzie wyglądał inaczej niż średni obraz tej samej cyfry innego klienta, ze względu na unikalny styl pisma każdej osoby. Możemy zastanowić się, w jaki sposób każda lokalna runda szkolenia będzie przesuwać model w innym kierunku na każdym kliencie, ponieważ uczymy się na podstawie własnych unikalnych danych tego użytkownika w tej rundzie lokalnej. W dalszej części samouczka zobaczymy, jak możemy pobrać każdą aktualizację modelu od wszystkich klientów i agregować je razem w nasz nowy globalny model, który nauczył się na podstawie unikalnych danych każdego z naszych klientów.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

png

png

png

png

png

Dane użytkownika mogą być zaszumione i nierzetelnie oznakowane. Na przykład, patrząc na powyższe dane Klienta nr 2, możemy zauważyć, że w przypadku etykiety 2 możliwe było wystąpienie kilku błędnie oznaczonych przykładów tworzących głośniejszy, średni obraz.

Wstępne przetwarzanie danych wejściowych

Ponieważ dane są już tf.data.Dataset , przerób może być dokonane za pomocą przekształceń DataSet. Tutaj spłaszczyć 28x28 obrazy do 784 -elementowe tablic, przetasować poszczególne przykłady, zorganizować je w partii, i zmienić funkcje z pixels i label do x i y do użytku z Keras. Mamy również rzucać w repeat na zbiorze danych, aby uruchomić kilka epok.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Sprawdźmy, czy to zadziałało.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [5],
       [7],
       [1],
       [7],
       [7],
       [1],
       [4],
       [7],
       [4],
       [2],
       [2],
       [5],
       [4],
       [1],
       [1],
       [0],
       [0],
       [9]], dtype=int32))])

Mamy prawie wszystkie bloki konstrukcyjne do tworzenia sfederowanych zestawów danych.

Jednym ze sposobów, aby plik danych stowarzyszony TFF w symulacji jest po prostu jako lista Python, przy czym każdy element listy trzyma dane indywidualnego użytkownika, czy to w postaci listy lub jako tf.data.Dataset . Ponieważ mamy już interfejs, który zapewnia to drugie, skorzystajmy z niego.

Oto prosta funkcja pomocnicza, która utworzy listę zestawów danych z danego zestawu użytkowników jako dane wejściowe do rundy szkolenia lub oceny.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

Jak teraz wybieramy klientów?

W typowym scenariuszu szkolenia sfederowanego mamy do czynienia z potencjalnie bardzo dużą populacją urządzeń użytkowników, z których tylko ułamek może być dostępny do szkolenia w danym momencie. Dzieje się tak na przykład, gdy urządzeniami klienckimi są telefony komórkowe, które uczestniczą w szkoleniu tylko wtedy, gdy są podłączone do źródła zasilania, wyłączone z sieci z pomiarem lub w inny sposób bezczynne.

Oczywiście jesteśmy w środowisku symulacyjnym, a wszystkie dane są dostępne lokalnie. Zazwyczaj wtedy, przeprowadzając symulacje, po prostu wybieramy losowy podzbiór klientów, którzy będą zaangażowani w każdą rundę szkolenia, generalnie różni się w każdej rundzie.

Mimo to, jak można się dowiedzieć studiując papier na uśredniania Federalne algorytmu, osiągnięcie konwergencji w systemie z losowo wybranych podzbiorów klientów w każdej rundzie może trochę potrwać, i byłoby niepraktyczne trzeba uruchomić setki rund w ten interaktywny samouczek.

Zamiast tego zrobimy próbkowanie zestawu klientów raz i ponownie użyjemy tego samego zestawu w kolejnych rundach, aby przyspieszyć konwergencję (celowo nadmiernie dopasowując się do tych kilku danych użytkownika). Zmodyfikowanie tego samouczka w celu symulacji losowego próbkowania pozostawiamy jako ćwiczenie dla czytelnika — jest to dość łatwe do zrobienia (jeśli już to zrobisz, pamiętaj, że uzyskanie zbieżności modelu może trochę potrwać).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

Tworzenie modelu z Keras

Jeśli używasz Keras, prawdopodobnie masz już kod, który konstruuje model Keras. Oto przykład prostego modelu, który wystarczy na nasze potrzeby.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

W celu korzystania z dowolnego modelu z TFF, musi być zawinięta w instancji tff.learning.Model interfejs, który naraża metody pieczęć podaniu modelki, właściwości metadanych, itd, podobnie jak Keras, ale również wprowadza dodatkowy elementy, takie jak sposoby kontrolowania procesu obliczania metryk stowarzyszonych. Nie martwmy się tym na razie; jeśli masz model Keras jak ta, którą właśnie zdefiniowanego powyżej, można mieć TFF owinąć go dla Ciebie powołując tff.learning.from_keras_model , przekazując model i na próbie liczącej danych jako argumenty, jak pokazano poniżej.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Szkolenie modelu na danych sfederowanych

Teraz, gdy mamy model owinięty jak tff.learning.Model do użytku z TFF, możemy pozwolić TFF skonstruować algorytm uśredniania Federalne przez wywołanie funkcji pomocnika tff.learning.build_federated_averaging_process , jak następuje.

Należy pamiętać, że argument musi być konstruktora (takie jak model_fn powyżej), nie jest już zbudowany instancji, tak że budowa modelu może się zdarzyć w kontekście kontrolowanym przez TFF (jeśli jesteś ciekawy o przyczynach to zachęcamy do zapoznania się z follow-up tutorial algorytmów niestandardowych ).

Jedna krytyczna uwaga o algorytm uśredniania Federalne poniżej, są 2 optymalizujące: a optymalizator _client i optymalizator _SERVER. _Client optymalizator jest używany tylko do obliczenia lokalnych aktualizacji modelu na każdym kliencie. _SERVER optymalizator stosuje uśrednionego aktualizację światowego modelu na serwerze. W szczególności oznacza to, że wybór używanego optymalizatora i szybkości uczenia się może być inny niż ten, którego użyłeś do uczenia modelu na standardowym zestawie danych iid. Zalecamy rozpoczęcie od regularnego SGD, prawdopodobnie z mniejszą szybkością uczenia się niż zwykle. Szybkość uczenia się, której używamy, nie została dokładnie dostosowana, możesz eksperymentować.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

Co się stało? TFF skonstruował parę stowarzyszonych obliczeń i pakować je do tff.templates.IterativeProcess w którym te obliczenia są dostępne jako para właściwości initialize i next .

W skrócie, Federalne obliczenia są programy w języku wewnętrznym TFF, że mogą wyrażać różne algorytmy federacyjnych (można znaleźć więcej na ten temat w niestandardowych algorytmów samouczek). W tym przypadku obie obliczenia generowane i zapakowane w iterative_process wdrożyć Federalne Uśrednianie .

Celem TFF jest zdefiniowanie obliczeń w taki sposób, aby można je było wykonywać w rzeczywistych ustawieniach sfederowanego uczenia się, ale obecnie implementowane jest tylko lokalne środowisko wykonawcze symulacji wykonywania. Aby wykonać obliczenia w symulatorze, po prostu wywołujesz je jak funkcję Pythona. To domyślnie interpretowane środowisko nie jest zaprojektowane z myślą o wysokiej wydajności, ale wystarczy w tym samouczku; spodziewamy się, że w przyszłych wydaniach zapewnimy wydajniejsze środowiska wykonawcze symulacji, aby ułatwić badania na większą skalę.

Zacznijmy z initialize obliczeń. Tak jak w przypadku wszystkich obliczeń federacyjnych, możesz myśleć o tym jako o funkcji. Obliczenie nie wymaga argumentów i zwraca jeden wynik — reprezentację stanu procesu uśredniania federacyjnego na serwerze. Chociaż nie chcemy zagłębiać się w szczegóły TFF, pouczające może być zobaczenie, jak wygląda ten stan. Możesz to zwizualizować w następujący sposób.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

Chociaż powyższy podpis typ może początkowo wydawać się nieco tajemnicze, można uznać, że stan serwera składa się z model (początkowe parametry modelu dla MNIST które zostaną rozesłane do wszystkich urządzeń), a optimizer_state (dodatkowe informacje przechowywane przez serwer, takie jak liczba rund do użycia w harmonogramach hiperparametrów itp.).

Załóżmy, powoływać się na initialize obliczeń skonstruować stan serwera.

state = iterative_process.initialize()

Drugi z pary federacyjnych obliczeń, next , reprezentuje pojedynczą rundę Federalne uśredniania, która składa się z pchania stanu serwera (w tym parametrów modelu) do klientów, szkolenia na urządzeniu na swoich lokalnych danych, gromadzenie i modelowe uśredniania aktualizacje i stworzenie nowego zaktualizowanego modelu na serwerze.

Koncepcyjnie, można myśleć o next jako posiadające funkcjonalne typu podpis, który wygląda następująco.

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

W szczególności, należy pomyśleć o next() nie jako funkcja, która działa na serwerze, lecz bycie deklaratywne funkcjonalny reprezentacja całego zdecentralizowanej obliczeń - niektóre z wejść są dostarczane przez serwer ( SERVER_STATE ), ale każdy uczestniczący urządzenie dostarcza swój własny lokalny zestaw danych.

Przeprowadźmy pojedynczą rundę szkolenia i zwizualizujmy wyniki. Możemy użyć danych federacyjnych, które już wygenerowaliśmy powyżej dla próbki użytkowników.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Przeprowadźmy jeszcze kilka rund. Jak wspomniano wcześniej, zazwyczaj w tym momencie wybierasz podzbiór danych symulacji z nowej losowo wybranej próbki użytkowników dla każdej rundy, aby zasymulować realistyczne wdrożenie, w którym użytkownicy stale przychodzą i odchodzą, ale w tym interaktywnym notatniku, ze względu na demonstrację po prostu ponownie użyjemy tych samych użytkowników, aby system szybko się zbieżny.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834728)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.19917695), ('loss', 2.6146567)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21975309), ('loss', 2.529761)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2409465), ('loss', 2.4053504)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2611111), ('loss', 2.315389)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.30823046), ('loss', 2.1240263)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.33312756), ('loss', 2.1164262)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Utrata szkolenia zmniejsza się po każdej rundzie szkolenia federacyjnego, co wskazuje, że model jest zbieżny. Istnieją pewne istotne zastrzeżenia z tych metryk szkoleniowych jednak, patrz rozdział na ocenie w dalszej części tego podręcznika.

Wyświetlanie metryk modelu w TensorBoard

Następnie zwizualizujmy metryki z tych obliczeń federacyjnych za pomocą Tensorboard.

Zacznijmy od utworzenia katalogu i odpowiedniego programu do zapisywania podsumowań, w którym będą zapisywane metryki.

logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

Wykreśl odpowiednie metryki skalarne za pomocą tego samego kreatora podsumowań.

with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

Uruchom TensorBoard z głównym katalogiem dziennika określonym powyżej. Załadowanie danych może potrwać kilka sekund.

!ls {logdir}
%tensorboard --logdir {logdir} --port=0
events.out.tfevents.1629557449.ebe6e776479e64ea-4903924a278.borgtask.google.com.458912.1.v2
Launching TensorBoard...
Reusing TensorBoard on port 50681 (pid 292785), started 0:30:30 ago. (Use '!kill 292785' to kill it.)
<IPython.core.display.Javascript at 0x7fd6617e02d0>
# Uncomment and run this this cell to clean your directory of old output for
# future graphs from this directory. We don't run it by default so that if 
# you do a "Runtime > Run all" you don't lose your results.

# !rm -R /tmp/logs/scalars/*

Aby wyświetlić metryki oceny w ten sam sposób, możesz utworzyć osobny folder eval, taki jak „logs/scalars/eval”, aby pisać do TensorBoard.

Dostosowanie implementacji modelu

Keras jest zalecana wysokiego poziomu Model API dla TensorFlow i zachęcamy stosując modele Keras (przez tff.learning.from_keras_model ) w TFF w miarę możliwości.

Jednak tff.learning zapewnia interfejs modelu niższego szczebla, tff.learning.Model , że odsłania minimalną funkcjonalność niezbędną dla stosując model dla stowarzyszonym nauki. Bezpośrednio realizacji tego interfejsu (prawdopodobnie nadal używa klocki jak tf.keras.layers ) pozwala na maksymalne dostosowanie bez modyfikowania wnętrzności federacyjnych algorytmów uczenia.

Więc zróbmy to wszystko od nowa.

Definiowanie zmiennych modelu, przejścia do przodu i metryk

Pierwszym krokiem jest zidentyfikowanie zmiennych TensorFlow, z którymi będziemy pracować. Aby poniższy kod był bardziej czytelny, zdefiniujmy strukturę danych reprezentującą cały zestaw. Będzie to obejmować takie zmienne jak weights i bias , że będziemy trenować, a także zmienne, które posiadają różne łączne statystyki i liczniki będziemy aktualizować w trakcie szkolenia, takie jak loss_sum , accuracy_sum i num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

Oto metoda, która tworzy zmienne. Dla uproszczenia możemy reprezentować wszystkie statystyki jak tf.float32 , jako że wyeliminuje potrzebę typu konwersji w późniejszym etapie. Zawijania zmienne inicjatorów jako lambdas jest to wymóg nałożony przez zmiennych zasobów .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

Mając zmienne dla parametrów modelu i skumulowanych statystyk, możemy teraz zdefiniować metodę przekazywania do przodu, która oblicza straty, emituje prognozy i aktualizuje skumulowane statystyki dla pojedynczej partii danych wejściowych w następujący sposób.

def predict_on_batch(variables, x):
  return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

def mnist_forward_pass(variables, batch):
  y = predict_on_batch(variables, batch['x'])
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

Następnie definiujemy funkcję, która zwraca zestaw lokalnych metryk, ponownie używając TensorFlow. Są to wartości (oprócz aktualizacji modelu, które są obsługiwane automatycznie), które kwalifikują się do agregowania na serwerze w procesie sfederowanego uczenia się lub oceny.

Tutaj, po prostu wrócić średnią loss i accuracy , a także num_examples , które musimy prawidłowo ważyć wkłady różnych użytkowników podczas obliczania agregatów stowarzyszonym.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

Wreszcie, musimy ustalić, jak agregowanie metryk lokalnych emitowanych przez poszczególne urządzenia poprzez get_local_mnist_metrics . To jest tylko część kodu, który nie jest napisany w TensorFlow - to stowarzyszonego obliczenia wyrażone w TFF. Jeśli chcesz kopać głębiej, chude nad niestandardowych algorytmów poradnik, ale w większości zastosowań, nie będzie naprawdę trzeba; warianty wzoru pokazanego poniżej powinny wystarczyć. Oto jak to wygląda:

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

Do wprowadzania metrics odpowiada argument OrderedDict zwróconej przez get_local_mnist_metrics powyżej, ale krytycznie wartości nie są już tf.Tensors - są „zapakowane”, jak tff.Value s, aby było jasne, nie będzie można nimi manipulować za pomocą TensorFlow, ale tylko stosując federacyjnych operatorów TFF Like tff.federated_mean i tff.federated_sum . Zwrócony słownik globalnych agregatów definiuje zestaw metryk, które będą dostępne na serwerze.

Konstruowanie wystąpienie tff.learning.Model

Mając wszystkie powyższe elementy, jesteśmy gotowi do skonstruowania reprezentacji modelu do użytku z TFF, podobnej do tej, która jest generowana dla Ciebie, gdy pozwolisz TFF pozyskać model Keras.

from typing import Callable, List, OrderedDict

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def predict_on_batch(self, x, training=True):
    del training
    return predict_on_batch(self._variables, x)

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

  @tf.function
  def report_local_unfinalized_metrics(
      self) -> OrderedDict[str, List[tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to unfinalized values."""
    return collections.OrderedDict(
        num_examples=[self._variables.num_examples],
        loss=[self._variables.loss_sum, self._variables.num_examples],
        accuracy=[self._variables.accuracy_sum, self._variables.num_examples])

  def metric_finalizers(
      self) -> OrderedDict[str, Callable[[List[tf.Tensor]], tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to finalizers."""
    return collections.OrderedDict(
        num_examples=tf.function(func=lambda x: x[0]),
        loss=tf.function(func=lambda x: x[0] / x[1]),
        accuracy=tf.function(func=lambda x: x[0] / x[1]))

Jak widać, abstrakcyjne metody i właściwości zdefiniowane przez tff.learning.Model odpowiada fragmentów kodu w poprzednim rozdziale, że wprowadzone zmienne i zdefiniowała straty i statystyki.

Oto kilka punktów, które warto podkreślić:

  • Wszystko stwierdzają, że dany model będzie korzystać muszą być ujęte jako zmienne TensorFlow, jak TFF nie używa Python przy starcie (pamiętać swój kod powinien być napisany tak, że może on zostać wdrożony do urządzeń mobilnych; patrz niestandardowych algorytmów samouczka na bardziej szczegółowe komentarz do przyczyn).
  • Twój model powinien opisać postać danych akceptuje ( input_spec ), jak w ogóle, TFF jest silnie wpisany środowisko i chce, aby określić typ podpisów dla wszystkich komponentów. Istotną jego częścią jest deklarowanie formatu danych wejściowych modelu.
  • Chociaż technicznie nie jest to wymagane, zalecamy owijanie wszelkiej logice TensorFlow (podaniu, obliczenia metryczne, itd.) Jako tf.function s, jak to pomaga zapewnić TensorFlow mogą być szeregowane i usuwa potrzebę wyraźnych zależności sterujących.

Powyższe jest wystarczające do oceny i algorytmów takich jak Federated SGD. Jednak w przypadku uśredniania federacyjnego musimy określić, w jaki sposób model powinien trenować lokalnie w każdej partii. Lokalny optymalizator określimy podczas budowania algorytmu uśredniania sfederowanego.

Symulacja szkolenia sfederowanego z nowym modelem

Po wdrożeniu wszystkich powyższych elementów pozostała część procesu wygląda tak, jak już widzieliśmy — po prostu zastąp konstruktor modelu konstruktorem naszej nowej klasy modelu i użyj dwóch sfederowanych obliczeń w utworzonym procesie iteracyjnym, aby przejść przez rundy szkoleniowe.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.0708053), ('accuracy', 0.12777779)])), ('stat', OrderedDict([('num_examples', 4860)]))])
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.011699), ('accuracy', 0.13024691)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7408307), ('accuracy', 0.15576132)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.6761012), ('accuracy', 0.17921811)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.675567), ('accuracy', 0.1855967)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.5664043), ('accuracy', 0.20329218)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4179392), ('accuracy', 0.24382716)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.3237286), ('accuracy', 0.26687244)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.1861682), ('accuracy', 0.28209877)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.046388), ('accuracy', 0.32037038)])), ('stat', OrderedDict([('num_examples', 4860)]))])

Aby zobaczyć te metryki w TensorBoard, zapoznaj się z krokami wymienionymi powyżej w "Wyświetlanie metryk modelu w TensorBoard".

Ocena

Wszystkie nasze dotychczasowe eksperymenty przedstawiały tylko metryki treningu sfederowanego — średnie metryki ze wszystkich partii danych wytrenowanych dla wszystkich klientów w rundzie. Wprowadza to normalne obawy związane z nadmiernym dopasowaniem, zwłaszcza że dla uproszczenia używaliśmy tego samego zestawu klientów w każdej rundzie, ale istnieje dodatkowe pojęcie o nadmiernym dopasowaniu w metrykach treningowych charakterystycznych dla algorytmu uśredniania federacyjnego. Najłatwiej to zobaczyć, jeśli wyobrazimy sobie, że każdy klient miał pojedynczą partię danych i trenujemy na tej partii przez wiele iteracji (epok). W takim przypadku model lokalny szybko dopasuje się dokładnie do tej jednej partii, a więc uśredniona przez nas lokalna metryka dokładności zbliży się do 1,0. Tak więc te wskaźniki treningowe mogą być traktowane jako znak postępu treningu, ale niewiele więcej.

Aby wykonać ocenę danych federacyjnych, można skonstruować inny stowarzyszonego obliczenia przeznaczone tylko dla tego celu, przy użyciu tff.learning.build_federated_evaluation funkcję i przekazując w konstruktorze modelu jako argumentu. Należy zauważyć, że w przeciwieństwie do Federacji uśredniania gdzie my używane MnistTrainableModel , wystarczy zdać MnistModel . Ocena nie wykonuje spadku gradientu i nie ma potrzeby konstruowania optymalizatorów.

Do eksperymentów i badań, gdy scentralizowane Test zestawu danych jest dostępna, Federalne Nauka dla Tekst Generation pokazuje inną opcję ocena: Biorąc wyszkolonych ciężary z stowarzyszonym nauki, stosując je do standardowego modelu Keras, a następnie po prostu wywołanie tf.keras.models.Model.evaluate() na scentralizowanym zbiorze.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

Możesz sprawdzić podpis typu abstrakcyjnego funkcji oceny w następujący sposób.

str(evaluation.type_signature)
'(<server_model_weights=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,federated_dataset={<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <eval=<num_examples=float32,loss=float32,accuracy=float32>,stat=<num_examples=int64>>@SERVER)'

Nie trzeba być zaniepokojony szczegółów w tym momencie, po prostu mieć świadomość, że przyjmuje następującą postać ogólną, podobny do tff.templates.IterativeProcess.next ale z dwoma istotnymi różnicami. Po pierwsze, nie zwracamy stanu serwera, ponieważ ewaluacja nie modyfikuje modelu ani żadnego innego aspektu stanu - można o nim myśleć jako o bezstanowym. Po drugie, ocena wymaga tylko modelu i nie wymaga żadnej innej części stanu serwera, która może być powiązana z uczeniem, na przykład zmiennych optymalizujących.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

Przywołajmy ocenę ostatniego stanu, do którego doszliśmy podczas szkolenia. W celu wydobycia najnowszy model z wyszkolonych stanu serwera, po prostu dostęp do .model zarejestrował się w następujący sposób.

train_metrics = evaluation(state.model, federated_train_data)

Oto, co otrzymujemy. Zwróć uwagę, że liczby wyglądają nieznacznie lepiej niż te, które podano w ostatniej rundzie szkolenia powyżej. Zgodnie z konwencją metryki uczące zgłaszane przez iteracyjny proces uczący na ogół odzwierciedlają wydajność modelu na początku rundy uczącej, więc metryki oceny będą zawsze o jeden krok do przodu.

str(train_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 4860.0), ('loss', 1.7510437), ('accuracy', 0.2788066)])), ('stat', OrderedDict([('num_examples', 4860)]))])"

Teraz skompilujmy próbkę testową danych federacyjnych i ponownie uruchom ocenę danych testowych. Dane będą pochodzić z tej samej próbki rzeczywistych użytkowników, ale z odrębnego zestawu danych.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 580.0), ('loss', 1.8361608), ('accuracy', 0.2413793)])), ('stat', OrderedDict([('num_examples', 580)]))])"

To kończy samouczek. Zachęcamy do zabawy z parametrami (np. wielkość partii, liczba użytkowników, epoki, tempo uczenia się itp.), modyfikowanie powyższego kodu w celu symulowania treningu na losowych próbkach użytkowników w każdej rundzie oraz zapoznawanie się z innymi samouczkami opracowaliśmy.