تنفيذ التجميعات المخصصة

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

في هذا البرنامج التعليمي، وشرح مبادئ التصميم وراء tff.aggregators حدة وأفضل الممارسات لتنفيذ تجميع مخصص من القيم من عملاء الخادم.

المتطلبات الأساسية. يفترض هذا البرنامج التعليمي كنت معتادا على المفاهيم الأساسية للبالفعل الاتحادية الأساسية مثل مواضع ( tff.SERVER ، tff.CLIENTS )، كيف TFF يمثل الحسابية ( tff.tf_computation ، tff.federated_computation ) والتوقيعات نوعها.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

ملخص التصميم

في TFF، "تجميع" يشير إلى حركة مجموعة من القيم على tff.CLIENTS لإنتاج تبلغ القيمة الإجمالية لنفس النوع على tff.SERVER . وهذا يعني أن كل قيمة عميل فردية لا يلزم أن تكون متاحة. على سبيل المثال في التعلم الموحد ، يتم حساب متوسط ​​تحديثات نموذج العميل للحصول على تحديث نموذج إجمالي لتطبيقه على النموذج العام على الخادم.

بالإضافة إلى مشغلي تحقيق هذا الهدف مثل tff.federated_sum ، يوفر TFF tff.templates.AggregationProcessعملية جليل ) الذي الطابع الرسمي على توقيع نوع لحساب التجميع بحيث يمكن تعميم إلى أشكال أكثر تعقيدا من مبلغ بسيط.

المكونات الرئيسية لل tff.aggregators حدة من المصانع لإنشاء AggregationProcess ، والتي صممت لتكون اللبنات مفيدة وreplacable عموما TFF في جانبين:

  1. الحسابات ذات المعاملات. تجميع لبنة المستقل الذي يمكن توصيله إلى وحدات TFF أخرى مصممة للعمل مع tff.aggregators إلى بالحدود تجميع واللازمة.

مثال:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. تكوين التجميع. يمكن أن تتكون الكتلة البرمجية الإنشائية للتجميع من وحدات بناء التجميع الأخرى لإنشاء تجميعات مركبة أكثر تعقيدًا.

مثال:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

يوضح الجزء المتبقي من هذا البرنامج التعليمي كيفية تحقيق هذين الهدفين.

عملية التجميع

نحن تلخيص أول tff.templates.AggregationProcess ، واتبع مع نمط مصنع لإنشائها.

و tff.templates.AggregationProcess هو tff.templates.MeasuredProcess مع التوقيعات النوع المحدد لتجميع. على وجه الخصوص، initialize و next وظائف لديها التواقيع نوع ما يلي:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

الدولة (من نوع state_type ) يجب أن توضع في الخدمة. و next يأخذ وظيفة وسيطة الإدخال الدولة والقيمة التي سيتم تجميعها (من نوع value_type ) وضعت في العملاء. و * وسائل اختياري الحجج إدخال أخرى، على سبيل المثال الأوزان في المتوسط الموزون. تقوم بإرجاع كائن حالة محدث ، والقيمة المجمعة من نفس النوع الموضوعة على الخادم ، وبعض القياسات.

لاحظ أن كل من الدولة لتمريرها بين الإعدام و next وظيفة، ويقصد القياسات ذكرت أن يقدم أي معلومات اعتمادا على تنفيذ محددة ل next وظيفة، قد تكون فارغة. ومع ذلك ، يجب تحديدها بشكل صريح لأجزاء أخرى من TFF للحصول على عقد واضح لمتابعة.

على سبيل المثال التحديثات نموذج في وحدات TFF أخرى، tff.learning ، ومن المتوقع أن استخدام tff.templates.AggregationProcess إلى بالحدود كيف يتم تجميع القيم. ومع ذلك ، ما هي القيم المجمعة بالضبط وما هي أنواع التوقيعات ، يعتمد على التفاصيل الأخرى للنموذج الذي يتم تدريبه وخوارزمية التعلم المستخدمة للقيام بذلك.

لجعل تجميع مستقلة عن جوانب أخرى من العمليات الحسابية، ونحن نستخدم نمط مصنع - نخلق المناسبة tff.templates.AggregationProcess مرة واحدة تواقيع نوع ذات الصلة من الكائنات التي سيتم تجميعها متاحة، من خلال الاستناد إلى create طريقة للمصنع. وبالتالي ، فإن المعالجة المباشرة لعملية التجميع مطلوبة فقط لمؤلفي المكتبات المسؤولين عن هذا الإنشاء.

مصانع عملية التجميع

هناك نوعان من فئات المصنع الأساسية المجردة للتجميع الموزون وغير الموزون. على create أسلوب يأخذ نوع تواقيع القيمة التي سيتم تجميعها وإرجاع tff.templates.AggregationProcess لتجميع هذه القيم.

عملية التي أنشأتها tff.aggregators.UnweightedAggregationFactory يأخذ حجتين المدخلات: (1) دولة في الخادم و(2) قيمة من نوع محدد value_type .

التنفيذ مثال على ذلك هو tff.aggregators.SumFactory .

عملية التي أنشأتها tff.aggregators.WeightedAggregationFactory تأخذ ثلاث حجج المدخلات: (1) دولة في الخادم، (2) قيمة من نوع محدد value_type و (3) الوزن من نوع weight_type ، على النحو المحدد من قبل المستخدم المصنع عند استدعاء في create الأسلوب.

التنفيذ مثال على ذلك هو tff.aggregators.MeanFactory الذي يحسب المتوسط الموزون.

نمط المصنع هو كيف نحقق الهدف الأول المذكور أعلاه ؛ هذا التجميع هو لبنة بناء مستقلة. على سبيل المثال ، عند تغيير متغيرات النموذج التي يمكن تدريبها ، لا يحتاج التجميع المعقد بالضرورة إلى التغيير ؛ المصنع يمثل ذلك سيتم استدعاؤه مع نوع توقيع مختلف عند استخدامها بطريقة مثل tff.learning.build_federated_averaging_process .

التراكيب

تذكر أن عملية التجميع العامة يمكن أن تشمل (أ) بعض المعالجة المسبقة للقيم عند العملاء ، (ب) حركة القيم من العميل إلى الخادم ، و (ج) بعض المعالجة اللاحقة للقيمة المجمعة على الخادم. الهدف الثاني هو مذكور أعلاه، تكوين التجميع، ويتحقق داخل tff.aggregators الوحدة النمطية التي كتبها هيكلة تنفيذ مصانع التجميع بحيث الجزء (ب) يمكن تفويضه إلى مصنع التجميع آخر.

بدلاً من تنفيذ كل المنطق الضروري داخل فئة مصنع واحدة ، تركز عمليات التنفيذ افتراضيًا على جانب واحد ذي صلة بالتجميع. عند الحاجة ، يمكّننا هذا النمط من استبدال اللبنات الأساسية واحدة تلو الأخرى.

ومن الأمثلة على ذلك الوزني tff.aggregators.MeanFactory . يضاعف تنفيذه القيم والأوزان المقدمة للعملاء ، ثم يجمع كل من القيم المرجحة والأوزان بشكل مستقل ، ثم يقسم مجموع القيم الموزونة على مجموع الأوزان على الخادم. بدلا من تنفيذ summations باستخدام مباشرة tff.federated_sum المشغل، وتفويض الجمع إلى حالتين من tff.aggregators.SumFactory .

مثل هذا الهيكل يجعل من الممكن استبدال الجمعيتين الافتراضيتين بمصانع مختلفة ، والتي تدرك المجموع بشكل مختلف. على سبيل المثال، tff.aggregators.SecureSumFactory ، أو تطبيق مخصص لل tff.aggregators.UnweightedAggregationFactory . على العكس من ذلك، والوقت، tff.aggregators.MeanFactory يمكن أن تكون هي نفسها تجميع الداخلي للمصنع آخر مثل tff.aggregators.clipping_factory ، إذا كانت القيم هي أن يثقب قبل المتوسط.

انظر السابق ضبط أوصى تجمعات لتعلم البرنامج التعليمي للاستخدامات receommended آلية تكوين باستخدام المصانع القائمة في tff.aggregators حدة.

أفضل الممارسات بالقدوة

نحن نذهب لتوضيح tff.aggregators المفاهيم في التفاصيل من خلال تنفيذ مهمة بسيطة سبيل المثال، وجعله تدريجيا أكثر عمومية. طريقة أخرى للتعلم هي النظر في تنفيذ المصانع القائمة.

import collections
import tensorflow as tf
import tensorflow_federated as tff

بدلا من تلخيص value ومهمة المثال هو لتلخيص value * 2.0 ومن ثم تقسيم مبلغ من 2.0 . والنتيجة هي تجميع بالتالي أي ما يعادل رياضيا لتلخيص مباشرة value ، ويمكن وصفه أنه يتكون من ثلاثة أجزاء: (1) التوسع في العملاء (2) تلخيص عبر عملاء (3) unscaling في الخادم.

بعد تصميم أوضح أعلاه، سيتم تنفيذ المنطق، فئة فرعية من tff.aggregators.UnweightedAggregationFactory ، مما يخلق مناسبا tff.templates.AggregationProcess عندما تعطى value_type إلى مجموع المباراتين:

الحد الأدنى من التنفيذ

بالنسبة لمهمة المثال ، تكون الحسابات الضرورية هي نفسها دائمًا ، لذلك ليست هناك حاجة لاستخدام الحالة. وبالتالي تفريغ، ومثلت أنها tff.federated_value((), tff.SERVER) . نفس الشيء ينطبق على القياسات ، في الوقت الحالي.

وبالتالي يكون الحد الأدنى من تنفيذ المهمة كما يلي:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

يمكن التحقق مما إذا كان كل شيء يعمل كما هو متوقع باستخدام الكود التالي:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

الدولة والقياسات

يتم استخدام الحالة على نطاق واسع في TFF لتمثيل الحسابات التي يُتوقع تنفيذها بشكل تكراري وتغييرها مع كل تكرار. على سبيل المثال ، تحتوي حالة حساب التعلم على أوزان النموذج الذي يتم تعلمه.

لتوضيح كيفية استخدام الحالة في حساب التجميع ، نقوم بتعديل مهمة المثال. بدلا من مضاعفة value من 2.0 ، ونحن اضربها تلاه مؤشر التكرار - عدد المرات التي تم تنفيذها في التجميع.

للقيام بذلك ، نحتاج إلى طريقة لتتبع مؤشر التكرار ، والذي يتحقق من خلال مفهوم الحالة. في initialize_fn ، بدلا من إنشاء دولة فارغة، ونحن تهيئة الدولة لتكون صفر العددية. ثم، دولة يمكن أن تستخدم في next_fn في ثلاث خطوات: (1) زيادة من 1.0 ، (2) الاستخدام لمضاعفة value ، و (3) عودة كدولة حديثة جديدة.

حالما يتم ذلك، قد لاحظ: ولكن بالضبط نفس الرمز على النحو الوارد أعلاه يمكن استخدامها للتحقق من جميع الأعمال كما هو متوقع. كيف أعرف أن شيئًا ما قد تغير بالفعل؟

سؤال جيد! هذا هو المكان الذي يصبح فيه مفهوم القياسات مفيدًا. بشكل عام، يمكن أن القياسات الإبلاغ عن أي قيمة ذات الصلة لتنفيذ واحد من next وظيفة، والتي يمكن استخدامها لرصد. في هذه الحالة، يمكن أن يكون summed_value من المثال السابق. أي القيمة قبل خطوة "عدم القياس" ، والتي يجب أن تعتمد على فهرس التكرار. مرة أخرى ، هذا ليس بالضرورة مفيدًا في الممارسة ، لكنه يوضح الآلية ذات الصلة.

وهكذا تبدو الإجابة الحكيمة على المهمة كما يلي:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

علما بأن state التي تأتي في next_fn كمدخل يوضع في الخدمة. من أجل استخدامها في عملاء، فإنه يجب أولا أن أجله، والذي يتحقق باستخدام tff.federated_broadcast المشغل.

للتحقق من جميع الأعمال كما هو متوقع، يمكننا الآن أن ننظر إلى ذكرت measurements ، والتي ينبغي أن تكون مختلفة مع كل جولة من التنفيذ، حتى لو المدى مع نفسه client_data .

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

الأنواع المهيكلة

عادةً ما يتم تمثيل أوزان النموذج لنموذج تم تدريبه في التعلم الفيدرالي كمجموعة من الموترات ، بدلاً من موتر واحد. في TFF، وهذا ما يمثل tff.StructType والمصانع تجميع مفيدة عموما تحتاج إلى أن تكون قادرة على قبول أنواع منظم.

ومع ذلك، في الأمثلة المذكورة أعلاه، إننا نعمل فقط مع tff.TensorType الكائن. لو كنا في محاولة لاستخدام المصنع السابق لإنشاء عملية التجميع مع tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) ، وحصلنا على خطأ غريب ل سوف TensorFlow محاولة تتضاعف tf.Tensor و list .

والمشكلة هي أنه بدلا من ضرب هيكل التنسورات بواسطة ثابت، ونحن بحاجة الى مضاعفة كل موتر في بنية بواسطة ثابت. الحل المعتاد لهذه المشكلة هو استخدام tf.nest حدة داخل خلق tff.tf_computation الصورة.

إصدار السابق ExampleTaskFactory متوافقة مع أنواع منظم مما يبدو على النحو التالي:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

يسلط هذا المثال الضوء على نمط قد يكون من المفيد اتباعه عند هيكلة كود TFF. عندما لا تتعامل مع عمليات بسيطة جدا، رمز يصبح أكثر مقروءا عندما tff.tf_computation الصورة التي سيتم استخدامها في بناء كتل داخل tff.federated_computation تم إنشاؤها في مكان منفصل. داخل tff.federated_computation ، هذه اللبنات ترتبط فقط باستخدام مشغلي الجوهرية.

للتحقق من أنها تعمل كما هو متوقع:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

التجمعات الداخلية

تتمثل الخطوة الأخيرة في تمكين تفويض التجميع الفعلي للمصانع الأخرى بشكل اختياري ، من أجل السماح بالتركيب السهل لتقنيات التجميع المختلفة.

ويتحقق ذلك عن طريق إنشاء اختياري inner_factory حجة في منشئ لدينا ExampleTaskFactory . إذا لم يكن محددا، tff.aggregators.SumFactory يستخدم، وهو ما ينطبق على tff.federated_sum المشغل استخدامها مباشرة في القسم السابق.

عندما create ما يسمى، يمكننا أولا استدعاء create من inner_factory لإنشاء عملية التجميع الداخلية مع نفس value_type .

حالة عمليتنا إرجاعها بواسطة initialize_fn هي تركيبة من جزأين: الدولة التي أنشأتها "هذا" العملية، وحالة عملية الداخلية بإنشائها.

تنفيذ next_fn يتم تفويض يختلف في أن التجميع الفعلي لل next ظيفة عملية داخلية، وكيف يتكون الناتج النهائي. وتتكون الدولة من جديد من "هذا" والدولة "الداخلية"، وتتكون القياسات بطريقة مماثلة بوصفها OrderedDict .

ما يلي هو تنفيذ مثل هذا النمط.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

عندما يفوض إلى inner_process.next وظيفة، وهيكل عودة نحصل هو tff.templates.MeasuredProcessOutput ، مع نفس الحقول الثلاثة - state ، result و measurements . عند إنشاء بنية العودة الشامل لعملية تجميع تتكون، و state و measurements الحقول يجب أن تتكون عموما وعاد معا. في المقابل، فإن result يتوافق الحقل إلى القيمة التي تجمع وبدلا من ذلك "تدفق من خلال" التجميع يتكون.

و state ينبغي أن ينظر إلى وجوه باعتبارها التفاصيل تنفيذ المصنع، وبالتالي تكوين يمكن أن يكون في أي هيكل. ومع ذلك، measurements تتوافق مع القيم ورود أنباء عن أن المستخدم في مرحلة ما. ولذلك، فإننا ننصح استخدام OrderedDict ، مع تتكون تسمية بحيث يكون واضحا حيث في تكوين يفعل ذكرت متري يأتي من.

لاحظ أيضا استخدام tff.federated_zip المشغل. و state يجب أن يكون الكائن كان يسيطر بواسطة عملية خلق tff.FederatedType . لو كان لدينا بدلا من ذلك عاد (this_state, inner_state) في initialize_fn ، فإن توقيع نوع عودتها تكون tff.StructType تحتوي على 2-الصفوف (tuple) من tff.FederatedType الصورة. استخدام tff.federated_zip "المصاعد" على tff.FederatedType إلى مستوى أعلى. ويستخدم هذا بالمثل في next_fn عند إعداد الدولة والقياسات يتم إرجاعها.

أخيرًا ، يمكننا أن نرى كيف يمكن استخدام ذلك مع التجميع الداخلي الافتراضي:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... وبتجميع داخلي مختلف. على سبيل المثال، ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

ملخص

في هذا البرنامج التعليمي ، أوضحنا أفضل الممارسات التي يجب اتباعها من أجل إنشاء كتلة بناء تجميع للأغراض العامة ، ممثلة كمصنع تجميع. تأتي العمومية من خلال نية التصميم بطريقتين:

  1. الحسابات ذات المعاملات. تجميع لبنة المستقل الذي يمكن توصيله إلى وحدات TFF أخرى مصممة للعمل مع tff.aggregators إلى بالحدود تجميع واللازمة، مثل tff.learning.build_federated_averaging_process .
  2. تكوين التجميع. يمكن أن تتكون الكتلة البرمجية الإنشائية للتجميع من وحدات بناء التجميع الأخرى لإنشاء تجميعات مركبة أكثر تعقيدًا.