1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // Class declaration for Stream type that enqueues tasks onto a host/CPU-based 17 // execution context (as opposed to a GPU device), HostExecutor. 18 #ifndef TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_ 19 #define TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_ 20 21 #include <functional> 22 #include <memory> 23 #include <queue> 24 25 #include "absl/synchronization/mutex.h" 26 #include "tensorflow/compiler/xla/stream_executor/lib/status.h" 27 #include "tensorflow/compiler/xla/stream_executor/lib/threadpool.h" 28 #include "tensorflow/compiler/xla/stream_executor/stream_executor_internal.h" 29 30 namespace stream_executor { 31 namespace host { 32 33 class HostStream : public internal::StreamInterface { 34 public: 35 // stack_size_in_bytes may be '0', meaning "use the default thread stack 36 // size". 37 explicit HostStream(size_t stack_size_in_bytes); 38 ~HostStream() override; 39 40 // Enqueue a task that reports a status when finished. Tasks that fail do not 41 // stop the stream or block any other tasks from executing; rather, the stream 42 // will remember the first error encountered and return it from 43 // 'BlockUntilDone'. 44 bool EnqueueTaskWithStatus(std::function<port::Status()> task); 45 // Enqueue a task that doesn't report any status. 46 bool EnqueueTask(std::function<void()> task); 47 GpuStreamHack()48 void* GpuStreamHack() override { return nullptr; } GpuStreamMemberHack()49 void** GpuStreamMemberHack() override { return nullptr; } 50 51 // Blocks until all tasks are done, returns the first error reported by a task 52 // (if any) and clears the error status. 53 port::Status BlockUntilDone(); 54 55 private: 56 bool WorkAvailable() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 57 void WorkLoop(); 58 59 absl::Mutex mu_; 60 std::queue<std::function<port::Status()>> work_queue_ ABSL_GUARDED_BY(mu_); 61 std::unique_ptr<port::Thread> thread_; 62 port::Status status_; 63 }; 64 65 } // namespace host 66 } // namespace stream_executor 67 68 #endif // TENSORFLOW_COMPILER_XLA_STREAM_EXECUTOR_HOST_HOST_STREAM_H_ 69