সাহায্য Kaggle উপর TensorFlow সঙ্গে গ্রেট বেরিয়ার রিফ রক্ষা চ্যালেঞ্জ যোগদান

প্যারামিটার সার্ভারস্ট্রেটজি সহ প্যারামিটার সার্ভার প্রশিক্ষণ

TensorFlow.org এ দেখুন GitHub-এ উৎস দেখুন নোটবুক ডাউনলোড করুন

ওভারভিউ

প্যারামিটার সার্ভার প্রশিক্ষণ একটি সাধারণ ডেটা-সমান্তরাল একাধিক মেশিনে মডেল প্রশিক্ষণ আপ স্কেল করা হয় পদ্ধতি।

একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টার শ্রমিকপ্যারামিটার সার্ভার নিয়ে গঠিত। ভেরিয়েবলগুলি প্যারামিটার সার্ভারে তৈরি করা হয় এবং সেগুলি প্রতিটি ধাপে কর্মীদের দ্বারা পড়া এবং আপডেট করা হয়। ডিফল্টরূপে, কর্মীরা একে অপরের সাথে সিঙ্ক্রোনাইজ না করে স্বাধীনভাবে এই ভেরিয়েবলগুলি পড়ে এবং আপডেট করে। এই কেন কখনও কখনও সার্ভার-শৈলী প্রশিক্ষণ প্যারামিটার অ্যাসিঙ্ক্রোনাস প্রশিক্ষণ বলা হয়।

TensorFlow 2, প্যারামিটার সার্ভার প্রশিক্ষণ দ্বারা চালিত হয় tf.distribute.experimental.ParameterServerStrategy বর্গ, যা একটি ক্লাস্টার যে হাজার হাজার শ্রমিকের পর্যন্ত আইশ প্রশিক্ষণ পদক্ষেপ বিতরণ (প্যারামিটার সার্ভার দ্বারা অনুষঙ্গী)।

সমর্থিত প্রশিক্ষণ পদ্ধতি

দুটি প্রধান সমর্থিত প্রশিক্ষণ পদ্ধতি আছে:

কাজ এবং কাজ সহ একটি ক্লাস্টার

পছন্দের এপিআই (তথাপি Model.fit বা একটি কাস্টম প্রশিক্ষণ লুপ), TensorFlow 2 বিতরণ প্রশিক্ষণ জড়িত: একটি 'cluster' বিভিন্ন সঙ্গে 'jobs' , এবং কাজ প্রতিটি এক বা একাধিক থাকতে পারে 'tasks'

প্যারামিটার সার্ভার প্রশিক্ষণ ব্যবহার করার সময়, এটি সুপারিশ করা হয়:

  • এক সমন্বয়কারী কাজ (যা কাজ নাম আছে chief )
  • একাধিক কর্মী কাজ (কাজ নাম worker ); এবং
  • একাধিক প্যারামিটার সার্ভার কাজ (কাজ নাম ps )

সমন্বয়কারী সম্পদ, কর্ম প্রশিক্ষণ পাঠাবে সৃষ্টি করার সময়, চেকপয়েন্ট লিখছেন, এবং কার্য ব্যর্থতা, শ্রমিকপ্যারামিটার সার্ভারের সাথে ডিল চালানো tf.distribute.Server যে সমন্বয়কারী থেকে অনুরোধের জন্য শুনুন।

সঙ্গে প্যারামিটার সার্ভার প্রশিক্ষণ Model.fit এপিআই

সঙ্গে প্যারামিটার সার্ভার প্রশিক্ষণ Model.fit এপিআই সমন্বয়কারী ব্যবহার করতে প্রয়োজন tf.distribute.experimental.ParameterServerStrategy বস্তু, এবং একটি tf.keras.utils.experimental.DatasetCreator ইনপুট হিসাবে। অনুরূপ Model.fit কোন কৌশলের সঙ্গে, বা অন্যান্য কৌশলের সঙ্গে, কর্মপ্রবাহ তৈরি জড়িত এবং মডেল কম্পাইলেশনের callbacks, একটি দ্বারা অনুসরণ প্রস্তুতি ব্যবহার Model.fit কল।

একটি কাস্টম প্রশিক্ষণ লুপ সহ প্যারামিটার সার্ভার প্রশিক্ষণ

কাস্টম প্রশিক্ষণ লুপ সঙ্গে, tf.distribute.experimental.coordinator.ClusterCoordinator বর্গ সমন্বয়কারী জন্য ব্যবহৃত মূল উপাদান।

  • ClusterCoordinator বর্গ একটি সাথে কাজ করতে হবে tf.distribute.Strategy অবজেক্ট।
  • এই tf.distribute.Strategy বস্তুর ক্লাস্টার তথ্য প্রদান করতে প্রয়োজন এবং হিসাবে দেখায়, একটি প্রশিক্ষণ পদক্ষেপ নির্ধারণ করতে ব্যবহৃত হয় tf.distribute.Strategy সঙ্গে কাস্টম প্রশিক্ষণ
  • ClusterCoordinator বস্তুর তখন দূরবর্তী কর্মীদের এইসব প্রশিক্ষণ পদক্ষেপ কার্যকর পাঠাবে।
  • পরামিতি সার্ভার প্রশিক্ষণ জন্য, ClusterCoordinator একটি সঙ্গে কাজ করার দরকার tf.distribute.experimental.ParameterServerStrategy

সবচেয়ে গুরুত্বপূর্ণ দ্বারা উপলব্ধ এপিআই ClusterCoordinator অবজেক্ট schedule :

  • schedule এপিআই একটি enqueues tf.function এবং ভবিষ্যতে মত ফেরৎ RemoteValue অবিলম্বে।
  • সারিবদ্ধ ফাংশন পটভূমি থ্রেডের মধ্যে দূরবর্তী শ্রমিকদের প্রেষিত করা হবে এবং তাদের RemoteValue গুলি দ্বারা অ্যাসিঙ্ক্রোনাস পূরণ করা হবে।
  • যেহেতু schedule কর্মী নিয়োগ প্রয়োজন হয় না, tf.function পাস কোনো উপলব্ধ কর্মী মৃত্যুদন্ড কার্যকর করা যেতে পারে।
  • যে কর্মীটির উপর এটি কার্যকর করা হয়েছে সেটি সম্পূর্ণ হওয়ার আগে অনুপলব্ধ হলে, অন্য উপলব্ধ কর্মীর উপর ফাংশনটি পুনরায় চেষ্টা করা হবে।
  • এই সত্যের কারণে এবং ফাংশন এক্সিকিউশন পারমাণবিক নয়, একটি ফাংশন একাধিকবার কার্যকর করা যেতে পারে।

দূরবর্তী ফাংশন ডিসপ্যাচিং ছাড়াও, ClusterCoordinator সব শ্রমিকদের ডেটাসেট তৈরি এবং এই ডেটাসেট পুনর্নির্মাণের সাহায্য করে যখন ব্যর্থতা থেকে একজন শ্রমিক উদ্ধার।

টিউটোরিয়াল সেটআপ

টিউটোরিয়াল মধ্যে শাখায় বিভক্ত হবে Model.fit এবং কাস্টম প্রশিক্ষণ লুপ পাথ, এবং আপনি যে এক আপনার প্রয়োজন ফিট চয়ন করতে পারেন। "X এর সাথে প্রশিক্ষণ" ছাড়া অন্য বিভাগগুলি উভয় পথের জন্য প্রযোজ্য।

pip install portpicker

ক্লাস্টার সেটআপ

উল্লেখ করা হয়েছে, একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টার একটি সমন্বয়কারী টাস্ক আপনার প্রশিক্ষণ প্রোগ্রাম, এক বা একাধিক শ্রমিক ও প্যারামিটার সার্ভার যে কাজগুলো চালানো TensorFlow servers- রান প্রয়োজন tf.distribute.Server -আর সম্ভবত একটি অতিরিক্ত মূল্যায়ন টাস্ক যে রান পার্শ্ব-গাড়ী মূল্যায়ন (নীচে সাইড-কার মূল্যায়ন বিভাগটি দেখুন)। তাদের সেট আপ করার প্রয়োজনীয়তা হল:

  • সমন্বয়কারীর কাজটিকে মূল্যায়নকারী ছাড়া অন্য সব TensorFlow সার্ভারের ঠিকানা এবং পোর্ট জানতে হবে।
  • কর্মী এবং পরামিতি সার্ভারগুলি তাদের কোন পোর্ট শুনতে হবে তা জানতে হবে। সরলতার জন্য, আপনি সাধারণত এই কাজগুলিতে টেনসরফ্লো সার্ভার তৈরি করার সময় সম্পূর্ণ ক্লাস্টার তথ্য পাস করতে পারেন।
  • মূল্যায়নকারীর কাজকে প্রশিক্ষণ ক্লাস্টারের সেটআপ জানতে হবে না। যদি তা হয়, তাহলে প্রশিক্ষণ ক্লাস্টারের সাথে সংযোগ করার চেষ্টা করা উচিত নয়।
  • শ্রমিক ও প্যারামিটার সার্ভার যেমন টাস্ক ধরনের থাকা উচিত "worker" এবং "ps" যথাক্রমে। সমন্বয়কারী ব্যবহার করা উচিত "chief" উত্তরাধিকার কারণে কাজের ধরণ হিসাবে।

এই টিউটোরিয়ালে, আপনি একটি ইন-প্রসেস ক্লাস্টার তৈরি করবেন যাতে পুরো প্যারামিটার সার্ভার প্রশিক্ষণ Colab-এ চালানো যায়। আপনি কিভাবে স্থাপন করা শিখতে হবে বাস্তব ক্লাস্টার পরবর্তী বিভাগে।

ইন-প্রসেস ক্লাস্টার

আপনি আগে থেকেই বেশ কয়েকটি টেনসরফ্লো সার্ভার তৈরি করে শুরু করবেন এবং পরে তাদের সাথে সংযুক্ত হবেন। নোট এই শুধুমাত্র এই টিউটোরিয়াল বিক্ষোভের উদ্দেশ্যে যে, এবং বাস্তব প্রশিক্ষণ সার্ভারে শুরু হবে "worker" এবং "ps" মেশিন।

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

প্রক্রিয়ার মধ্যে ক্লাস্টার সেটআপ ঘন ঘন যেমন, ইউনিট টেস্টিং ব্যবহার করা হয় এখানে

স্থানীয় পরীক্ষার জন্য অন্য কোনো বিকল্প খুঁজে স্থানীয় মেশিন-চেক প্রসেস আরম্ভ হয় Keras সঙ্গে মাল্টি কর্মী প্রশিক্ষণ এই পদ্ধতির একটি উদাহরণ জন্য।

একটি প্যারামিটার সার্ভার স্ট্র্যাটেজি চালু করুন

আগে আপনি প্রশিক্ষণ কোড মধ্যে আকর্ষণীয়, এর instantiate যাক ParameterServerStrategy অবজেক্ট। লক্ষ্য করুন আপনার সাথে এই অগ্রসর হবে কিনা তা নির্বিশেষে প্রয়োজন হয় Model.fit বা একটি কাস্টম প্রশিক্ষণ লুপ। variable_partitioner যুক্তি ব্যাখ্যা করা হবে চলক sharding অধ্যায়

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:16686', 'localhost:23151'], 'worker': ['localhost:16753', 'localhost:22750', 'localhost:20823']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

প্রশিক্ষণের জন্য GPUs ব্যবহার করার জন্য, প্রতিটি কর্মীর জন্য দৃশ্যমান GPU বরাদ্দ করুন। ParameterServerStrategy সীমাবদ্ধতা সঙ্গে, প্রতিটি কর্মী উপর সমস্ত উপলব্ধ জিপিইউ ব্যবহার করবে যে সব শ্রমিক প্রাপ্তিসাধ্য জিপিইউ একই সংখ্যক থাকা উচিত।

পরিবর্তনশীল শার্ডিং

চলক sharding বিভাজন একাধিক ছোট ভেরিয়েবল, যা shards বলা হয় মধ্যে একটি পরিবর্তনশীল বোঝায়। এই শার্ডগুলি অ্যাক্সেস করার সময় নেটওয়ার্ক লোড বিতরণ করার জন্য পরিবর্তনশীল শার্ডিং কার্যকর হতে পারে। এটি একাধিক প্যারামিটার সার্ভার জুড়ে একটি সাধারণ ভেরিয়েবলের গণনা এবং সঞ্চয়স্থান বিতরণ করতেও দরকারী।

পরিবর্তনশীল sharding সক্ষম করতে, আপনাকে একটি প্রেরণ করতে পারেন variable_partitioner যখন একটি নির্মাণের ParameterServerStrategy অবজেক্ট। variable_partitioner প্রত্যেক সময় প্রার্থনা করা হবে একটি পরিবর্তনশীল তৈরি করা হয় এবং এটি পরিবর্তনশীল প্রতিটি মাত্রা বরাবর shards সংখ্যা ফিরতে আশা করা হচ্ছে। কিছু আউট-অফ-বক্স variable_partitioner গুলি যেমন প্রদান করা হয় tf.distribute.experimental.partitioners.MinSizePartitioner । মনে হচ্ছে আকার-ভিত্তিক partitioners ব্যবহার করা বাঞ্ছনীয় tf.distribute.experimental.partitioners.MinSizePartitioner ছোট ভেরিয়েবল, যা মডেল প্রশিক্ষণ গতির উপর নেতিবাচক প্রভাব ফেলতে পারে পার্টিশন এড়ানো।

একটি যখন variable_partitioner পাস করা হয় এবং যদি আপনি অধীনে সরাসরি একটি পরিবর্তনশীল তৈরি strategy.scope() , এটি একটি সঙ্গে একটি ধারক টাইপ হয়ে যাবে variables সম্পত্তি যা shards তালিকায় অ্যাক্সেস প্রদান করে। বেশিরভাগ ক্ষেত্রে, এই ধারকটি স্বয়ংক্রিয়ভাবে সমস্ত শার্ডগুলিকে একত্রিত করে একটি টেনসরে রূপান্তরিত হবে৷ ফলস্বরূপ, এটি একটি সাধারণ পরিবর্তনশীল হিসাবে ব্যবহার করা যেতে পারে। অন্যদিকে, যেমন কিছু TensorFlow পদ্ধতি tf.nn.embedding_lookup এই ধারক টাইপ জন্য এবং এই পদ্ধতি স্বয়ংক্রিয় সংযুক্তকরণের এড়ানো হবে সুদক্ষ বাস্তবায়ন প্রদান।

অনুগ্রহ করে এপিআই ডক্স দেখতে tf.distribute.experimental.ParameterServerStrategy আরো বিস্তারিত জানার জন্য।

সঙ্গে প্রশিক্ষণ Model.fit

Keras মাধ্যমে একটি সহজ-থেকে-ব্যবহার প্রশিক্ষণ API উপলব্ধকারী Model.fit যে হ্যান্ডলগুলি ফণা অধীন প্রশিক্ষণ লুপ, overridable নমনীয়তা সঙ্গে train_step এবং callbacks, যা চেকপয়েন্ট সংরক্ষণ বা সারসংক্ষেপ TensorBoard জন্য সংরক্ষণ করার সময় যেমন বৈশিষ্ট্য প্রদান। সঙ্গে Model.fit , একই প্রশিক্ষণ কোড কৌশল বস্তুর একটি সহজ ও swap 'র সঙ্গে অন্যান্য কৌশল ব্যবহার করা যেতে পারে।

তথ্য অন্তর্ভুক্তী

Model.fit প্যারামিটার সার্ভার প্রশিক্ষণ প্রয়োজন যে ইনপুট ডেটা একটি callable যে ধরনের একটি একক আর্গুমেন্ট গ্রহণ করা প্রদান করা tf.distribute.InputContext , এবং একটি ফেরৎ tf.data.Dataset । তারপরে, এক তৈরি tf.keras.utils.experimental.DatasetCreator বস্তুর যে এই ধরনের লাগে callable ও ঐচ্ছিকরূপে tf.distribute.InputOptions মাধ্যমে অবজেক্ট input_options যুক্তি।

মনে রাখবেন এলোমেলো এবং প্যারামিটার সার্ভার প্রশিক্ষণ ডেটা পুনরাবৃত্তি, এবং নির্দিষ্ট করার সুপারিশ করা হয় steps_per_epoch মধ্যে fit তাই গ্রন্থাগার যুগান্তকারী গণ্ডি জানেন কল।

দয়া করে দেখুন বন্টিত ইনপুট আরো তথ্যের জন্য টিউটোরিয়াল InputContext যুক্তি।

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

কোড dataset_fn ইনপুট ডিভাইস, যা সাধারণত CPU- র হয়, কর্মী মেশিনে প্রতিটি সময় আহবান করা হবে না।

মডেল নির্মাণ এবং সংকলন

এখন, আপনি একটি তৈরি করবে tf.keras.Model -a তুচ্ছ tf.keras.models.Sequential বিক্ষোভের উদ্দেশ্যে-অনুসৃত একটি দ্বারা জন্য মডেল Model.compile কল যেমন একটি অপটিমাইজার, মান, বা এই ধরনের যেমন প্যারামিটার হিসেবে উপাদান, নিগমবদ্ধ steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

কলব্যাক এবং প্রশিক্ষণ

আগে আপনাকে কল model.fit প্রকৃত প্রশিক্ষণের জন্য, এর যেমন সাধারণ কার্য, জন্য প্রয়োজনীয় callbacks প্রস্তুত যাক:

  • ModelCheckpoint : মডেল ওজন সংরক্ষণ করুন।
  • BackupAndRestore : নিশ্চিত প্রশিক্ষণ উন্নতি স্বয়ংক্রিয়ভাবে ব্যাকআপ হয়ে, এবং যদি উদ্ধার করতে ক্লাস্টার অভিজ্ঞতা অভাবে (যেমন পরিত্যাগ বা ফলে সংশোধিত হিসাবে); বা
  • TensorBoard : SUMMARY ফাইল, যা TensorBoard সরঞ্জামে ভিজ্যুয়ালাইজ পেতে মধ্যে অগ্রগতি প্রতিবেদন সংরক্ষণ করুন।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-12-02 02:22:17.429288: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 6s - loss: 0.6550 - 6s/epoch - 286ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.5718 - 546ms/epoch - 27ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b38365dd0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f4b4a806c20> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 1s - loss: 0.4267 - 502ms/epoch - 25ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3612 - 394ms/epoch - 20ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.3184 - 385ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f4b4b93c510>

সঙ্গে সরাসরি ব্যবহার ClusterCoordinator (ঐচ্ছিক)

এমনকি যদি আপনি পছন্দ করে Model.fit প্রশিক্ষণ পাথ, আপনি বিকল্প হিসেবে একটি instantiate করতে tf.distribute.experimental.coordinator.ClusterCoordinator বস্তুর অন্যান্য কার্যাবলী আপনি শ্রমিকদের মৃত্যুদন্ড কার্যকর করা চাই নির্দিষ্ট সময় নির্ধারণের জন্য। দেখুন একটি কাস্টম প্রশিক্ষণ লুপ সঙ্গে প্রশিক্ষণ অধিক বিবরণের এবং উদাহরণ জন্য অধ্যায়।

একটি কাস্টম প্রশিক্ষণ লুপ সঙ্গে প্রশিক্ষণ

সাথে কাস্টম প্রশিক্ষণ লুপ ব্যবহার tf.distribute.Strategy মহান নমনীয়তা প্রশিক্ষণ লুপ সংজ্ঞায়িত করতে প্রদান করে। সঙ্গে ParameterServerStrategy উপরে সংজ্ঞায়িত (যেমন strategy ), আপনি একটি ব্যবহার করা হবে tf.distribute.experimental.coordinator.ClusterCoordinator দূরবর্তী শ্রমিকদের প্রশিক্ষণ পদক্ষেপ কার্যকর প্রাণবধ।

তারপর, আপনি একটি মডেল তৈরি করবে, একটি ডেটাসেটের এবং একটি পদক্ষেপ ফাংশন নির্ধারণ হিসাবে আপনি সঙ্গে অন্যান্য প্রশিক্ষণ লুপ কাজ করেছেন tf.distribute.Strategy গুলি। আপনি আরো বিস্তারিত জানতে পারেন tf.distribute.Strategy সঙ্গে কাস্টম প্রশিক্ষণ টিউটোরিয়াল।

দক্ষ ডেটা সেটটি পূর্বআনয়ন নিশ্চিত করার জন্য, ব্যবহার ডেটা সেটটি সৃষ্টি API গুলি উল্লেখ বিতরণ সুপারিশ দূরবর্তী শ্রমিকদের ডিসপ্যাচ প্রশিক্ষণ পদক্ষেপ বিভাগটি দেখুন। এছাড়াও, কল করতে ভুলবেন না Strategy.run ভিতরে worker_fn শ্রমিকদের বরাদ্দ জিপিইউ পূর্ণ সুবিধা নিতে। বাকি ধাপগুলো GPU সহ বা ছাড়া প্রশিক্ষণের জন্য একই।

আসুন নিম্নলিখিত ধাপে এই উপাদানগুলি তৈরি করি:

ডেটা সেট আপ করুন

প্রথমত, একটি ফাংশন যে একটি ডেটাসেটের যে বাস্তবায়িত যুক্তিবিজ্ঞান preprocessing অন্তর্ভুক্ত সৃষ্টি লিখতে Keras preprocessing স্তর

আপনি বাইরে এই তিনটি স্তরের সৃষ্টি করবে dataset_fn ভিতরে কিন্তু প্রয়োগ রূপান্তর dataset_fn , যেহেতু আপনি মোড়ানো হবে dataset_fn একটি মধ্যে tf.function , যা ভেরিয়েবল এটা ভিতরে তৈরি করা অনুমতি দেয় না।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

একটি ডেটাসেটে খেলনা উদাহরণ তৈরি করুন:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

তারপরে, এক আবৃত প্রশিক্ষণ ডেটা সেটটি তৈরি dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

মডেল তৈরি করুন

এর পরে, মডেল এবং অন্যান্য বস্তু তৈরি করুন। অধীনস্থ সকল ভেরিয়েবল তৈরি করতে নিশ্চিত করুন strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

আসুন নিশ্চিত যে ব্যবহার FixedShardsPartitioner দুই shards সব ভেরিয়েবল বিভক্ত এবং প্রতিটি ঠিকরা বিভিন্ন পরামিতি সার্ভার নির্ধারিত ছিল:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

প্রশিক্ষণের ধাপ সংজ্ঞায়িত করুন

তৃতীয়ত, একটি মধ্যে আবৃত প্রশিক্ষণ পদক্ষেপ তৈরি tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

উপরে প্রশিক্ষণ পদক্ষেপ ফাংশন ইন, কলিং Strategy.run এবং Strategy.reduce মধ্যে step_fn কর্মী প্রতি একাধিক GPU সমর্থন করতে পারে। শ্রমিকদের জিপিইউ বরাদ্দ করে থাকেন, Strategy.run একাধিক প্রতিলিপি উপর ডেটাসেট বিতরণ করবে।

প্রত্যন্ত কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি প্রেরণ করুন

পরে সব কম্পিউটেশন দ্বারা সংজ্ঞায়িত করা হয় ParameterServerStrategy , আপনি ব্যবহার করবে tf.distribute.experimental.coordinator.ClusterCoordinator সম্পদ তৈরি করতে বর্গ এবং দূরবর্তী শ্রমিকদের প্রশিক্ষণ পদক্ষেপ বিতরণের লাইসেন্স দেন।

প্রথম একটি তৈরি করা যাক ClusterCoordinator বস্তু এবং কৌশল বস্তুর মধ্যে পাস:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

তারপরে, একটি প্রতি-কর্মী ডেটাসেট এবং একটি পুনরাবৃত্তিকারী তৈরি করুন। ইন per_worker_dataset_fn নীচে মোড়কে dataset_fn মধ্যে strategy.distribute_datasets_from_function জিপিইউ দক্ষ পূর্বআনয়ন অঙ্গীভূতভাবে করার অনুমতি বাঞ্ছনীয়।

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

চূড়ান্ত পদক্ষেপ ব্যবহার করে দূরবর্তী শ্রমিকদের গণনার বিতরণ করতে হয় ClusterCoordinator.schedule :

  • schedule পদ্ধতি enqueues tf.function এবং ভবিষ্যতে মত ফেরৎ RemoteValue অবিলম্বে। সারিবদ্ধ ফাংশন পটভূমি থ্রেডের মধ্যে দূরবর্তী শ্রমিকদের প্রেষিত করা হবে এবং RemoteValue অ্যাসিঙ্ক্রোনাস পূরণ করা হবে।
  • join পদ্ধতি ( ClusterCoordinator.join ) পর্যন্ত নির্ধারিত ফাংশন মৃত্যুদন্ড কার্যকর করা হয় অপেক্ষা করতে ব্যবহার করা যাবে।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.637500.
Finished epoch 1, accuracy is 0.906250.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

এখানে কিভাবে আপনি একটি ফল আনা যায় RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

বিকল্পভাবে, আপনি সমস্ত পদক্ষেপ চালু করতে পারেন এবং সমাপ্তির জন্য অপেক্ষা করার সময় কিছু করতে পারেন:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

সম্পূর্ণ প্রশিক্ষণ ও এই বিশেষ উদাহরণস্বরূপ ভজনা কর্মপ্রবাহ জন্য, এই চেক আউট দয়া করে পরীক্ষা

ডেটাসেট তৈরি সম্পর্কে আরও

উপরে কোডে ডেটা সেটটি ব্যবহার করে তৈরি করেছেন ClusterCoordinator.create_per_worker_dataset এপিআই)। এটি কর্মী প্রতি একটি ডেটাসেট তৈরি করে এবং একটি ধারক বস্তু প্রদান করে। আপনি কল করতে পারেন iter প্রতি কর্মী পুনরুক্তিকারীর তৈরি করতে এটিতে পদ্ধতি। প্রতি-কর্মী পুনরুক্তিকারীর কর্মী প্রতি এক পুনরুক্তিকারীর রয়েছে এবং একজন শ্রমিকের সংশ্লিষ্ট ফালি ফাংশন প্রেরণ করা ইনপুট যুক্তি দ্বারা প্রতিস্থাপিত হবে ClusterCoordinator.schedule পদ্ধতি ফাংশন একটি নির্দিষ্ট কর্মী উপর মৃত্যুদন্ড কার্যকর করা হয় আগে।

বর্তমানে, ClusterCoordinator.schedule পদ্ধতি অনুমান শ্রমিকদের হয় সমতুল্য এবং এইভাবে অনুমান বিভিন্ন শ্রমিকদের ডেটাসেট একই ব্যতীত তারা এলোমেলো হতে পারে ভিন্নভাবে যদি তাদের ধারণ Dataset.shuffle অপারেশন। এই কারণে, এটি সুপারিশ করা হয় যে ডেটাসেট অনির্দিষ্টকালের জন্য পুনরাবৃত্তি করা প্রয়োজন এবং আপনি যদি এর পরিবর্তে এর উপর নির্ভর করার পদক্ষেপ একটি সসীম সংখ্যা নির্দিষ্ট সময় নির্ধারণের OutOfRangeError একটি ডেটাসেটের থেকে।

আরেকটি গুরুত্বপূর্ণ নোট যে tf.data ডেটাসেট কাজের সীমানা জুড়ে অন্তর্নিহিত ধারাবাহিকতাতে এবং deserialization সমর্থন করি না। সুতরাং এটি ফাংশন প্রেরণ ভিতরে পুরো ডেটা সেটটি তৈরি করা জরুরী ClusterCoordinator.create_per_worker_dataset

মূল্যায়ন

বিতরণ করা প্রশিক্ষণে একটি মূল্যায়ন লুপ সংজ্ঞায়িত এবং চালানোর একাধিক উপায় রয়েছে। নীচে বর্ণিত হিসাবে প্রত্যেকের নিজস্ব সুবিধা এবং অসুবিধা রয়েছে। আপনার পছন্দ না থাকলে ইনলাইন মূল্যায়ন পদ্ধতিটি সুপারিশ করা হয়।

ইনলাইন মূল্যায়ন

এই পদ্ধতিতে, প্রশিক্ষণ ও মূল্যায়ন এবং এভাবেই এটা বলা হয় ইনলাইন মূল্যায়ন মধ্যে সমন্বয়কারী পরিবর্তন ঘটান।

ইনলাইন মূল্যায়নের বেশ কিছু সুবিধা রয়েছে। উদাহরণ স্বরূপ:

  • এটি বড় মূল্যায়ন মডেল এবং মূল্যায়ন ডেটাসেট সমর্থন করতে পারে যা একটি একক কাজ ধরে রাখতে পারে না।
  • মূল্যায়নের ফলাফল পরবর্তী যুগের প্রশিক্ষণের সিদ্ধান্ত নিতে ব্যবহার করা যেতে পারে।

ইনলাইন মূল্যায়ন বাস্তবায়নের দুটি উপায় রয়েছে: সরাসরি মূল্যায়ন এবং বিতরণ মূল্যায়ন।

  • সরাসরি মূল্যায়ন: ছোট মডেল ও মূল্যায়ন ডেটাসেট জন্য, সমন্বয়কারী মূল্যায়ন সরাসরি সমন্বয়কারী উপর মূল্যায়ন ডেটা সেটটি সঙ্গে বিতরণ করা মডেলের উপর রান করতে পারেন:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • বন্টিত মূল্যায়ন: বড় মডেল বা ডেটাসেট যে সমন্বয়কারী সরাসরি চালানোর জন্য infeasible করি, সমন্বয়কারী কাজের মাধ্যমে শ্রমিকদের মূল্যায়ন কর্ম বিতরণ করতে পারেন ClusterCoordinator.schedule / ClusterCoordinator.join পদ্ধতি:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

সাইড-কার মূল্যায়ন

আরেকটি পদ্ধতি পার্শ্ব-গাড়ী মূল্যায়ন যেখানে আপনি একটি ডেডিকেটেড evaluator কার্য তৈরি করে বারবার চেকপয়েন্ট পড়ে এবং একটি সর্বশেষ চেকপয়েন্ট উপর মূল্যায়ন রান বলা হয়। মূল্যায়ন ফলাফলের উপর ভিত্তি করে আপনার প্রশিক্ষণের লুপ পরিবর্তন করার প্রয়োজন না হলে এটি আপনার প্রশিক্ষণ প্রোগ্রামটি তাড়াতাড়ি শেষ করার অনুমতি দেয়। যাইহোক, এটি মূল্যায়ন ট্রিগার করার জন্য একটি অতিরিক্ত মূল্যায়নকারীর কাজ এবং পর্যায়ক্রমিক চেকপয়েন্টিং প্রয়োজন। নিম্নলিখিত একটি সম্ভাব্য পার্শ্ব-কার মূল্যায়ন লুপ:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

বাস্তব বিশ্বের ক্লাস্টার

একটি বাস্তব উত্পাদন পরিবেশে, আপনি বিভিন্ন মেশিনে বিভিন্ন প্রক্রিয়ায় সমস্ত কাজ চালাবেন। প্রতিটি কাজের উপর কনফিগার ক্লাস্টার তথ্যে সহজ উপায় নির্ধারণ করে হয় "TF_CONFIG" এনভায়রনমেন্ট ভেরিয়েবল এবং ব্যবহার tf.distribute.cluster_resolver.TFConfigClusterResolver বিশ্লেষণ করতে "TF_CONFIG"

সম্পর্কে একটি সাধারণ বর্ণনার জন্য "TF_CONFIG" এনভায়রনমেন্ট ভেরিয়েবল, পড়ুন বন্টিত প্রশিক্ষণ গাইড।

আপনি Kubernetes বা অন্যান্য কনফিগারেশন টেমপ্লেট ব্যবহার করে আপনার প্রশিক্ষণ কর্ম শুরু হলে, এটি খুব সম্ভবত এই টেমপ্লেটগুলি ইতিমধ্যে সেট আছে যে “TF_CONFIG" আপনার জন্য।

সেট "TF_CONFIG" এনভায়রনমেন্ট ভেরিয়েবল

ধরুন আপনি 3 শ্রমিক ও 2 পরামিতি সার্ভার আছে, "TF_CONFIG" কর্মী 1 হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" evaluator এর হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" উপরে অংশগ্রহণ "TF_CONFIG" evaluator জন্য স্ট্রিং ঐচ্ছিক।

আপনি যদি সমস্ত কাজের জন্য একই বাইনারি ব্যবহার করেন

আপনি যদি একটি একক বাইনারি ব্যবহার করে এই সমস্ত কাজগুলি চালাতে পছন্দ করেন, তাহলে আপনাকে আপনার প্রোগ্রামের শাখাটিকে একেবারে শুরুতে বিভিন্ন ভূমিকাতে দিতে হবে:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

নিম্নলিখিত কোডটি একটি টেনসরফ্লো সার্ভার শুরু করে এবং অপেক্ষা করে:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

হ্যান্ডলিং টাস্ক ব্যর্থতা

কর্মীর ব্যর্থতা

tf.distribute.experimental.coordinator.ClusterCoordinator বা Model.fit প্রদান বিল্ট-ইন কর্মী ব্যর্থতার জন্য দোষ সহনশীলতা। কর্মী পুনরুদ্ধার করার পরে, (হয় করার আগে যে ডেটা সেটটি ফাংশন ClusterCoordinator.create_per_worker_dataset একটি কাস্টম প্রশিক্ষণ লুপ, অথবা tf.keras.utils.experimental.DatasetCreator জন্য Model.fit ডেটাসেট সৃষ্টি পুনরায়) এর শ্রমিকদের প্রার্থনা করা হবে না।

প্যারামিটার সার্ভার বা সমন্বয়কারী ব্যর্থতা

যাইহোক, যখন সমন্বয়কারী একটি প্যারামিটার সার্ভার ত্রুটি দেখেন, এটি একটি উঠাবে UnavailableError বা AbortedError অবিলম্বে। আপনি এই ক্ষেত্রে সমন্বয়কারী পুনরায় চালু করতে পারেন. সমন্বয়কারী নিজেও অনুপলব্ধ হতে পারে। অতএব, প্রশিক্ষণের অগ্রগতি না হারানোর জন্য নির্দিষ্ট টুলিংয়ের সুপারিশ করা হয়:

  • জন্য Model.fit , আপনি একটি ব্যবহার করা উচিত BackupAndRestore কলব্যাক, যা উন্নতি সংরক্ষণ এবং পুন: প্রতিষ্ঠা স্বয়ংক্রিয়ভাবে পরিচালনা করে। দেখুন Callbacks ও প্রশিক্ষণ উদাহরণের জন্য উপরে অধ্যায়।

  • একটি কাস্টম ট্রেনিং লুপের জন্য, আপনাকে পর্যায়ক্রমে মডেল ভেরিয়েবল চেকপয়েন্ট করা উচিত এবং ট্রেনিং শুরু হওয়ার আগে একটি চেকপয়েন্ট থেকে মডেল ভেরিয়েবল লোড করা উচিত। প্রশিক্ষণ উন্নতি থেকে প্রায় অনুমান করা যায় optimizer.iterations যদি কোনো অপটিমাইজার checkpointed হয়:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

একটি আনয়ন করতে RemoteValue

একটি আনয়ন করতে RemoteValue যদি একটি ফাংশন সফলভাবে কার্যকর সফল হতে নিশ্চিত করা হয়। কারণ বর্তমানে একটি ফাংশন কার্যকর করার পর রিটার্ন মানটি সমন্বয়কারীর কাছে অবিলম্বে অনুলিপি করা হয়। অনুলিপি চলাকালীন কোন কর্মী ব্যর্থ হলে, ফাংশনটি অন্য উপলব্ধ কর্মীর উপর পুনরায় চেষ্টা করা হবে। অতএব, আপনি যদি পারফরম্যান্সের জন্য অপ্টিমাইজ করতে চান তবে আপনি রিটার্ন মান ছাড়াই ফাংশনগুলি নির্ধারণ করতে পারেন।

ভূল প্রতিবেদন

একবার সমন্বয়কারী হিসাবে যেমন একটি ত্রুটি দেখেন UnavailableError যেমন একটি হিসাবে পরামিতি সার্ভার ও অন্যান্য অ্যাপ্লিকেশান ত্রুটিগুলি থেকে InvalidArgument থেকে tf.debugging.check_numerics , এটা ত্রুটি উত্থাপন আগে সব মুলতুবি আছে ও সারিবদ্ধ ফাংশন বাতিল করবেন। আনা হচ্ছে তাদের সংশ্লিষ্ট RemoteValue করা একটি উঠাবে CancelledError

একটি ত্রুটি উত্থাপিত হওয়ার পরে, সমন্বয়কারী একই ত্রুটি বা বাতিল ফাংশন থেকে কোনো ত্রুটি উত্থাপন করবে না।

কর্মক্ষমতা বৃদ্ধি

সেখানে যদি আপনি কর্মক্ষমতা বিষয় দেখতে পেলে আপনার সাথে প্রশিক্ষণ বিভিন্ন সম্ভাব্য কারণ আছে ParameterServerStrategy এবং ClusterResolver

একটি সাধারণ কারণ হল প্যারামিটার সার্ভারের ভারসাম্যহীন লোড রয়েছে এবং কিছু ভারী-লোড করা প্যারামিটার সার্ভার ক্ষমতায় পৌঁছেছে। এছাড়াও একাধিক মূল কারণ থাকতে পারে। এই সমস্যা প্রশমিত করার কিছু সহজ পদ্ধতি হল:

  1. একটি নির্দিষ্ট মাধ্যমে আপনার বৃহৎ মডেল ভেরিয়েবল ঠিকরা variable_partitioner যখন একটি নির্মাণের ParameterServerStrategy
  2. একটি হটস্পট ভেরিয়েবল তৈরি করা এড়িয়ে চলুন যা সম্ভব হলে একটি একক ধাপে সমস্ত প্যারামিটার সার্ভারের জন্য প্রয়োজনীয়। উদাহরণস্বরূপ, একটি ধ্রুবক শেখার হার বা উপশ্রেণী ব্যবহার tf.keras.optimizers.schedules.LearningRateSchedule optimizers মধ্যে যেহেতু ডিফল্ট আচরণ যে শেখার হার একটি পরিবর্তনশীল প্রতিটি পদক্ষেপ মধ্যে একটি নির্দিষ্ট পরামিতি সার্ভার স্থাপন ও অন্যান্য পরামিতি সার্ভার দ্বারা অনুরোধ হয়ে যাবে .
  3. কেরাস প্রিপ্রসেসিং লেয়ারে পাঠানোর আগে আপনার বড় শব্দভান্ডার এলোমেলো করুন।

পারফরম্যান্স সমস্যার আরেকটি সম্ভাব্য কারণ হল সমন্বয়কারী। আপনার প্রথম বাস্তবায়ন schedule / join পাইথন ভিত্তিক হয় এবং এইভাবে ওভারহেড থ্রেডিং থাকতে পারে। এছাড়াও সমন্বয়কারী এবং কর্মীদের মধ্যে লেটেন্সি বড় হতে পারে। এই যদি হয় তাহলে,

  • জন্য Model.fit , আপনি সেট করতে পারেন steps_per_execution দেওয়া যুক্তি Model.compile 1 চেয়ে বড় একটি মান হতে।

  • একটি কাস্টম প্রশিক্ষণ লুপ, আপনি একটি একক একাধিক পদক্ষেপ প্যাক করতে পারেন tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

যেহেতু লাইব্রেরিটি আরও অপ্টিমাইজ করা হয়েছে, আশা করি বেশিরভাগ ব্যবহারকারীকে ভবিষ্যতে ম্যানুয়ালি পদক্ষেপগুলি প্যাক করতে হবে না৷

উপরন্তু, কর্মক্ষমতা উন্নতির জন্য একটি ছোট কৌশল হল রিটার্ন মান ছাড়াই ফাংশনের সময় নির্ধারণ করা, যেমনটি উপরে হ্যান্ডলিং টাস্ক ব্যর্থতা বিভাগে ব্যাখ্যা করা হয়েছে।

পরিচিত সীমাবদ্ধতা

বেশিরভাগ পরিচিত সীমাবদ্ধতাগুলি ইতিমধ্যেই উপরের বিভাগে কভার করা হয়েছে। এই বিভাগে একটি সারসংক্ষেপ প্রদান করে.

ParameterServerStrategy সাধারণ

  • os.environment["grpc_fail_fast"]="use_caller" সমন্বয়কারী সহ যে কাজের উপর প্রয়োজন হয়, ফল্ট সহনশীলতা কাজ সঠিকভাবে করতে।
  • সিঙ্ক্রোনাস প্যারামিটার সার্ভার প্রশিক্ষণ সমর্থিত নয়।
  • সর্বোত্তম কর্মক্ষমতা অর্জনের জন্য সাধারণত একটি একক ফাংশনে একাধিক ধাপ প্যাক করা প্রয়োজন।
  • এটা তোলে মাধ্যমে একটি saved_model লোড করতে সমর্থিত নয় tf.saved_model.load sharded ভেরিয়েবল রয়েছে। দ্রষ্টব্য TensorFlow সার্ভিং ব্যবহার করে এই ধরনের একটি সংরক্ষিত_মডেল লোড করা কাজ করবে বলে আশা করা হচ্ছে।
  • শার্ডেড অপ্টিমাইজার স্লট ভেরিয়েবল সহ একটি ভিন্ন সংখ্যক শার্ডে চেকপয়েন্ট লোড করা সমর্থিত নয়।
  • সমন্বয়কারীর টাস্ক রিস্টার্ট না করে প্যারামিটার সার্ভারের ব্যর্থতা থেকে পুনরুদ্ধার করা সমর্থিত নয়।
  • এর ব্যবহার tf.lookup.StaticHashTable (যা সাধারণভাবে যেমন কিছু Keras preprocessing স্তর দ্বারা নিযুক্ত করা হয় tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup এবং tf.keras.layers.TextVectorization সম্পদ ফলাফল উপর স্থাপন করা) প্যারামিটার সার্ভার প্রশিক্ষণের সাথে এই সময়ে সমন্বয়কারী। কর্মীদের থেকে সমন্বয়কারী পর্যন্ত RPC-এর সন্ধানের জন্য এর কার্যক্ষমতার প্রভাব রয়েছে। এই ঠিকানা একটি বর্তমান উচ্চ অগ্রাধিকার.

Model.fit সুনির্দিষ্ট

  • steps_per_epoch যুক্তি প্রয়োজন বোধ করা হয় Model.fit । আপনি একটি মান নির্বাচন করতে পারেন যা একটি যুগে উপযুক্ত ব্যবধান প্রদান করে।
  • ParameterServerStrategy কর্মক্ষমতা কারণে ব্যাচ পর্যায়ের কল নেই কাস্টম callbacks জন্য সমর্থন নেই। আপনি উপযুক্ত বাছাই করা সঙ্গে যুগান্তকারী স্তর কল মধ্যে যারা কল রূপান্তর করা উচিত steps_per_epoch যাতে তারা যে বলা হয়, steps_per_epoch ধাপের সংখ্যা। অন্তর্নির্মিত কলব্যাকগুলি প্রভাবিত হয় না: তাদের ব্যাচ-স্তরের কলগুলি কার্যকরী হওয়ার জন্য সংশোধন করা হয়েছে৷ জন্য ব্যাচ পর্যায়ের কল সাপোর্টিং ParameterServerStrategy পরিকল্পনা করা হচ্ছে।
  • একই কারণে, অন্যান্য কৌশলগুলির বিপরীতে, অগ্রগতি বার এবং মেট্রিক্স শুধুমাত্র যুগের সীমানায় লগ করা হয়।
  • run_eagerly সমর্থিত নয়।

কাস্টম প্রশিক্ষণ লুপ সুনির্দিষ্ট