High-performance Simulation with Kubernetes

This tutorial will describe how to set up high-performance simulation using a TFF runtime running on Kubernetes. The model is the same as in the previous tutorial, High-performance simulations with TFF. The only difference is that here we use a worker pool instead of a local executor.

This tutorial refers to Google Cloud's GKE to create the Kubernetes cluster, but all the steps after the cluster is created can be used with any Kubernetes installation.

View on TensorFlow.org Run in Google Colab View source on GitHub

GKE에서 TFF 작업자 시작하기

참고 : 이 가이드에서는 사용자에게 기존 GCP 프로젝트가 있다고 가정합니다.

Kubernetes 클러스터 생성하기

다음 단계는 한 번만 수행하면 됩니다. 클러스터는 향후 워크로드에 재사용할 수 있습니다.

GKE 지침에 따라 컨테이너 클러스터를 만듭니다. 이 가이드의 나머지 부분에서는 클러스터의 이름이 tff-cluster라고 가정하지만, 실제 이름은 중요하지 않습니다. "5단계: 애플리케이션 배포하기"에 도달하면 지침를 따르지 마세요.

TFF 작업자 애플리케이션 배포하기

GCP와 상호 작용하는 명령은 로컬 또는 Google Cloud Shell에서 실행할 수 있습니다. 추가 설정이 필요하지 않으므로 Google Cloud Shell을 사용하는 것이 좋습니다.

  1. 다음 명령을 실행하여 Kubernetes 애플리케이션을 시작합니다.
$ kubectl create deployment tff-workers --image=gcr.io/tensorflow-federated/remote-executor-service:{ {version} }
  1. 애플리케이션에 대한 로드 밸런서를 추가합니다.
$ kubectl expose deployment tff-workers --type=LoadBalancer --port 80 --target-port 8000

참고: 이렇게 하면 배포가 인터넷에 노출되며 데모용으로만 사용됩니다. 운영 용도의 경우, 방화벽과 인증을 강력히 권장합니다.

Google Cloud Console에서 로드 밸런서의 IP 주소를 조회합니다. 나중에 훈련 루프를 작업자 앱에 연결하는 데 필요합니다.

(또는) 로컬로 Docker 컨테이너 시작하기

$ docker run --rm -p 8000:8000 gcr.io/tensorflow-federated/remote_executor_service:{ {version} }

TFF 환경 설정

!pip install --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

훈련할 모델 정의하기

import collections
import time

import tensorflow as tf
import tensorflow_federated as tff

source, _ = tff.simulation.datasets.emnist.load_data()


def map_fn(example):
  return collections.OrderedDict(
      x=tf.reshape(example['pixels'], [-1, 784]), y=example['label'])


def client_data(n):
  ds = source.create_tf_dataset_for_client(source.client_ids[n])
  return ds.repeat(10).batch(20).map(map_fn)


train_data = [client_data(n) for n in range(10)]
input_spec = train_data[0].element_spec


def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(units=10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])
  return tff.learning.from_keras_model(
      model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])


trainer = tff.learning.build_federated_averaging_process(
    model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.02))


def evaluate(num_rounds=10):
  state = trainer.initialize()
  for round in range(num_rounds):
    t1 = time.time()
    state, metrics = trainer.next(state, train_data)
    t2 = time.time()
    print('Round {}: loss {}, round time {}'.format(round, metrics.loss, t2 - t1))

원격 실행기 설정하기

기본적으로 TFF는 모든 계산을 로컬에서 실행합니다. 이 단계에서는 위에서 설정한 Kubernetes 서비스에 연결하도록 TFF에 지시합니다. 서비스의 IP 주소를 복사해야 합니다.

import grpc

ip_address = '0.0.0.0' 
port = 80 

channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]

tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')

훈련 실행하기

evaluate()
Round 0: loss 4.370407581329346, round time 4.201097726821899
Round 1: loss 4.1407670974731445, round time 3.3283166885375977
Round 2: loss 3.865147590637207, round time 3.098310947418213
Round 3: loss 3.534019708633423, round time 3.1565616130828857
Round 4: loss 3.272688388824463, round time 3.175067663192749
Round 5: loss 2.935391664505005, round time 3.008434534072876
Round 6: loss 2.7399251461029053, round time 3.31435227394104
Round 7: loss 2.5054931640625, round time 3.4411356449127197
Round 8: loss 2.290508985519409, round time 3.158798933029175
Round 9: loss 2.1194536685943604, round time 3.1348156929016113