xref: /aosp_15_r20/external/tensorflow/tensorflow/compiler/xla/stream_executor/host/host_stream.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // Class declaration for Stream type that enqueues tasks onto a host/CPU-based
17 // execution context (as opposed to a GPU device), HostExecutor.
18 #ifndef TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_
19 #define TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_
20 
21 #include <functional>
22 #include <memory>
23 #include <queue>
24 
25 #include "absl/synchronization/mutex.h"
26 #include "tensorflow/compiler/xla/stream_executor/lib/status.h"
27 #include "tensorflow/compiler/xla/stream_executor/lib/threadpool.h"
28 #include "tensorflow/compiler/xla/stream_executor/stream_executor_internal.h"
29 
30 namespace stream_executor {
31 namespace host {
32 
33 class HostStream : public internal::StreamInterface {
34  public:
35   // stack_size_in_bytes may be '0', meaning "use the default thread stack
36   // size".
37   explicit HostStream(size_t stack_size_in_bytes);
38   ~HostStream() override;
39 
40   // Enqueue a task that reports a status when finished. Tasks that fail do not
41   // stop the stream or block any other tasks from executing; rather, the stream
42   // will remember the first error encountered and return it from
43   // 'BlockUntilDone'.
44   bool EnqueueTaskWithStatus(std::function<port::Status()> task);
45   // Enqueue a task that doesn't report any status.
46   bool EnqueueTask(std::function<void()> task);
47 
GpuStreamHack()48   void* GpuStreamHack() override { return nullptr; }
GpuStreamMemberHack()49   void** GpuStreamMemberHack() override { return nullptr; }
50 
51   // Blocks until all tasks are done, returns the first error reported by a task
52   // (if any) and clears the error status.
53   port::Status BlockUntilDone();
54 
55  private:
56   bool WorkAvailable() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
57   void WorkLoop();
58 
59   absl::Mutex mu_;
60   std::queue<std::function<port::Status()>> work_queue_ ABSL_GUARDED_BY(mu_);
61   std::unique_ptr<port::Thread> thread_;
62   port::Status status_;
63 };
64 
65 }  // namespace host
66 }  // namespace stream_executor
67 
68 #endif  // TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_
69