xref: /aosp_15_r20/external/tensorflow/tensorflow/core/kernels/data/experimental/data_service_dataset_op.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_KERNELS_DATA_EXPERIMENTAL_DATA_SERVICE_DATASET_OP_H_
16 #define TENSORFLOW_CORE_KERNELS_DATA_EXPERIMENTAL_DATA_SERVICE_DATASET_OP_H_
17 
18 #include <memory>
19 #include <string>
20 #include <vector>
21 
22 #include "absl/strings/str_cat.h"
23 #include "tensorflow/core/data/captured_function.h"
24 #include "tensorflow/core/data/service/common.h"
25 #include "tensorflow/core/framework/dataset.h"
26 #include "tensorflow/core/framework/op_kernel.h"
27 #include "tensorflow/core/framework/resource_mgr.h"
28 #include "tensorflow/core/framework/tensor_shape.h"
29 #include "tensorflow/core/framework/types.h"
30 #include "tensorflow/core/platform/mutex.h"
31 
32 namespace tensorflow {
33 namespace data {
34 
35 // A resource which counts how many iterators have been created. This is used
36 // by the DataServiceDataset to coordinate jobs across multiple iterations.
37 class IterationCounter : public ResourceBase {
38  public:
IterationCounter()39   IterationCounter() : counter_(0) {}
40 
DebugString()41   std::string DebugString() const override {
42     mutex_lock l(mu_);
43     return absl::StrCat(counter_);
44   }
45 
GetAndIncrement()46   int64_t GetAndIncrement() {
47     mutex_lock l(mu_);
48     return ++counter_;
49   }
50 
51  private:
52   mutable mutex mu_;
53   int64_t counter_ TF_GUARDED_BY(mu_) = 0;
54 };
55 
56 // Creates a dataset for reading from the tf.data service.
57 class DataServiceDatasetOp : public DatasetOpKernel {
58  public:
59   static constexpr const char* const kDatasetType = "DataService";
60   static constexpr const char* const kDatasetId = "dataset_id";
61   static constexpr const char* const kProcessingMode = "processing_mode";
62   static constexpr const char* const kAddress = "address";
63   static constexpr const char* const kProtocol = "protocol";
64   static constexpr const char* const kDataTransferProtocol =
65       "data_transfer_protocol";
66   static constexpr const char* const kJobName = "job_name";
67   static constexpr const char* const kConsumerIndex = "consumer_index";
68   static constexpr const char* const kNumConsumers = "num_consumers";
69   static constexpr const char* const kMaxOutstandingRequests =
70       "max_outstanding_requests";
71   static constexpr const char* const kTaskRefreshIntervalHintMs =
72       "task_refresh_interval_hint_ms";
73   static constexpr const char* const kTargetWorkers = "target_workers";
74   static constexpr const char* const kIterationCounter = "iteration_counter";
75   static constexpr const char* const kOutputTypes = "output_types";
76   static constexpr const char* const kOutputShapes = "output_shapes";
77   static constexpr const char* const kUncompress = "uncompress";
78   static constexpr const char* const kUncompressFn = "uncompress_fn";
79   static constexpr const char* const kCrossTrainerCacheOptions =
80       "cross_trainer_cache_options";
81 
82   // Note: If a new constant is declared here, it *must* be defined in
83   // data_service_dataset_op.cc, otherwise it will not compile in debug mode.
84 
85   explicit DataServiceDatasetOp(OpKernelConstruction* ctx);
86 
87  protected:
88   void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override;
89 
90  private:
91   class Dataset;
92   int op_version_;
93   int64_t task_refresh_interval_hint_ms_;
94   DataTypeVector output_types_;
95   std::vector<PartialTensorShape> output_shapes_;
96   std::string data_transfer_protocol_;
97   TargetWorkers target_workers_ = TARGET_WORKERS_AUTO;
98   bool uncompress_;
99   std::shared_ptr<FunctionMetadata> uncompress_fn_ = nullptr;
100   std::string seriazlied_cross_trainer_cache_options_;
101 };
102 
103 }  // namespace data
104 }  // namespace tensorflow
105 
106 #endif  // TENSORFLOW_CORE_KERNELS_DATA_EXPERIMENTAL_DATA_SERVICE_DATASET_OP_H_
107