1 //
2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
8
9 #include <common/include/NumericCast.hpp>
10 #include <common/include/ProfilingException.hpp>
11
12 #if defined(ARMNN_DISABLE_THREADS)
13 #include <common/include/IgnoreUnused.hpp>
14 #endif
15
16 #include <cstring>
17
18 namespace arm
19 {
20
21 namespace pipe
22 {
23
SendThread(ProfilingStateMachine & profilingStateMachine,IBufferManager & buffer,ISendCounterPacket & sendCounterPacket,int timeout)24 SendThread::SendThread(ProfilingStateMachine& profilingStateMachine,
25 IBufferManager& buffer,
26 ISendCounterPacket& sendCounterPacket,
27 int timeout)
28 : m_StateMachine(profilingStateMachine)
29 , m_BufferManager(buffer)
30 , m_SendCounterPacket(sendCounterPacket)
31 , m_Timeout(timeout)
32 , m_IsRunning(false)
33 , m_KeepRunning(false)
34 , m_SendThreadException(nullptr)
35 {
36 m_BufferManager.SetConsumer(this);
37 }
38
SetReadyToRead()39 void SendThread::SetReadyToRead()
40 {
41 // We need to wait for the send thread to release its mutex
42 {
43 #if !defined(ARMNN_DISABLE_THREADS)
44 std::lock_guard<std::mutex> lck(m_WaitMutex);
45 #endif
46 m_ReadyToRead = true;
47 }
48 // Signal the send thread that there's something to read in the buffer
49 #if !defined(ARMNN_DISABLE_THREADS)
50 m_WaitCondition.notify_one();
51 #endif
52 }
53
Start(IProfilingConnection & profilingConnection)54 void SendThread::Start(IProfilingConnection& profilingConnection)
55 {
56 // Check if the send thread is already running
57 if (m_IsRunning.load())
58 {
59 // The send thread is already running
60 return;
61 }
62
63 #if !defined(ARMNN_DISABLE_THREADS)
64 if (m_SendThread.joinable())
65 {
66 m_SendThread.join();
67 }
68 #endif
69
70 // Mark the send thread as running
71 m_IsRunning.store(true);
72
73 // Keep the send procedure going until the send thread is signalled to stop
74 m_KeepRunning.store(true);
75
76 // Make sure the send thread will not flush the buffer until signaled to do so
77 // no need for a mutex as the send thread can not be running at this point
78 m_ReadyToRead = false;
79
80 m_PacketSent = false;
81
82 // Start the send thread
83 #if !defined(ARMNN_DISABLE_THREADS)
84 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
85 #else
86 IgnoreUnused(profilingConnection);
87 #endif
88 }
89
Stop(bool rethrowSendThreadExceptions)90 void SendThread::Stop(bool rethrowSendThreadExceptions)
91 {
92 // Signal the send thread to stop
93 m_KeepRunning.store(false);
94
95 // Check that the send thread is running
96 #if !defined(ARMNN_DISABLE_THREADS)
97 if (m_SendThread.joinable())
98 {
99 // Kick the send thread out of the wait condition
100 SetReadyToRead();
101 // Wait for the send thread to complete operations
102 m_SendThread.join();
103 }
104 #endif
105
106 // Check if the send thread exception has to be rethrown
107 if (!rethrowSendThreadExceptions)
108 {
109 // No need to rethrow the send thread exception, return immediately
110 return;
111 }
112
113 // Check if there's an exception to rethrow
114 if (m_SendThreadException)
115 {
116 // Rethrow the send thread exception
117 std::rethrow_exception(m_SendThreadException);
118
119 // Nullify the exception as it has been rethrown
120 m_SendThreadException = nullptr;
121 }
122 }
123
Send(IProfilingConnection & profilingConnection)124 void SendThread::Send(IProfilingConnection& profilingConnection)
125 {
126 // Run once and keep the sending procedure looping until the thread is signalled to stop
127 do
128 {
129 // Check the current state of the profiling service
130 ProfilingState currentState = m_StateMachine.GetCurrentState();
131 switch (currentState)
132 {
133 case ProfilingState::Uninitialised:
134 case ProfilingState::NotConnected:
135
136 // The send thread cannot be running when the profiling service is uninitialized or not connected,
137 // stop the thread immediately
138 m_KeepRunning.store(false);
139 m_IsRunning.store(false);
140
141 // An exception should be thrown here, save it to be rethrown later from the main thread so that
142 // it can be caught by the consumer
143 m_SendThreadException =
144 std::make_exception_ptr(arm::pipe::ProfilingException(
145 "The send thread should not be running with the profiling service not yet initialized or connected"));
146
147 return;
148 case ProfilingState::WaitingForAck:
149
150 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
151 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
152 // updated by the command handler
153
154 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
155 m_SendCounterPacket.SendStreamMetaDataPacket();
156
157 // Flush the buffer manually to send the packet
158 FlushBuffer(profilingConnection);
159
160 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
161 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
162 // StreamMetadata again.
163
164 // Wait condition lock scope - Begin
165 {
166 #if !defined(ARMNN_DISABLE_THREADS)
167 std::unique_lock<std::mutex> lock(m_WaitMutex);
168
169 bool timeout = m_WaitCondition.wait_for(lock,
170 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
171 [&]{ return m_ReadyToRead; });
172 // If we get notified we need to flush the buffer again
173 if (timeout)
174 {
175 // Otherwise if we just timed out don't flush the buffer
176 continue;
177 }
178 #endif
179 //reset condition variable predicate for next use
180 m_ReadyToRead = false;
181 }
182 // Wait condition lock scope - End
183 break;
184 case ProfilingState::Active:
185 default:
186 // Wait condition lock scope - Begin
187 {
188 #if !defined(ARMNN_DISABLE_THREADS)
189 std::unique_lock<std::mutex> lock(m_WaitMutex);
190 #endif
191 // Normal working state for the send thread
192 // Check if the send thread is required to enforce a timeout wait policy
193 if (m_Timeout < 0)
194 {
195 // Wait indefinitely until notified that something to read has become available in the buffer
196 #if !defined(ARMNN_DISABLE_THREADS)
197 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
198 #endif
199 }
200 else
201 {
202 // Wait until the thread is notified of something to read from the buffer,
203 // or check anyway after the specified number of milliseconds
204 #if !defined(ARMNN_DISABLE_THREADS)
205 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
206 #endif
207 }
208
209 //reset condition variable predicate for next use
210 m_ReadyToRead = false;
211 }
212 // Wait condition lock scope - End
213 break;
214 }
215
216 // Send all the available packets in the buffer
217 FlushBuffer(profilingConnection);
218 } while (m_KeepRunning.load());
219
220 // Ensure that all readable data got written to the profiling connection before the thread is stopped
221 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
222 FlushBuffer(profilingConnection, false);
223
224 // Mark the send thread as not running
225 m_IsRunning.store(false);
226 }
227
FlushBuffer(IProfilingConnection & profilingConnection,bool notifyWatchers)228 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
229 {
230 // Get the first available readable buffer
231 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
232
233 // Initialize the flag that indicates whether at least a packet has been sent
234 bool packetsSent = false;
235
236 while (packetBuffer != nullptr)
237 {
238 // Get the data to send from the buffer
239 const unsigned char* readBuffer = packetBuffer->GetReadableData();
240 unsigned int readBufferSize = packetBuffer->GetSize();
241
242 if (readBuffer == nullptr || readBufferSize == 0)
243 {
244 // Nothing to send, get the next available readable buffer and continue
245 m_BufferManager.MarkRead(packetBuffer);
246 packetBuffer = m_BufferManager.GetReadableBuffer();
247
248 continue;
249 }
250
251 // Check that the profiling connection is open, silently drop the data and continue if it's closed
252 if (profilingConnection.IsOpen())
253 {
254 // Write a packet to the profiling connection. Silently ignore any write error and continue
255 profilingConnection.WritePacket(readBuffer, arm::pipe::numeric_cast<uint32_t>(readBufferSize));
256
257 // Set the flag that indicates whether at least a packet has been sent
258 packetsSent = true;
259 }
260
261 // Mark the packet buffer as read
262 m_BufferManager.MarkRead(packetBuffer);
263
264 // Get the next available readable buffer
265 packetBuffer = m_BufferManager.GetReadableBuffer();
266 }
267 // Check whether at least a packet has been sent
268 if (packetsSent && notifyWatchers)
269 {
270 // Wait for the parent thread to release its mutex if necessary
271 {
272 #if !defined(ARMNN_DISABLE_THREADS)
273 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
274 #endif
275 m_PacketSent = true;
276 }
277 // Notify to any watcher that something has been sent
278 #if !defined(ARMNN_DISABLE_THREADS)
279 m_PacketSentWaitCondition.notify_one();
280 #endif
281 }
282 }
283
WaitForPacketSent(uint32_t timeout=1000)284 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
285 {
286 #if !defined(ARMNN_DISABLE_THREADS)
287 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
288 // Blocks until notified that at least a packet has been sent or until timeout expires.
289 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
290 std::chrono::milliseconds(timeout),
291 [&] { return m_PacketSent; });
292 m_PacketSent = false;
293
294 return timedOut;
295 #else
296 IgnoreUnused(timeout);
297 return false;
298 #endif
299 }
300
301 } // namespace pipe
302
303 } // namespace arm
304