1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_DATA_SERVICE_TASK_REMOVER_H_ 16 #define TENSORFLOW_CORE_DATA_SERVICE_TASK_REMOVER_H_ 17 18 #include "absl/container/flat_hash_set.h" 19 #include "tensorflow/core/platform/env.h" 20 #include "tensorflow/core/platform/mutex.h" 21 22 namespace tensorflow { 23 namespace data { 24 25 // A `TaskRemover` maintains state about a single task and decides whether the 26 // task should be removed. 27 class TaskRemover { 28 public: 29 explicit TaskRemover(int64_t num_consumers); 30 31 // Attempts to remove the task. The task is removed when all consumers 32 // concurrently reach a barrier in this method. 33 // Returns true if the task is successfully removed. 34 // Returns false if either: 35 // - There is a timeout waiting for other consumers to request task removal. 36 // This timeout is hardcoded into the implementation. 37 // - Another consumer requests removal at a different round. 38 bool RequestRemoval(int64_t consumer_index, int64_t round); 39 40 private: 41 const int64_t num_consumers_; 42 mutex mu_; 43 condition_variable cv_; 44 // The round we are considering removing the task in. 45 int64_t round_ TF_GUARDED_BY(mu_); 46 bool removed_ TF_GUARDED_BY(mu_) = false; 47 // Consumers currently blocked in RequestRemoval. 48 absl::flat_hash_set<int64_t> consumers_waiting_ TF_GUARDED_BY(mu_); 49 }; 50 51 } // namespace data 52 } // namespace tensorflow 53 54 #endif // TENSORFLOW_CORE_DATA_SERVICE_TASK_REMOVER_H_ 55