/
op_kernel.h
1728 lines (1452 loc) · 68.6 KB
/
op_kernel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_CORE_FRAMEWORK_OP_KERNEL_H_
#define TENSORFLOW_CORE_FRAMEWORK_OP_KERNEL_H_
#include <functional>
#include <unordered_set>
#include <utility>
#include <vector>
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/framework/cancellation.h"
#include "tensorflow/core/framework/control_flow.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/framework/graph.pb.h"
#include "tensorflow/core/framework/kernel_def.pb.h"
#include "tensorflow/core/framework/kernel_def_builder.h"
#include "tensorflow/core/framework/node_def.pb.h"
#include "tensorflow/core/framework/node_def_util.h"
#include "tensorflow/core/framework/node_properties.h"
#include "tensorflow/core/framework/op.h" // TODO(b/62899350): Remove
#include "tensorflow/core/framework/op_requires.h"
#include "tensorflow/core/framework/registration/registration.h"
#include "tensorflow/core/framework/rendezvous.h"
#include "tensorflow/core/framework/session_state.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "tensorflow/core/framework/tensor_shape.pb.h" // TODO(b/62899350): Remove
#include "tensorflow/core/framework/tracking_allocator.h"
#include "tensorflow/core/framework/types.h"
#include "tensorflow/core/framework/types.pb.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/gtl/array_slice.h"
#include "tensorflow/core/lib/gtl/manual_constructor.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/profile_utils/cpu_utils.h"
#include "tensorflow/core/platform/thread_annotations.h"
#include "tensorflow/core/platform/types.h"
#include "tensorflow/core/protobuf/config.pb.h"
#include "tensorflow/core/util/managed_stack_trace.h"
// Used to match ops to kernel sources (and eventually to kernel targets)
#ifdef TF_LOG_KERNEL_SOURCES
#define LOG_KERNEL_SOURCES(name) \
LOG(INFO) << "Kernel found: " << name << " " << __FILE__ << "\n";
#else
#define LOG_KERNEL_SOURCES(name)
#endif
namespace Eigen {
struct ThreadPoolDevice;
struct GpuDevice;
} // end namespace Eigen
namespace tsl {
class CoordinationServiceAgent;
}
namespace tensorflow {
namespace checkpoint {
class TensorSliceReaderCacheWrapper;
} // namespace checkpoint
class AsyncOpKernel;
class CallFrameInterface;
class DeviceMgr;
class FunctionLibraryRuntime;
class OpKernelConstruction; // declared below
class OpKernelContext; // declared below,
class OpRegistryInterface;
class ResourceMgr;
class ScopedStepContainer;
class CollectiveExecutor;
class StepStatsCollectorInterface;
// A label that is added to kernels that are JIT compiled. These labels will be
// removed before kernels are looked up, so they can be used without specifying
// the label. This label is a temporary measure to allow JIT kernels to be
// disabled if needed.
extern const char* kJitKernelLabel;
extern const char* kDisableJitKernelsEnvVar;
class OpKernel {
public:
// OpKernel won't be instantiated by the scheduler, so you may perform
// expensive initialization in the descendant's constructor.
explicit OpKernel(OpKernelConstruction* context);
// Specialized constructor that allows a kernel implementation to mark itself
// as a "deferred" op. If true, the executor will provide access to the
// `OpKernelContext::inc_num_deferred_ops_function()` and
// `OpKernelContext::dec_num_deferred_ops_function()` methods at run-time.
OpKernel(OpKernelConstruction* context, bool is_deferred);
// Specialized constructor that enables the descendant to provide a custom
// `NodeDef` value. For example, this constructor can be used to provide a
// stripped-down `NodeDef` that does not contain the full set of attrs (such
// as tensor values) if the descendant stores them in a different form.
OpKernel(OpKernelConstruction* context, NodeDef&& custom_def,
bool is_deferred);
virtual ~OpKernel();
// An OpKernel's computation can be either synchronous or
// asynchronous. All OpKernel Compute() methods must be thread-safe as they
// may be called concurrently (e.g. by multiple executions of the same graph
// concurrently).
//
// Most OpKernels should compute synchronously. They should
// subclass OpKernel and override the Compute() method and have it
// return after completing the supplied work.
//
// A synchronous OpKernel *MUST NOT* block the calling thread on a
// synchronization mechanism (condition variable, Notification, etc.) that
// will be unblocked by the execution of another OpKernel. Execution may
// deadlock in that case, because the executor may use a bounded number of
// threads.
//
// If an OpKernel must block on the execution of another OpKernel (e.g. a
// RecvOp, or a DequeueOp), the implementation *MUST* subclass AsyncOpKernel,
// and override `AsyncOpKernel::ComputeAsync()`. In addition, because the
// unblocking kernel may never run (due to an error or cancellation), in most
// cases the AsyncOpKernel should implement cancellation support via
// `ctx->cancellation_manager()`.
//
// In both cases, implementations of Compute() and ComputeAsync()
// get inputs and write outputs through the given OpKernelContext
// and returns a status via context->SetStatus(). They must be
// thread-safe.
// Synchronous compute.
//
// "context" is guaranteed to be alive until Compute() returns.
virtual void Compute(OpKernelContext* context) = 0;
// Returns nullptr iff this op kernel is synchronous.
virtual AsyncOpKernel* AsAsync() { return nullptr; }
// Returns true iff this op kernel is considered "expensive". The
// runtime may use this flag to optimize graph execution for example
// to "inline" inexpensive kernels.
virtual bool IsExpensive() { return expensive_; }
// Returns a pointer to the tensor stored inside constant ops.
virtual const Tensor* const_tensor() const { return nullptr; }
// Accessors.
const NodeDef& def() const { return props_->node_def; }
const std::string& name() const { return props_->node_def.name(); }
absl::string_view name_view() const { return name_view_; }
const std::string& type_string() const { return props_->node_def.op(); }
absl::string_view type_string_view() const { return type_string_view_; }
const std::string& requested_input(int i) const {
return props_->node_def.input(i);
}
const std::string& requested_device() const {
return props_->node_def.device();
}
int num_inputs() const { return props_->input_types.size(); }
DataType input_type(int i) const { return props_->input_types[i]; }
const DataTypeVector& input_types() const { return props_->input_types; }
const MemoryTypeVector& input_memory_types() const {
return input_memory_types_;
}
int num_outputs() const { return props_->output_types.size(); }
DataType output_type(int o) const { return props_->output_types[o]; }
const DataTypeVector& output_types() const { return props_->output_types; }
const MemoryTypeVector& output_memory_types() const {
return output_memory_types_;
}
Status InputRange(StringPiece input_name, int* start, int* stop) const;
Status OutputRange(StringPiece output_name, int* start, int* stop) const;
// Returns `true` if and only if this kernel uses deferred execution.
bool is_deferred() const { return is_deferred_; }
// Returns a trace string for current computation, op name/type and input
// tensor shape/dtype are encoded for profiler cost analysis. Most OpKernel
// should use the default implementation.
virtual std::string TraceString(const OpKernelContext& ctx,
bool verbose) const;
protected:
std::string ShapeTraceString(const OpKernelContext& ctx) const;
private:
const std::shared_ptr<const NodeProperties> props_;
const MemoryTypeVector input_memory_types_;
const MemoryTypeVector output_memory_types_;
NameRangeMap input_name_map_;
NameRangeMap output_name_map_;
const absl::string_view name_view_;
const absl::string_view type_string_view_;
const int graph_def_version_;
const bool is_deferred_;
bool expensive_;
OpKernel(const OpKernel&) = delete;
void operator=(const OpKernel&) = delete;
};
class AsyncOpKernel : public OpKernel {
public:
using OpKernel::OpKernel; // Lift OpKernel constructors.
// Asynchronous compute.
//
// Implementations of ComputeAsync() must ensure that `done` is (eventually)
// called exactly once to signal the completion of the computation. The
// implementation of ComputeAsync() must not block on the execution of another
// OpKernel. `done` may be called by the current thread, or by another thread.
// `context` is guaranteed to stay alive until the `done` callback starts.
//
// Since it is possible that the unblocking kernel may never run (due to an
// error or cancellation), in most cases the AsyncOpKernel should implement
// cancellation support via `context->cancellation_manager()`.
//
// WARNING: As soon as the `done` callback starts, `context` and `this` may be
// deleted. No code depending on these objects should execute after the call
// to `done`.
typedef std::function<void()> DoneCallback;
virtual void ComputeAsync(OpKernelContext* context, DoneCallback done) = 0;
AsyncOpKernel* AsAsync() override { return this; }
void Compute(OpKernelContext* context) override;
};
class OpKernelConstruction {
public:
OpKernelConstruction(DeviceType device_type, DeviceBase* device,
Allocator* allocator, FunctionLibraryRuntime* flib,
ResourceMgr* resource_mgr,
const std::shared_ptr<const NodeProperties>& props,
const MemoryTypeSlice& input_memory_types,
const MemoryTypeSlice& output_memory_types,
int graph_def_version, Status* status);
Env* env() const { return device_->env(); }
// Allocation of tensors during kernel construction:
//
// It is legal to temporarily allocate scratch tensor storage during
// Op kernel construction. Scratch tensors should be allocated using
// allocate_temp below. Some kernels need to keep tensors in between
// invocations. If such a Tensor is allocated during kernel
// construction this also must be done using allocate_temp, and the
// Op may only store the returned Tensor object.
// Allocates a temporary Tensor of the specified type and shape. The
// Tensor must not be used after kernel construction is
// complete. See comment above.
Status allocate_temp(DataType type, const TensorShape& shape,
Tensor* out_temp);
Status allocate_temp(DataType type, const TensorShape& shape,
Tensor* out_temp, AllocatorAttributes allocator_attr);
// User-supplied configuration of this operation.
const NodeDef& def() const { return props_->node_def; }
// For inspecting the inputs to this operation.
int num_inputs() const { return props_->input_types.size(); }
DataType input_type(int i) const { return props_->input_types[i]; }
const DataTypeSlice& input_types() const { return props_->input_types_slice; }
const MemoryTypeSlice& input_memory_types() const {
return input_memory_types_;
}
// For inspecting the outputs expected from this operation.
int num_outputs() const { return props_->output_types.size(); }
DataType output_type(int i) const { return props_->output_types[i]; }
const DataTypeSlice& output_types() const {
return props_->output_types_slice;
}
const MemoryTypeSlice& output_memory_types() const {
return output_memory_types_;
}
// If expected_inputs == inputs() and expected_outputs == output_types(),
// returns OK, else returns INVALID_ARGUMENT with an error message.
// Recommended for Ops with dynamic signatures.
Status MatchSignature(const DataTypeSlice expected_inputs,
const DataTypeSlice expected_outputs);
// For recording configuration errors during construction.
void SetStatus(const Status& status);
const Status& status() const { return *status_; }
// Look up the attr with name attr_name and set *value to its value. If no
// attr with attr_name is found in def(), or the attr does not have
// a matching type, a non-ok status will be returned.
template <class T>
Status GetAttr(StringPiece attr_name, T* value) const TF_ATTRIBUTE_NOINLINE;
// Return true if the attr_name is defined in def().
bool HasAttr(StringPiece attr_name) const;
// Return the device type.
const DeviceType& device_type() const { return device_type_; }
// If not nullptr, the kernel can instantiate functions defined in
// the library. E.g.,
// CHECK_NOTNULL(function_library())->Instantiate("Foo", ...).
FunctionLibraryRuntime* function_library() const { return flib_; }
// Shared resources accessible to this kernel.
ResourceMgr* resource_manager() const { return resource_mgr_; }
// The GraphDef version whose behavior we should follow.
int graph_def_version() const { return graph_def_version_; }
// Helper routines for the OP_REQUIRES macros
void CtxFailure(const Status& s);
void CtxFailureWithWarning(const Status& s);
void CtxFailure(const char* file, int line, const Status& s);
void CtxFailureWithWarning(const char* file, int line, const Status& s);
// Unrecommended functions: these are functions that have some
// current uses but are not recommended for use, and may go away at
// some future major version release.
// May be used, e.g., to get GPU handles, etc.
//
// Currently only used to call MakeTensorFromProto() for
// implementing ConstantOp for every device. See comments
// on Device::MakeTensorFromProto for longer-term replacement
// ideas.
DeviceBase* device() const { return device_; }
private:
const DeviceType device_type_;
DeviceBase* const device_;
Allocator* allocator_;
FunctionLibraryRuntime* flib_;
ResourceMgr* const resource_mgr_;
std::shared_ptr<const NodeProperties> props_;
MemoryTypeSlice input_memory_types_;
MemoryTypeSlice output_memory_types_;
const int graph_def_version_;
Status* status_;
// Allow access from OpKernel ctor.
friend class OpKernel;
OpKernelConstruction(const OpKernelConstruction&) = delete;
void operator=(const OpKernelConstruction&) = delete;
};
// TODO(mrry): Consider converting to a random_access_iterator, and upgrading
// tensorflow::gtl::iterator_range to make the below container classes
// unnecessary.
template <typename ListType, typename ElementType>
class OpArgIterator {
public:
using iterator_category = std::forward_iterator_tag;
using value_type = ElementType;
using pointer = ElementType*;
using const_pointer = const ElementType*;
using reference = ElementType&;
using const_reference = const ElementType&;
using difference_type = ptrdiff_t;
OpArgIterator(const ListType* list, int i) : list_(list), i_(i) {}
bool operator==(const OpArgIterator& rhs) {
DCHECK(list_ == rhs.list_);
return i_ == rhs.i_;
}
bool operator!=(const OpArgIterator& rhs) {
DCHECK(list_ == rhs.list_);
return i_ != rhs.i_;
}
OpArgIterator operator++() { // prefix ++it
++i_;
return *this;
}
OpArgIterator operator++(int) { // postfix it++
OpArgIterator old_value = *this;
++i_;
return old_value;
}
reference operator*() { return (*list_)[i_]; }
pointer operator->() { return &(*list_)[i_]; }
const_reference operator*() const { return (*list_)[i_]; }
const_pointer operator->() const { return &(*list_)[i_]; }
private:
const ListType* const list_;
int i_;
};
// Utility class for representing a list of immutable input tensors
// that are passed to the op as a single named argument.
class OpInputList {
public:
typedef OpArgIterator<OpInputList, const Tensor> Iterator;
OpInputList() : ctx_(nullptr), start_(0), stop_(0) {}
OpInputList(OpKernelContext* ctx, int start, int stop)
: ctx_(ctx), start_(start), stop_(stop) {}
OpInputList& operator=(const OpInputList& other) = default;
const Tensor& operator[](int i) const;
int size() const { return stop_ - start_; }
Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, size()); }
private:
OpKernelContext* ctx_; // not owned
int start_;
int stop_;
};
// Utility class for representing a list of mutable ("ref") input tensors
// that are passed to the op as a single named argument.
class OpMutableInputList {
public:
typedef OpArgIterator<OpMutableInputList, Tensor*> Iterator;
OpMutableInputList(OpKernelContext* ctx, int start, int stop)
: ctx_(ctx), start_(start), stop_(stop) {}
OpMutableInputList() : ctx_(nullptr), start_(0), stop_(0) {}
OpMutableInputList& operator=(const OpMutableInputList& other) = default;
Tensor at(int i, bool lock_held);
mutex* ref_mutex(int i);
int size() const { return stop_ - start_; }
Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, size()); }
private:
OpKernelContext* ctx_; // not owned
int start_;
int stop_;
};
// Utility class for representing a list of output tensors that are
// grouped as a single named output.
class OpOutputList {
public:
typedef OpArgIterator<OpOutputList, const Tensor*> Iterator;
OpOutputList() : ctx_(nullptr), start_(0), stop_(0) {}
OpOutputList(OpKernelContext* ctx, int start, int stop)
: ctx_(ctx), start_(start), stop_(stop) {}
OpOutputList& operator=(const OpOutputList& other) = default;
Tensor* operator[](int i);
bool required(int i) const;
DataType expected_output_dtype(int i) const;
Status allocate(int i, const TensorShape& shape, Tensor** output);
void set(int i, const Tensor& tensor);
void set(int i, Tensor&& tensor);
void set_ref(int i, mutex* mu, Tensor* tensor_for_ref);
int size() const { return stop_ - start_; }
Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, size()); }
private:
OpKernelContext* ctx_; // not owned
int start_;
int stop_;
};
// Holds a tensor or tensor reference. For tensor references, we need
// a mutex to prevent concurrent access to the tensor.
struct TensorValue {
TensorValue() : mutex_if_ref(nullptr), tensor(nullptr) {}
explicit TensorValue(Tensor* t) : mutex_if_ref(nullptr), tensor(t) {}
TensorValue(mutex* mu, Tensor* t) : mutex_if_ref(mu), tensor(t) {}
Tensor* operator->() const { return tensor; }
bool is_ref() const { return mutex_if_ref != nullptr; }
// Return the dtype of the Tensor. For references, return the underlying type.
DataType dtype() const {
if (is_ref()) {
return MakeRefType(tensor->dtype());
} else {
return tensor->dtype();
}
}
// Return the dtype of the Tensor. For references, return the underlying type.
// This variation on the dtype() acquires the lock for references.
//
// TODO(b/133843385): Disallow dtype modifications
DataType dtype_safe() const {
if (is_ref()) {
tf_shared_lock ml(*mutex_if_ref);
return MakeRefType(tensor->dtype());
} else {
return tensor->dtype();
}
}
mutex* mutex_if_ref; // nullptr if not a ref, != nullptr if a ref
Tensor* tensor;
};
// Used to store partitioned graphs from function-calling ops.
struct GraphCollector {
mutex mu;
std::vector<GraphDef> partitioned_graphs TF_GUARDED_BY(mu);
GraphDef raw_graph TF_GUARDED_BY(mu);
GraphDef optimized_graph TF_GUARDED_BY(mu);
bool dirty TF_GUARDED_BY(mu);
GraphCollector() : dirty(false) {}
void CollectRawGraph(const GraphDef& graph) {
mutex_lock ml(mu);
raw_graph.MergeFrom(graph);
dirty = true;
}
void CollectOptimizedGraph(const GraphDef& graph) {
mutex_lock ml(mu);
optimized_graph.MergeFrom(graph);
dirty = true;
}
void CollectPartitionedGraph(const GraphDef& graph) {
mutex_lock ml(mu);
partitioned_graphs.push_back(graph);
dirty = true;
}
void ClearGraphs() TF_EXCLUSIVE_LOCKS_REQUIRED(mu) {
raw_graph.Clear();
optimized_graph.Clear();
partitioned_graphs.clear();
dirty = false;
}
bool HasUpdatedGraphs() {
mutex_lock ml(mu);
return dirty;
}
};
class OpKernelContext {
public:
// The first element of a WrappedAllocator is a "base" Allocator and
// the second element is that Allocator wrapped by a
// TrackingAllocator
typedef std::pair<Allocator*, TrackingAllocator*> WrappedAllocator;
// TODO(zhifengc): Do some cleanup of Params.
// The Params struct is passed in to initialize an OpKernelContext,
// and must outlive the OpKernelContext.
struct Params {
~Params() { delete eigen_gpu_device; }
// The step being executed.
int64_t step_id = 0;
// Timestamp for the start of graph execution. Used for latency metrics.
int64_t start_time_usecs = 0;
// The deadline for the session to complete by. Empty if unspecified.
absl::optional<absl::Time> deadline;
// The op kernel being computed.
OpKernel* op_kernel = nullptr;
// The device on which the kernel is running.
DeviceBase* device = nullptr;
// The Eigen GPU device wrapper, which may include a per-op
// wrapped allocator. The concrete type of this object depends on
// the type of this->device, so eigen_gpu_device can't be an
// inline member and must be heap allocated. However, we don't
// want to allocate a new eigen_gpu_device for every Op that is
// executed. Instead this member is allocated on first use using
// ensure_eigen_gpu_device, and then if the Params structure is
// re-used for subsequent Ops, the eigen_gpu_device is
// ReInitialized in the OpKernelContext constructor. Unlike the
// other pointers in Params, this one is owned by Params.
PerOpGpuDevice* eigen_gpu_device = nullptr;
inline void ensure_eigen_gpu_device() {
DCHECK(device);
if (nullptr == eigen_gpu_device) {
// Surprisingly, MakeGpuDevice will return nullptr if the
// device is not a GPU device. This is ok, since those devices
// will never use eigen_gpu_device. It seems better to have
// ensure_eigen_gpu_device fall through and regenerate the
// nullptr every time an OpKernelContext is instantiated, than
// to do an unnecessary allocation of a dummy eigen GPU
// device for CPU device Ops.
eigen_gpu_device = device->MakeGpuDevice();
}
}
bool track_allocations = false;
bool log_memory = false;
// Array indexed by output number for this node
const AllocatorAttributes* output_attr_array = nullptr;
// Shared resources accessible by this op kernel invocation.
ResourceMgr* resource_manager = nullptr;
// Per-step resources accessible by this op kernel invocation should be
// stored in this container..
ScopedStepContainer* step_container = nullptr;
// Mechanism used by this op kernel invocation to communicate with
// computations running on other devices.
RendezvousInterface* rendezvous = nullptr;
// Mechanism for executing a collective op that needs to coordinate
// with parallel instances running on other devices.
CollectiveExecutor* collective_executor = nullptr;
// Session configuration parameters. Can be nullptr.
const ConfigProto* session_config = nullptr;
// The session state for this op.
SessionState* session_state = nullptr;
// Unique session identifier. Can be empty.
std::string session_handle;
// Metadata about the session. Can be nullptr.
const SessionMetadata* session_metadata = nullptr;
// The tensor store for this op.
TensorStore* tensor_store = nullptr;
// Mechanism used by this op kernel invocation to register a callback
// for its cancellation.
CancellationManager* cancellation_manager = nullptr;
// Inputs to this op kernel.
absl::Span<const TensorValue> inputs;
bool is_input_dead = false;
absl::Span<const AllocatorAttributes> input_alloc_attrs;
// Device context.
DeviceContext* op_device_context = nullptr;
// Control-flow op supports.
FrameAndIter frame_iter;
// Function call supports.
CallFrameInterface* call_frame = nullptr;
FunctionLibraryRuntime* function_library = nullptr;
std::function<void(std::function<void()>)>* runner = nullptr;
StepStatsCollectorInterface* stats_collector = nullptr;
GraphCollector* graph_collector = nullptr;
bool run_all_kernels_inline = false;
const std::string* executor_type = nullptr;
// TensorSliceReaderCache support.
checkpoint::TensorSliceReaderCacheWrapper* slice_reader_cache = nullptr;
// Support for forwarding reservations (used by ScopedAllocator).
static constexpr int kNeverForward = -2;
static constexpr int kNoReservation = -1;
// Values in [0,...) represent reservations for the indexed output.
const int* forward_from_array = nullptr;
// For tracking actively running deferred ops.
std::function<void()> inc_num_deferred_ops_function;
std::function<void()> dec_num_deferred_ops_function;
absl::optional<ManagedStackTrace> stack_trace = {};
// For implementing `OpKernelContext::output_required()`. If null, all
// outputs are required.
bool* outputs_required_array = nullptr;
// For access to distributed coordination service.
tsl::CoordinationServiceAgent* coordination_service_agent = nullptr;
};
// params must outlive the OpKernelContext.
explicit OpKernelContext(Params* params);
OpKernelContext(Params* params, int num_outputs);
~OpKernelContext();
Env* env() const { return params_->device->env(); }
int64_t step_id() const { return params_->step_id; }
int64_t start_time_usecs() const { return params_->start_time_usecs; }
const ConfigProto* session_config() const { return params_->session_config; }
// The deadline for the session to complete by. Empty if unspecified in
// RunOptions.
absl::optional<absl::Time> deadline() const { return params_->deadline; }
const OpKernel& op_kernel() const { return *params_->op_kernel; }
// Stack trace of where the op was defined (if defined in eager mode).
const absl::optional<ManagedStackTrace>& stack_trace() const {
return params_->stack_trace;
}
// Input/output signature.
int num_inputs() const { return params_->inputs.size(); }
DataType input_dtype(int index) const;
Status input_dtype(StringPiece name, DataType* dtype) const;
MemoryType input_memory_type(int index) const;
int num_outputs() const { return outputs_.size(); }
DataType expected_output_dtype(int index) const;
MemoryType output_memory_type(int index) const;
// Input
// Returns an immutable input tensor by index. May only be used for non-Ref
// inputs. For Ref inputs use mutable_input below.
// REQUIRES: !IsRefType(input_dtype(index))
// TODO(mrry): Convert this to return Status.
const Tensor& input(int index) const;
// Returns an immutable input tensor in "tensor" by index. May only be used
// for non-Ref inputs. For Ref inputs use mutable_input below.
// REQUIRES: !IsRefType(input_dtype(index))
absl::StatusOr<const Tensor*> get_input(int index) const;
// Returns the named immutable input tensor in "tensor", as defined
// in the OpDef. May only be used for non-Ref inputs. For Ref inputs
// use mutable_input below.
// REQUIRES: !IsRefType(input_dtype(index))
// REQUIRES: the named input must not be a list.
Status input(StringPiece name, const Tensor** tensor);
// Returns the named list-valued immutable input in "list", as
// defined in the OpDef. If the named output is not list-valued,
// returns a one-element list. May only be used for non-Ref
// inputs. For Ref inputs use mutable_input below.
// REQUIRES: !IsRefType(input_dtype(index))
Status input_list(StringPiece name, OpInputList* list);
// For mutable inputs, use the following together to make sure there
// is no concurrent access to mutable_input(), e.g.:
// {
// Tensor& t = context->mutable_input(index);
// mutex_lock lock(*context->input_ref_mutex(index));
// // modify the values in t
// }
// REQUIRES: IsRefType(input_dtype(index))
Status input_ref_mutex(StringPiece name, mutex** out_mutex);
// Returns a mutable input tensor. Must be used to access Ref
// inputs. REQUIRES: IsRefType(input_dtype(index)). The caller may
// modify the values stored in the Tensor buffer, and modifications
// will be visible to other Ops reading the same ref tensor. If
// !lock_held the input mutex will be acquired before returning the
// Tensor.
// TODO(mrry): Convert this to return Status.
Tensor mutable_input(int index, bool lock_held);
// Returns the named mutable input tensor in "tensor", as defined in
// the OpDef. Must be used to access Ref inputs. The values stored
// in the Tensor buffer may be modified, and modifications will be
// visible to other Ops reading the same ref tensor. If !lock_held
// the input mutex will be acquired before returning the Tensor.
// REQUIRES: the named input must not be a list.
// REQUIRES: the named input must be a ref tensor.
Status mutable_input(StringPiece name, Tensor* tensor, bool lock_held);
// Returns the named list-valued mutable input in "list", as defined
// in the OpDef. If the named input is not list-valued, returns a
// one-element list. Must be used to access Ref inputs. The values
// stored in the Tensor buffer may be modified, and modifications
// will be visible to other Ops reading the same ref tensor.
// REQUIRES: the named input must be a ref tensor.
Status mutable_input_list(StringPiece name, OpMutableInputList* list);
// Replace the corresponding Ref Input to use the storage buffer
// used by tensor. If !lock_held the input mutex will be acquired
// before returning the Tensor.
// REQUIRES: IsRefType(input_dtype(index)).
void replace_ref_input(int index, const Tensor& tensor, bool lock_held);
// Replace the corresponding named Ref Input to use the storage
// buffer used by tensor. If !lock_held the input mutex will be
// acquired before returning the Tensor.
// REQUIRES: IsRefType(input_dtype(index)).
Status replace_ref_input(StringPiece name, const Tensor& tensor,
bool lock_held);
// Deletes the Tensor object used as the Ref Input at
// input_index. This is not usually necessary and should be used
// with caution. If !lock_held the input mutex will be acquired
// before returning the Tensor.
// REQUIRES: IsRefType(input_dtype(input_index)).
void delete_ref_input(int input_index, bool lock_held);
// Return true if there is input at the given index. An operator has no
// input at index if its tensor is null. This is primarily used by the
// merge operator.
// TODO(mrry): Convert this to return Status.
bool has_input(int index) const;
// Returns true if all inputs are the same shape, otherwise sets the
// status to a non-OK value and returns false.
// Usage: if (!context->ValidateInputsAreSameShape(this)) return;
bool ValidateInputsAreSameShape(OpKernel* op);
// If non-null, kernels should populate with any partition subgraphs created.
GraphCollector* graph_collector() { return params_->graph_collector; }
// If True, hint that all kernels in functions called by this kernel, should
// be treated as "inexpensive", and hence executed on the scheduling thread.
bool run_all_kernels_inline() const {
return params_->run_all_kernels_inline;
}
// Returns the registered name for the executor type that is executing the
// current kernel. If empty, the default executor is used.
const std::string& executor_type() const;
// Input to output forwarding.
// Set the output Ref Tensor at output_index to be an alias of the
// input Ref Tensor at input_index.
// REQUIRES: IsRefType(input_dtype(input_index)).
// REQUIRES: IsRefType(output_dtype(output_index)).
void forward_ref_input_to_ref_output(int input_index, int output_index);
// Returns true when an alias to input[input_index], reshaped to output_shape,
// which is safe to use for in-place computation was written to *output.
// Returns false if input[input_index] has a refcount greater than one, or if
// its type does not match the expected output type of output[output_index],
// or the number of elements in input[input_index] does not equal the number
// of elements in output_shape.
bool forward_input_to_output_with_shape(int input_index, int output_index,
const TensorShape& output_shape,
Tensor** output) TF_MUST_USE_RESULT;
Status forward_input_to_output_with_shape(StringPiece input_name,
StringPiece output_name,
const TensorShape& output_shape,
Tensor** output) TF_MUST_USE_RESULT;
// Returns a pointer to a Tensor aliasing the underlying buffer backing
// input[input_index] iff
// * input[input_index] is not a ref,
// * the data type, shape, memory type, and allocator attributes of
// input[input_index] are compatible with those given in dtype, shape,
// memory_type, and attr,
// * refcount on the underlying buffer is one.
// * Either there is no forwarding reservation for either input_index
// or output_index or the specified input is reserved for the specified
// output. More precisely:
//
// These cases mean neither input nor output has a reservation:
// forward_from_array = nullptr
// OR (input_index is not in forward_from_array AND
// (output_index == kNoReservation OR
// forward_from_array[output_index] == kNoReservation))
//
// This case means that input_index is reserved for output_index:
// forward_from_array[output_index] == input_index
//
// This case means the output is reserved to always be allocated,
// never assigned a forwarded input:
// forward_from_array[output_index] == kNeverForward
//
// Otherwise returns nullptr.
// NOTE: For Cuda kernels that read inputs using the __ldg() intrinsic,
// forwarding is only safe if there are no reads via __ldg() after writes
// to the same address.
std::unique_ptr<Tensor> forward_input(
int input_index, int output_index, DataType output_dtype,
const TensorShape& output_shape, MemoryType output_memory_type,
const AllocatorAttributes& output_attr) TF_MUST_USE_RESULT;
// Tries to forward one of the inputs given in input_indices to
// output[output_index]. If none of the given inputs can be forwarded, calls
// allocate_output() to allocate a new output buffer. The index of the
// forwarded input will be assign to output argument forwarded_input (if it's
// not nullptr). If no inputs are forwarded, forwarded_input will be assigned
// -1.
Status forward_input_or_allocate_output(
absl::Span<const int> candidate_input_indices, int output_index,
const TensorShape& output_shape, Tensor** output,
int* forwarded_input = nullptr) TF_MUST_USE_RESULT;
Status forward_input_or_allocate_output(
absl::Span<const StringPiece> candidate_input_names,
StringPiece output_name, const TensorShape& output_shape,
Tensor** output) TF_MUST_USE_RESULT;
// Tries to reuse one of the inputs given in input_indices as a temporary.
// If none of the given inputs can be forwarded, calls
// allocate_temp() to allocate a new temporary buffer.
Status forward_input_or_allocate_temp(
absl::Span<const int> candidate_input_indices, DataType type,
const TensorShape& shape, const AllocatorAttributes& allocator_attr,
Tensor* out_temp) TF_MUST_USE_RESULT;
Status forward_input_or_allocate_temp(
absl::Span<const int> candidate_input_indices, DataType type,
const TensorShape& shape, Tensor* out_temp) TF_MUST_USE_RESULT {
return forward_input_or_allocate_temp(candidate_input_indices, type, shape,
AllocatorAttributes(), out_temp);
}
// Output
// Returns the named list-valued output in "list", as defined in the OpDef.
// If the named output is not list-valued, returns a one-element list.
Status output_list(StringPiece name, OpOutputList* list);
// If output_required(index) returns true, the OpKernel's Compute() method
// should call allocate_output(index, ...), set_output(index, ...),
// set_output_ref(index, ...), or set the status to a non-ok value.
// If it returns false, it may output, but is not required to do so.
bool output_required(int index) const {
return !params_->outputs_required_array ||
params_->outputs_required_array[index];
}
// If output_expects_forwarding returns true, the OpKernel's Compute() method
// should not allocate the output with allocate_output but instead needs to
// use forward_input.
bool output_expects_forwarding(int index) const {
return params_->forward_from_array != nullptr &&
params_->forward_from_array[index] >= 0;
}
// Allocation of tensors during kernel execution inside the Compute
// method:
//
// There are two methods to allocate Tensors when an Op kernel
// executes.
//
// 1) allocate_output. This should be used to allocate any tensor
// that is going to be used as an output from the Op at the end of
// the current execution. The caller indicates which output the
// Tensor will be assigned to, and the call returns the
// newly-allocated Tensor. The Tensor can subsequently be assigned
// to during kernel execution, and will be used as the designated
// output when the kernel execution completes.
//
// 2) allocate_temp. This should be used to allocate any scratch
// storage that is needed while the kernel is executing, and will
// not be retained by the Op.
//
// In some cases a Tensor needs to be used as an output even though
// it was previously allocated elsewhere. The Tensor may have been
// passed as an input, or stored in a Tensor during a
// previous kernel execution, or allocated earlier in the kernel
// execution at a time when it was not known which output it would
// be assigned to. In this case the kernel can use set_output or
// set_output_ref to indicate that the tensor should be used as the
// designated output. It is legal to use any previously-allocated
// Tensor as an argument to set_output or set_output_ref, including
// Tensors allocated via allocate_temp. There may be a performance
// penalty to using a Tensor that was not allocated using
// allocate_output. This is because allocate_output uses the
// AllocatorAttributes stored in output_attr_array for the
// designated output. In some cases, using the wrong attributes may
// cause an extra copy of the Tensor's buffer.
// Allocates output for the specified output index with shape.
// OpKernelContext retains ownership of the returned pointer. See
// comment above.
//
// If memory allocation fails, returns an error status.
//
// REQUIRES: !IsRefType(expected_output_dtype(index))
Status allocate_output(int index, const TensorShape& shape,
Tensor** tensor) TF_MUST_USE_RESULT;
Status allocate_output(StringPiece name, const TensorShape& shape,
Tensor** tensor) TF_MUST_USE_RESULT;
// The following methods use the supplied attributes instead of
// those in output_attr_array. The caller is responsible for
// ensuring that the attributes are "compatible" with the