xref: /aosp_15_r20/external/deqp/execserver/xsExecutionServer.cpp (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Execution Server
3  * ---------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Test Execution Server.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "xsExecutionServer.hpp"
25 #include "deClock.h"
26 
27 #include <cstdio>
28 
29 using std::string;
30 using std::vector;
31 
32 #if 1
33 #define DBG_PRINT(X) printf X
34 #else
35 #define DBG_PRINT(X)
36 #endif
37 
38 namespace xs
39 {
40 
isComplete(void) const41 inline bool MessageBuilder::isComplete(void) const
42 {
43     if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44         return false;
45     else
46         return m_buffer.size() == getMessageSize();
47 }
48 
getMessageData(void) const49 const uint8_t *MessageBuilder::getMessageData(void) const
50 {
51     return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52 }
53 
getMessageDataSize(void) const54 size_t MessageBuilder::getMessageDataSize(void) const
55 {
56     DE_ASSERT(isComplete());
57     return m_buffer.size() - MESSAGE_HEADER_SIZE;
58 }
59 
read(ByteBuffer & src)60 void MessageBuilder::read(ByteBuffer &src)
61 {
62     // Try to get header.
63     if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64     {
65         while (m_buffer.size() < MESSAGE_HEADER_SIZE && src.getNumElements() > 0)
66             m_buffer.push_back(src.popBack());
67 
68         DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
69 
70         if (m_buffer.size() == MESSAGE_HEADER_SIZE)
71         {
72             // Got whole header, parse it.
73             Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
74         }
75     }
76 
77     if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
78     {
79         // We have header.
80         size_t msgSize      = getMessageSize();
81         size_t numBytesLeft = msgSize - m_buffer.size();
82         size_t numToRead    = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
83 
84         if (numToRead > 0)
85         {
86             int curBufPos = (int)m_buffer.size();
87             m_buffer.resize(curBufPos + numToRead);
88             src.popBack(&m_buffer[curBufPos], (int)numToRead);
89         }
90     }
91 }
92 
clear(void)93 void MessageBuilder::clear(void)
94 {
95     m_buffer.clear();
96     m_messageType = MESSAGETYPE_NONE;
97     m_messageSize = 0;
98 }
99 
ExecutionServer(xs::TestProcess * testProcess,deSocketFamily family,int port,RunMode runMode)100 ExecutionServer::ExecutionServer(xs::TestProcess *testProcess, deSocketFamily family, int port, RunMode runMode)
101     : TcpServer(family, port)
102     , m_testDriver(testProcess)
103     , m_runMode(runMode)
104 {
105 }
106 
~ExecutionServer(void)107 ExecutionServer::~ExecutionServer(void)
108 {
109 }
110 
acquireTestDriver(void)111 TestDriver *ExecutionServer::acquireTestDriver(void)
112 {
113     if (!m_testDriverLock.tryLock())
114         throw Error("Failed to acquire test driver");
115 
116     return &m_testDriver;
117 }
118 
releaseTestDriver(TestDriver * driver)119 void ExecutionServer::releaseTestDriver(TestDriver *driver)
120 {
121     DE_ASSERT(&m_testDriver == driver);
122     DE_UNREF(driver);
123     m_testDriverLock.unlock();
124 }
125 
createHandler(de::Socket * socket,const de::SocketAddress & clientAddress)126 ConnectionHandler *ExecutionServer::createHandler(de::Socket *socket, const de::SocketAddress &clientAddress)
127 {
128     printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
129     return new ExecutionRequestHandler(this, socket);
130 }
131 
connectionDone(ConnectionHandler * handler)132 void ExecutionServer::connectionDone(ConnectionHandler *handler)
133 {
134     if (m_runMode == RUNMODE_SINGLE_EXEC)
135         m_socket.close();
136 
137     TcpServer::connectionDone(handler);
138 }
139 
ExecutionRequestHandler(ExecutionServer * server,de::Socket * socket)140 ExecutionRequestHandler::ExecutionRequestHandler(ExecutionServer *server, de::Socket *socket)
141     : ConnectionHandler(server, socket)
142     , m_execServer(server)
143     , m_testDriver(DE_NULL)
144     , m_bufferIn(RECV_BUFFER_SIZE)
145     , m_bufferOut(SEND_BUFFER_SIZE)
146     , m_run(false)
147     , m_sendRecvTmpBuf(SEND_RECV_TMP_BUFFER_SIZE)
148 {
149     // Set flags.
150     m_socket->setFlags(DE_SOCKET_NONBLOCKING | DE_SOCKET_KEEPALIVE | DE_SOCKET_CLOSE_ON_EXEC);
151 
152     // Init protocol keepalives.
153     initKeepAlives();
154 }
155 
~ExecutionRequestHandler(void)156 ExecutionRequestHandler::~ExecutionRequestHandler(void)
157 {
158     if (m_testDriver)
159         m_execServer->releaseTestDriver(m_testDriver);
160 }
161 
handle(void)162 void ExecutionRequestHandler::handle(void)
163 {
164     DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
165 
166     try
167     {
168         // Process execution session.
169         processSession();
170     }
171     catch (const std::exception &e)
172     {
173         printf("ExecutionRequestHandler::run(): %s\n", e.what());
174     }
175 
176     DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
177 
178     // Release test driver.
179     if (m_testDriver)
180     {
181         try
182         {
183             m_testDriver->reset();
184         }
185         catch (...)
186         {
187         }
188         m_execServer->releaseTestDriver(m_testDriver);
189         m_testDriver = DE_NULL;
190     }
191 
192     // Close connection.
193     if (m_socket->isConnected())
194         m_socket->shutdown();
195 }
196 
acquireTestDriver(void)197 void ExecutionRequestHandler::acquireTestDriver(void)
198 {
199     DE_ASSERT(!m_testDriver);
200 
201     // Try to acquire test driver - may fail.
202     m_testDriver = m_execServer->acquireTestDriver();
203     DE_ASSERT(m_testDriver);
204     m_testDriver->reset();
205 }
206 
processSession(void)207 void ExecutionRequestHandler::processSession(void)
208 {
209     m_run = true;
210 
211     uint64_t lastIoTime = deGetMicroseconds();
212 
213     while (m_run)
214     {
215         bool anyIO = false;
216 
217         // Read from socket to buffer.
218         anyIO = receive() || anyIO;
219 
220         // Send bytes in buffer.
221         anyIO = send() || anyIO;
222 
223         // Process incoming data.
224         if (m_bufferIn.getNumElements() > 0)
225         {
226             DE_ASSERT(!m_msgBuilder.isComplete());
227             m_msgBuilder.read(m_bufferIn);
228         }
229 
230         if (m_msgBuilder.isComplete())
231         {
232             // Process message.
233             processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(),
234                            m_msgBuilder.getMessageDataSize());
235 
236             m_msgBuilder.clear();
237         }
238 
239         // Keepalives, anyone?
240         pollKeepAlives();
241 
242         // Poll test driver for IO.
243         if (m_testDriver)
244             anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
245 
246         // If no IO happens in a reasonable amount of time, go to sleep.
247         {
248             uint64_t curTime = deGetMicroseconds();
249             if (anyIO)
250                 lastIoTime = curTime;
251             else if (curTime - lastIoTime > SERVER_IDLE_THRESHOLD * 1000)
252                 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
253             else
254                 deYield(); // Just give other threads chance to run.
255         }
256     }
257 }
258 
processMessage(MessageType type,const uint8_t * data,size_t dataSize)259 void ExecutionRequestHandler::processMessage(MessageType type, const uint8_t *data, size_t dataSize)
260 {
261     switch (type)
262     {
263     case MESSAGETYPE_HELLO:
264     {
265         HelloMessage msg(data, dataSize);
266         DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
267         if (msg.version != PROTOCOL_VERSION)
268             throw ProtocolError("Unsupported protocol version");
269         break;
270     }
271 
272     case MESSAGETYPE_TEST:
273     {
274         TestMessage msg(data, dataSize);
275         DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
276         break;
277     }
278 
279     case MESSAGETYPE_KEEPALIVE:
280     {
281         KeepAliveMessage msg(data, dataSize);
282         DBG_PRINT(("KeepAliveMessage\n"));
283         keepAliveReceived();
284         break;
285     }
286 
287     case MESSAGETYPE_EXECUTE_BINARY:
288     {
289         ExecuteBinaryMessage msg(data, dataSize);
290         DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(),
291                    msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292         getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293         keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294         break;
295     }
296 
297     case MESSAGETYPE_STOP_EXECUTION:
298     {
299         StopExecutionMessage msg(data, dataSize);
300         DBG_PRINT(("StopExecutionMessage\n"));
301         getTestDriver()->stopProcess();
302         break;
303     }
304 
305     default:
306         throw ProtocolError("Unsupported message");
307     }
308 }
309 
initKeepAlives(void)310 void ExecutionRequestHandler::initKeepAlives(void)
311 {
312     uint64_t curTime        = deGetMicroseconds();
313     m_lastKeepAliveSent     = curTime;
314     m_lastKeepAliveReceived = curTime;
315 }
316 
keepAliveReceived(void)317 void ExecutionRequestHandler::keepAliveReceived(void)
318 {
319     m_lastKeepAliveReceived = deGetMicroseconds();
320 }
321 
pollKeepAlives(void)322 void ExecutionRequestHandler::pollKeepAlives(void)
323 {
324     uint64_t curTime = deGetMicroseconds();
325 
326     // Check that we've got keepalives in timely fashion.
327     if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT * 1000)
328         throw ProtocolError("Keepalive timeout occurred");
329 
330     // Send some?
331     if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL * 1000 &&
332         m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333     {
334         vector<uint8_t> buf;
335         KeepAliveMessage().write(buf);
336         m_bufferOut.pushFront(&buf[0], (int)buf.size());
337 
338         m_lastKeepAliveSent = deGetMicroseconds();
339     }
340 }
341 
receive(void)342 bool ExecutionRequestHandler::receive(void)
343 {
344     size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
345 
346     if (maxLen > 0)
347     {
348         size_t numRecv;
349         deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350 
351         if (result == DE_SOCKETRESULT_SUCCESS)
352         {
353             DE_ASSERT(numRecv > 0);
354             m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
355             return true;
356         }
357         else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358         {
359             m_run = false;
360             return true;
361         }
362         else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363             return false;
364         else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365             throw ConnectionError("Connection terminated");
366         else
367             throw ConnectionError("receive() failed");
368     }
369     else
370         return false;
371 }
372 
send(void)373 bool ExecutionRequestHandler::send(void)
374 {
375     size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
376 
377     if (maxLen > 0)
378     {
379         m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
380 
381         size_t numSent;
382         deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383 
384         if (result == DE_SOCKETRESULT_SUCCESS)
385         {
386             DE_ASSERT(numSent > 0);
387             m_bufferOut.popBack((int)numSent);
388             return true;
389         }
390         else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391         {
392             m_run = false;
393             return true;
394         }
395         else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396             return false;
397         else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398             throw ConnectionError("Connection terminated");
399         else
400             throw ConnectionError("send() failed");
401     }
402     else
403         return false;
404 }
405 
406 } // namespace xs
407