مشاهده در TensorFlow.org | در Google Colab اجرا شود | مشاهده منبع در GitHub | دانلود دفترچه یادداشت |
در این آموزش، ما اصول طراحی پشت توضیح tff.aggregators
ماژول و بهترین روش برای پیاده سازی تجمع سفارشی از ارزش ها از مشتریان به سرور.
پیش نیازها. این آموزش فرض شما در حال حاضر با مفاهیم اولیه آشنا فدرال هسته مانند جایگاه ( tff.SERVER
، tff.CLIENTS
)، چگونه TFF نشان دهنده محاسبات ( tff.tf_computation
، tff.federated_computation
) و امضا نوع خود.
!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
خلاصه طراحی
در TFF، "تجمع" اشاره به حرکت یک مجموعه ای از ارزش در tff.CLIENTS
برای تولید یک ارزش کل از همان نوع در tff.SERVER
. یعنی نیازی نیست که هر یک از ارزش مشتری جداگانه در دسترس باشد. به عنوان مثال در یادگیری فدرال، بهروزرسانیهای مدل کلاینت برای دریافت بهروزرسانی مدل انبوه برای اعمال به مدل جهانی روی سرور، میانگین میشوند.
علاوه بر اپراتورهای رسیدن به این هدف مانند tff.federated_sum
، TFF فراهم می کند tff.templates.AggregationProcess
(یک فرآیند stateful به ) که رسمی تعریف امضای نوع برای محاسبات تجمع پس از آن می توانید به اشکال پیچیده تر از یک جمع ساده را تعمیم دهند.
اجزای اصلی از tff.aggregators
ماژول کارخانه ایجاد می AggregationProcess
، که طراحی شده اند به بلوک های ساختمان به طور کلی مفید و جایگزین از TFF در دو جنبه:
- محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است
tff.aggregators
استفاده از پارامترها در تجمع لازم خود را.
مثال:
learning_process = tff.learning.build_federated_averaging_process(
...,
model_update_aggregation_factory=tff.aggregators.MeanFactory())
- ترکیب تجمع. یک بلوک ساختمانی تجمع را می توان با سایر بلوک های ساختمانی تجمیع برای ایجاد تجمعات مرکب پیچیده تر ترکیب کرد.
مثال:
secure_mean = tff.aggregators.MeanFactory(
value_sum_factory=tff.aggregators.SecureSumFactory(...))
ادامه این آموزش نحوه دستیابی به این دو هدف را توضیح می دهد.
فرآیند تجمیع
ما برای اولین بار به طور خلاصه tff.templates.AggregationProcess
، و با الگوی کارخانه برای ایجاد آن دنبال کنید.
tff.templates.AggregationProcess
یک IS tff.templates.MeasuredProcess
با نوع امضا مشخص شده برای تجمع. به طور خاص، initialize
و next
توابع امضا نوع زیر:
-
( -> state_type@SERVER)
-
(<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)
دولت (از نوع state_type
) باید در سرور قرار می گیرد. next
تابع به عنوان آرگومان ورودی دولت و یک مقدار به جمع گردد (از نوع طول می کشد value_type
) قرار داده شده در مشتریان. *
یعنی اختیاری دیگر آرگومان ورودی، وزن مثلا در یک میانگین موزون. یک شی وضعیت به روز شده، مقدار تجمیع شده از همان نوع قرار داده شده در سرور و برخی اندازه گیری ها را برمی گرداند.
توجه داشته باشید که هر دو دولت به بین اعدام منتقل می شود next
تابع، و اندازه گیری گزارش در نظر گرفته شده برای گزارش هر گونه اطلاعات بسته به اعدام خاص از next
تابع، ممکن است خالی باشد. با این وجود، آنها باید به صراحت مشخص شوند تا سایر بخشهای TFF دارای یک قرارداد واضح برای پیروی باشند.
دیگر ماژول های TFF، به عنوان مثال به روز رسانی مدل در tff.learning
، انتظار می رود که با استفاده از tff.templates.AggregationProcess
، استفاده از پارامترها چگونه ارزش جمع می شوند. با این حال، اینکه دقیقاً چه مقادیری جمعآوری شدهاند و نوع امضای آنها چیست، به جزئیات دیگر مدل آموزش داده شده و الگوریتم یادگیری مورد استفاده برای انجام آن بستگی دارد.
به تجمع مستقل از جنبه های دیگری از محاسبات، ما استفاده از الگوی کارخانه - ما مناسب ایجاد tff.templates.AggregationProcess
پس از آن که امضا نوع مربوط به اشیاء به جمع گردد در دسترس هستند، با استناد به create
روش از کارخانه. بنابراین مدیریت مستقیم فرآیند تجمیع فقط برای نویسندگان کتابخانه که مسئول این ایجاد هستند مورد نیاز است.
کارخانه های فرآیند تجمیع
دو کلاس کارخانه پایه انتزاعی برای تجمع وزنی و وزنی وجود دارد. آنها create
روش نوع امضا از ارزش طول می کشد تا جمع شود و یک گرداند tff.templates.AggregationProcess
برای تجمع چنین ارزش.
فرآیند ایجاد شده توسط tff.aggregators.UnweightedAggregationFactory
(1) دولت در سرور و (2) مقدار از نوع مشخص شده: دو آرگومان ورودی طول می کشد value_type
.
یک پیاده سازی مثال است tff.aggregators.SumFactory
.
فرآیند ایجاد شده توسط tff.aggregators.WeightedAggregationFactory
سه آرگومان ورودی طول می کشد: (1) دولت در سرور، (2) ارزش نوع مشخص value_type
و (3) وزن از نوع weight_type
، به عنوان توسط کاربران این کارخانه مشخص شده هنگامی که با استناد به آن create
روش.
یک پیاده سازی مثال است tff.aggregators.MeanFactory
که محاسبه میانگین موزون.
الگوی کارخانه نحوه دستیابی به اولین هدف ذکر شده در بالا است. که تجمع یک بلوک سازنده مستقل است. برای مثال، هنگام تغییر اینکه کدام متغیرهای مدل قابل آموزش هستند، لزوماً نیازی به تغییر یک تجمع پیچیده نیست. کارخانه به نمایندگی از آن خواهد شد با یک نوع امضای مختلف استناد به زمانی با استفاده از روش هایی مانند استفاده tff.learning.build_federated_averaging_process
.
ترکیبات
به یاد داشته باشید که یک فرآیند تجمیع عمومی میتواند (الف) مقداری پیش پردازش مقادیر در کلاینتها، (ب) جابجایی مقادیر از مشتری به سرور، و (ج) برخی پسپردازشهای ارزش انباشته در سرور را در بر بگیرد. هدف دوم که در بالا ذکر، ترکیب تجمع، متوجه شدم داخل tff.aggregators
ماژول های ساختار اجرای کارخانه تجمع به طوری که قسمت (ب) را می توان به کارخانه تجمع دیگر واگذار شده است.
بهجای پیادهسازی تمام منطق لازم در یک کلاس کارخانه، پیادهسازیها بهطور پیشفرض بر روی یک جنبه مرتبط با تجمع متمرکز هستند. در صورت نیاز، این الگو ما را قادر می سازد تا بلوک های ساختمان را یکی یکی جایگزین کنیم.
به عنوان مثال وزن است tff.aggregators.MeanFactory
. اجرای آن مقادیر و وزن های ارائه شده را در مشتریان ضرب می کند، سپس مقادیر وزنی و وزن ها را به طور مستقل جمع می کند، و سپس مجموع مقادیر وزنی را بر مجموع وزن های سرور تقسیم می کند. جای پیاده کردن جمع به طور مستقیم با استفاده از tff.federated_sum
اپراتورها، جمع به دو نمونه از واگذار tff.aggregators.SumFactory
.
چنین ساختاری این امکان را فراهم می کند که دو جمع پیش فرض با کارخانه های مختلف جایگزین شوند، که مجموع را به طور متفاوتی درک می کنند. برای مثال، یک tff.aggregators.SecureSumFactory
، و یا یک اجرای سفارشی از tff.aggregators.UnweightedAggregationFactory
. در مقابل، زمان، tff.aggregators.MeanFactory
می تواند خود را یک تجمع داخلی یکی دیگر از کارخانه مانند tff.aggregators.clipping_factory
، اگر مقادیر به قبل متوسط کوتاه می شود.
مشاهده قبلی تنظیم واحدهای برای یادگیری توصیه می شود آموزش برای استفاده receommended از مکانیسم ترکیب استفاده در کارخانه های موجود در tff.aggregators
ماژول.
بهترین شیوه ها با مثال
ما می رویم به نشان دادن tff.aggregators
مفاهیم را با جزئیات با اجرای یک کار مثال ساده، و آن را به تدریج کلی تر. راه دیگری برای یادگیری این است که به اجرای کارخانه های موجود نگاه کنید.
import collections
import tensorflow as tf
import tensorflow_federated as tff
به جای جمع value
، وظیفه مثال است به طور خلاصه value * 2.0
و سپس با تقسیم مجموع 2.0
. نتیجه تجمع بنابراین ریاضی به طور مستقیم جمع معادل value
، و می تواند به عنوان مشتمل بر سه بخش فکر کردم: (1) پوسته پوسته شدن در مشتریان (2) جمع در سراسر مشتریان (3) unscaling در سرور.
پس از طراحی در بالا توضیح داده، منطق به عنوان یک زیر کلاس از اجرا tff.aggregators.UnweightedAggregationFactory
، که ایجاد مناسب tff.templates.AggregationProcess
هنگامی که یک داده value_type
به مجموع:
اجرای حداقلی
برای کار مثال، محاسبات لازم همیشه یکسان است، بنابراین نیازی به استفاده از حالت نیست. به این ترتیب خالی، و به عنوان نشان tff.federated_value((), tff.SERVER)
. در حال حاضر در مورد اندازه گیری ها نیز همینطور است.
بنابراین حداقل اجرای کار به شرح زیر است:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value((), tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
scaled_value = tff.federated_map(
tff.tf_computation(lambda x: x * 2.0), value)
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x: x / 2.0), summed_value)
measurements = tff.federated_value((), tff.SERVER)
return tff.templates.MeasuredProcessOutput(
state=state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
اینکه آیا همه چیز همانطور که انتظار می رود کار می کند را می توان با کد زیر تأیید کرد:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)
حالت و اندازه گیری
Statefulness به طور گسترده در TFF برای نشان دادن محاسباتی استفاده می شود که انتظار می رود به صورت تکراری اجرا شوند و با هر تکرار تغییر کنند. به عنوان مثال، حالت یک محاسبات یادگیری شامل وزن های مدلی است که یاد گرفته می شود.
برای نشان دادن نحوه استفاده از حالت در محاسبات تجمیع، وظیفه مثال را اصلاح می کنیم. به جای ضرب value
های 2.0
، ما تکثیر آن توسط شاخص تکرار - تعداد دفعاتی تجمع اجرا شده است.
برای انجام این کار، ما به راهی برای پیگیری شاخص تکرار نیاز داریم که از طریق مفهوم حالت به دست می آید. در initialize_fn
، به جای ایجاد یک دولت خالی، ما مقداردهی اولیه دولت به صفر عددی. سپس، دولت را می توان در استفاده next_fn
(1) افزایش شده توسط: در سه مرحله 1.0
، (2) استفاده کنید و ضرب value
، و (3) بازگشت به دولت به روز شده است.
به محض این که انجام می شود، شما ممکن است توجه داشته باشید: اما دقیقا همان کد بالا را می توان به منظور بررسی همه آثار به عنوان انتظار می رود استفاده می شود. چگونه بفهمم چیزی واقعاً تغییر کرده است؟
سؤال خوبی بود! اینجاست که مفهوم اندازه گیری مفید می شود. به طور کلی، اندازه گیری می توانید هر مقدار مربوط به اعدام تنها از گزارش next
تابع، که می تواند برای نظارت بر استفاده می شود. در این مورد، می توان آن را summed_value
از مثال قبلی است. یعنی مقدار قبل از مرحله "unscaling" که باید به شاخص تکرار بستگی داشته باشد. باز هم، این لزوما در عمل مفید نیست، اما مکانیسم مربوطه را نشان می دهد.
بنابراین پاسخ حالتی به کار به صورت زیر است:
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(
tff.tf_computation(lambda x: x + 1.0), state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(
tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
توجه داشته باشید که state
که به می آید next_fn
به عنوان ورودی است که در سرور قرار می گیرد. به منظور استفاده از آن را در مشتریان، آن را برای اولین بار باید منتقل شود، که به دست آورد با استفاده از tff.federated_broadcast
اپراتور.
به منظور بررسی همه آثار به عنوان انتظار می رود، ما در حال حاضر می توانید در گزارش نگاه measurements
، که باید با هر دور از اعدام های مختلف می شود، حتی اگر اجرا با همان client_data
.
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)
انواع ساختار یافته
وزن مدل یک مدل آموزشدیده در یادگیری فدرال معمولاً بهعنوان مجموعهای از تانسورها نشان داده میشود تا یک تانسور. در TFF، این است که به عنوان نمایندگی tff.StructType
و کارخانه تجمع به طور کلی مفید باید قادر به پذیرفتن انواع ساختار.
با این حال، در مثال های بالا، ما تنها با یک کار tff.TensorType
شی. اگر ما سعی به استفاده از کارخانه قبلی برای ایجاد روند تجمع با tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])
، ما یک خطای عجیب و غریب به دلیل TensorFlow سعی خواهد کرد تا ضرب tf.Tensor
و یک list
.
مشکل این است که به جای ضرب ساختار تانسورها توسط یک ثابت، ما نیاز به ضرب هر تانسور در ساختار توسط یک ثابت. راه حل معمول برای این مشکل است به استفاده از tf.nest
در داخل ماژول از ایجاد tff.tf_computation
است.
نسخه قبلی ExampleTaskFactory
سازگار با انواع ساختار در نتیجه به نظر می رسد شرح زیر است:
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def create(self, value_type):
@tff.federated_computation()
def initialize_fn():
return tff.federated_value(0.0, tff.SERVER)
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
new_state = tff.federated_map(add_one, state)
state_at_clients = tff.federated_broadcast(new_state)
scaled_value = tff.federated_map(scale, (value, state_at_clients))
summed_value = tff.federated_sum(scaled_value)
unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=summed_value)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
این مثال الگویی را برجسته می کند که ممکن است هنگام ساختاردهی کد TFF رعایت شود. هنگامی که با عملیات بسیار ساده خرید و فروش نیست، کد خوانا تر می شود زمانی که tff.tf_computation
بازدید کنندگان که به عنوان بلوک های ساختمان استفاده می شود در داخل یک tff.federated_computation
در یک مکان جداگانه ایجاد شده است. داخل tff.federated_computation
، این بلوک های ساختمانی تنها با استفاده از اپراتورهای ذاتی متصل می شود.
برای تأیید اینکه مطابق انتظار کار می کند:
client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
[[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
f' - initialize: {aggregation_process.initialize.type_signature}\n'
f' - next: {aggregation_process.next.type_signature}\n')
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]
تجمعات درونی
گام نهایی این است که به صورت اختیاری، واگذاری تجمع واقعی به کارخانههای دیگر را فعال کنید تا امکان ترکیب آسان تکنیکهای تجمیع را فراهم کنیم.
این است که با ایجاد یک اختیاری دست inner_factory
استدلال در سازنده ما ExampleTaskFactory
. اگر مشخص نشده است، tff.aggregators.SumFactory
استفاده شده است، که اعمال tff.federated_sum
اپراتور به طور مستقیم در بخش قبلی استفاده می شود.
هنگامی که create
نامیده می شود، ما برای اولین بار می توانید تماس بگیرید create
از inner_factory
به ایجاد روند تجمع داخلی با همان value_type
.
دولت از فرآیند ما بازگردانده شده توسط initialize_fn
دولت ایجاد شده توسط "این" روند، و دولت از روند داخلی فقط ایجاد: یک ترکیب از دو بخش است.
اجرای next_fn
تفاوت که تجمع واقعی به واگذار next
تابعی از فرآیند درونی، و در چگونه خروجی نهایی تشکیل شده است. دولت دوباره از "این" و دولت "درونی" تشکیل شده است، و اندازه گیری به شیوه ای مشابه به عنوان یک تشکیل شده OrderedDict
.
در زیر پیاده سازی چنین الگویی است.
@tff.tf_computation()
def scale(value, factor):
return tf.nest.map_structure(lambda x: x * factor, value)
@tff.tf_computation()
def unscale(value, factor):
return tf.nest.map_structure(lambda x: x / factor, value)
@tff.tf_computation()
def add_one(value):
return value + 1.0
class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):
def __init__(self, inner_factory=None):
if inner_factory is None:
inner_factory = tff.aggregators.SumFactory()
self._inner_factory = inner_factory
def create(self, value_type):
inner_process = self._inner_factory.create(value_type)
@tff.federated_computation()
def initialize_fn():
my_state = tff.federated_value(0.0, tff.SERVER)
inner_state = inner_process.initialize()
return tff.federated_zip((my_state, inner_state))
@tff.federated_computation(initialize_fn.type_signature.result,
tff.type_at_clients(value_type))
def next_fn(state, value):
my_state, inner_state = state
my_new_state = tff.federated_map(add_one, my_state)
my_state_at_clients = tff.federated_broadcast(my_new_state)
scaled_value = tff.federated_map(scale, (value, my_state_at_clients))
# Delegation to an inner factory, returning values placed at SERVER.
inner_output = inner_process.next(inner_state, scaled_value)
unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))
new_state = tff.federated_zip((my_new_state, inner_output.state))
measurements = tff.federated_zip(
collections.OrderedDict(
scaled_value=inner_output.result,
example_task=inner_output.measurements))
return tff.templates.MeasuredProcessOutput(
state=new_state, result=unscaled_value, measurements=measurements)
return tff.templates.AggregationProcess(initialize_fn, next_fn)
هنگامی که تفویض به inner_process.next
تابع، ساختار بازگشت ما یک است tff.templates.MeasuredProcessOutput
، با همان سه زمینه - state
، result
و measurements
. هنگام ایجاد ساختار بازگشت کلی از روند تجمع تشکیل شده، state
و measurements
زمینه ها باید به طور کلی تشکیل شده و با هم بازگشت. در مقابل، result
مربوط زمینه به ارزش بودن جمع و به جای "جریان از طریق" تجمع تشکیل شده است.
state
شی باید به عنوان یک جزئیات اجرای کارخانه دیده می شود، و در نتیجه ترکیب می تواند از هر ساختار باشد. با این حال، measurements
مربوط به ارزش به کاربران در برخی از نقطه گزارش شود. بنابراین، ما به استفاده توصیه OrderedDict
، با نامگذاری مرکب به طوری که این امر می تواند روشن که در آن در یک ترکیب یک گزارش متریک می آید.
همچنین توجه داشته باشید که استفاده از tff.federated_zip
اپراتور. state
شی contolled توسط فرایند ایجاد شود باید یک tff.FederatedType
. اگر ما به جای بازگشته بود (this_state, inner_state)
در initialize_fn
، امضا نوع بازگشت خود را خواهد بود tff.StructType
حاوی تایی 2 از tff.FederatedType
است. استفاده از tff.federated_zip
"آسانسور" در tff.FederatedType
به سطح بالا است. این به طور مشابه در استفاده next_fn
هنگام آماده سازی دولت و اندازه گیری می شود برگشت.
در نهایت، میتوانیم ببینیم که چگونه میتوان از آن با تجمع داخلی پیشفرض استفاده کرد:
client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()
... و با تجمیع درونی متفاوت. به عنوان مثال، ExampleTaskFactory
:
client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'| Aggregation result: {output.result} (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])
خلاصه
در این آموزش، بهترین روشهایی را که باید برای ایجاد یک بلوک ساختمانی تجمع همه منظوره، که بهعنوان کارخانه تجمیع نشان داده میشود، دنبال کرد، توضیح دادیم. کلیت از طریق هدف طراحی به دو صورت حاصل می شود:
- محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است
tff.aggregators
استفاده از پارامترها در تجمع لازم خود را، مانندtff.learning.build_federated_averaging_process
. - ترکیب تجمع. یک بلوک ساختمانی تجمع را می توان با سایر بلوک های ساختمانی تجمیع برای ایجاد تجمعات مرکب پیچیده تر ترکیب کرد.