1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Execution Server
3 * ---------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Test Execution Server.
22 *//*--------------------------------------------------------------------*/
23
24 #include "xsExecutionServer.hpp"
25 #include "deClock.h"
26
27 #include <cstdio>
28
29 using std::string;
30 using std::vector;
31
32 #if 1
33 #define DBG_PRINT(X) printf X
34 #else
35 #define DBG_PRINT(X)
36 #endif
37
38 namespace xs
39 {
40
isComplete(void) const41 inline bool MessageBuilder::isComplete(void) const
42 {
43 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44 return false;
45 else
46 return m_buffer.size() == getMessageSize();
47 }
48
getMessageData(void) const49 const uint8_t *MessageBuilder::getMessageData(void) const
50 {
51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52 }
53
getMessageDataSize(void) const54 size_t MessageBuilder::getMessageDataSize(void) const
55 {
56 DE_ASSERT(isComplete());
57 return m_buffer.size() - MESSAGE_HEADER_SIZE;
58 }
59
read(ByteBuffer & src)60 void MessageBuilder::read(ByteBuffer &src)
61 {
62 // Try to get header.
63 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64 {
65 while (m_buffer.size() < MESSAGE_HEADER_SIZE && src.getNumElements() > 0)
66 m_buffer.push_back(src.popBack());
67
68 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
69
70 if (m_buffer.size() == MESSAGE_HEADER_SIZE)
71 {
72 // Got whole header, parse it.
73 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
74 }
75 }
76
77 if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
78 {
79 // We have header.
80 size_t msgSize = getMessageSize();
81 size_t numBytesLeft = msgSize - m_buffer.size();
82 size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
83
84 if (numToRead > 0)
85 {
86 int curBufPos = (int)m_buffer.size();
87 m_buffer.resize(curBufPos + numToRead);
88 src.popBack(&m_buffer[curBufPos], (int)numToRead);
89 }
90 }
91 }
92
clear(void)93 void MessageBuilder::clear(void)
94 {
95 m_buffer.clear();
96 m_messageType = MESSAGETYPE_NONE;
97 m_messageSize = 0;
98 }
99
ExecutionServer(xs::TestProcess * testProcess,deSocketFamily family,int port,RunMode runMode)100 ExecutionServer::ExecutionServer(xs::TestProcess *testProcess, deSocketFamily family, int port, RunMode runMode)
101 : TcpServer(family, port)
102 , m_testDriver(testProcess)
103 , m_runMode(runMode)
104 {
105 }
106
~ExecutionServer(void)107 ExecutionServer::~ExecutionServer(void)
108 {
109 }
110
acquireTestDriver(void)111 TestDriver *ExecutionServer::acquireTestDriver(void)
112 {
113 if (!m_testDriverLock.tryLock())
114 throw Error("Failed to acquire test driver");
115
116 return &m_testDriver;
117 }
118
releaseTestDriver(TestDriver * driver)119 void ExecutionServer::releaseTestDriver(TestDriver *driver)
120 {
121 DE_ASSERT(&m_testDriver == driver);
122 DE_UNREF(driver);
123 m_testDriverLock.unlock();
124 }
125
createHandler(de::Socket * socket,const de::SocketAddress & clientAddress)126 ConnectionHandler *ExecutionServer::createHandler(de::Socket *socket, const de::SocketAddress &clientAddress)
127 {
128 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
129 return new ExecutionRequestHandler(this, socket);
130 }
131
connectionDone(ConnectionHandler * handler)132 void ExecutionServer::connectionDone(ConnectionHandler *handler)
133 {
134 if (m_runMode == RUNMODE_SINGLE_EXEC)
135 m_socket.close();
136
137 TcpServer::connectionDone(handler);
138 }
139
ExecutionRequestHandler(ExecutionServer * server,de::Socket * socket)140 ExecutionRequestHandler::ExecutionRequestHandler(ExecutionServer *server, de::Socket *socket)
141 : ConnectionHandler(server, socket)
142 , m_execServer(server)
143 , m_testDriver(DE_NULL)
144 , m_bufferIn(RECV_BUFFER_SIZE)
145 , m_bufferOut(SEND_BUFFER_SIZE)
146 , m_run(false)
147 , m_sendRecvTmpBuf(SEND_RECV_TMP_BUFFER_SIZE)
148 {
149 // Set flags.
150 m_socket->setFlags(DE_SOCKET_NONBLOCKING | DE_SOCKET_KEEPALIVE | DE_SOCKET_CLOSE_ON_EXEC);
151
152 // Init protocol keepalives.
153 initKeepAlives();
154 }
155
~ExecutionRequestHandler(void)156 ExecutionRequestHandler::~ExecutionRequestHandler(void)
157 {
158 if (m_testDriver)
159 m_execServer->releaseTestDriver(m_testDriver);
160 }
161
handle(void)162 void ExecutionRequestHandler::handle(void)
163 {
164 DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
165
166 try
167 {
168 // Process execution session.
169 processSession();
170 }
171 catch (const std::exception &e)
172 {
173 printf("ExecutionRequestHandler::run(): %s\n", e.what());
174 }
175
176 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
177
178 // Release test driver.
179 if (m_testDriver)
180 {
181 try
182 {
183 m_testDriver->reset();
184 }
185 catch (...)
186 {
187 }
188 m_execServer->releaseTestDriver(m_testDriver);
189 m_testDriver = DE_NULL;
190 }
191
192 // Close connection.
193 if (m_socket->isConnected())
194 m_socket->shutdown();
195 }
196
acquireTestDriver(void)197 void ExecutionRequestHandler::acquireTestDriver(void)
198 {
199 DE_ASSERT(!m_testDriver);
200
201 // Try to acquire test driver - may fail.
202 m_testDriver = m_execServer->acquireTestDriver();
203 DE_ASSERT(m_testDriver);
204 m_testDriver->reset();
205 }
206
processSession(void)207 void ExecutionRequestHandler::processSession(void)
208 {
209 m_run = true;
210
211 uint64_t lastIoTime = deGetMicroseconds();
212
213 while (m_run)
214 {
215 bool anyIO = false;
216
217 // Read from socket to buffer.
218 anyIO = receive() || anyIO;
219
220 // Send bytes in buffer.
221 anyIO = send() || anyIO;
222
223 // Process incoming data.
224 if (m_bufferIn.getNumElements() > 0)
225 {
226 DE_ASSERT(!m_msgBuilder.isComplete());
227 m_msgBuilder.read(m_bufferIn);
228 }
229
230 if (m_msgBuilder.isComplete())
231 {
232 // Process message.
233 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(),
234 m_msgBuilder.getMessageDataSize());
235
236 m_msgBuilder.clear();
237 }
238
239 // Keepalives, anyone?
240 pollKeepAlives();
241
242 // Poll test driver for IO.
243 if (m_testDriver)
244 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
245
246 // If no IO happens in a reasonable amount of time, go to sleep.
247 {
248 uint64_t curTime = deGetMicroseconds();
249 if (anyIO)
250 lastIoTime = curTime;
251 else if (curTime - lastIoTime > SERVER_IDLE_THRESHOLD * 1000)
252 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
253 else
254 deYield(); // Just give other threads chance to run.
255 }
256 }
257 }
258
processMessage(MessageType type,const uint8_t * data,size_t dataSize)259 void ExecutionRequestHandler::processMessage(MessageType type, const uint8_t *data, size_t dataSize)
260 {
261 switch (type)
262 {
263 case MESSAGETYPE_HELLO:
264 {
265 HelloMessage msg(data, dataSize);
266 DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
267 if (msg.version != PROTOCOL_VERSION)
268 throw ProtocolError("Unsupported protocol version");
269 break;
270 }
271
272 case MESSAGETYPE_TEST:
273 {
274 TestMessage msg(data, dataSize);
275 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
276 break;
277 }
278
279 case MESSAGETYPE_KEEPALIVE:
280 {
281 KeepAliveMessage msg(data, dataSize);
282 DBG_PRINT(("KeepAliveMessage\n"));
283 keepAliveReceived();
284 break;
285 }
286
287 case MESSAGETYPE_EXECUTE_BINARY:
288 {
289 ExecuteBinaryMessage msg(data, dataSize);
290 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(),
291 msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294 break;
295 }
296
297 case MESSAGETYPE_STOP_EXECUTION:
298 {
299 StopExecutionMessage msg(data, dataSize);
300 DBG_PRINT(("StopExecutionMessage\n"));
301 getTestDriver()->stopProcess();
302 break;
303 }
304
305 default:
306 throw ProtocolError("Unsupported message");
307 }
308 }
309
initKeepAlives(void)310 void ExecutionRequestHandler::initKeepAlives(void)
311 {
312 uint64_t curTime = deGetMicroseconds();
313 m_lastKeepAliveSent = curTime;
314 m_lastKeepAliveReceived = curTime;
315 }
316
keepAliveReceived(void)317 void ExecutionRequestHandler::keepAliveReceived(void)
318 {
319 m_lastKeepAliveReceived = deGetMicroseconds();
320 }
321
pollKeepAlives(void)322 void ExecutionRequestHandler::pollKeepAlives(void)
323 {
324 uint64_t curTime = deGetMicroseconds();
325
326 // Check that we've got keepalives in timely fashion.
327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT * 1000)
328 throw ProtocolError("Keepalive timeout occurred");
329
330 // Send some?
331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL * 1000 &&
332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333 {
334 vector<uint8_t> buf;
335 KeepAliveMessage().write(buf);
336 m_bufferOut.pushFront(&buf[0], (int)buf.size());
337
338 m_lastKeepAliveSent = deGetMicroseconds();
339 }
340 }
341
receive(void)342 bool ExecutionRequestHandler::receive(void)
343 {
344 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
345
346 if (maxLen > 0)
347 {
348 size_t numRecv;
349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350
351 if (result == DE_SOCKETRESULT_SUCCESS)
352 {
353 DE_ASSERT(numRecv > 0);
354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
355 return true;
356 }
357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358 {
359 m_run = false;
360 return true;
361 }
362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363 return false;
364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365 throw ConnectionError("Connection terminated");
366 else
367 throw ConnectionError("receive() failed");
368 }
369 else
370 return false;
371 }
372
send(void)373 bool ExecutionRequestHandler::send(void)
374 {
375 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
376
377 if (maxLen > 0)
378 {
379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
380
381 size_t numSent;
382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383
384 if (result == DE_SOCKETRESULT_SUCCESS)
385 {
386 DE_ASSERT(numSent > 0);
387 m_bufferOut.popBack((int)numSent);
388 return true;
389 }
390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391 {
392 m_run = false;
393 return true;
394 }
395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396 return false;
397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398 throw ConnectionError("Connection terminated");
399 else
400 throw ConnectionError("send() failed");
401 }
402 else
403 return false;
404 }
405
406 } // namespace xs
407