TensorFlow.org এ দেখুন | Google Colab-এ চালান | GitHub-এ উৎস দেখুন | নোটবুক ডাউনলোড করুন |
ওভারভিউ
প্যারামিটার সার্ভার প্রশিক্ষণ একাধিক মেশিনে মডেল প্রশিক্ষণ স্কেল আপ করার জন্য একটি সাধারণ ডেটা-সমান্তরাল পদ্ধতি।
একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারে কর্মী এবং পরামিতি সার্ভার থাকে । ভেরিয়েবলগুলি প্যারামিটার সার্ভারে তৈরি করা হয় এবং সেগুলি প্রতিটি ধাপে কর্মীদের দ্বারা পড়া এবং আপডেট করা হয়। ডিফল্টরূপে, কর্মীরা একে অপরের সাথে সিঙ্ক্রোনাইজ না করে স্বাধীনভাবে এই ভেরিয়েবলগুলি পড়ে এবং আপডেট করে। এই কারণেই কখনও কখনও প্যারামিটার সার্ভার-স্টাইল প্রশিক্ষণকে অ্যাসিঙ্ক্রোনাস প্রশিক্ষণ বলা হয়।
TensorFlow 2-এ, প্যারামিটার সার্ভার প্রশিক্ষণ tf.distribute.experimental.ParameterServerStrategy
ক্লাস দ্বারা চালিত হয়, যা প্রশিক্ষণের ধাপগুলিকে একটি ক্লাস্টারে বিতরণ করে যা হাজার হাজার কর্মী (প্যারামিটার সার্ভার সহ) পর্যন্ত স্কেল করে।
সমর্থিত প্রশিক্ষণ পদ্ধতি
দুটি প্রধান সমর্থিত প্রশিক্ষণ পদ্ধতি আছে:
-
Model.fit
API, যেটি সুপারিশ করা হয় যখন আপনি একটি উচ্চ-স্তরের বিমূর্ততা এবং প্রশিক্ষণ পরিচালনা করতে পছন্দ করেন। - একটি কাস্টম প্রশিক্ষণ লুপ (আপনি কাস্টম প্রশিক্ষণ উল্লেখ করতে পারেন, স্ক্র্যাচ থেকে একটি প্রশিক্ষণ লুপ লেখা এবং আরও বিশদ বিবরণের জন্য কেরাস এবং মাল্টিওয়ার্কার মিররড স্ট্র্যাটেজি সহ কাস্টম প্রশিক্ষণ লুপ ।) কাস্টম লুপ প্রশিক্ষণ সুপারিশ করা হয় যখন আপনি তাদের প্রশিক্ষণ লুপের বিবরণ সংজ্ঞায়িত করতে পছন্দ করেন।
কাজ এবং কাজ সহ একটি ক্লাস্টার
পছন্দের API ( Model.fit
বা একটি কাস্টম ট্রেনিং লুপ) যাই হোক না কেন, TensorFlow 2-এ বিতরণ করা প্রশিক্ষণ জড়িত: বেশ কয়েকটি 'jobs'
সহ একটি 'cluster'
, এবং প্রতিটি কাজের এক বা একাধিক 'tasks'
থাকতে পারে।
প্যারামিটার সার্ভার প্রশিক্ষণ ব্যবহার করার সময়, এটি সুপারিশ করা হয়:
- একজন সমন্বয়কারীর কাজ (যার কাজের নাম
chief
আছে) - একাধিক কর্মীর কাজ (চাকরীর নাম
worker
); এবং - একাধিক প্যারামিটার সার্ভার কাজ (চাকরীর নাম
ps
)
যখন সমন্বয়কারী সংস্থান তৈরি করে, প্রশিক্ষণের কাজগুলি প্রেরণ করে, চেকপয়েন্টগুলি লেখে এবং টাস্ক ব্যর্থতার সাথে কাজ করে, তখন কর্মী এবং প্যারামিটার সার্ভারগুলি tf.distribute.Server
চালায় যেগুলি সমন্বয়কারীর অনুরোধগুলি শোনে।
Model.fit
API সহ প্যারামিটার সার্ভার প্রশিক্ষণ
Model.fit
API-এর সাথে প্যারামিটার সার্ভার প্রশিক্ষণের জন্য সমন্বয়কারীকে একটি tf.distribute.experimental.ParameterServerStrategy
অবজেক্ট এবং একটি tf.keras.utils.experimental.DatasetCreator
ইনপুট হিসেবে ব্যবহার করতে হবে। কোন কৌশল ছাড়া Model.fit
ব্যবহারের অনুরূপ, বা অন্যান্য কৌশলগুলির সাথে, কর্মপ্রবাহে মডেল তৈরি এবং কম্পাইল করা, কলব্যাক প্রস্তুত করা এবং একটি Model.fit
কল অনুসরণ করা জড়িত।
একটি কাস্টম প্রশিক্ষণ লুপ সহ প্যারামিটার সার্ভার প্রশিক্ষণ
কাস্টম প্রশিক্ষণ লুপগুলির সাথে, tf.distribute.experimental.coordinator.ClusterCoordinator
ক্লাসটি সমন্বয়কারীর জন্য ব্যবহৃত মূল উপাদান।
-
ClusterCoordinator
শ্রেণীকে একটিtf.distribute.Strategy
অবজেক্টের সাথে একত্রে কাজ করতে হবে। - এই
tf.distribute.Strategy
অবজেক্টটি ক্লাস্টারের তথ্য প্রদানের জন্য প্রয়োজন এবং একটি প্রশিক্ষণের ধাপ সংজ্ঞায়িত করতে ব্যবহৃত হয়, যেমনটি tf.distribute.Strategy-এর সাথে কাস্টম প্রশিক্ষণে প্রদর্শিত হয়েছে। -
ClusterCoordinator
অবজেক্ট তারপর এই প্রশিক্ষণের পদক্ষেপগুলি দূরবর্তী কর্মীদের কাছে প্রেরণ করে। - প্যারামিটার সার্ভার প্রশিক্ষণের জন্য, ক্লাস্টার
tf.distribute.experimental.ParameterServerStrategy
ClusterCoordinator
সাথে কাজ করতে হবে।
ClusterCoordinator
অবজেক্ট দ্বারা প্রদত্ত সবচেয়ে গুরুত্বপূর্ণ API হল schedule
:
-
schedule
API একটিtf.function
করে এবং অবিলম্বে ভবিষ্যতের মতোRemoteValue
করে। - সারিবদ্ধ ফাংশনগুলি পটভূমির থ্রেডগুলিতে দূরবর্তী কর্মীদের কাছে প্রেরণ করা হবে এবং তাদের
RemoteValue
গুলি অ্যাসিঙ্ক্রোনাসভাবে পূরণ করা হবে। - যেহেতু
schedule
জন্য কর্মী নিয়োগের প্রয়োজন হয় না, তাই পাস করাtf.function
যেকোনো উপলব্ধ কর্মীর উপর কার্যকর করা যেতে পারে। - যে কর্মীটির উপর এটি কার্যকর করা হয়েছে সেটি সম্পূর্ণ হওয়ার আগে অনুপলব্ধ হলে, অন্য উপলব্ধ কর্মীর উপর ফাংশনটি পুনরায় চেষ্টা করা হবে।
- এই সত্যের কারণে এবং ফাংশন এক্সিকিউশন পারমাণবিক নয়, একটি ফাংশন একাধিকবার কার্যকর করা যেতে পারে।
দূরবর্তী ফাংশন প্রেরণের পাশাপাশি, ClusterCoordinator
সমস্ত কর্মীদের ডেটাসেট তৈরি করতে এবং একজন কর্মী ব্যর্থতা থেকে পুনরুদ্ধার করলে এই ডেটাসেটগুলি পুনর্নির্মাণ করতে সহায়তা করে।
টিউটোরিয়াল সেটআপ
টিউটোরিয়ালটি Model.fit
এবং কাস্টম প্রশিক্ষণ লুপ পাথগুলিতে শাখা হবে এবং আপনি আপনার প্রয়োজনের সাথে মানানসই একটি বেছে নিতে পারেন। "X এর সাথে প্রশিক্ষণ" ছাড়া অন্য বিভাগগুলি উভয় পথের জন্য প্রযোজ্য।
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
ক্লাস্টার সেটআপ
উপরে উল্লিখিত হিসাবে, একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারের জন্য একটি সমন্বয়কারীর টাস্ক প্রয়োজন যা আপনার প্রশিক্ষণ প্রোগ্রাম চালায়, এক বা একাধিক কর্মী এবং প্যারামিটার সার্ভারের টাস্ক যা TensorFlow সার্ভার চালায়— tf.distribute.Server
— এবং সম্ভবত একটি অতিরিক্ত মূল্যায়ন টাস্ক যা সাইড-কার মূল্যায়ন চালায়। (নীচে সাইড-কার মূল্যায়ন বিভাগটি দেখুন)। তাদের সেট আপ করার প্রয়োজনীয়তা হল:
- সমন্বয়কারীর কাজটিকে মূল্যায়নকারী ছাড়া অন্য সব TensorFlow সার্ভারের ঠিকানা এবং পোর্ট জানতে হবে।
- কর্মী এবং পরামিতি সার্ভারগুলি তাদের কোন পোর্ট শুনতে হবে তা জানতে হবে। সরলতার জন্য, আপনি সাধারণত এই কাজগুলিতে টেনসরফ্লো সার্ভার তৈরি করার সময় সম্পূর্ণ ক্লাস্টার তথ্য পাস করতে পারেন।
- মূল্যায়নকারীর কাজকে প্রশিক্ষণ ক্লাস্টারের সেটআপ জানতে হবে না। যদি তা হয়, তাহলে প্রশিক্ষণ ক্লাস্টারের সাথে সংযোগ করার চেষ্টা করা উচিত নয়।
- কর্মী এবং প্যারামিটার সার্ভারের যথাক্রমে
"worker"
এবং"ps"
হিসাবে টাস্কের ধরন থাকা উচিত। কোঅর্ডিনেটরকে উত্তরাধিকারের কারণে টাস্ক টাইপ হিসাবে"chief"
ব্যবহার করা উচিত।
এই টিউটোরিয়ালে, আপনি একটি ইন-প্রসেস ক্লাস্টার তৈরি করবেন যাতে পুরো প্যারামিটার সার্ভার প্রশিক্ষণ Colab-এ চালানো যায়। আপনি পরবর্তী বিভাগে কীভাবে আসল ক্লাস্টার সেট আপ করবেন তা শিখবেন।
ইন-প্রসেস ক্লাস্টার
আপনি আগে থেকেই বেশ কয়েকটি টেনসরফ্লো সার্ভার তৈরি করে শুরু করবেন এবং পরে তাদের সাথে সংযুক্ত হবেন। মনে রাখবেন যে এটি শুধুমাত্র এই টিউটোরিয়ালের প্রদর্শনের উদ্দেশ্যে, এবং প্রকৃত প্রশিক্ষণে সার্ভারগুলি "worker"
এবং "ps"
মেশিনে চালু করা হবে।
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
ইন-প্রসেস ক্লাস্টার সেটআপ প্রায়শই ইউনিট পরীক্ষায় ব্যবহৃত হয়, যেমন এখানে ।
স্থানীয় পরীক্ষার জন্য আরেকটি বিকল্প হল স্থানীয় মেশিনে প্রসেস চালু করা—এই পদ্ধতির উদাহরণের জন্য কেরাসের সাথে মাল্টি-কর্মী প্রশিক্ষণ দেখুন ।
একটি প্যারামিটার সার্ভার স্ট্র্যাটেজি চালু করুন
আপনি প্রশিক্ষণ কোডে ডুব দেওয়ার আগে, আসুন একটি ParameterServerStrategy
অবজেক্টকে ইনস্ট্যান্টিয়েট করি। মনে রাখবেন যে আপনি Model.fit
বা একটি কাস্টম প্রশিক্ষণ লুপের সাথে এগিয়ে যাচ্ছেন কিনা তা নির্বিশেষে এটি প্রয়োজন। variable_partitioner
আর্গুমেন্টটি পরিবর্তনশীল শার্ডিং বিভাগে ব্যাখ্যা করা হবে।
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
প্রশিক্ষণের জন্য GPUs ব্যবহার করার জন্য, প্রতিটি কর্মীর জন্য দৃশ্যমান GPU বরাদ্দ করুন। ParameterServerStrategy
সার্ভার স্ট্র্যাটেজি প্রতিটি কর্মীর জন্য সমস্ত উপলব্ধ জিপিইউ ব্যবহার করবে, এই সীমাবদ্ধতার সাথে যে সমস্ত কর্মীদের একই সংখ্যক জিপিইউ উপলব্ধ থাকতে হবে।
পরিবর্তনশীল শার্ডিং
ভেরিয়েবল শার্ডিং বলতে বোঝায় একটি ভেরিয়েবলকে একাধিক ছোট ভেরিয়েবলে বিভক্ত করা, যাকে শার্ড বলা হয়। এই শার্ডগুলি অ্যাক্সেস করার সময় নেটওয়ার্ক লোড বিতরণ করার জন্য পরিবর্তনশীল শার্ডিং কার্যকর হতে পারে। এটি একাধিক প্যারামিটার সার্ভার জুড়ে একটি সাধারণ ভেরিয়েবলের গণনা এবং সঞ্চয়স্থান বিতরণ করতেও দরকারী।
পরিবর্তনশীল শার্ডিং সক্ষম করতে, আপনি একটি ParameterServerStrategy
সার্ভার স্ট্র্যাটেজি অবজেক্ট তৈরি করার সময় একটি variable_partitioner
এ পাস করতে পারেন। যখন একটি ভেরিয়েবল তৈরি করা হয় তখন প্রতিবার variable_partitioner
আহ্বান করা হবে এবং এটি ভেরিয়েবলের প্রতিটি ডাইমেনশন বরাবর শার্ডের সংখ্যা ফেরত দেবে বলে আশা করা হয়। কিছু আউট-অফ-বক্স variable_partitioner
প্রদান করা হয় যেমন tf.distribute.experimental.partitioners.MinSizePartitioner
। ছোট ভেরিয়েবলের বিভাজন এড়াতে tf.distribute.experimental.partitioners.MinSizePartitioner
এর মতো আকার-ভিত্তিক পার্টিশনার ব্যবহার করার পরামর্শ দেওয়া হয়, যা মডেল প্রশিক্ষণের গতিতে নেতিবাচক প্রভাব ফেলতে পারে।
যখন একটি variable_partitioner
পাস করা হয় এবং আপনি যদি সরাসরি strategy.scope()
এর অধীনে একটি ভেরিয়েবল তৈরি করেন, এটি একটি variables
বৈশিষ্ট্য সহ একটি ধারক প্রকারে পরিণত হবে যা শার্ডগুলির তালিকায় অ্যাক্সেস প্রদান করে। বেশিরভাগ ক্ষেত্রে, এই ধারকটি স্বয়ংক্রিয়ভাবে সমস্ত শার্ডগুলিকে একত্রিত করে একটি টেনসরে রূপান্তরিত হবে৷ ফলস্বরূপ, এটি একটি সাধারণ পরিবর্তনশীল হিসাবে ব্যবহার করা যেতে পারে। অন্যদিকে, কিছু TensorFlow পদ্ধতি যেমন tf.nn.embedding_lookup
এই কন্টেইনার প্রকারের জন্য দক্ষ বাস্তবায়ন প্রদান করে এবং এই পদ্ধতিতে স্বয়ংক্রিয় সংযোজন এড়ানো হবে।
আরো বিস্তারিত জানার জন্য tf.distribute.experimental.ParameterServerStrategy
এর API ডক্স দেখুন।
Model.fit
সাথে প্রশিক্ষণ
Model.fit
এর মাধ্যমে একটি সহজে-ব্যবহারযোগ্য প্রশিক্ষণ API প্রদান করে যা হুডের নিচে প্রশিক্ষণ লুপ পরিচালনা করে, ওভাররিডেবল train_step
, এবং কলব্যাকের নমনীয়তা সহ, যা টেনসরবোর্ডের জন্য চেকপয়েন্ট সংরক্ষণ বা সারাংশ সংরক্ষণের মতো কার্যকারিতা প্রদান করে। Model.fit
এর সাথে, কৌশল অবজেক্টের একটি সাধারণ অদলবদল সহ অন্যান্য কৌশলগুলির জন্য একই প্রশিক্ষণ কোড ব্যবহার করা যেতে পারে।
তথ্য অন্তর্ভুক্তী
প্যারামিটার সার্ভার প্রশিক্ষণের সাথে Model.fit
এর জন্য ইনপুট ডেটা একটি কলেবলে সরবরাহ করা প্রয়োজন যা tf.distribute.InputContext
টাইপের একটি একক আর্গুমেন্ট নেয় এবং একটি tf.data.Dataset
। তারপর, একটি tf.keras.utils.experimental.DatasetCreator
অবজেক্ট তৈরি করুন যা এই ধরনের callable
এবং একটি ঐচ্ছিক tf.distribute.InputOptions
অবজেক্ট input_options
আর্গুমেন্টের মাধ্যমে নেয়।
মনে রাখবেন যে প্যারামিটার সার্ভার প্রশিক্ষণের সাথে ডেটা শাফেল এবং পুনরাবৃত্তি করার পরামর্শ দেওয়া হয় এবং fit
কলে steps_per_epoch
নির্দিষ্ট করুন যাতে লাইব্রেরি যুগের সীমানা জানে৷
ইনপুট InputContext
আর্গুমেন্ট সম্পর্কে আরও তথ্যের জন্য অনুগ্রহ করে ডিস্ট্রিবিউটেড ইনপুট টিউটোরিয়াল দেখুন।
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
dataset_fn
এর কোডটি ইনপুট ডিভাইসে, যা সাধারণত CPU হয়, প্রতিটি কর্মী মেশিনে ব্যবহার করা হবে।
মডেল নির্মাণ এবং সংকলন
এখন, আপনি একটি tf.keras.Model
তৈরি করবেন —একটি তুচ্ছ tf.keras.models.Sequential
. প্রদর্শনের উদ্দেশ্যে অনুক্রমিক মডেল—এর পরে একটি Model.compile
কলের মাধ্যমে উপাদানগুলি, যেমন একটি অপ্টিমাইজার, মেট্রিক্স, বা প্যারামিটার যেমন steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
কলব্যাক এবং প্রশিক্ষণ
আপনি প্রকৃত প্রশিক্ষণের জন্য model.fit
কল করার আগে, আসুন সাধারণ কাজের জন্য প্রয়োজনীয় কলব্যাকগুলি প্রস্তুত করি, যেমন:
-
ModelCheckpoint
: মডেলের ওজন সংরক্ষণ করতে। -
BackupAndRestore
: প্রশিক্ষণের অগ্রগতি স্বয়ংক্রিয়ভাবে ব্যাক আপ করা হয়েছে তা নিশ্চিত করতে এবং ক্লাস্টারটি অনুপলব্ধতা অনুভব করলে পুনরুদ্ধার করা হয় (যেমন গর্ভপাত বা প্রিম্পশন); বা -
TensorBoard
: সারাংশ ফাইলগুলিতে অগ্রগতি প্রতিবেদন সংরক্ষণ করতে, যা টেনসরবোর্ড টুলে ভিজ্যুয়ালাইজ করা হয়।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
ClusterCoordinator
সাথে সরাসরি ব্যবহার (ঐচ্ছিক)
এমনকি যদি আপনি Model.fit
প্রশিক্ষণের পথ বেছে নেন, আপনি ঐচ্ছিকভাবে একটি tf.distribute.experimental.coordinator.ClusterCoordinator
অবজেক্ট তৈরি করতে পারেন যাতে আপনি কর্মীদের উপর সম্পাদন করতে চান এমন অন্যান্য ফাংশন নির্ধারণ করতে পারেন। আরও বিশদ বিবরণ এবং উদাহরণের জন্য একটি কাস্টম প্রশিক্ষণ লুপ বিভাগ সহ প্রশিক্ষণ দেখুন।
একটি কাস্টম প্রশিক্ষণ লুপ সঙ্গে প্রশিক্ষণ
tf.distribute.Strategy সহ কাস্টম প্রশিক্ষণ লুপ ব্যবহার করা প্রশিক্ষণ tf.distribute.Strategy
সংজ্ঞায়িত করার জন্য দুর্দান্ত নমনীয়তা প্রদান করে। উপরে সংজ্ঞায়িত ParameterServerStrategy
সার্ভার স্ট্র্যাটেজির সাথে ( strategy
হিসাবে), আপনি একটি tf.distribute.experimental.coordinator.ClusterCoordinator
ব্যবহার করবেন দূরবর্তী কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি সম্পাদন করার জন্য।
তারপর, আপনি একটি মডেল তৈরি করবেন, একটি ডেটাসেট এবং একটি ধাপ ফাংশন সংজ্ঞায়িত করবেন, যেমন আপনি অন্যান্য tf.distribute.Strategy
s-এর সাথে প্রশিক্ষণ লুপে করেছেন। আপনি tf.distribute.Strategy টিউটোরিয়াল সহ কাস্টম প্রশিক্ষণে আরও বিশদ জানতে পারেন।
দক্ষ ডেটাসেট প্রিফেচিং নিশ্চিত করতে, নীচে প্রত্যন্ত কর্মীদের বিভাগে পাঠানোর প্রশিক্ষণের ধাপে উল্লিখিত প্রস্তাবিত বিতরণ করা ডেটাসেট তৈরি API ব্যবহার করুন। এছাড়াও, শ্রমিকদের জন্য বরাদ্দকৃত GPU-এর সম্পূর্ণ সুবিধা নিতে worker_fn
এর ভিতরে Strategy.run
কল করতে ভুলবেন না। বাকি ধাপগুলো GPU সহ বা ছাড়া প্রশিক্ষণের জন্য একই।
আসুন নিম্নলিখিত ধাপে এই উপাদানগুলি তৈরি করি:
ডেটা সেট আপ করুন
প্রথমে, একটি ফাংশন লিখুন যা একটি ডেটাসেট তৈরি করে যাতে কেরাস প্রিপ্রসেসিং স্তরগুলি দ্বারা বাস্তবায়িত প্রিপ্রসেসিং লজিক অন্তর্ভুক্ত থাকে।
আপনি dataset_fn
dataset_fn
ভিতরে রূপান্তর প্রয়োগ করবেন, যেহেতু আপনি dataset_fn
কে একটি tf.function
এ মোড়ানো হবে, যা এর ভিতরে ভেরিয়েবল তৈরি করতে দেয় না।
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
একটি ডেটাসেটে খেলনা উদাহরণ তৈরি করুন:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
তারপর, একটি dataset_fn
এ মোড়ানো প্রশিক্ষণ ডেটাসেট তৈরি করুন:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
মডেল তৈরি করুন
এর পরে, মডেল এবং অন্যান্য বস্তু তৈরি করুন। strategy.scope
অধীনে সমস্ত ভেরিয়েবল তৈরি করা নিশ্চিত করুন।
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
আসুন নিশ্চিত করি যে FixedShardsPartitioner-এর ব্যবহার সমস্ত ভেরিয়েবলকে দুটি FixedShardsPartitioner
বিভক্ত করেছে এবং প্রতিটি শার্ড বিভিন্ন প্যারামিটার সার্ভারে বরাদ্দ করা হয়েছে:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
প্রশিক্ষণের ধাপ সংজ্ঞায়িত করুন
তৃতীয়ত, একটি tf.function
এ মোড়ানো প্রশিক্ষণের ধাপ তৈরি করুন:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
উপরের ট্রেনিং স্টেপ ফাংশনে, step_fn
এ Strategy.run
এবং Strategy.reduce
কল করলে প্রতি কর্মী একাধিক GPU সমর্থন করতে পারে। যদি কর্মীদের জিপিইউ বরাদ্দ থাকে, তাহলে Strategy.run
একাধিক প্রতিলিপিতে ডেটাসেট বিতরণ করবে।
প্রত্যন্ত কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি প্রেরণ করুন
ParameterServerStrategy
সার্ভারস্ট্র্যাটেজি দ্বারা সমস্ত গণনা সংজ্ঞায়িত হওয়ার পরে, আপনি রিসোর্স তৈরি করতে এবং দূরবর্তী কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি বিতরণ করতে tf.distribute.experimental.coordinator.ClusterCoordinator
ক্লাস ব্যবহার করবেন।
প্রথমে একটি ClusterCoordinator
অবজেক্ট তৈরি করি এবং কৌশল অবজেক্টে পাস করি:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
তারপরে, একটি প্রতি-কর্মী ডেটাসেট এবং একটি পুনরাবৃত্তিকারী তৈরি করুন। নিচের per_worker_dataset_fn
এ, dataset_fn
কে strategy.distribute_datasets_from_function
এ মোড়ানোর পরামর্শ দেওয়া হয় যাতে করে GPU-তে নির্বিঘ্নে দক্ষ প্রিফেচ করা যায়।
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
চূড়ান্ত ধাপ হল ClusterCoordinator.schedule
ব্যবহার করে দূরবর্তী কর্মীদের গণনা বিতরণ করা:
-
schedule
পদ্ধতি একটিtf.function
করে এবং অবিলম্বে ভবিষ্যতের মতোRemoteValue
করে। সারিবদ্ধ ফাংশনগুলি পটভূমির থ্রেডগুলিতে দূরবর্তী কর্মীদের কাছে পাঠানো হবে এবং রিমোটRemoteValue
অ্যাসিঙ্ক্রোনাসভাবে পূরণ করা হবে। -
join
পদ্ধতি (ClusterCoordinator.join
) সমস্ত নির্ধারিত ফাংশন কার্যকর না হওয়া পর্যন্ত অপেক্ষা করতে ব্যবহার করা যেতে পারে।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
এখানে আপনি কিভাবে একটি RemoteValue
এর ফলাফল আনতে পারেন:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
বিকল্পভাবে, আপনি সমস্ত পদক্ষেপ চালু করতে পারেন এবং সমাপ্তির জন্য অপেক্ষা করার সময় কিছু করতে পারেন:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
এই বিশেষ উদাহরণের জন্য সম্পূর্ণ প্রশিক্ষণ এবং পরিবেশন কর্মপ্রবাহের জন্য, অনুগ্রহ করে এই পরীক্ষাটি দেখুন।
ডেটাসেট তৈরি সম্পর্কে আরও
উপরের কোডের ClusterCoordinator.create_per_worker_dataset
API ব্যবহার করে তৈরি করা হয়েছে। এটি কর্মী প্রতি একটি ডেটাসেট তৈরি করে এবং একটি ধারক বস্তু প্রদান করে। আপনি প্রতি-কর্মী পুনরাবৃত্তিকারী তৈরি করতে এটিতে iter
পদ্ধতিতে কল করতে পারেন। প্রতি-কর্মী ইটারেটারে প্রতি কর্মী প্রতি একটি ইটারেটর থাকে এবং একটি নির্দিষ্ট কর্মীর উপর ফাংশনটি কার্যকর করার আগে ClusterCoordinator.schedule
পদ্ধতিতে পাস করা ফাংশনের ইনপুট আর্গুমেন্টে একজন কর্মীর অনুরূপ স্লাইস প্রতিস্থাপিত হবে।
বর্তমানে, ClusterCoordinator.schedule
পদ্ধতি অনুমান করে যে শ্রমিকরা সমতুল্য এবং এইভাবে ধরে নেয় যে বিভিন্ন কর্মীদের ডেটাসেটগুলি একই, যদি তাদের মধ্যে Dataset.shuffle
অপারেশন থাকে তবে সেগুলি ভিন্নভাবে শাফেল হতে পারে। এই কারণে, এটিও সুপারিশ করা হয় যে ডেটাসেটগুলি অনির্দিষ্টকালের জন্য পুনরাবৃত্তি করা হবে এবং আপনি একটি ডেটাসেট থেকে OutOfRangeError
এর উপর নির্ভর করার পরিবর্তে একটি সীমিত সংখ্যক পদক্ষেপের সময় নির্ধারণ করুন৷
আরেকটি গুরুত্বপূর্ণ নোট হল যে tf.data
ডেটাসেটগুলি টাস্ক সীমানা জুড়ে অন্তর্নিহিত সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশন সমর্থন করে না। তাই ClusterCoordinator.create_per_worker_dataset
এ পাস করা ফাংশনের ভিতরে পুরো ডেটাসেট তৈরি করা গুরুত্বপূর্ণ।
মূল্যায়ন
বিতরণ করা প্রশিক্ষণে একটি মূল্যায়ন লুপ সংজ্ঞায়িত এবং চালানোর একাধিক উপায় রয়েছে। নীচে বর্ণিত হিসাবে প্রত্যেকের নিজস্ব সুবিধা এবং অসুবিধা রয়েছে। আপনার পছন্দ না থাকলে ইনলাইন মূল্যায়ন পদ্ধতিটি সুপারিশ করা হয়।
ইনলাইন মূল্যায়ন
এই পদ্ধতিতে, সমন্বয়কারী প্রশিক্ষণ এবং মূল্যায়নের মধ্যে বিকল্প করে এবং এইভাবে একে ইনলাইন মূল্যায়ন বলা হয়।
ইনলাইন মূল্যায়নের বেশ কিছু সুবিধা রয়েছে। উদাহরণ স্বরূপ:
- এটি বড় মূল্যায়ন মডেল এবং মূল্যায়ন ডেটাসেট সমর্থন করতে পারে যা একটি একক কাজ ধরে রাখতে পারে না।
- মূল্যায়নের ফলাফল পরবর্তী যুগের প্রশিক্ষণের সিদ্ধান্ত নিতে ব্যবহার করা যেতে পারে।
ইনলাইন মূল্যায়ন বাস্তবায়নের দুটি উপায় রয়েছে: সরাসরি মূল্যায়ন এবং বিতরণ মূল্যায়ন।
- প্রত্যক্ষ মূল্যায়ন : ছোট মডেল এবং মূল্যায়ন ডেটাসেটের জন্য, সমন্বয়কারী সরাসরি বিতরণকৃত মডেলে সমন্বয়কারীর মূল্যায়ন ডেটাসেটের সাথে মূল্যায়ন চালাতে পারে:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- বিতরণকৃত মূল্যায়ন : বড় মডেল বা ডেটাসেটগুলির জন্য যেগুলি সরাসরি সমন্বয়কারীতে চালানো অসম্ভব, সমন্বয়কারীর কাজটি
ClusterCoordinator.schedule
/ClusterCoordinator.join
পদ্ধতির মাধ্যমে কর্মীদের মূল্যায়নের কাজগুলি বিতরণ করতে পারে:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
সাইড-কার মূল্যায়ন
আরেকটি পদ্ধতিকে সাইড-কার মূল্যায়ন বলা হয় যেখানে আপনি একটি ডেডিকেটেড মূল্যায়নকারী কাজ তৈরি করেন যা বারবার চেকপয়েন্ট পড়ে এবং একটি সর্বশেষ চেকপয়েন্টে মূল্যায়ন চালায়। মূল্যায়ন ফলাফলের উপর ভিত্তি করে আপনার প্রশিক্ষণের লুপ পরিবর্তন করার প্রয়োজন না হলে এটি আপনার প্রশিক্ষণ প্রোগ্রামটি তাড়াতাড়ি শেষ করার অনুমতি দেয়। যাইহোক, এটি মূল্যায়ন ট্রিগার করার জন্য একটি অতিরিক্ত মূল্যায়নকারীর কাজ এবং পর্যায়ক্রমিক চেকপয়েন্টিং প্রয়োজন। নিম্নলিখিত একটি সম্ভাব্য পার্শ্ব-কার মূল্যায়ন লুপ:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
বাস্তব বিশ্বের ক্লাস্টার
একটি বাস্তব উত্পাদন পরিবেশে, আপনি বিভিন্ন মেশিনে বিভিন্ন প্রক্রিয়ায় সমস্ত কাজ চালাবেন। প্রতিটি টাস্কে ক্লাস্টার তথ্য কনফিগার করার সবচেয়ে সহজ উপায় হল "TF_CONFIG"
পরিবেশ ভেরিয়েবল সেট করা এবং "TF_CONFIG"
পার্স করার জন্য একটি tf.distribute.cluster_resolver.TFConfigClusterResolver
ব্যবহার করা।
"TF_CONFIG"
এনভায়রনমেন্ট ভেরিয়েবল সম্পর্কে একটি সাধারণ বর্ণনার জন্য, ডিস্ট্রিবিউটেড ট্রেনিং গাইড দেখুন।
আপনি যদি Kubernetes বা অন্যান্য কনফিগারেশন টেমপ্লেটগুলি ব্যবহার করে আপনার প্রশিক্ষণের কাজগুলি শুরু করেন, তাহলে খুব সম্ভবত এই টেমপ্লেটগুলি ইতিমধ্যেই আপনার জন্য “TF_CONFIG"
সেট করেছে৷
"TF_CONFIG"
পরিবেশ পরিবর্তনশীল সেট করুন
ধরুন আপনার 3 জন কর্মী এবং 2টি প্যারামিটার সার্ভার আছে, কর্মী 1 এর "TF_CONFIG"
হতে পারে:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
মূল্যায়নকারীর "TF_CONFIG"
হতে পারে:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
মূল্যায়নকারীর জন্য উপরের "TF_CONFIG"
স্ট্রিংয়ের "cluster"
অংশটি ঐচ্ছিক।
আপনি যদি সমস্ত কাজের জন্য একই বাইনারি ব্যবহার করেন
আপনি যদি একটি একক বাইনারি ব্যবহার করে এই সমস্ত কাজগুলি চালাতে পছন্দ করেন, তাহলে আপনাকে আপনার প্রোগ্রামের শাখাটিকে একেবারে শুরুতে বিভিন্ন ভূমিকাতে দিতে হবে:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
নিম্নলিখিত কোডটি একটি টেনসরফ্লো সার্ভার শুরু করে এবং অপেক্ষা করে:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
হ্যান্ডলিং টাস্ক ব্যর্থতা
কর্মীর ব্যর্থতা
tf.distribute.experimental.coordinator.ClusterCoordinator
বা Model.fit
কর্মীদের ব্যর্থতার জন্য বিল্ট-ইন ফল্ট টলারেন্স প্রদান করে। কর্মী পুনরুদ্ধার করার পরে, পূর্বে প্রদত্ত ডেটাসেট ফাংশন (হয় একটি কাস্টম প্রশিক্ষণ লুপের জন্য ClusterCoordinator.create_per_worker_dataset
, অথবা tf.keras.utils.experimental.DatasetCreator
for Model.fit
) ডেটাসেটগুলি পুনরায় তৈরি করার জন্য কর্মীদের কাছে আহ্বান করা হবে৷
প্যারামিটার সার্ভার বা সমন্বয়কারী ব্যর্থতা
যাইহোক, যখন সমন্বয়কারী একটি প্যারামিটার সার্ভার ত্রুটি দেখে, তখন এটি অবিলম্বে একটি UnavailableError
বা AbortedError
উত্থাপন করবে। আপনি এই ক্ষেত্রে সমন্বয়কারী পুনরায় চালু করতে পারেন। সমন্বয়কারী নিজেও অনুপলব্ধ হতে পারে। অতএব, প্রশিক্ষণের অগ্রগতি না হারানোর জন্য নির্দিষ্ট টুলিংয়ের সুপারিশ করা হয়:
Model.fit
এর জন্য, আপনার একটিBackupAndRestore
কলব্যাক ব্যবহার করা উচিত, যা স্বয়ংক্রিয়ভাবে অগ্রগতি সংরক্ষণ এবং পুনরুদ্ধার পরিচালনা করে। একটি উদাহরণের জন্য উপরে কলব্যাক এবং প্রশিক্ষণ বিভাগ দেখুন।একটি কাস্টম ট্রেনিং লুপের জন্য, আপনাকে পর্যায়ক্রমে মডেল ভেরিয়েবল চেকপয়েন্ট করা উচিত এবং ট্রেনিং শুরু হওয়ার আগে একটি চেকপয়েন্ট থেকে মডেল ভেরিয়েবল লোড করা উচিত। প্রশিক্ষণের অগ্রগতি প্রায়
optimizer.iterations
থেকে অনুমান করা যেতে পারে যদি একটি অপ্টিমাইজার চেকপয়েন্ট করা হয়:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
একটি RemoteValue
হচ্ছে
একটি ফাংশন সফলভাবে সম্পাদিত হলে একটি RemoteValue
আনয়ন নিশ্চিত করা হয়। কারণ বর্তমানে একটি ফাংশন কার্যকর করার পর রিটার্ন মানটি সমন্বয়কারীর কাছে অবিলম্বে অনুলিপি করা হয়। অনুলিপি চলাকালীন কোন কর্মী ব্যর্থ হলে, ফাংশনটি অন্য উপলব্ধ কর্মীর উপর পুনরায় চেষ্টা করা হবে। অতএব, আপনি যদি পারফরম্যান্সের জন্য অপ্টিমাইজ করতে চান তবে আপনি রিটার্ন মান ছাড়াই ফাংশনগুলি নির্ধারণ করতে পারেন।
ভূল প্রতিবেদন
একবার সমন্বয়কারী প্যারামিটার সার্ভার থেকে UnavailableError
বা InvalidArgument
থেকে একটি InvalidArgument এর মতো অন্যান্য অ্যাপ্লিকেশন ত্রুটির মতো একটি ত্রুটি দেখতে tf.debugging.check_numerics
, ত্রুটিটি উত্থাপন করার আগে এটি সমস্ত মুলতুবি এবং সারিবদ্ধ ফাংশন বাতিল করবে। তাদের সংশ্লিষ্ট RemoteValue
গুলি আনার ফলে একটি CancelledError
ত্রুটি দেখা দেবে।
একটি ত্রুটি উত্থাপিত হওয়ার পরে, সমন্বয়কারী একই ত্রুটি বা বাতিল ফাংশন থেকে কোনো ত্রুটি উত্থাপন করবে না।
কর্মক্ষমতা বৃদ্ধি
আপনি ParameterServerStrategy
এবং ClusterResolver
সাথে প্রশিক্ষণের সময় কর্মক্ষমতা সংক্রান্ত সমস্যাগুলি দেখতে পেলে বেশ কয়েকটি সম্ভাব্য কারণ রয়েছে।
একটি সাধারণ কারণ হল প্যারামিটার সার্ভারের ভারসাম্যহীন লোড রয়েছে এবং কিছু ভারী-লোড করা প্যারামিটার সার্ভার ক্ষমতায় পৌঁছেছে। এছাড়াও একাধিক মূল কারণ থাকতে পারে। এই সমস্যা প্রশমিত করার কিছু সহজ পদ্ধতি হল:
- একটি
ParameterServerStrategy
তৈরি করার সময় একটিvariable_partitioner
নির্দিষ্ট করার মাধ্যমে আপনার বড় মডেলের ভেরিয়েবলগুলিকে ভাগ করুন। - একটি হটস্পট ভেরিয়েবল তৈরি করা এড়িয়ে চলুন যা সম্ভব হলে একটি একক ধাপে সমস্ত প্যারামিটার সার্ভারের জন্য প্রয়োজনীয়। উদাহরণস্বরূপ, অপ্টিমাইজারগুলিতে একটি ধ্রুবক শেখার হার বা সাবক্লাস
tf.keras.optimizers.schedules.LearningRateSchedule
ব্যবহার করুন যেহেতু ডিফল্ট আচরণ হল যে শেখার হার একটি নির্দিষ্ট প্যারামিটার সার্ভারে রাখা একটি পরিবর্তনশীল হয়ে যাবে এবং প্রতিটি ধাপে অন্য সমস্ত প্যারামিটার সার্ভার দ্বারা অনুরোধ করা হবে। . - কেরাস প্রিপ্রসেসিং লেয়ারে পাঠানোর আগে আপনার বড় শব্দভান্ডার এলোমেলো করুন।
পারফরম্যান্স সমস্যার আরেকটি সম্ভাব্য কারণ হল সমন্বয়কারী। আপনার schedule
/ join
প্রথম বাস্তবায়ন পাইথন-ভিত্তিক এবং এইভাবে থ্রেডিং ওভারহেড থাকতে পারে। এছাড়াও সমন্বয়কারী এবং কর্মীদের মধ্যে লেটেন্সি বড় হতে পারে। এই যদি হয় তাহলে,
Model.fit
এর জন্য, আপনিModel.compile
এ প্রদত্তsteps_per_execution
আর্গুমেন্ট 1-এর চেয়ে বড় মান সেট করতে পারেন।একটি কাস্টম প্রশিক্ষণ লুপের জন্য, আপনি একটি একক
tf.function
এ একাধিক ধাপ প্যাক করতে পারেন:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
যেহেতু লাইব্রেরিটি আরও অপ্টিমাইজ করা হয়েছে, আশা করি বেশিরভাগ ব্যবহারকারীকে ভবিষ্যতে ম্যানুয়ালি পদক্ষেপগুলি প্যাক করতে হবে না৷
উপরন্তু, কর্মক্ষমতা উন্নতির জন্য একটি ছোট কৌশল হল রিটার্ন মান ছাড়াই ফাংশনের সময় নির্ধারণ করা, যেমনটি উপরে হ্যান্ডলিং টাস্ক ব্যর্থতা বিভাগে ব্যাখ্যা করা হয়েছে।
পরিচিত সীমাবদ্ধতা
বেশিরভাগ পরিচিত সীমাবদ্ধতাগুলি ইতিমধ্যেই উপরের বিভাগে কভার করা হয়েছে। এই বিভাগে একটি সারসংক্ষেপ প্রদান করে.
ParameterServerStrategy
সাধারণ
-
os.environment["grpc_fail_fast"]="use_caller"
সমন্বয়কারী সহ প্রতিটি কাজের জন্য প্রয়োজন, যাতে ত্রুটি সহনশীলতা সঠিকভাবে কাজ করে। - সিঙ্ক্রোনাস প্যারামিটার সার্ভার প্রশিক্ষণ সমর্থিত নয়।
- সর্বোত্তম কর্মক্ষমতা অর্জনের জন্য সাধারণত একটি একক ফাংশনে একাধিক ধাপ প্যাক করা প্রয়োজন।
-
tf.saved_model.load
এর মাধ্যমে শার্ড ভেরিয়েবল সহ একটি saved_model লোড করা সমর্থিত নয়। দ্রষ্টব্য TensorFlow সার্ভিং ব্যবহার করে এই ধরনের একটি সংরক্ষিত_মডেল লোড করা কাজ করবে বলে আশা করা হচ্ছে। - শার্ডেড অপ্টিমাইজার স্লট ভেরিয়েবল সহ একটি ভিন্ন সংখ্যক শার্ডে চেকপয়েন্ট লোড করা সমর্থিত নয়।
- সমন্বয়কারীর টাস্ক রিস্টার্ট না করে প্যারামিটার সার্ভারের ব্যর্থতা থেকে পুনরুদ্ধার করা সমর্থিত নয়।
-
tf.lookup.StaticHashTable
এর ব্যবহার (যা সাধারণত কিছু কেরাস প্রিপ্রসেসিং লেয়ার দ্বারা নিযুক্ত করা হয়, যেমনtf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
, এবংtf.keras.layers.TextVectorization
) রিসোর্সে স্থাপন করা হয় প্যারামিটার সার্ভার প্রশিক্ষণের সাথে এই সময়ে সমন্বয়কারী। কর্মীদের থেকে সমন্বয়কারী পর্যন্ত RPC-এর সন্ধানের জন্য এর কার্যক্ষমতার প্রভাব রয়েছে। এই ঠিকানা একটি বর্তমান উচ্চ অগ্রাধিকার.
Model.fit
সুনির্দিষ্ট
-
steps_per_epoch
এModel.fit
আর্গুমেন্ট প্রয়োজন। আপনি একটি মান নির্বাচন করতে পারেন যা একটি যুগে উপযুক্ত ব্যবধান প্রদান করে। - পারফরম্যান্সের কারণে ব্যাচ-লেভেল কল আছে এমন কাস্টম কলব্যাকগুলির জন্য
ParameterServerStrategy
সার্ভার স্ট্র্যাটেজির সমর্থন নেই৷ আপনার সেই কলগুলিকে উপযুক্তভাবে বাছাইsteps_per_epoch
সহ যুগ-স্তরের কলে রূপান্তর করা উচিত, যাতে সেগুলিকে প্রতিটিsteps_per_epoch
নম্বর বলা হয়। অন্তর্নির্মিত কলব্যাকগুলি প্রভাবিত হয় না: তাদের ব্যাচ-স্তরের কলগুলি কার্যকরী হওয়ার জন্য সংশোধন করা হয়েছে৷ParameterServerStrategy
জন্য সমর্থনকারী ব্যাচ-স্তরের কলের পরিকল্পনা করা হচ্ছে। - একই কারণে, অন্যান্য কৌশলগুলির বিপরীতে, অগ্রগতি বার এবং মেট্রিক্স শুধুমাত্র যুগের সীমানায় লগ করা হয়।
-
run_eagerly
সমর্থিত নয়।
কাস্টম প্রশিক্ষণ লুপ সুনির্দিষ্ট
-
ClusterCoordinator.schedule
একটি ডেটাসেটের জন্য পরিদর্শনের গ্যারান্টি সমর্থন করে না। - যখন
ClusterCoordinator.create_per_worker_dataset
ব্যবহার করা হয়, তখন পুরো ডেটাসেটটিকে এটিতে পাস করা ফাংশনের ভিতরে তৈরি করতে হবে। -
tf.data.Options
দ্বারা তৈরি একটি ডেটাসেটেClusterCoordinator.create_per_worker_dataset
উপেক্ষা করা হয়।