1 #ifndef SIMPLE_WEB_SERVER_HTTP_HPP 2 #define SIMPLE_WEB_SERVER_HTTP_HPP 3 4 #include "asio_compatibility.hpp" 5 #include "mutex.hpp" 6 #include "utility.hpp" 7 #include <functional> 8 #include <iostream> 9 #include <limits> 10 #include <list> 11 #include <map> 12 #include <sstream> 13 #include <thread> 14 #include <unordered_set> 15 16 // Late 2017 TODO: remove the following checks and always use std::regex 17 #ifdef USE_BOOST_REGEX 18 #include <boost/regex.hpp> 19 namespace SimpleWeb { 20 namespace regex = boost; 21 } 22 #else 23 #include <regex> 24 namespace SimpleWeb { 25 namespace regex = std; 26 } 27 #endif 28 29 namespace SimpleWeb { 30 template <class socket_type> 31 class Server; 32 33 template <class socket_type> 34 class ServerBase { 35 protected: 36 class Connection; 37 class Session; 38 39 public: 40 /// Response class where the content of the response is sent to client when the object is about to be destroyed. 41 class Response : public std::enable_shared_from_this<Response>, public std::ostream { 42 friend class ServerBase<socket_type>; 43 friend class Server<socket_type>; 44 45 std::unique_ptr<asio::streambuf> streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf()); 46 47 std::shared_ptr<Session> session; 48 long timeout_content; 49 50 Mutex send_queue_mutex; 51 std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue GUARDED_BY(send_queue_mutex); 52 Response(std::shared_ptr<Session> session_,long timeout_content)53 Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content) { 54 rdbuf(streambuf.get()); 55 } 56 57 template <typename size_type> write_header(const CaseInsensitiveMultimap & header,size_type size)58 void write_header(const CaseInsensitiveMultimap &header, size_type size) { 59 bool content_length_written = false; 60 bool chunked_transfer_encoding = false; 61 bool event_stream = false; 62 for(auto &field : header) { 63 if(!content_length_written && case_insensitive_equal(field.first, "content-length")) 64 content_length_written = true; 65 else if(!chunked_transfer_encoding && case_insensitive_equal(field.first, "transfer-encoding") && case_insensitive_equal(field.second, "chunked")) 66 chunked_transfer_encoding = true; 67 else if(!event_stream && case_insensitive_equal(field.first, "content-type") && case_insensitive_equal(field.second, "text/event-stream")) 68 event_stream = true; 69 70 *this << field.first << ": " << field.second << "\r\n"; 71 } 72 if(!content_length_written && !chunked_transfer_encoding && !event_stream && !close_connection_after_response) 73 *this << "Content-Length: " << size << "\r\n\r\n"; 74 else 75 *this << "\r\n"; 76 } 77 send_from_queue()78 void send_from_queue() REQUIRES(send_queue_mutex) { 79 auto buffer = send_queue.begin()->first->data(); 80 auto self = this->shared_from_this(); 81 post(session->connection->write_strand, [self, buffer] { 82 auto lock = self->session->connection->handler_runner->continue_lock(); 83 if(!lock) 84 return; 85 asio::async_write(*self->session->connection->socket, buffer, [self](const error_code &ec, std::size_t /*bytes_transferred*/) { 86 auto lock = self->session->connection->handler_runner->continue_lock(); 87 if(!lock) 88 return; 89 { 90 LockGuard lock(self->send_queue_mutex); 91 if(!ec) { 92 auto it = self->send_queue.begin(); 93 auto callback = std::move(it->second); 94 self->send_queue.erase(it); 95 if(self->send_queue.size() > 0) 96 self->send_from_queue(); 97 98 lock.unlock(); 99 if(callback) 100 callback(ec); 101 } 102 else { 103 // All handlers in the queue is called with ec: 104 std::list<std::function<void(const error_code &)>> callbacks; 105 for(auto &pair : self->send_queue) { 106 if(pair.second) 107 callbacks.emplace_back(std::move(pair.second)); 108 } 109 self->send_queue.clear(); 110 111 lock.unlock(); 112 for(auto &callback : callbacks) 113 callback(ec); 114 } 115 } 116 }); 117 }); 118 } 119 send_on_delete(const std::function<void (const error_code &)> & callback=nullptr)120 void send_on_delete(const std::function<void(const error_code &)> &callback = nullptr) noexcept { 121 auto buffer = streambuf->data(); 122 auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write 123 post(session->connection->write_strand, [self, buffer, callback] { 124 auto lock = self->session->connection->handler_runner->continue_lock(); 125 if(!lock) 126 return; 127 asio::async_write(*self->session->connection->socket, buffer, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) { 128 auto lock = self->session->connection->handler_runner->continue_lock(); 129 if(!lock) 130 return; 131 if(callback) 132 callback(ec); 133 }); 134 }); 135 } 136 137 public: size()138 std::size_t size() noexcept { 139 return streambuf->size(); 140 } 141 142 /// Send the content of the response stream to client. The callback is called when the send has completed. 143 /// 144 /// Use this function if you need to recursively send parts of a longer message, or when using server-sent events. send(std::function<void (const error_code &)> callback=nullptr)145 void send(std::function<void(const error_code &)> callback = nullptr) noexcept { 146 std::shared_ptr<asio::streambuf> streambuf = std::move(this->streambuf); 147 this->streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf()); 148 rdbuf(this->streambuf.get()); 149 150 LockGuard lock(send_queue_mutex); 151 send_queue.emplace_back(std::move(streambuf), std::move(callback)); 152 if(send_queue.size() == 1) 153 send_from_queue(); 154 } 155 156 /// Write directly to stream buffer using std::ostream::write. write(const char_type * ptr,std::streamsize n)157 void write(const char_type *ptr, std::streamsize n) { 158 std::ostream::write(ptr, n); 159 } 160 161 /// Convenience function for writing status line, potential header fields, and empty content. write(StatusCode status_code=StatusCode::success_ok,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())162 void write(StatusCode status_code = StatusCode::success_ok, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 163 *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n"; 164 write_header(header, 0); 165 } 166 167 /// Convenience function for writing status line, header fields, and content. write(StatusCode status_code,string_view content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())168 void write(StatusCode status_code, string_view content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 169 *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n"; 170 write_header(header, content.size()); 171 if(!content.empty()) 172 *this << content; 173 } 174 175 /// Convenience function for writing status line, header fields, and content. write(StatusCode status_code,std::istream & content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())176 void write(StatusCode status_code, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 177 *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n"; 178 content.seekg(0, std::ios::end); 179 auto size = content.tellg(); 180 content.seekg(0, std::ios::beg); 181 write_header(header, size); 182 if(size) 183 *this << content.rdbuf(); 184 } 185 186 /// Convenience function for writing success status line, header fields, and content. write(string_view content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())187 void write(string_view content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 188 write(StatusCode::success_ok, content, header); 189 } 190 191 /// Convenience function for writing success status line, header fields, and content. write(std::istream & content,const CaseInsensitiveMultimap & header=CaseInsensitiveMultimap ())192 void write(std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { 193 write(StatusCode::success_ok, content, header); 194 } 195 196 /// Convenience function for writing success status line, and header fields. write(const CaseInsensitiveMultimap & header)197 void write(const CaseInsensitiveMultimap &header) { 198 write(StatusCode::success_ok, std::string(), header); 199 } 200 201 /// If set to true, force server to close the connection after the response have been sent. 202 /// 203 /// This is useful when implementing a HTTP/1.0-server sending content 204 /// without specifying the content length. 205 bool close_connection_after_response = false; 206 }; 207 208 class Content : public std::istream { 209 friend class ServerBase<socket_type>; 210 211 public: size()212 std::size_t size() noexcept { 213 return streambuf.size(); 214 } 215 /// Convenience function to return content as std::string. string()216 std::string string() noexcept { 217 return std::string(asio::buffers_begin(streambuf.data()), asio::buffers_end(streambuf.data())); 218 } 219 220 private: 221 asio::streambuf &streambuf; Content(asio::streambuf & streambuf)222 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {} 223 }; 224 225 class Request { 226 friend class ServerBase<socket_type>; 227 friend class Server<socket_type>; 228 friend class Session; 229 230 asio::streambuf streambuf; 231 std::weak_ptr<Connection> connection; 232 std::string optimization = std::to_string(0); // TODO: figure out what goes wrong in gcc optimization without this line 233 Request(std::size_t max_request_streambuf_size,const std::shared_ptr<Connection> & connection_)234 Request(std::size_t max_request_streambuf_size, const std::shared_ptr<Connection> &connection_) noexcept : streambuf(max_request_streambuf_size), connection(connection_), content(streambuf) {} 235 236 public: 237 std::string method, path, query_string, http_version; 238 239 Content content; 240 241 CaseInsensitiveMultimap header; 242 243 /// The result of the resource regular expression match of the request path. 244 regex::smatch path_match; 245 246 /// The time point when the request header was fully read. 247 std::chrono::system_clock::time_point header_read_time; 248 remote_endpoint() const249 asio::ip::tcp::endpoint remote_endpoint() const noexcept { 250 try { 251 if(auto connection = this->connection.lock()) 252 return connection->socket->lowest_layer().remote_endpoint(); 253 } 254 catch(...) { 255 } 256 return asio::ip::tcp::endpoint(); 257 } 258 local_endpoint() const259 asio::ip::tcp::endpoint local_endpoint() const noexcept { 260 try { 261 if(auto connection = this->connection.lock()) 262 return connection->socket->lowest_layer().local_endpoint(); 263 } 264 catch(...) { 265 } 266 return asio::ip::tcp::endpoint(); 267 } 268 269 /// Deprecated, please use remote_endpoint().address().to_string() instead. remote_endpoint_address() const270 SW_DEPRECATED std::string remote_endpoint_address() const noexcept { 271 try { 272 if(auto connection = this->connection.lock()) 273 return connection->socket->lowest_layer().remote_endpoint().address().to_string(); 274 } 275 catch(...) { 276 } 277 return std::string(); 278 } 279 280 /// Deprecated, please use remote_endpoint().port() instead. remote_endpoint_port() const281 SW_DEPRECATED unsigned short remote_endpoint_port() const noexcept { 282 try { 283 if(auto connection = this->connection.lock()) 284 return connection->socket->lowest_layer().remote_endpoint().port(); 285 } 286 catch(...) { 287 } 288 return 0; 289 } 290 291 /// Returns query keys with percent-decoded values. parse_query_string() const292 CaseInsensitiveMultimap parse_query_string() const noexcept { 293 return SimpleWeb::QueryString::parse(query_string); 294 } 295 }; 296 297 protected: 298 class Connection : public std::enable_shared_from_this<Connection> { 299 public: 300 template <typename... Args> Connection(std::shared_ptr<ScopeRunner> handler_runner_,Args &&...args)301 Connection(std::shared_ptr<ScopeRunner> handler_runner_, Args &&...args) noexcept : handler_runner(std::move(handler_runner_)), socket(new socket_type(std::forward<Args>(args)...)), write_strand(get_executor(socket->lowest_layer())) {} 302 303 std::shared_ptr<ScopeRunner> handler_runner; 304 305 std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable 306 307 /** 308 * Needed for TLS communication where async_write could be called outside of the io_context runners. 309 * For more information see https://stackoverflow.com/a/12801042. 310 */ 311 strand write_strand; 312 313 std::unique_ptr<asio::steady_timer> timer; 314 close()315 void close() noexcept { 316 error_code ec; 317 socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); 318 socket->lowest_layer().cancel(ec); 319 } 320 set_timeout(long seconds)321 void set_timeout(long seconds) noexcept { 322 if(seconds == 0) { 323 timer = nullptr; 324 return; 325 } 326 327 timer = make_steady_timer(*socket, std::chrono::seconds(seconds)); 328 std::weak_ptr<Connection> self_weak(this->shared_from_this()); // To avoid keeping Connection instance alive longer than needed 329 timer->async_wait([self_weak](const error_code &ec) { 330 if(!ec) { 331 if(auto self = self_weak.lock()) 332 self->close(); 333 } 334 }); 335 } 336 cancel_timeout()337 void cancel_timeout() noexcept { 338 if(timer) { 339 try { 340 timer->cancel(); 341 } 342 catch(...) { 343 } 344 } 345 } 346 }; 347 348 class Session { 349 public: Session(std::size_t max_request_streambuf_size,std::shared_ptr<Connection> connection_)350 Session(std::size_t max_request_streambuf_size, std::shared_ptr<Connection> connection_) noexcept : connection(std::move(connection_)), request(new Request(max_request_streambuf_size, connection)) {} 351 352 std::shared_ptr<Connection> connection; 353 std::shared_ptr<Request> request; 354 }; 355 356 public: 357 class Config { 358 friend class ServerBase<socket_type>; 359 Config(unsigned short port)360 Config(unsigned short port) noexcept : port(port) {} 361 362 public: 363 /// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. Set to 0 get an assigned port. 364 unsigned short port; 365 /// If io_service is not set, number of threads that the server will use when start() is called. 366 /// Defaults to 1 thread. 367 std::size_t thread_pool_size = 1; 368 /// Timeout on request completion. Defaults to 5 seconds. 369 long timeout_request = 5; 370 /// Timeout on request/response content completion. Defaults to 300 seconds. 371 long timeout_content = 300; 372 /// Maximum size of request stream buffer. Defaults to architecture maximum. 373 /// Reaching this limit will result in a message_size error code. 374 std::size_t max_request_streambuf_size = (std::numeric_limits<std::size_t>::max)(); 375 /// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation. 376 /// If empty, the address will be any address. 377 std::string address; 378 /// Set to false to avoid binding the socket to an address that is already in use. Defaults to true. 379 bool reuse_address = true; 380 /// Make use of RFC 7413 or TCP Fast Open (TFO) 381 bool fast_open = false; 382 }; 383 /// Set before calling start(). 384 Config config; 385 386 private: 387 class regex_orderable : public regex::regex { 388 public: 389 std::string str; 390 regex_orderable(const char * regex_cstr)391 regex_orderable(const char *regex_cstr) : regex::regex(regex_cstr), str(regex_cstr) {} regex_orderable(std::string regex_str_)392 regex_orderable(std::string regex_str_) : regex::regex(regex_str_), str(std::move(regex_str_)) {} operator <(const regex_orderable & rhs) const393 bool operator<(const regex_orderable &rhs) const noexcept { 394 return str < rhs.str; 395 } 396 }; 397 398 public: 399 /// Use this container to add resources for specific request paths depending on the given regex and method. 400 /// Warning: do not add or remove resources after start() is called 401 std::map<regex_orderable, std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>>> resource; 402 403 /// If the request path does not match a resource regex, this function is called. 404 std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>> default_resource; 405 406 /// Called when an error occurs. 407 std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const error_code &)> on_error; 408 409 /// Called on upgrade requests. 410 std::function<void(std::unique_ptr<socket_type> &, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade; 411 412 /// If you want to reuse an already created asio::io_service, store its pointer here before calling start(). 413 std::shared_ptr<io_context> io_service; 414 415 /// Start the server. 416 /// If io_service is not set, an internal io_service is created instead. 417 /// The callback argument is called after the server is accepting connections, 418 /// where its parameter contains the assigned port. start(const std::function<void (unsigned short)> & callback=nullptr)419 void start(const std::function<void(unsigned short /*port*/)> &callback = nullptr) { 420 std::unique_lock<std::mutex> lock(start_stop_mutex); 421 422 asio::ip::tcp::endpoint endpoint; 423 if(!config.address.empty()) 424 endpoint = asio::ip::tcp::endpoint(make_address(config.address), config.port); 425 else 426 endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v6(), config.port); 427 428 if(!io_service) { 429 io_service = std::make_shared<io_context>(); 430 internal_io_service = true; 431 } 432 433 if(!acceptor) 434 acceptor = std::unique_ptr<asio::ip::tcp::acceptor>(new asio::ip::tcp::acceptor(*io_service)); 435 try { 436 acceptor->open(endpoint.protocol()); 437 } 438 catch(const system_error &error) { 439 if(error.code() == asio::error::address_family_not_supported && config.address.empty()) { 440 endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port); 441 acceptor->open(endpoint.protocol()); 442 } 443 else 444 throw; 445 } 446 acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address)); 447 if(config.fast_open) { 448 #if defined(__linux__) && defined(TCP_FASTOPEN) 449 const int qlen = 5; // This seems to be the value that is used in other examples. 450 error_code ec; 451 acceptor->set_option(asio::detail::socket_option::integer<IPPROTO_TCP, TCP_FASTOPEN>(qlen), ec); 452 #endif // End Linux 453 } 454 acceptor->bind(endpoint); 455 456 after_bind(); 457 458 auto port = acceptor->local_endpoint().port(); 459 460 acceptor->listen(); 461 accept(); 462 463 if(internal_io_service && io_service->stopped()) 464 restart(*io_service); 465 466 if(callback) 467 post(*io_service, [callback, port] { 468 callback(port); 469 }); 470 471 if(internal_io_service) { 472 // If thread_pool_size>1, start m_io_service.run() in (thread_pool_size-1) threads for thread-pooling 473 threads.clear(); 474 for(std::size_t c = 1; c < config.thread_pool_size; c++) { 475 threads.emplace_back([this]() { 476 this->io_service->run(); 477 }); 478 } 479 480 lock.unlock(); 481 482 // Main thread 483 if(config.thread_pool_size > 0) 484 io_service->run(); 485 486 lock.lock(); 487 488 // Wait for the rest of the threads, if any, to finish as well 489 for(auto &t : threads) 490 t.join(); 491 } 492 } 493 494 /// Stop accepting new requests, and close current connections. stop()495 void stop() noexcept { 496 std::lock_guard<std::mutex> lock(start_stop_mutex); 497 498 if(acceptor) { 499 error_code ec; 500 acceptor->close(ec); 501 502 { 503 LockGuard lock(connections->mutex); 504 for(auto &connection : connections->set) 505 connection->close(); 506 connections->set.clear(); 507 } 508 509 if(internal_io_service) 510 io_service->stop(); 511 } 512 } 513 ~ServerBase()514 virtual ~ServerBase() noexcept { 515 handler_runner->stop(); 516 stop(); 517 } 518 519 protected: 520 std::mutex start_stop_mutex; 521 522 bool internal_io_service = false; 523 524 std::unique_ptr<asio::ip::tcp::acceptor> acceptor; 525 std::vector<std::thread> threads; 526 527 struct Connections { 528 Mutex mutex; 529 std::unordered_set<Connection *> set GUARDED_BY(mutex); 530 }; 531 std::shared_ptr<Connections> connections; 532 533 std::shared_ptr<ScopeRunner> handler_runner; 534 ServerBase(unsigned short port)535 ServerBase(unsigned short port) noexcept : config(port), connections(new Connections()), handler_runner(new ScopeRunner()) {} 536 after_bind()537 virtual void after_bind() {} 538 virtual void accept() = 0; 539 540 template <typename... Args> create_connection(Args &&...args)541 std::shared_ptr<Connection> create_connection(Args &&...args) noexcept { 542 auto connections = this->connections; 543 auto connection = std::shared_ptr<Connection>(new Connection(handler_runner, std::forward<Args>(args)...), [connections](Connection *connection) { 544 { 545 LockGuard lock(connections->mutex); 546 auto it = connections->set.find(connection); 547 if(it != connections->set.end()) 548 connections->set.erase(it); 549 } 550 delete connection; 551 }); 552 { 553 LockGuard lock(connections->mutex); 554 connections->set.emplace(connection.get()); 555 } 556 return connection; 557 } 558 read(const std::shared_ptr<Session> & session)559 void read(const std::shared_ptr<Session> &session) { 560 session->connection->set_timeout(config.timeout_request); 561 asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, std::size_t bytes_transferred) { 562 auto lock = session->connection->handler_runner->continue_lock(); 563 if(!lock) 564 return; 565 session->request->header_read_time = std::chrono::system_clock::now(); 566 567 if(!ec) { 568 session->connection->set_timeout(this->config.timeout_content); 569 // request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs: 570 // "After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter" 571 // The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the 572 // streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content). 573 std::size_t num_additional_bytes = session->request->streambuf.size() - bytes_transferred; 574 575 if(!RequestMessage::parse(session->request->content, session->request->method, session->request->path, 576 session->request->query_string, session->request->http_version, session->request->header)) { 577 if(this->on_error) 578 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error)); 579 return; 580 } 581 582 // If content, read that as well 583 auto header_it = session->request->header.find("Content-Length"); 584 if(header_it != session->request->header.end()) { 585 unsigned long long content_length = 0; 586 try { 587 content_length = std::stoull(header_it->second); 588 } 589 catch(const std::exception &) { 590 if(this->on_error) 591 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error)); 592 return; 593 } 594 if(content_length > session->request->streambuf.max_size()) { 595 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content)); 596 response->write(StatusCode::client_error_payload_too_large); 597 if(this->on_error) 598 this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); 599 return; 600 } 601 if(content_length > num_additional_bytes) { 602 asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { 603 auto lock = session->connection->handler_runner->continue_lock(); 604 if(!lock) 605 return; 606 607 if(!ec) 608 this->find_resource(session); 609 else if(this->on_error) 610 this->on_error(session->request, ec); 611 }); 612 } 613 else 614 this->find_resource(session); 615 } 616 else if((header_it = session->request->header.find("Transfer-Encoding")) != session->request->header.end() && header_it->second == "chunked") { 617 // Expect hex number to not exceed 16 bytes (64-bit number), but take into account previous additional read bytes 618 auto chunk_size_streambuf = std::make_shared<asio::streambuf>(std::max<std::size_t>(16 + 2, session->request->streambuf.size())); 619 620 // Move leftover bytes 621 auto &source = session->request->streambuf; 622 auto &target = *chunk_size_streambuf; 623 target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); 624 source.consume(source.size()); 625 626 this->read_chunked_transfer_encoded(session, chunk_size_streambuf); 627 } 628 else 629 this->find_resource(session); 630 } 631 else if(this->on_error) 632 this->on_error(session->request, ec); 633 }); 634 } 635 read_chunked_transfer_encoded(const std::shared_ptr<Session> & session,const std::shared_ptr<asio::streambuf> & chunk_size_streambuf)636 void read_chunked_transfer_encoded(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &chunk_size_streambuf) { 637 asio::async_read_until(*session->connection->socket, *chunk_size_streambuf, "\r\n", [this, session, chunk_size_streambuf](const error_code &ec, size_t bytes_transferred) { 638 auto lock = session->connection->handler_runner->continue_lock(); 639 if(!lock) 640 return; 641 642 if(!ec) { 643 std::istream istream(chunk_size_streambuf.get()); 644 std::string line; 645 std::getline(istream, line); 646 bytes_transferred -= line.size() + 1; 647 unsigned long chunk_size = 0; 648 try { 649 chunk_size = std::stoul(line, 0, 16); 650 } 651 catch(...) { 652 if(this->on_error) 653 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error)); 654 return; 655 } 656 657 if(chunk_size == 0) { 658 this->find_resource(session); 659 return; 660 } 661 662 if(chunk_size + session->request->streambuf.size() > session->request->streambuf.max_size()) { 663 auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content)); 664 response->write(StatusCode::client_error_payload_too_large); 665 if(this->on_error) 666 this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); 667 return; 668 } 669 670 auto num_additional_bytes = chunk_size_streambuf->size() - bytes_transferred; 671 672 auto bytes_to_move = std::min<std::size_t>(chunk_size, num_additional_bytes); 673 if(bytes_to_move > 0) { 674 // Move leftover bytes 675 auto &source = *chunk_size_streambuf; 676 auto &target = session->request->streambuf; 677 target.commit(asio::buffer_copy(target.prepare(bytes_to_move), source.data(), bytes_to_move)); 678 source.consume(bytes_to_move); 679 } 680 681 if(chunk_size > num_additional_bytes) { 682 asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf](const error_code &ec, size_t /*bytes_transferred*/) { 683 auto lock = session->connection->handler_runner->continue_lock(); 684 if(!lock) 685 return; 686 687 if(!ec) { 688 // Remove "\r\n" 689 auto null_buffer = std::make_shared<asio::streambuf>(2); 690 asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { 691 auto lock = session->connection->handler_runner->continue_lock(); 692 if(!lock) 693 return; 694 if(!ec) 695 read_chunked_transfer_encoded(session, chunk_size_streambuf); 696 else 697 this->on_error(session->request, ec); 698 }); 699 } 700 else if(this->on_error) 701 this->on_error(session->request, ec); 702 }); 703 } 704 else if(2 + chunk_size > num_additional_bytes) { // If only end of chunk remains unread (\n or \r\n) 705 // Remove "\r\n" 706 if(2 + chunk_size - num_additional_bytes == 1) 707 istream.get(); 708 auto null_buffer = std::make_shared<asio::streambuf>(2); 709 asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2 + chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { 710 auto lock = session->connection->handler_runner->continue_lock(); 711 if(!lock) 712 return; 713 if(!ec) 714 read_chunked_transfer_encoded(session, chunk_size_streambuf); 715 else 716 this->on_error(session->request, ec); 717 }); 718 } 719 else { 720 // Remove "\r\n" 721 istream.get(); 722 istream.get(); 723 724 read_chunked_transfer_encoded(session, chunk_size_streambuf); 725 } 726 } 727 else if(this->on_error) 728 this->on_error(session->request, ec); 729 }); 730 } 731 find_resource(const std::shared_ptr<Session> & session)732 void find_resource(const std::shared_ptr<Session> &session) { 733 // Upgrade connection 734 if(on_upgrade) { 735 auto it = session->request->header.find("Upgrade"); 736 if(it != session->request->header.end()) { 737 // remove connection from connections 738 { 739 LockGuard lock(connections->mutex); 740 auto it = connections->set.find(session->connection.get()); 741 if(it != connections->set.end()) 742 connections->set.erase(it); 743 } 744 745 on_upgrade(session->connection->socket, session->request); 746 return; 747 } 748 } 749 // Find path- and method-match, and call write 750 for(auto ®ex_method : resource) { 751 auto it = regex_method.second.find(session->request->method); 752 if(it != regex_method.second.end()) { 753 regex::smatch sm_res; 754 if(regex::regex_match(session->request->path, sm_res, regex_method.first)) { 755 session->request->path_match = std::move(sm_res); 756 write(session, it->second); 757 return; 758 } 759 } 760 } 761 auto it = default_resource.find(session->request->method); 762 if(it != default_resource.end()) 763 write(session, it->second); 764 } 765 write(const std::shared_ptr<Session> & session,std::function<void (std::shared_ptr<typename ServerBase<socket_type>::Response>,std::shared_ptr<typename ServerBase<socket_type>::Request>)> & resource_function)766 void write(const std::shared_ptr<Session> &session, 767 std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)> &resource_function) { 768 auto response = std::shared_ptr<Response>(new Response(session, config.timeout_content), [this](Response *response_ptr) { 769 auto response = std::shared_ptr<Response>(response_ptr); 770 response->send_on_delete([this, response](const error_code &ec) { 771 response->session->connection->cancel_timeout(); 772 if(!ec) { 773 if(response->close_connection_after_response) 774 return; 775 776 auto range = response->session->request->header.equal_range("Connection"); 777 for(auto it = range.first; it != range.second; it++) { 778 if(case_insensitive_equal(it->second, "close")) 779 return; 780 else if(case_insensitive_equal(it->second, "keep-alive")) { 781 auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection); 782 this->read(new_session); 783 return; 784 } 785 } 786 if(response->session->request->http_version >= "1.1") { 787 auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection); 788 this->read(new_session); 789 return; 790 } 791 } 792 else if(this->on_error) 793 this->on_error(response->session->request, ec); 794 }); 795 }); 796 797 try { 798 resource_function(response, session->request); 799 } 800 catch(const std::exception &) { 801 if(on_error) 802 on_error(session->request, make_error_code::make_error_code(errc::operation_canceled)); 803 return; 804 } 805 } 806 }; 807 808 template <class socket_type> 809 class Server : public ServerBase<socket_type> {}; 810 811 using HTTP = asio::ip::tcp::socket; 812 813 template <> 814 class Server<HTTP> : public ServerBase<HTTP> { 815 public: 816 /// Constructs a server object. Server()817 Server() noexcept : ServerBase<HTTP>::ServerBase(80) {} 818 819 protected: accept()820 void accept() override { 821 auto connection = create_connection(*io_service); 822 823 acceptor->async_accept(*connection->socket, [this, connection](const error_code &ec) { 824 auto lock = connection->handler_runner->continue_lock(); 825 if(!lock) 826 return; 827 828 // Immediately start accepting a new connection (unless io_service has been stopped) 829 if(ec != error::operation_aborted) 830 this->accept(); 831 832 auto session = std::make_shared<Session>(config.max_request_streambuf_size, connection); 833 834 if(!ec) { 835 asio::ip::tcp::no_delay option(true); 836 error_code ec; 837 session->connection->socket->set_option(option, ec); 838 839 this->read(session); 840 } 841 else if(this->on_error) 842 this->on_error(session->request, ec); 843 }); 844 } 845 }; 846 } // namespace SimpleWeb 847 848 #endif /* SIMPLE_WEB_SERVER_HTTP_HPP */ 849