xref: /aosp_15_r20/external/tensorflow/tensorflow/core/kernels/batching_util/bounded_executor.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BOUNDED_EXECUTOR_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BOUNDED_EXECUTOR_H_
18 
19 #include <string>
20 
21 #include "tensorflow/core/platform/status.h"
22 #include "tensorflow/core/platform/statusor.h"
23 #include "tensorflow/core/platform/thread_annotations.h"
24 #include "tensorflow/core/platform/threadpool.h"
25 #include "tensorflow/core/platform/threadpool_interface.h"
26 
27 namespace tensorflow {
28 namespace serving {
29 // BoundedExecutor has a bounded number of threads and unlimited queue length,
30 // scheduled tasks are executed in a FIFO way.
31 class BoundedExecutor : public thread::ThreadPoolInterface {
32  public:
33   struct Options {
34     Env* env = Env::Default();
35     ThreadOptions thread_options;
36     std::string thread_name;
37     int num_threads = -1;
38   };
39 
40   static StatusOr<std::unique_ptr<BoundedExecutor>> Create(
41       const Options& options);
42 
43   // Destructor. All threads will be joined.
44   ~BoundedExecutor() override;
45 
46   // Enqueue a function to be executed.
47   //
48   // Callers are responsible to guarantee `func` is not nullptr.
49   void Schedule(std::function<void()> func) override;
50 
51   // Returns the number of threads.
52   int NumThreads() const override;
53 
54   int CurrentThreadId() const override;
55 
56  private:
57   explicit BoundedExecutor(const Options& options);
58 
59   // Starts N workers (N == num_threads), polling tasks from `work_queue_`.
60   void InitWorker();
61 
62   // A loop to fetch task from `work_queue_` and execute task.
63   void Run();
64 
65   const Options& options_;
66 
67   mutex work_queue_mu_;
68   std::deque<std::function<void()>> work_queue_ TF_GUARDED_BY(work_queue_mu_);
69   condition_variable work_queue_cv_ TF_GUARDED_BY(work_queue_mu_);
70 
71   // A fixed number of threads.
72   std::vector<std::unique_ptr<Thread>> threads_;
73   TF_DISALLOW_COPY_AND_ASSIGN(BoundedExecutor);
74 };
75 
76 }  // namespace serving
77 }  // namespace tensorflow
78 
79 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BOUNDED_EXECUTOR_H_
80