1 #ifndef SIMPLE_WEB_CLIENT_HTTP_HPP 2 #define SIMPLE_WEB_CLIENT_HTTP_HPP 3 4 #include "asio_compatibility.hpp" 5 #include "mutex.hpp" 6 #include "utility.hpp" 7 #include <future> 8 #include <limits> 9 #include <random> 10 #include <unordered_set> 11 #include <vector> 12 13 namespace SimpleWeb { 14 class HeaderEndMatch { 15 int crlfcrlf = 0; 16 int lflf = 0; 17 18 public: 19 /// Match condition for asio::read_until to match both standard and non-standard HTTP header endings. operator ()(asio::buffers_iterator<asio::const_buffers_1> begin,asio::buffers_iterator<asio::const_buffers_1> end)20 std::pair<asio::buffers_iterator<asio::const_buffers_1>, bool> operator()(asio::buffers_iterator<asio::const_buffers_1> begin, asio::buffers_iterator<asio::const_buffers_1> end) { 21 auto it = begin; 22 for(; it != end; ++it) { 23 if(*it == '\n') { 24 if(crlfcrlf == 1) 25 ++crlfcrlf; 26 else if(crlfcrlf == 2) 27 crlfcrlf = 0; 28 else if(crlfcrlf == 3) 29 return {++it, true}; 30 if(lflf == 0) 31 ++lflf; 32 else if(lflf == 1) 33 return {++it, true}; 34 } 35 else if(*it == '\r') { 36 if(crlfcrlf == 0) 37 ++crlfcrlf; 38 else if(crlfcrlf == 2) 39 ++crlfcrlf; 40 else 41 crlfcrlf = 0; 42 lflf = 0; 43 } 44 else { 45 crlfcrlf = 0; 46 lflf = 0; 47 } 48 } 49 return {it, false}; 50 } 51 }; 52 } // namespace SimpleWeb 53 #ifndef ASIO_STANDALONE 54 namespace boost { 55 #endif 56 namespace asio { 57 template <> 58 struct is_match_condition<SimpleWeb::HeaderEndMatch> : public std::true_type {}; 59 } // namespace asio 60 #ifndef ASIO_STANDALONE 61 } // namespace boost 62 #endif 63 64 namespace SimpleWeb { 65 template <class socket_type> 66 class Client; 67 68 template <class socket_type> 69 class ClientBase { 70 public: 71 class Content : public std::istream { 72 friend class ClientBase<socket_type>; 73 74 public: size()75 std::size_t size() noexcept { 76 return streambuf.size(); 77 } 78 /// Convenience function to return content as a string. string()79 std::string string() noexcept { 80 return std::string(asio::buffers_begin(streambuf.data()), asio::buffers_end(streambuf.data())); 81 } 82 83 /// When true, this is the last response content part from server for the current request. 84 bool end = true; 85 86 private: 87 asio::streambuf &streambuf; Content(asio::streambuf & streambuf)88 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {} 89 }; 90 91 protected: 92 class Connection; 93 94 public: 95 class Response { 96 friend class ClientBase<socket_type>; 97 friend class Client<socket_type>; 98 99 class Shared { 100 public: 101 std::string http_version, status_code; 102 103 CaseInsensitiveMultimap header; 104 }; 105 106 asio::streambuf streambuf; 107 108 std::shared_ptr<Shared> shared; 109 110 std::weak_ptr<Connection> connection_weak; 111 Response(std::size_t max_response_streambuf_size,const std::shared_ptr<Connection> & connection_)112 Response(std::size_t max_response_streambuf_size, const std::shared_ptr<Connection> &connection_) noexcept 113 : streambuf(max_response_streambuf_size), shared(new Shared()), connection_weak(connection_), http_version(shared->http_version), status_code(shared->status_code), header(shared->header), content(streambuf) {} 114 115 /// Constructs a response object that has empty content, but otherwise is equal to the response parameter Response(const Response & response)116 Response(const Response &response) noexcept 117 : streambuf(response.streambuf.max_size()), shared(response.shared), connection_weak(response.connection_weak), http_version(shared->http_version), status_code(shared->status_code), header(shared->header), content(streambuf) {} 118 119 public: 120 std::string &http_version, &status_code; 121 122 CaseInsensitiveMultimap &header; 123 124 Content content; 125 126 /// Closes the connection to the server, preventing further response content parts from server. close()127 void close() noexcept { 128 if(auto connection = this->connection_weak.lock()) 129 connection->close(); 130 } 131 }; 132 133 class Config { 134 friend class ClientBase<socket_type>; 135 136 private: Config()137 Config() noexcept {} 138 139 public: 140 /// Set timeout on requests in seconds. Default value: 0 (no timeout). 141 long timeout = 0; 142 /// Set connect timeout in seconds. Default value: 0 (Config::timeout is then used instead). 143 long timeout_connect = 0; 144 /// Maximum size of response stream buffer. Defaults to architecture maximum. 145 /// Reaching this limit will result in a message_size error code. 146 std::size_t max_response_streambuf_size = (std::numeric_limits<std::size_t>::max)(); 147 /// Set proxy server (server:port) 148 std::string proxy_server; 149 }; 150 151 protected: 152 class Connection : public std::enable_shared_from_this<Connection> { 153 public: 154 template <typename... Args> Connection(std::shared_ptr<ScopeRunner> handler_runner_,Args &&...args)155 Connection(std::shared_ptr<ScopeRunner> handler_runner_, Args &&...args) noexcept 156 : handler_runner(std::move(handler_runner_)), socket(new socket_type(std::forward<Args>(args)...)) {} 157 158 std::shared_ptr<ScopeRunner> handler_runner; 159 160 std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable 161 bool in_use = false; 162 bool attempt_reconnect = true; 163 164 std::unique_ptr<asio::steady_timer> timer; 165 close()166 void close() noexcept { 167 error_code ec; 168 socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); 169 socket->lowest_layer().cancel(ec); 170 } 171 set_timeout(long seconds)172 void set_timeout(long seconds) noexcept { 173 if(seconds == 0) { 174 timer = nullptr; 175 return; 176 } 177 timer = make_steady_timer(*socket, std::chrono::seconds(seconds)); 178 std::weak_ptr<Connection> self_weak(this->shared_from_this()); // To avoid keeping Connection instance alive longer than needed 179 timer->async_wait([self_weak](const error_code &ec) { 180 if(!ec) { 181 if(auto self = self_weak.lock()) 182 self->close(); 183 } 184 }); 185 } 186 cancel_timeout()187 void cancel_timeout() noexcept { 188 if(timer) { 189 try { 190 timer->cancel(); 191 } 192 catch(...) { 193 } 194 } 195 } 196 }; 197 198 class Session { 199 public: Session(std::size_t max_response_streambuf_size,std::shared_ptr<Connection> connection_,std::unique_ptr<asio::streambuf> request_streambuf_)200 Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection_, std::unique_ptr<asio::streambuf> request_streambuf_) noexcept 201 : connection(std::move(connection_)), request_streambuf(std::move(request_streambuf_)), response(new Response(max_response_streambuf_size, connection)) {} 202 203 std::shared_ptr<Connection> connection; 204 std::unique_ptr<asio::streambuf> request_streambuf; 205 std::shared_ptr<Response> response; 206 std::function<void(const error_code &)> callback; 207 }; 208 209 public: 210 /// Set before calling a request function. 211 Config config; 212 213 /// If you want to reuse an already created asio::io_service, store its pointer here before calling a request function. 214 /// Do not set when using synchronous request functions. 215 std::shared_ptr<io_context> io_service; 216 217 /// Convenience function to perform synchronous request. The io_service is started in this function. 218 /// Should not be combined with asynchronous request functions. 219 /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. 220 /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. request(const std::string & method,const std::string & path={"/"},string_view content={},const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())221 std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 222 return sync_request(method, path, content, header); 223 } 224 225 /// Convenience function to perform synchronous request. The io_service is started in this function. 226 /// Should not be combined with asynchronous request functions. 227 /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. 228 /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())229 std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 230 return sync_request(method, path, content, header); 231 } 232 233 /// Asynchronous request where running Client's io_service is required. 234 /// Do not use concurrently with the synchronous request functions. 235 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,string_view content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)236 void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header, 237 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 238 auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); 239 std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed 240 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); 241 session->callback = [this, session_weak, request_callback](const error_code &ec) { 242 if(auto session = session_weak.lock()) { 243 if(session->response->content.end) { 244 session->connection->cancel_timeout(); 245 session->connection->in_use = false; 246 } 247 { 248 LockGuard lock(this->connections_mutex); 249 250 // Remove unused connections, but keep one open for HTTP persistent connection: 251 std::size_t unused_connections = 0; 252 for(auto it = this->connections.begin(); it != this->connections.end();) { 253 if(ec && session->connection == *it) 254 it = this->connections.erase(it); 255 else if((*it)->in_use) 256 ++it; 257 else { 258 ++unused_connections; 259 if(unused_connections > 1) 260 it = this->connections.erase(it); 261 else 262 ++it; 263 } 264 } 265 } 266 267 if(*request_callback) 268 (*request_callback)(session->response, ec); 269 } 270 }; 271 272 std::ostream write_stream(session->request_streambuf.get()); 273 if(content.size() > 0) { 274 auto header_it = header.find("Content-Length"); 275 if(header_it == header.end()) { 276 header_it = header.find("Transfer-Encoding"); 277 if(header_it == header.end() || header_it->second != "chunked") 278 write_stream << "Content-Length: " << content.size() << "\r\n"; 279 } 280 } 281 write_stream << "\r\n"; 282 write_stream.write(content.data(), static_cast<std::streamsize>(content.size())); 283 284 connect(session); 285 } 286 287 /// Asynchronous request where running Client's io_service is required. 288 /// Do not use concurrently with the synchronous request functions. 289 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,string_view content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)290 void request(const std::string &method, const std::string &path, string_view content, 291 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 292 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_)); 293 } 294 295 /// Asynchronous request where running Client's io_service is required. 296 /// Do not use concurrently with the synchronous request functions. 297 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)298 void request(const std::string &method, const std::string &path, 299 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 300 request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback_)); 301 } 302 303 /// Asynchronous request where running Client's io_service is required. 304 /// Do not use concurrently with the synchronous request functions. 305 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)306 void request(const std::string &method, std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 307 request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback_)); 308 } 309 310 /// Asynchronous request where running Client's io_service is required. 311 /// Do not use concurrently with the synchronous request functions. 312 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::istream & content,const CaseInsensitiveMultimap & header,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)313 void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header, 314 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 315 auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); 316 std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed 317 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); 318 session->callback = [this, session_weak, request_callback](const error_code &ec) { 319 if(auto session = session_weak.lock()) { 320 if(session->response->content.end) { 321 session->connection->cancel_timeout(); 322 session->connection->in_use = false; 323 } 324 { 325 LockGuard lock(this->connections_mutex); 326 327 // Remove unused connections, but keep one open for HTTP persistent connection: 328 std::size_t unused_connections = 0; 329 for(auto it = this->connections.begin(); it != this->connections.end();) { 330 if(ec && session->connection == *it) 331 it = this->connections.erase(it); 332 else if((*it)->in_use) 333 ++it; 334 else { 335 ++unused_connections; 336 if(unused_connections > 1) 337 it = this->connections.erase(it); 338 else 339 ++it; 340 } 341 } 342 } 343 344 if(*request_callback) 345 (*request_callback)(session->response, ec); 346 } 347 }; 348 349 content.seekg(0, std::ios::end); 350 auto content_length = content.tellg(); 351 content.seekg(0, std::ios::beg); 352 std::ostream write_stream(session->request_streambuf.get()); 353 if(content_length > 0) { 354 auto header_it = header.find("Content-Length"); 355 if(header_it == header.end()) { 356 header_it = header.find("Transfer-Encoding"); 357 if(header_it == header.end() || header_it->second != "chunked") 358 write_stream << "Content-Length: " << content_length << "\r\n"; 359 } 360 } 361 write_stream << "\r\n"; 362 if(content_length > 0) 363 write_stream << content.rdbuf(); 364 365 connect(session); 366 } 367 368 /// Asynchronous request where running Client's io_service is required. 369 /// Do not use concurrently with the synchronous request functions. 370 /// When requesting Server-Sent Events: request_callback might be called more than twice, first call with empty contents on open, and with ec = error::eof on last call request(const std::string & method,const std::string & path,std::istream & content,std::function<void (std::shared_ptr<Response>,const error_code &)> && request_callback_)371 void request(const std::string &method, const std::string &path, std::istream &content, 372 std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { 373 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback_)); 374 } 375 376 /// Close connections. stop()377 void stop() noexcept { 378 LockGuard lock(connections_mutex); 379 for(auto it = connections.begin(); it != connections.end();) { 380 (*it)->close(); 381 it = connections.erase(it); 382 } 383 } 384 ~ClientBase()385 virtual ~ClientBase() noexcept { 386 handler_runner->stop(); 387 stop(); 388 if(internal_io_service) 389 io_service->stop(); 390 } 391 392 protected: 393 bool internal_io_service = false; 394 395 std::string host; 396 unsigned short port; 397 unsigned short default_port; 398 399 std::unique_ptr<std::pair<std::string, std::string>> host_port; 400 401 Mutex connections_mutex; 402 std::unordered_set<std::shared_ptr<Connection>> connections GUARDED_BY(connections_mutex); 403 404 std::shared_ptr<ScopeRunner> handler_runner; 405 406 Mutex synchronous_request_mutex; 407 bool synchronous_request_called GUARDED_BY(synchronous_request_mutex) = false; 408 ClientBase(const std::string & host_port,unsigned short default_port)409 ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) { 410 auto parsed_host_port = parse_host_port(host_port, default_port); 411 host = parsed_host_port.first; 412 port = parsed_host_port.second; 413 } 414 415 template <typename ContentType> sync_request(const std::string & method,const std::string & path,ContentType & content,const CaseInsensitiveMultimap & header)416 std::shared_ptr<Response> sync_request(const std::string &method, const std::string &path, ContentType &content, const CaseInsensitiveMultimap &header) { 417 { 418 LockGuard lock(synchronous_request_mutex); 419 if(!synchronous_request_called) { 420 if(io_service) // Throw if io_service already set 421 throw make_error_code::make_error_code(errc::operation_not_permitted); 422 io_service = std::make_shared<io_context>(); 423 internal_io_service = true; 424 auto io_service_ = io_service; 425 std::thread thread([io_service_] { 426 auto work = make_work_guard(*io_service_); 427 io_service_->run(); 428 }); 429 thread.detach(); 430 synchronous_request_called = true; 431 } 432 } 433 434 std::shared_ptr<Response> response; 435 std::promise<std::shared_ptr<Response>> response_promise; 436 auto stop_future_handlers = std::make_shared<bool>(false); 437 request(method, path, content, header, [&response, &response_promise, stop_future_handlers](std::shared_ptr<Response> response_, error_code ec) { 438 if(*stop_future_handlers) 439 return; 440 441 if(!response) 442 response = response_; 443 else if(!ec) { 444 if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) { 445 ec = make_error_code::make_error_code(errc::message_size); 446 response->close(); 447 } 448 else { 449 // Move partial response_ content to response: 450 auto &source = response_->streambuf; 451 auto &target = response->streambuf; 452 target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); 453 source.consume(source.size()); 454 } 455 } 456 457 if(ec) { 458 response_promise.set_exception(std::make_exception_ptr(system_error(ec))); 459 *stop_future_handlers = true; 460 } 461 else if(response_->content.end) 462 response_promise.set_value(response); 463 }); 464 465 return response_promise.get_future().get(); 466 } 467 get_connection()468 std::shared_ptr<Connection> get_connection() noexcept { 469 std::shared_ptr<Connection> connection; 470 LockGuard lock(connections_mutex); 471 472 if(!io_service) { 473 io_service = std::make_shared<io_context>(); 474 internal_io_service = true; 475 } 476 477 for(auto it = connections.begin(); it != connections.end(); ++it) { 478 if(!(*it)->in_use) { 479 connection = *it; 480 break; 481 } 482 } 483 if(!connection) { 484 connection = create_connection(); 485 connections.emplace(connection); 486 } 487 connection->attempt_reconnect = true; 488 connection->in_use = true; 489 490 if(!host_port) { 491 if(config.proxy_server.empty()) 492 host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(host, std::to_string(port))); 493 else { 494 auto proxy_host_port = parse_host_port(config.proxy_server, 8080); 495 host_port = std::unique_ptr<std::pair<std::string, std::string>>(new std::pair<std::string, std::string>(proxy_host_port.first, std::to_string(proxy_host_port.second))); 496 } 497 } 498 499 return connection; 500 } 501 parse_host_port(const std::string & host_port,unsigned short default_port) const502 std::pair<std::string, unsigned short> parse_host_port(const std::string &host_port, unsigned short default_port) const noexcept { 503 std::string host, port; 504 host.reserve(host_port.size()); 505 bool parse_port = false; 506 int square_count = 0; // To parse IPv6 addresses 507 for(auto chr : host_port) { 508 if(chr == '[') 509 ++square_count; 510 else if(chr == ']') 511 --square_count; 512 else if(square_count == 0 && chr == ':') 513 parse_port = true; 514 else if(!parse_port) 515 host += chr; 516 else 517 port += chr; 518 } 519 520 if(port.empty()) 521 return {std::move(host), default_port}; 522 else { 523 try { 524 return {std::move(host), static_cast<unsigned short>(std::stoul(port))}; 525 } 526 catch(...) { 527 return {std::move(host), default_port}; 528 } 529 } 530 } 531 532 virtual std::shared_ptr<Connection> create_connection() noexcept = 0; 533 virtual void connect(const std::shared_ptr<Session> &) = 0; 534 create_request_header(const std::string & method,const std::string & path,const CaseInsensitiveMultimap & header) const535 std::unique_ptr<asio::streambuf> create_request_header(const std::string &method, const std::string &path, const CaseInsensitiveMultimap &header) const { 536 auto corrected_path = path; 537 if(corrected_path == "") 538 corrected_path = "/"; 539 if(!config.proxy_server.empty() && std::is_same<socket_type, asio::ip::tcp::socket>::value) 540 corrected_path = "http://" + host + ':' + std::to_string(port) + corrected_path; 541 542 std::unique_ptr<asio::streambuf> streambuf(new asio::streambuf()); 543 std::ostream write_stream(streambuf.get()); 544 write_stream << method << " " << corrected_path << " HTTP/1.1\r\n"; 545 write_stream << "Host: " << host; 546 if(port != default_port) 547 write_stream << ':' << std::to_string(port); 548 write_stream << "\r\n"; 549 for(auto &h : header) 550 write_stream << h.first << ": " << h.second << "\r\n"; 551 return streambuf; 552 } 553 write(const std::shared_ptr<Session> & session)554 void write(const std::shared_ptr<Session> &session) { 555 session->connection->set_timeout(config.timeout); 556 asio::async_write(*session->connection->socket, session->request_streambuf->data(), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { 557 auto lock = session->connection->handler_runner->continue_lock(); 558 if(!lock) 559 return; 560 if(!ec) 561 this->read(session); 562 else { 563 if(session->connection->attempt_reconnect && ec != error::operation_aborted) 564 reconnect(session, ec); 565 else 566 session->callback(ec); 567 } 568 }); 569 } 570 read(const std::shared_ptr<Session> & session)571 void read(const std::shared_ptr<Session> &session) { 572 asio::async_read_until(*session->connection->socket, session->response->streambuf, HeaderEndMatch(), [this, session](const error_code &ec, std::size_t bytes_transferred) { 573 auto lock = session->connection->handler_runner->continue_lock(); 574 if(!lock) 575 return; 576 577 if(!ec) { 578 session->connection->attempt_reconnect = true; 579 std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred; 580 581 if(!ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) { 582 session->callback(make_error_code::make_error_code(errc::protocol_error)); 583 return; 584 } 585 586 auto header_it = session->response->header.find("Content-Length"); 587 if(header_it != session->response->header.end()) { 588 auto content_length = std::stoull(header_it->second); 589 if(content_length > num_additional_bytes) 590 this->read_content(session, content_length - num_additional_bytes); 591 else 592 session->callback(ec); 593 } 594 else if((header_it = session->response->header.find("Transfer-Encoding")) != session->response->header.end() && header_it->second == "chunked") { 595 // Expect hex number to not exceed 16 bytes (64-bit number), but take into account previous additional read bytes 596 auto chunk_size_streambuf = std::make_shared<asio::streambuf>(std::max<std::size_t>(16 + 2, session->response->streambuf.size())); 597 598 // Move leftover bytes 599 auto &source = session->response->streambuf; 600 auto &target = *chunk_size_streambuf; 601 target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); 602 source.consume(source.size()); 603 604 this->read_chunked_transfer_encoded(session, chunk_size_streambuf); 605 } 606 else if(session->response->http_version < "1.1" || ((header_it = session->response->header.find("Connection")) != session->response->header.end() && header_it->second == "close")) 607 read_content(session); 608 else if(((header_it = session->response->header.find("Content-Type")) != session->response->header.end() && header_it->second == "text/event-stream")) { 609 auto events_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size); 610 611 // Move leftover bytes 612 auto &source = session->response->streambuf; 613 auto &target = *events_streambuf; 614 target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); 615 source.consume(source.size()); 616 617 session->callback(ec); // Connection to a Server-Sent Events resource is opened 618 619 this->read_server_sent_event(session, events_streambuf); 620 } 621 else 622 session->callback(ec); 623 } 624 else { 625 if(session->connection->attempt_reconnect && ec != error::operation_aborted) 626 reconnect(session, ec); 627 else 628 session->callback(ec); 629 } 630 }); 631 } 632 reconnect(const std::shared_ptr<Session> & session,const error_code & ec)633 void reconnect(const std::shared_ptr<Session> &session, const error_code &ec) { 634 LockGuard lock(connections_mutex); 635 auto it = connections.find(session->connection); 636 if(it != connections.end()) { 637 connections.erase(it); 638 session->connection = create_connection(); 639 session->connection->attempt_reconnect = false; 640 session->connection->in_use = true; 641 session->response = std::shared_ptr<Response>(new Response(this->config.max_response_streambuf_size, session->connection)); 642 connections.emplace(session->connection); 643 lock.unlock(); 644 this->connect(session); 645 } 646 else { 647 lock.unlock(); 648 session->callback(ec); 649 } 650 } 651 read_content(const std::shared_ptr<Session> & session,std::size_t remaining_length)652 void read_content(const std::shared_ptr<Session> &session, std::size_t remaining_length) { 653 asio::async_read(*session->connection->socket, session->response->streambuf, asio::transfer_exactly(remaining_length), [this, session, remaining_length](const error_code &ec, std::size_t bytes_transferred) { 654 auto lock = session->connection->handler_runner->continue_lock(); 655 if(!lock) 656 return; 657 658 if(!ec) { 659 if(session->response->streambuf.size() == session->response->streambuf.max_size() && remaining_length > bytes_transferred) { 660 session->response->content.end = false; 661 session->callback(ec); 662 session->response = std::shared_ptr<Response>(new Response(*session->response)); 663 this->read_content(session, remaining_length - bytes_transferred); 664 } 665 else 666 session->callback(ec); 667 } 668 else 669 session->callback(ec); 670 }); 671 } 672 673 /// Ignore end of file error codes clean_error_code(const error_code & ec)674 virtual error_code clean_error_code(const error_code &ec) { 675 return ec == error::eof ? error_code() : ec; 676 } 677 read_content(const std::shared_ptr<Session> & session)678 void read_content(const std::shared_ptr<Session> &session) { 679 asio::async_read(*session->connection->socket, session->response->streambuf, [this, session](const error_code &ec_, std::size_t /*bytes_transferred*/) { 680 auto lock = session->connection->handler_runner->continue_lock(); 681 if(!lock) 682 return; 683 684 auto ec = clean_error_code(ec_); 685 686 if(!ec) { 687 { 688 LockGuard lock(this->connections_mutex); 689 this->connections.erase(session->connection); 690 } 691 if(session->response->streambuf.size() == session->response->streambuf.max_size()) { 692 session->response->content.end = false; 693 session->callback(ec); 694 session->response = std::shared_ptr<Response>(new Response(*session->response)); 695 this->read_content(session); 696 } 697 else 698 session->callback(ec); 699 } 700 else 701 session->callback(ec); 702 }); 703 } 704 read_chunked_transfer_encoded(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunk_size_streambuf)705 void read_chunked_transfer_encoded(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunk_size_streambuf) { 706 asio::async_read_until(*session->connection->socket, *chunk_size_streambuf, "\r\n", [this, session, chunk_size_streambuf](const error_code &ec, size_t bytes_transferred) { 707 auto lock = session->connection->handler_runner->continue_lock(); 708 if(!lock) 709 return; 710 711 if(!ec) { 712 std::istream istream(chunk_size_streambuf.get()); 713 std::string line; 714 std::getline(istream, line); 715 bytes_transferred -= line.size() + 1; 716 unsigned long chunk_size = 0; 717 try { 718 chunk_size = std::stoul(line, 0, 16); 719 } 720 catch(...) { 721 session->callback(make_error_code::make_error_code(errc::protocol_error)); 722 return; 723 } 724 725 if(chunk_size == 0) { 726 session->callback(error_code()); 727 return; 728 } 729 730 if(chunk_size + session->response->streambuf.size() > session->response->streambuf.max_size()) { 731 session->response->content.end = false; 732 session->callback(ec); 733 session->response = std::shared_ptr<Response>(new Response(*session->response)); 734 } 735 736 auto num_additional_bytes = chunk_size_streambuf->size() - bytes_transferred; 737 738 auto bytes_to_move = std::min<std::size_t>(chunk_size, num_additional_bytes); 739 if(bytes_to_move > 0) { 740 auto &source = *chunk_size_streambuf; 741 auto &target = session->response->streambuf; 742 target.commit(asio::buffer_copy(target.prepare(bytes_to_move), source.data(), bytes_to_move)); 743 source.consume(bytes_to_move); 744 } 745 746 if(chunk_size > num_additional_bytes) { 747 asio::async_read(*session->connection->socket, session->response->streambuf, asio::transfer_exactly(chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf](const error_code &ec, size_t /*bytes_transferred*/) { 748 auto lock = session->connection->handler_runner->continue_lock(); 749 if(!lock) 750 return; 751 752 if(!ec) { 753 // Remove "\r\n" 754 auto null_buffer = std::make_shared<asio::streambuf>(2); 755 asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { 756 auto lock = session->connection->handler_runner->continue_lock(); 757 if(!lock) 758 return; 759 if(!ec) 760 read_chunked_transfer_encoded(session, chunk_size_streambuf); 761 else 762 session->callback(ec); 763 }); 764 } 765 else 766 session->callback(ec); 767 }); 768 } 769 else if(2 + chunk_size > num_additional_bytes) { // If only end of chunk remains unread (\n or \r\n) 770 // Remove "\r\n" 771 if(2 + chunk_size - num_additional_bytes == 1) 772 istream.get(); 773 auto null_buffer = std::make_shared<asio::streambuf>(2); 774 asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2 + chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { 775 auto lock = session->connection->handler_runner->continue_lock(); 776 if(!lock) 777 return; 778 if(!ec) 779 read_chunked_transfer_encoded(session, chunk_size_streambuf); 780 else 781 session->callback(ec); 782 }); 783 } 784 else { 785 // Remove "\r\n" 786 istream.get(); 787 istream.get(); 788 789 read_chunked_transfer_encoded(session, chunk_size_streambuf); 790 } 791 } 792 else 793 session->callback(ec); 794 }); 795 } 796 read_server_sent_event(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & events_streambuf)797 void read_server_sent_event(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &events_streambuf) { 798 asio::async_read_until(*session->connection->socket, *events_streambuf, HeaderEndMatch(), [this, session, events_streambuf](const error_code &ec, std::size_t /*bytes_transferred*/) { 799 auto lock = session->connection->handler_runner->continue_lock(); 800 if(!lock) 801 return; 802 803 if(!ec) { 804 session->response->content.end = false; 805 std::istream istream(events_streambuf.get()); 806 std::ostream ostream(&session->response->streambuf); 807 std::string line; 808 while(std::getline(istream, line) && !line.empty() && !(line.back() == '\r' && line.size() == 1)) { 809 ostream.write(line.data(), static_cast<std::streamsize>(line.size() - (line.back() == '\r' ? 1 : 0))); 810 ostream.put('\n'); 811 } 812 813 session->callback(ec); 814 session->response = std::shared_ptr<Response>(new Response(*session->response)); 815 read_server_sent_event(session, events_streambuf); 816 } 817 else 818 session->callback(ec); 819 }); 820 } 821 }; 822 823 template <class socket_type> 824 class Client : public ClientBase<socket_type> {}; 825 826 using HTTP = asio::ip::tcp::socket; 827 828 template <> 829 class Client<HTTP> : public ClientBase<HTTP> { 830 public: 831 /** 832 * Constructs a client object. 833 * 834 * @param server_port_path Server resource given by host[:port][/path] 835 */ Client(const std::string & server_port_path)836 Client(const std::string &server_port_path) noexcept : ClientBase<HTTP>::ClientBase(server_port_path, 80) {} 837 838 protected: create_connection()839 std::shared_ptr<Connection> create_connection() noexcept override { 840 return std::make_shared<Connection>(handler_runner, *io_service); 841 } 842 connect(const std::shared_ptr<Session> & session)843 void connect(const std::shared_ptr<Session> &session) override { 844 if(!session->connection->socket->lowest_layer().is_open()) { 845 auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service); 846 session->connection->set_timeout(config.timeout_connect); 847 async_resolve(*resolver, *host_port, [this, session, resolver](const error_code &ec, resolver_results results) { 848 session->connection->cancel_timeout(); 849 auto lock = session->connection->handler_runner->continue_lock(); 850 if(!lock) 851 return; 852 if(!ec) { 853 session->connection->set_timeout(config.timeout_connect); 854 asio::async_connect(*session->connection->socket, results, [this, session, resolver](const error_code &ec, async_connect_endpoint /*endpoint*/) { 855 session->connection->cancel_timeout(); 856 auto lock = session->connection->handler_runner->continue_lock(); 857 if(!lock) 858 return; 859 if(!ec) { 860 asio::ip::tcp::no_delay option(true); 861 error_code ec; 862 session->connection->socket->set_option(option, ec); 863 this->write(session); 864 } 865 else 866 session->callback(ec); 867 }); 868 } 869 else 870 session->callback(ec); 871 }); 872 } 873 else 874 write(session); 875 } 876 }; 877 } // namespace SimpleWeb 878 879 #endif /* SIMPLE_WEB_CLIENT_HTTP_HPP */ 880