xref: /aosp_15_r20/external/armnn/profiling/client/src/SendThread.cpp (revision 89c4ff92f2867872bb9e2354d150bf0c8c502810)
1 //
2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
8 
9 #include <common/include/NumericCast.hpp>
10 #include <common/include/ProfilingException.hpp>
11 
12 #if defined(ARMNN_DISABLE_THREADS)
13 #include <common/include/IgnoreUnused.hpp>
14 #endif
15 
16 #include <cstring>
17 
18 namespace arm
19 {
20 
21 namespace pipe
22 {
23 
SendThread(ProfilingStateMachine & profilingStateMachine,IBufferManager & buffer,ISendCounterPacket & sendCounterPacket,int timeout)24 SendThread::SendThread(ProfilingStateMachine& profilingStateMachine,
25                        IBufferManager& buffer,
26                        ISendCounterPacket& sendCounterPacket,
27                        int timeout)
28     : m_StateMachine(profilingStateMachine)
29     , m_BufferManager(buffer)
30     , m_SendCounterPacket(sendCounterPacket)
31     , m_Timeout(timeout)
32     , m_IsRunning(false)
33     , m_KeepRunning(false)
34     , m_SendThreadException(nullptr)
35 {
36     m_BufferManager.SetConsumer(this);
37 }
38 
SetReadyToRead()39 void SendThread::SetReadyToRead()
40 {
41     // We need to wait for the send thread to release its mutex
42     {
43 #if !defined(ARMNN_DISABLE_THREADS)
44         std::lock_guard<std::mutex> lck(m_WaitMutex);
45 #endif
46         m_ReadyToRead = true;
47     }
48     // Signal the send thread that there's something to read in the buffer
49 #if !defined(ARMNN_DISABLE_THREADS)
50     m_WaitCondition.notify_one();
51 #endif
52 }
53 
Start(IProfilingConnection & profilingConnection)54 void SendThread::Start(IProfilingConnection& profilingConnection)
55 {
56     // Check if the send thread is already running
57     if (m_IsRunning.load())
58     {
59         // The send thread is already running
60         return;
61     }
62 
63 #if !defined(ARMNN_DISABLE_THREADS)
64     if (m_SendThread.joinable())
65     {
66         m_SendThread.join();
67     }
68 #endif
69 
70     // Mark the send thread as running
71     m_IsRunning.store(true);
72 
73     // Keep the send procedure going until the send thread is signalled to stop
74     m_KeepRunning.store(true);
75 
76     // Make sure the send thread will not flush the buffer until signaled to do so
77     // no need for a mutex as the send thread can not be running at this point
78     m_ReadyToRead = false;
79 
80     m_PacketSent = false;
81 
82     // Start the send thread
83 #if !defined(ARMNN_DISABLE_THREADS)
84     m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
85 #else
86     IgnoreUnused(profilingConnection);
87 #endif
88 }
89 
Stop(bool rethrowSendThreadExceptions)90 void SendThread::Stop(bool rethrowSendThreadExceptions)
91 {
92     // Signal the send thread to stop
93     m_KeepRunning.store(false);
94 
95     // Check that the send thread is running
96 #if !defined(ARMNN_DISABLE_THREADS)
97     if (m_SendThread.joinable())
98     {
99         // Kick the send thread out of the wait condition
100         SetReadyToRead();
101         // Wait for the send thread to complete operations
102         m_SendThread.join();
103     }
104 #endif
105 
106     // Check if the send thread exception has to be rethrown
107     if (!rethrowSendThreadExceptions)
108     {
109         // No need to rethrow the send thread exception, return immediately
110         return;
111     }
112 
113     // Check if there's an exception to rethrow
114     if (m_SendThreadException)
115     {
116         // Rethrow the send thread exception
117         std::rethrow_exception(m_SendThreadException);
118 
119         // Nullify the exception as it has been rethrown
120         m_SendThreadException = nullptr;
121     }
122 }
123 
Send(IProfilingConnection & profilingConnection)124 void SendThread::Send(IProfilingConnection& profilingConnection)
125 {
126     // Run once and keep the sending procedure looping until the thread is signalled to stop
127     do
128     {
129         // Check the current state of the profiling service
130         ProfilingState currentState = m_StateMachine.GetCurrentState();
131         switch (currentState)
132         {
133         case ProfilingState::Uninitialised:
134         case ProfilingState::NotConnected:
135 
136             // The send thread cannot be running when the profiling service is uninitialized or not connected,
137             // stop the thread immediately
138             m_KeepRunning.store(false);
139             m_IsRunning.store(false);
140 
141             // An exception should be thrown here, save it to be rethrown later from the main thread so that
142             // it can be caught by the consumer
143             m_SendThreadException =
144                 std::make_exception_ptr(arm::pipe::ProfilingException(
145                 "The send thread should not be running with the profiling service not yet initialized or connected"));
146 
147             return;
148         case ProfilingState::WaitingForAck:
149 
150             // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
151             // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
152             // updated by the command handler
153 
154             // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
155             m_SendCounterPacket.SendStreamMetaDataPacket();
156 
157              // Flush the buffer manually to send the packet
158             FlushBuffer(profilingConnection);
159 
160             // Wait for a connection ack from the remote server. We should expect a response within timeout value.
161             // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
162             // StreamMetadata again.
163 
164             // Wait condition lock scope - Begin
165             {
166 #if !defined(ARMNN_DISABLE_THREADS)
167                 std::unique_lock<std::mutex> lock(m_WaitMutex);
168 
169                 bool timeout = m_WaitCondition.wait_for(lock,
170                                                         std::chrono::milliseconds(std::max(m_Timeout, 1000)),
171                                                         [&]{ return m_ReadyToRead; });
172                 // If we get notified we need to flush the buffer again
173                 if (timeout)
174                 {
175                     // Otherwise if we just timed out don't flush the buffer
176                     continue;
177                 }
178 #endif
179                 //reset condition variable predicate for next use
180                 m_ReadyToRead = false;
181             }
182             // Wait condition lock scope - End
183             break;
184         case ProfilingState::Active:
185         default:
186             // Wait condition lock scope - Begin
187             {
188 #if !defined(ARMNN_DISABLE_THREADS)
189                 std::unique_lock<std::mutex> lock(m_WaitMutex);
190 #endif
191                 // Normal working state for the send thread
192                 // Check if the send thread is required to enforce a timeout wait policy
193                 if (m_Timeout < 0)
194                 {
195                     // Wait indefinitely until notified that something to read has become available in the buffer
196 #if !defined(ARMNN_DISABLE_THREADS)
197                     m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
198 #endif
199                 }
200                 else
201                 {
202                     // Wait until the thread is notified of something to read from the buffer,
203                     // or check anyway after the specified number of milliseconds
204 #if !defined(ARMNN_DISABLE_THREADS)
205                     m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
206 #endif
207                 }
208 
209                 //reset condition variable predicate for next use
210                 m_ReadyToRead = false;
211             }
212             // Wait condition lock scope - End
213             break;
214         }
215 
216         // Send all the available packets in the buffer
217         FlushBuffer(profilingConnection);
218     } while (m_KeepRunning.load());
219 
220     // Ensure that all readable data got written to the profiling connection before the thread is stopped
221     // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
222     FlushBuffer(profilingConnection, false);
223 
224     // Mark the send thread as not running
225     m_IsRunning.store(false);
226 }
227 
FlushBuffer(IProfilingConnection & profilingConnection,bool notifyWatchers)228 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
229 {
230     // Get the first available readable buffer
231     IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
232 
233     // Initialize the flag that indicates whether at least a packet has been sent
234     bool packetsSent = false;
235 
236     while (packetBuffer != nullptr)
237     {
238         // Get the data to send from the buffer
239         const unsigned char* readBuffer = packetBuffer->GetReadableData();
240         unsigned int readBufferSize = packetBuffer->GetSize();
241 
242         if (readBuffer == nullptr || readBufferSize == 0)
243         {
244             // Nothing to send, get the next available readable buffer and continue
245             m_BufferManager.MarkRead(packetBuffer);
246             packetBuffer = m_BufferManager.GetReadableBuffer();
247 
248             continue;
249         }
250 
251         // Check that the profiling connection is open, silently drop the data and continue if it's closed
252         if (profilingConnection.IsOpen())
253         {
254             // Write a packet to the profiling connection. Silently ignore any write error and continue
255             profilingConnection.WritePacket(readBuffer, arm::pipe::numeric_cast<uint32_t>(readBufferSize));
256 
257             // Set the flag that indicates whether at least a packet has been sent
258             packetsSent = true;
259         }
260 
261         // Mark the packet buffer as read
262         m_BufferManager.MarkRead(packetBuffer);
263 
264         // Get the next available readable buffer
265         packetBuffer = m_BufferManager.GetReadableBuffer();
266     }
267     // Check whether at least a packet has been sent
268     if (packetsSent && notifyWatchers)
269     {
270         // Wait for the parent thread to release its mutex if necessary
271         {
272 #if !defined(ARMNN_DISABLE_THREADS)
273             std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
274 #endif
275             m_PacketSent = true;
276         }
277         // Notify to any watcher that something has been sent
278 #if !defined(ARMNN_DISABLE_THREADS)
279         m_PacketSentWaitCondition.notify_one();
280 #endif
281     }
282 }
283 
WaitForPacketSent(uint32_t timeout=1000)284 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
285 {
286 #if !defined(ARMNN_DISABLE_THREADS)
287     std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
288     // Blocks until notified that at least a packet has been sent or until timeout expires.
289     bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
290                                                        std::chrono::milliseconds(timeout),
291                                                        [&] { return m_PacketSent; });
292     m_PacketSent = false;
293 
294     return timedOut;
295 #else
296     IgnoreUnused(timeout);
297     return false;
298 #endif
299 }
300 
301 } // namespace pipe
302 
303 } // namespace arm
304