پیاده سازی مجموعه های سفارشی

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

در این آموزش، ما اصول طراحی پشت توضیح tff.aggregators ماژول و بهترین روش برای پیاده سازی تجمع سفارشی از ارزش ها از مشتریان به سرور.

پیش نیازها. این آموزش فرض شما در حال حاضر با مفاهیم اولیه آشنا فدرال هسته مانند جایگاه ( tff.SERVER ، tff.CLIENTS )، چگونه TFF نشان دهنده محاسبات ( tff.tf_computation ، tff.federated_computation ) و امضا نوع خود.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

خلاصه طراحی

در TFF، "تجمع" اشاره به حرکت یک مجموعه ای از ارزش در tff.CLIENTS برای تولید یک ارزش کل از همان نوع در tff.SERVER . یعنی نیازی نیست که هر یک از ارزش مشتری جداگانه در دسترس باشد. به عنوان مثال در یادگیری فدرال، به‌روزرسانی‌های مدل کلاینت برای دریافت به‌روزرسانی مدل انبوه برای اعمال به مدل جهانی روی سرور، میانگین می‌شوند.

علاوه بر اپراتورهای رسیدن به این هدف مانند tff.federated_sum ، TFF فراهم می کند tff.templates.AggregationProcess (یک فرآیند stateful به ) که رسمی تعریف امضای نوع برای محاسبات تجمع پس از آن می توانید به اشکال پیچیده تر از یک جمع ساده را تعمیم دهند.

اجزای اصلی از tff.aggregators ماژول کارخانه ایجاد می AggregationProcess ، که طراحی شده اند به بلوک های ساختمان به طور کلی مفید و جایگزین از TFF در دو جنبه:

  1. محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است tff.aggregators استفاده از پارامترها در تجمع لازم خود را.

مثال:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. ترکیب تجمع. یک بلوک ساختمانی تجمع را می توان با سایر بلوک های ساختمانی تجمیع برای ایجاد تجمعات مرکب پیچیده تر ترکیب کرد.

مثال:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

ادامه این آموزش نحوه دستیابی به این دو هدف را توضیح می دهد.

فرآیند تجمیع

ما برای اولین بار به طور خلاصه tff.templates.AggregationProcess ، و با الگوی کارخانه برای ایجاد آن دنبال کنید.

tff.templates.AggregationProcess یک IS tff.templates.MeasuredProcess با نوع امضا مشخص شده برای تجمع. به طور خاص، initialize و next توابع امضا نوع زیر:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

دولت (از نوع state_type ) باید در سرور قرار می گیرد. next تابع به عنوان آرگومان ورودی دولت و یک مقدار به جمع گردد (از نوع طول می کشد value_type ) قرار داده شده در مشتریان. * یعنی اختیاری دیگر آرگومان ورودی، وزن مثلا در یک میانگین موزون. یک شی وضعیت به روز شده، مقدار تجمیع شده از همان نوع قرار داده شده در سرور و برخی اندازه گیری ها را برمی گرداند.

توجه داشته باشید که هر دو دولت به بین اعدام منتقل می شود next تابع، و اندازه گیری گزارش در نظر گرفته شده برای گزارش هر گونه اطلاعات بسته به اعدام خاص از next تابع، ممکن است خالی باشد. با این وجود، آنها باید به صراحت مشخص شوند تا سایر بخش‌های TFF دارای یک قرارداد واضح برای پیروی باشند.

دیگر ماژول های TFF، به عنوان مثال به روز رسانی مدل در tff.learning ، انتظار می رود که با استفاده از tff.templates.AggregationProcess ، استفاده از پارامترها چگونه ارزش جمع می شوند. با این حال، اینکه دقیقاً چه مقادیری جمع‌آوری شده‌اند و نوع امضای آن‌ها چیست، به جزئیات دیگر مدل آموزش داده شده و الگوریتم یادگیری مورد استفاده برای انجام آن بستگی دارد.

به تجمع مستقل از جنبه های دیگری از محاسبات، ما استفاده از الگوی کارخانه - ما مناسب ایجاد tff.templates.AggregationProcess پس از آن که امضا نوع مربوط به اشیاء به جمع گردد در دسترس هستند، با استناد به create روش از کارخانه. بنابراین مدیریت مستقیم فرآیند تجمیع فقط برای نویسندگان کتابخانه که مسئول این ایجاد هستند مورد نیاز است.

کارخانه های فرآیند تجمیع

دو کلاس کارخانه پایه انتزاعی برای تجمع وزنی و وزنی وجود دارد. آنها create روش نوع امضا از ارزش طول می کشد تا جمع شود و یک گرداند tff.templates.AggregationProcess برای تجمع چنین ارزش.

فرآیند ایجاد شده توسط tff.aggregators.UnweightedAggregationFactory (1) دولت در سرور و (2) مقدار از نوع مشخص شده: دو آرگومان ورودی طول می کشد value_type .

یک پیاده سازی مثال است tff.aggregators.SumFactory .

فرآیند ایجاد شده توسط tff.aggregators.WeightedAggregationFactory سه آرگومان ورودی طول می کشد: (1) دولت در سرور، (2) ارزش نوع مشخص value_type و (3) وزن از نوع weight_type ، به عنوان توسط کاربران این کارخانه مشخص شده هنگامی که با استناد به آن create روش.

یک پیاده سازی مثال است tff.aggregators.MeanFactory که محاسبه میانگین موزون.

الگوی کارخانه نحوه دستیابی به اولین هدف ذکر شده در بالا است. که تجمع یک بلوک سازنده مستقل است. برای مثال، هنگام تغییر اینکه کدام متغیرهای مدل قابل آموزش هستند، لزوماً نیازی به تغییر یک تجمع پیچیده نیست. کارخانه به نمایندگی از آن خواهد شد با یک نوع امضای مختلف استناد به زمانی با استفاده از روش هایی مانند استفاده tff.learning.build_federated_averaging_process .

ترکیبات

به یاد داشته باشید که یک فرآیند تجمیع عمومی می‌تواند (الف) مقداری پیش پردازش مقادیر در کلاینت‌ها، (ب) جابجایی مقادیر از مشتری به سرور، و (ج) برخی پس‌پردازش‌های ارزش انباشته در سرور را در بر بگیرد. هدف دوم که در بالا ذکر، ترکیب تجمع، متوجه شدم داخل tff.aggregators ماژول های ساختار اجرای کارخانه تجمع به طوری که قسمت (ب) را می توان به کارخانه تجمع دیگر واگذار شده است.

به‌جای پیاده‌سازی تمام منطق لازم در یک کلاس کارخانه، پیاده‌سازی‌ها به‌طور پیش‌فرض بر روی یک جنبه مرتبط با تجمع متمرکز هستند. در صورت نیاز، این الگو ما را قادر می سازد تا بلوک های ساختمان را یکی یکی جایگزین کنیم.

به عنوان مثال وزن است tff.aggregators.MeanFactory . اجرای آن مقادیر و وزن های ارائه شده را در مشتریان ضرب می کند، سپس مقادیر وزنی و وزن ها را به طور مستقل جمع می کند، و سپس مجموع مقادیر وزنی را بر مجموع وزن های سرور تقسیم می کند. جای پیاده کردن جمع به طور مستقیم با استفاده از tff.federated_sum اپراتورها، جمع به دو نمونه از واگذار tff.aggregators.SumFactory .

چنین ساختاری این امکان را فراهم می کند که دو جمع پیش فرض با کارخانه های مختلف جایگزین شوند، که مجموع را به طور متفاوتی درک می کنند. برای مثال، یک tff.aggregators.SecureSumFactory ، و یا یک اجرای سفارشی از tff.aggregators.UnweightedAggregationFactory . در مقابل، زمان، tff.aggregators.MeanFactory می تواند خود را یک تجمع داخلی یکی دیگر از کارخانه مانند tff.aggregators.clipping_factory ، اگر مقادیر به قبل متوسط کوتاه می شود.

مشاهده قبلی تنظیم واحدهای برای یادگیری توصیه می شود آموزش برای استفاده receommended از مکانیسم ترکیب استفاده در کارخانه های موجود در tff.aggregators ماژول.

بهترین شیوه ها با مثال

ما می رویم به نشان دادن tff.aggregators مفاهیم را با جزئیات با اجرای یک کار مثال ساده، و آن را به تدریج کلی تر. راه دیگری برای یادگیری این است که به اجرای کارخانه های موجود نگاه کنید.

import collections
import tensorflow as tf
import tensorflow_federated as tff

به جای جمع value ، وظیفه مثال است به طور خلاصه value * 2.0 و سپس با تقسیم مجموع 2.0 . نتیجه تجمع بنابراین ریاضی به طور مستقیم جمع معادل value ، و می تواند به عنوان مشتمل بر سه بخش فکر کردم: (1) پوسته پوسته شدن در مشتریان (2) جمع در سراسر مشتریان (3) unscaling در سرور.

پس از طراحی در بالا توضیح داده، منطق به عنوان یک زیر کلاس از اجرا tff.aggregators.UnweightedAggregationFactory ، که ایجاد مناسب tff.templates.AggregationProcess هنگامی که یک داده value_type به مجموع:

اجرای حداقلی

برای کار مثال، محاسبات لازم همیشه یکسان است، بنابراین نیازی به استفاده از حالت نیست. به این ترتیب خالی، و به عنوان نشان tff.federated_value((), tff.SERVER) . در حال حاضر در مورد اندازه گیری ها نیز همینطور است.

بنابراین حداقل اجرای کار به شرح زیر است:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

اینکه آیا همه چیز همانطور که انتظار می رود کار می کند را می توان با کد زیر تأیید کرد:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

حالت و اندازه گیری

Statefulness به طور گسترده در TFF برای نشان دادن محاسباتی استفاده می شود که انتظار می رود به صورت تکراری اجرا شوند و با هر تکرار تغییر کنند. به عنوان مثال، حالت یک محاسبات یادگیری شامل وزن های مدلی است که یاد گرفته می شود.

برای نشان دادن نحوه استفاده از حالت در محاسبات تجمیع، وظیفه مثال را اصلاح می کنیم. به جای ضرب value های 2.0 ، ما تکثیر آن توسط شاخص تکرار - تعداد دفعاتی تجمع اجرا شده است.

برای انجام این کار، ما به راهی برای پیگیری شاخص تکرار نیاز داریم که از طریق مفهوم حالت به دست می آید. در initialize_fn ، به جای ایجاد یک دولت خالی، ما مقداردهی اولیه دولت به صفر عددی. سپس، دولت را می توان در استفاده next_fn (1) افزایش شده توسط: در سه مرحله 1.0 ، (2) استفاده کنید و ضرب value ، و (3) بازگشت به دولت به روز شده است.

به محض این که انجام می شود، شما ممکن است توجه داشته باشید: اما دقیقا همان کد بالا را می توان به منظور بررسی همه آثار به عنوان انتظار می رود استفاده می شود. چگونه بفهمم چیزی واقعاً تغییر کرده است؟

سؤال خوبی بود! اینجاست که مفهوم اندازه گیری مفید می شود. به طور کلی، اندازه گیری می توانید هر مقدار مربوط به اعدام تنها از گزارش next تابع، که می تواند برای نظارت بر استفاده می شود. در این مورد، می توان آن را summed_value از مثال قبلی است. یعنی مقدار قبل از مرحله "unscaling" که باید به شاخص تکرار بستگی داشته باشد. باز هم، این لزوما در عمل مفید نیست، اما مکانیسم مربوطه را نشان می دهد.

بنابراین پاسخ حالتی به کار به صورت زیر است:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

توجه داشته باشید که state که به می آید next_fn به عنوان ورودی است که در سرور قرار می گیرد. به منظور استفاده از آن را در مشتریان، آن را برای اولین بار باید منتقل شود، که به دست آورد با استفاده از tff.federated_broadcast اپراتور.

به منظور بررسی همه آثار به عنوان انتظار می رود، ما در حال حاضر می توانید در گزارش نگاه measurements ، که باید با هر دور از اعدام های مختلف می شود، حتی اگر اجرا با همان client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

انواع ساختار یافته

وزن مدل یک مدل آموزش‌دیده در یادگیری فدرال معمولاً به‌عنوان مجموعه‌ای از تانسورها نشان داده می‌شود تا یک تانسور. در TFF، این است که به عنوان نمایندگی tff.StructType و کارخانه تجمع به طور کلی مفید باید قادر به پذیرفتن انواع ساختار.

با این حال، در مثال های بالا، ما تنها با یک کار tff.TensorType شی. اگر ما سعی به استفاده از کارخانه قبلی برای ایجاد روند تجمع با tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) ، ما یک خطای عجیب و غریب به دلیل TensorFlow سعی خواهد کرد تا ضرب tf.Tensor و یک list .

مشکل این است که به جای ضرب ساختار تانسورها توسط یک ثابت، ما نیاز به ضرب هر تانسور در ساختار توسط یک ثابت. راه حل معمول برای این مشکل است به استفاده از tf.nest در داخل ماژول از ایجاد tff.tf_computation است.

نسخه قبلی ExampleTaskFactory سازگار با انواع ساختار در نتیجه به نظر می رسد شرح زیر است:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

این مثال الگویی را برجسته می کند که ممکن است هنگام ساختاردهی کد TFF رعایت شود. هنگامی که با عملیات بسیار ساده خرید و فروش نیست، کد خوانا تر می شود زمانی که tff.tf_computation بازدید کنندگان که به عنوان بلوک های ساختمان استفاده می شود در داخل یک tff.federated_computation در یک مکان جداگانه ایجاد شده است. داخل tff.federated_computation ، این بلوک های ساختمانی تنها با استفاده از اپراتورهای ذاتی متصل می شود.

برای تأیید اینکه مطابق انتظار کار می کند:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

تجمعات درونی

گام نهایی این است که به صورت اختیاری، واگذاری تجمع واقعی به کارخانه‌های دیگر را فعال کنید تا امکان ترکیب آسان تکنیک‌های تجمیع را فراهم کنیم.

این است که با ایجاد یک اختیاری دست inner_factory استدلال در سازنده ما ExampleTaskFactory . اگر مشخص نشده است، tff.aggregators.SumFactory استفاده شده است، که اعمال tff.federated_sum اپراتور به طور مستقیم در بخش قبلی استفاده می شود.

هنگامی که create نامیده می شود، ما برای اولین بار می توانید تماس بگیرید create از inner_factory به ایجاد روند تجمع داخلی با همان value_type .

دولت از فرآیند ما بازگردانده شده توسط initialize_fn دولت ایجاد شده توسط "این" روند، و دولت از روند داخلی فقط ایجاد: یک ترکیب از دو بخش است.

اجرای next_fn تفاوت که تجمع واقعی به واگذار next تابعی از فرآیند درونی، و در چگونه خروجی نهایی تشکیل شده است. دولت دوباره از "این" و دولت "درونی" تشکیل شده است، و اندازه گیری به شیوه ای مشابه به عنوان یک تشکیل شده OrderedDict .

در زیر پیاده سازی چنین الگویی است.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

هنگامی که تفویض به inner_process.next تابع، ساختار بازگشت ما یک است tff.templates.MeasuredProcessOutput ، با همان سه زمینه - state ، result و measurements . هنگام ایجاد ساختار بازگشت کلی از روند تجمع تشکیل شده، state و measurements زمینه ها باید به طور کلی تشکیل شده و با هم بازگشت. در مقابل، result مربوط زمینه به ارزش بودن جمع و به جای "جریان از طریق" تجمع تشکیل شده است.

state شی باید به عنوان یک جزئیات اجرای کارخانه دیده می شود، و در نتیجه ترکیب می تواند از هر ساختار باشد. با این حال، measurements مربوط به ارزش به کاربران در برخی از نقطه گزارش شود. بنابراین، ما به استفاده توصیه OrderedDict ، با نامگذاری مرکب به طوری که این امر می تواند روشن که در آن در یک ترکیب یک گزارش متریک می آید.

همچنین توجه داشته باشید که استفاده از tff.federated_zip اپراتور. state شی contolled توسط فرایند ایجاد شود باید یک tff.FederatedType . اگر ما به جای بازگشته بود (this_state, inner_state) در initialize_fn ، امضا نوع بازگشت خود را خواهد بود tff.StructType حاوی تایی 2 از tff.FederatedType است. استفاده از tff.federated_zip "آسانسور" در tff.FederatedType به سطح بالا است. این به طور مشابه در استفاده next_fn هنگام آماده سازی دولت و اندازه گیری می شود برگشت.

در نهایت، می‌توانیم ببینیم که چگونه می‌توان از آن با تجمع داخلی پیش‌فرض استفاده کرد:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... و با تجمیع درونی متفاوت. به عنوان مثال، ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

خلاصه

در این آموزش، بهترین روش‌هایی را که باید برای ایجاد یک بلوک ساختمانی تجمع همه منظوره، که به‌عنوان کارخانه تجمیع نشان داده می‌شود، دنبال کرد، توضیح دادیم. کلیت از طریق هدف طراحی به دو صورت حاصل می شود:

  1. محاسبات پارامتری تجمع یک بلوک ساختمان مستقل است که می تواند به دیگر ماژول های TFF به کار طراحی شده با وصل شده است tff.aggregators استفاده از پارامترها در تجمع لازم خود را، مانند tff.learning.build_federated_averaging_process .
  2. ترکیب تجمع. یک بلوک ساختمانی تجمع را می توان با سایر بلوک های ساختمانی تجمیع برای ایجاد تجمعات مرکب پیچیده تر ترکیب کرد.