TensorFlow.org'da görüntüleyin | Google Colab'da çalıştırın | Kaynağı GitHub'da görüntüleyin | Not defterini indir |
Bu eğitimde, tasarım ardındaki ilkeleri açıklamak tff.aggregators
modülüne ve sunucuya müşterilerinden gelen değerlerin özel toplanmasına ilgili en iyi uygulamaları.
Önkoşullar. Bu eğitimde zaten temel kavramlarına aşina varsayar Federe Çekirdek böyle yerleşimler (aynı tff.SERVER
, tff.CLIENTS
TFF hesaplamaları nasıl temsil), ( tff.tf_computation
, tff.federated_computation
) ve bunların tip imzaları.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
Tasarım özeti
TFF olarak, "birleştirme" ile ilgili bir dizi değer hareketi ifade eder tff.CLIENTS
aynı tipte bir toplam değeri elde etmek için tff.SERVER
. Yani, her bir bireysel müşteri değerinin mevcut olması gerekmez. Örneğin, federe öğrenimde, sunucudaki global modele uygulanacak bir toplu model güncellemesi elde etmek için istemci modeli güncellemelerinin ortalaması alınır.
Gibi bu amacı gerçekleştirmek operatörler ek olarak tff.federated_sum
TFF içerir tff.templates.AggregationProcess
(bir durum bilgisi işlem basit bir miktar daha karmaşık formlar genelleme böylece agregasyon hesaplama tipi imza formalizes).
Ana bileşenleri tff.aggregators
oluşturulması için fabrikaları modülü AggregationProcess
iki yönden TFF genellikle faydalı ve Değişebilir yapı blokları olarak tasarlanmıştır:
- Parametreli hesaplamalar. Toplama ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilir bağımsız bir yapı taşıdır
tff.aggregators
onların gerekli toplanmasına parametreleştirmenin.
Örnek:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.
Örnek:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
Bu öğreticinin geri kalanı, bu iki hedefe nasıl ulaşıldığını açıklar.
Toplama süreci
Biz ilk özetlemek tff.templates.AggregationProcess
ve bunun oluşturulması için fabrika desenle izleyin.
tff.templates.AggregationProcess
bir olduğunu tff.templates.MeasuredProcess
toplama için belirtilen tip imzalarla. Özellikle, initialize
ve next
fonksiyonlar şu tip imzaları vardır:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
(Tip durumu state_type
) servis yerleştirilmelidir. next
fonksiyonu, durum ve bir değer (tipi toplu için kullanılan giriş argüman olarak alır value_type
müşteriler yerleştirilir). *
Aracı bir ağırlıklı ortalama olarak, örneğin ağırlıkları için, diğer giriş bağımsız değişkeni isteğe. Güncellenmiş bir durum nesnesi, sunucuya yerleştirilen aynı türden toplu değer ve bazı ölçümler döndürür.
Devlet hem de çalıştırma arasında geçirilecek bu Not next
işlev, ve rapor edilen ölçümler, belirli bir yürütme bağlı olarak herhangi bir bilgi vermek için amaçlanan next
fonksiyonu, boş olabilir. Bununla birlikte, TFF'nin diğer bölümlerinin takip etmesi gereken net bir sözleşmeye sahip olmaları için açıkça belirtilmeleri gerekir.
Diğer TFF modülleri, örnek model güncellemelerini için tff.learning
, kullanması beklenir tff.templates.AggregationProcess
değerleri toplanır nasıl parametreleştirmenin. Ancak, toplanan değerlerin tam olarak ne olduğu ve tür imzalarının ne olduğu, eğitilen modelin diğer ayrıntılarına ve bunu yapmak için kullanılan öğrenme algoritmasına bağlıdır.
Hesaplamaların diğer yönlerinin toplanma bağımsız hale getirmek için, fabrika deseni kullanmak - biz uygun oluşturmak tff.templates.AggregationProcess
nesnelerin ilgili tip imzaları çağırarak, toplanan mevcuttur edilecek bir kez create
fabrikanın yöntemini. Bu nedenle, toplama işleminin doğrudan ele alınması, yalnızca bu oluşturmadan sorumlu olan kütüphane yazarları için gereklidir.
Toplama proses fabrikaları
Ağırlıksız ve ağırlıklı toplama için iki soyut temel fabrika sınıfı vardır. Bunların create
yöntem toplu gereken değerin tipi imzalarını alır ve bir döner tff.templates.AggregationProcess
bu değerlerin toplanması için.
Tarafından oluşturulan yöntem tff.aggregators.UnweightedAggregationFactory
sunucusunda (1) durum ve belirtilen tipi (2) değeri: iki giriş argüman alır value_type
.
Bir örnek uygulamasıdır tff.aggregators.SumFactory
.
Tarafından oluşturulan yöntem tff.aggregators.WeightedAggregationFactory
sunucusunda (1) durum, belirli bir tip (2) değeri: üç giriş argüman alır value_type
ve tipi (3) ağırlığı weight_type
fabrikanın kullanıcı tarafından belirtilen şekilde onun yürütmesini zaman, create
yöntemi.
Bir örnek uygulamasıdır tff.aggregators.MeanFactory
ağırlıklı ortalamasını hesaplar.
Fabrika modeli, yukarıda belirtilen ilk hedefe nasıl ulaştığımızdır; bu toplama bağımsız bir yapı taşıdır. Örneğin, hangi model değişkenlerinin eğitilebilir olduğunu değiştirirken, karmaşık bir toplamanın mutlaka değişmesi gerekmez; Bu tür bir yöntem ile kullanıldığında, bu temsil fabrika farklı bir türü imza ile çağrılır tff.learning.build_federated_averaging_process
.
Kompozisyonlar
Genel bir toplama işleminin, (a) istemcilerdeki değerlerin bir miktar ön işlemesini, (b) değerlerin istemciden sunucuya hareketini ve (c) sunucuda toplanmış değerin bazı sonradan işlenmesini kapsadığını hatırlayın. İçindeki agregasyon bileşimin, yukarıda belirtilen ikinci amacı, gerçekleştirilen tff.aggregators
yapılanma kısım (b) başka bir agregasyon fabrikasına temsilci şekildedir agregasyon fabrikaları uygulanmasını modülü.
Tüm gerekli mantığı tek bir fabrika sınıfı içinde uygulamak yerine, uygulamalar varsayılan olarak toplamayla ilgili tek bir yöne odaklanır. Gerektiğinde, bu model daha sonra yapı taşlarını birer birer değiştirmemizi sağlar.
Bir örnek ağırlıklı olan tff.aggregators.MeanFactory
. Uygulaması, istemcilerde sağlanan değerleri ve ağırlıkları çarpar, ardından hem ağırlıklı değerleri hem de ağırlıkları bağımsız olarak toplar ve ardından ağırlıklı değerlerin toplamını sunucudaki ağırlıkların toplamına böler. Bunun yerine doğrudan kullanarak toplamları uygulanması tff.federated_sum
operatörü, toplam iki örneğine delege tff.aggregators.SumFactory
.
Bu yapı, iki varsayılan toplamın, toplamı farklı şekilde gerçekleştiren farklı fabrikalar tarafından değiştirilmesini mümkün kılar. Örneğin, bir tff.aggregators.SecureSumFactory
veya özel bir uygulama tff.aggregators.UnweightedAggregationFactory
. Bunun aksine, zaman, tff.aggregators.MeanFactory
kendisi gibi bir başka fabrikanın bir iç agregasyon olabilir tff.aggregators.clipping_factory
değerler ortalama alma öncesinde kırpılmasına ise.
Önceki Bkz Ayarlama öğrenme toplamalardan önerilen mevcut fabrikaları kullanarak kompozisyon mekanizmasının receommended kullanımlar için öğretici tff.aggregators
modülü.
Örnek olarak en iyi uygulamalar
Biz göstermek için gidiyoruz tff.aggregators
basit bir örnek görevi uygulayarak ayrıntılı olarak kavram ve giderek daha genel olun. Öğrenmenin başka bir yolu da mevcut fabrikaların uygulanmasına bakmaktır.
import collections
import tensorflow as tf
import tensorflow_federated as tff
Bunun yerine toplanmasının value
, örneğin görev toplanmasından ibarettir value * 2.0
ve o zamana kadar toplam ikiye bölünür 2.0
. Agregasyon sonucu böylece doğrudan toplanmasıyla için matematiksel olarak denk olan value
, ve üç bölümden oluşan olarak düşünülebilir: sunucusunda unscaling istemciler (3) boyunca toplanmasıyla istemciler de (1) ölçekleme (2).
Tasarım Yukarıda açıklanan takiben, mantık bir alt sınıfı olarak uygulanacaktır tff.aggregators.UnweightedAggregationFactory
uygun oluşturur tff.templates.AggregationProcess
verilen bir value_type
agrega için:
Minimum uygulama
Örnek görev için gerekli hesaplamalar her zaman aynıdır, bu nedenle durumu kullanmaya gerek yoktur. Böylece boş ve olarak temsil edilir tff.federated_value((), tff.SERVER)
. Şimdilik ölçümler için de aynı şey geçerli.
Görevin minimum uygulaması aşağıdaki gibidir:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Her şeyin beklendiği gibi çalışıp çalışmadığı aşağıdaki kodla doğrulanabilir:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
Durumsallık ve ölçümler
Durum bilgisi, TFF'de, yinelemeli olarak yürütülmesi ve her yinelemede değişmesi beklenen hesaplamaları temsil etmek için yaygın olarak kullanılır. Örneğin, bir öğrenme hesaplamasının durumu, öğrenilen modelin ağırlıklarını içerir.
Bir toplama hesaplamasında durumun nasıl kullanılacağını göstermek için örnek görevi değiştiriyoruz. Bunun yerine çarpımının value
ile 2.0
, biz çarpın o yineleme endeksine göre - kaç kez toplama idam edilmiştir.
Bunu yapmak için, durum kavramı aracılığıyla elde edilen yineleme indeksini takip etmenin bir yoluna ihtiyacımız var. In initialize_fn
, bunun yerine boş devlet yaratma, bir skaler sıfır olması devlet başlatmak. Daha sonra, durum kullanılabilecek next_fn
(1) tarafından artışı: üç adımda 1.0
, çok-katlı (2) 'kullanım value
ve yeni güncel durum olarak (3) döner.
Bu yapıldıktan sonra, şuna dikkat çekebilir: Ama tam yukarıdaki gibi aynı kod beklendiği gibi tüm çalışmaları doğrulamak için kullanılabilir. Bir şeyin gerçekten değiştiğini nasıl bilebilirim?
İyi soru! Ölçüm kavramının yararlı olduğu yer burasıdır. Genel olarak, ölçümler tek uygulanmasına ilişkin herhangi bir değer bildirebilirsiniz next
izlenmesi için kullanılabilir fonksiyonu. Bu durumda, bu olabilir summed_value
önceki örnekten. Diğer bir deyişle, yineleme dizinine bağlı olması gereken "ölçeklendirmeyi kaldırma" adımından önceki değer. Yine, bu pratikte mutlaka yararlı değildir, ancak ilgili mekanizmayı gösterir.
Göreve verilen durumsal cevap şu şekilde görünür:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
O Not state
girer next_fn
girdi olarak sunucuya yerleştirilir. İstemciler de kullanmak için, öncelikle kullanılarak elde edilir, iletilmesi gereken tff.federated_broadcast
operatör.
Beklendiği gibi tüm çalışmaları doğrulamak için, artık bildirilen bakabilirsiniz measurements
, yürütme her turda farklı olmalı bile aynı olan çalışma client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
Yapılandırılmış tipler
Birleşik öğrenmede eğitilmiş bir modelin model ağırlıkları, genellikle tek bir tensörden ziyade bir tensör koleksiyonu olarak temsil edilir. TFF, bu şekilde temsil edilen tff.StructType
ve genel olarak faydalı agregasyon fabrikaları yapılandırılmış türlerini kabul mümkün olması gerekmektedir.
Bununla birlikte, yukarıdaki örneklerde, sadece bir çalıştı tff.TensorType
nesne. Biz ile toplama işlemini oluşturmak için önceki fabrika kullanmayı denerseniz tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
, garip bir hata nedeniyle olsun TensorFlow bir çoğalmaya çalışacağız tf.Tensor
ve bir list
.
Sorun yerine sabiti tarafından tensörlerinin yapısını çarpılması nedeniyle, bir sabiti tarafından yapısında her tensörünü çarpmak gerekir olmasıdır. Bu sorunun olağan çözümü kullanmaktır tf.nest
oluşturulan modül içini tff.tf_computation
s.
Önceki sürümü ExampleTaskFactory
aşağıdaki gibi yapılandırılmış türleriyle uyumlu böylece görünür:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
Bu örnek, TFF kodunu yapılandırırken izlenmesi yararlı olabilecek bir modeli vurgulamaktadır. Çok basit operasyonlar ile ilgili değil zaman zaman, kod daha okunaklı hale tff.tf_computation
bir iç yapı taşları olarak kullanılacak ler tff.federated_computation
ayrı yerde oluşturulur. İçinde bir tff.federated_computation
, bu yapı taşları yalnızca içsel operatörlerini kullanarak bağlanır.
Beklendiği gibi çalıştığını doğrulamak için:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
İç toplamalar
Son adım, farklı toplama tekniklerinin kolay bir şekilde oluşturulmasına izin vermek için fiili toplamanın diğer fabrikalara devredilmesini isteğe bağlı olarak etkinleştirmektir.
Bu isteğe bağlı bir oluşturularak gerçekleştirilir inner_factory
bizim kurucuları içindedir argüman ExampleTaskFactory
. Belirtilen Değilse, tff.aggregators.SumFactory
uygulanacağı, kullanılan tff.federated_sum
önceki bölümde doğrudan kullanılan operatöre.
Ne zaman create
denir, öncelikle arayabilecek create
ait inner_factory
aynı iç toplama işlemini oluşturmak için value_type
.
Tarafından döndürülen Sürecimizin devlet initialize_fn
"Bu" işlemiyle oluşturulan devlet ve daha sonra oluşturulan iç sürecin devlet: iki bölümden bir bileşimdir.
Uygulanması next_fn
bu gerçek birleşimine de farklılık için temsilci next
iç işleminin fonksiyonu, ve son çıkış oluşmaktadır nasıl. Durum yine, "bu" ve "iç" durumuna oluşmaktadır ve ölçümler ile benzer bir şekilde oluşan OrderedDict
.
Aşağıdaki, böyle bir modelin bir uygulamasıdır.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
İçin devredilirken inner_process.next
fonksiyonu, aldığımız dönüş yapısı olan tff.templates.MeasuredProcessOutput
- aynı üç alanlarla, state
, result
ve measurements
. Oluşan toplanma işleminin genel dönüş yapısı oluştururken, state
ve measurements
alanları genellikle oluşan ve birlikte döndürülmelidir. Buna karşılık, result
değerine alan karşılık birleştirilmiş olan ve bunun yerine oluşan agregasyonu "akar".
state
nesne fabrikasının bir uygulama detaylı olarak görülmelidir ve dolayısıyla bileşim, herhangi bir yapı olabilir. Ancak, measurements
değerleri bir noktada kullanıcıya raporlanacak karşılık gelmektedir. Bu nedenle, kullanım tavsiye OrderedDict
bir kompozisyonda bir gelir metrik bildirilen gelmez yerler net olacağını böyle adlandırma oluşur ile.
Ayrıca kullanımına dikkat tff.federated_zip
operatörü. state
oluşturulmuş yöntem ile contolled amacı olmalıdır tff.FederatedType
. Bunun yerine geri dönmüş olsaydı (this_state, inner_state)
içinde initialize_fn
, dönüş türü imzası olacağını tff.StructType
bir 2-tuple içeren tff.FederatedType
s. Kullanımı tff.federated_zip
"asansörleri" tff.FederatedType
en üst seviyeye. Bu benzer şekilde kullanılır next_fn
durumu ve ölçümler döndürülecek hazırlanırken.
Son olarak, bunun varsayılan iç toplama ile nasıl kullanılabileceğini görebiliriz:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... ve farklı bir iç toplama ile. Örneğin, bir ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
Özet
Bu öğreticide, bir toplama fabrikası olarak temsil edilen genel amaçlı bir toplama yapı taşı oluşturmak için izlenecek en iyi uygulamaları açıkladık. Genellik, tasarım amacından iki şekilde gelir:
- Parametreli hesaplamalar. Toplama ile çalışmak üzere tasarlanmış diğer TFF modüllerine takılabilir bağımsız bir yapı taşıdır
tff.aggregators
gibi onların gerekli toplanmasına parametreleştirmenin,tff.learning.build_federated_averaging_process
. - Toplama bileşimi. Bir toplama yapı taşı, daha karmaşık bileşik toplamalar oluşturmak için diğer toplama yapı taşlarıyla oluşturulabilir.