diff --git a/src/qhttp/Response.cc b/src/qhttp/Response.cc index 5a612568a..9bfa6ab10 100644 --- a/src/qhttp/Response.cc +++ b/src/qhttp/Response.cc @@ -24,6 +24,8 @@ #include "qhttp/Response.h" // System headers +#include +#include #include #include #include @@ -35,6 +37,7 @@ #include "boost/asio.hpp" #include "boost/filesystem.hpp" #include "boost/filesystem/fstream.hpp" +#include "boost/system/error_code.hpp" // Local headers #include "lsst/log/Log.h" @@ -120,8 +123,11 @@ std::unordered_map contentTypesByExtension = { namespace lsst::qserv::qhttp { Response::Response(std::shared_ptr const server, std::shared_ptr const socket, - DoneCallback const& doneCallback) - : _server(std::move(server)), _socket(std::move(socket)), _doneCallback(doneCallback) { + DoneCallback const& doneCallback, std::size_t const maxResponseBufSize) + : _server(std::move(server)), + _socket(std::move(socket)), + _doneCallback(doneCallback), + _maxResponseBufSize(maxResponseBufSize) { _transmissionStarted.clear(); } @@ -142,67 +148,115 @@ void Response::send(std::string const& content, std::string const& contentType) headers["Content-Type"] = contentType; headers["Content-Length"] = std::to_string(content.length()); - std::ostream responseStream(&_responsebuf); - responseStream << _headers() << "\r\n" << content; - _write(); + _responseBuf = _headers() + "\r\n" + content; + _startTransmit(); + asio::async_write(*_socket, asio::buffer(_responseBuf.data(), _responseBuf.size()), + [self = shared_from_this()](boost::system::error_code const& ec, std::size_t sent) { + self->_finishTransmit(ec, sent); + }); } void Response::sendFile(fs::path const& path) { + _bytesRemaining = fs::file_size(path); auto ct = contentTypesByExtension.find(path.extension().string()); headers["Content-Type"] = (ct != contentTypesByExtension.end()) ? ct->second : "text/plain"; - headers["Content-Length"] = std::to_string(fs::file_size(path)); + headers["Content-Length"] = std::to_string(_bytesRemaining); - // Try to open the file for streaming input. Throw if we hit a snag; exception expected to be caught by + // Try to open the file. Throw if we hit a snag; exception expected to be caught by // top-level handler in Server::_dispatchRequest(). - fs::ifstream responseFile(path); - if (!responseFile) { - LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "open failed for " << path << ": " + _fileName = path.string(); + _inFile.open(_fileName, std::ios::binary); + if (!_inFile.is_open()) { + LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "open failed for " << _fileName << ": " << std::strerror(errno)); throw(boost::system::system_error(errno, boost::system::generic_category())); } - std::ostream responseStream(&_responsebuf); - responseStream << _headers() << "\r\n" << responseFile.rdbuf(); - _write(); + // Make the initial allocation of the response buffer. For smaller files + // the buffer should be large enough to accomodate both the header and + // the file payload. And that would be the only record sent to a requestor. + // For the large files the buffer size will be set to the upper limit. + // In the last case a series of records of the same (but perhaps the very + // last one) size will be transmitted. The very last record may have + // the smaller size. + _responseBuf = _headers() + "\r\n"; + std::size_t const hdrSize = _responseBuf.size(); + if (hdrSize >= _maxResponseBufSize) { + // Disregard the suggested buffer size if it's too small to accomodate the header. + _responseBuf.resize(hdrSize); + } else { + _responseBuf.resize(std::min(hdrSize + _bytesRemaining, _maxResponseBufSize)); + } + + // Start reading the file payload into the buffer and transmitting a series of records. + _startTransmit(); + std::string::size_type const pos = hdrSize; + std::size_t const size = std::min(_bytesRemaining, _responseBuf.size() - pos); + _sendFileRecord(pos, size); } std::string Response::_headers() const { - std::ostringstream headerst; - headerst << "HTTP/1.1 "; + std::ostringstream headerStream; + headerStream << "HTTP/1.1 "; - auto r = responseStringsByCode.find(status); - if (r == responseStringsByCode.end()) r = responseStringsByCode.find(STATUS_INTERNAL_SERVER_ERR); - headerst << r->first << " " << r->second; + auto resp = responseStringsByCode.find(status); + if (resp == responseStringsByCode.end()) resp = responseStringsByCode.find(STATUS_INTERNAL_SERVER_ERR); + headerStream << resp->first << " " << resp->second; - auto ilength = headers.find("Content-Length"); - std::size_t length = (ilength == headers.end()) ? 0 : std::stoull(ilength->second); - LOGLS_INFO(_log, logger(_server) << logger(_socket) << headerst.str() << " + " << length << " bytes"); + auto itr = headers.find("Content-Length"); + std::size_t const length = (itr == headers.end()) ? 0 : std::stoull(itr->second); + LOGLS_INFO(_log, logger(_server) << logger(_socket) << headerStream.str() << " + " << length << " bytes"); - headerst << "\r\n"; - for (auto const& h : headers) { - headerst << h.first << ": " << h.second << "\r\n"; + headerStream << "\r\n"; + for (auto const& [key, val] : headers) { + headerStream << key << ": " << val << "\r\n"; } - - return headerst.str(); + return headerStream.str(); } -void Response::_write() { +void Response::_startTransmit() { if (_transmissionStarted.test_and_set()) { LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "handler logic error: multiple responses sent"); - return; } +} - auto self = shared_from_this(); - asio::async_write(*_socket, _responsebuf, [self](boost::system::error_code const& ec, std::size_t sent) { - if (ec) { - LOGLS_ERROR(_log, logger(self->_server) - << logger(self->_socket) << "write failed: " << ec.message()); - } - if (self->_doneCallback) { - self->_doneCallback(ec, sent); - } - }); +void Response::_finishTransmit(boost::system::error_code const& ec, std::size_t sent) { + if (ec) { + LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "write failed: " << ec.message()); + } + _responseBuf.clear(); + _responseBuf.shrink_to_fit(); + if (_inFile.is_open()) _inFile.close(); + if (_doneCallback) _doneCallback(ec, sent); +} + +void Response::_sendFileRecord(std::string::size_type pos, std::size_t size) { + if (!_inFile.read(&_responseBuf[pos], size)) { + LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "read failed for " << _fileName << ": " + << std::strerror(errno)); + auto const ec = boost::system::system_error(errno, boost::system::generic_category()); + _finishTransmit(ec.code(), _bytesSent); + return; + } + _bytesRemaining -= size; + asio::async_write( + *_socket, asio::buffer(_responseBuf.data(), pos + size), + [self = shared_from_this()](boost::system::error_code const& ec, std::size_t sent) { + if (ec) { + self->_finishTransmit(ec, self->_bytesSent); + } else { + self->_bytesSent += sent; + if (self->_bytesRemaining == 0) { + auto const ec = boost::system::errc::make_error_code(boost::system::errc::success); + self->_finishTransmit(ec, self->_bytesSent); + } else { + std::string::size_type const pos = 0; + std::size_t const size = std::min(self->_bytesRemaining, self->_maxResponseBufSize); + self->_sendFileRecord(pos, size); + } + } + }); } } // namespace lsst::qserv::qhttp diff --git a/src/qhttp/Response.h b/src/qhttp/Response.h index 4fb91d6d4..3e429cef5 100644 --- a/src/qhttp/Response.h +++ b/src/qhttp/Response.h @@ -72,18 +72,26 @@ class Response : public std::enable_shared_from_this { std::function; Response(std::shared_ptr const server, std::shared_ptr const socket, - DoneCallback const& doneCallback); + DoneCallback const& doneCallback, std::size_t const maxResponseBufSize); std::string _headers() const; - void _write(); + void _startTransmit(); + void _finishTransmit(boost::system::error_code const& ec, std::size_t sent); + void _sendFileRecord(std::string::size_type pos, std::size_t size); std::shared_ptr const _server; std::shared_ptr const _socket; - boost::asio::streambuf _responsebuf; + std::string _responseBuf; std::atomic_flag _transmissionStarted; + std::string _fileName; + std::ifstream _inFile; + std::size_t _bytesRemaining = 0; // initialized with the file size + std::size_t _bytesSent = 0; // including the header + DoneCallback _doneCallback; + std::size_t const _maxResponseBufSize; }; } // namespace lsst::qserv::qhttp diff --git a/src/qhttp/Server.cc b/src/qhttp/Server.cc index 254f3dabd..d2956bae8 100644 --- a/src/qhttp/Server.cc +++ b/src/qhttp/Server.cc @@ -24,6 +24,7 @@ #include "qhttp/Server.h" // System headers +#include #include #include #include @@ -50,6 +51,7 @@ namespace chrono = std::chrono; using namespace std::literals; #define DEFAULT_REQUEST_TIMEOUT 5min +#define DEFAULT_MAX_RESPONSE_BUF_SIZE 2097152 namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.qhttp"); @@ -57,15 +59,18 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.qhttp"); namespace lsst::qserv::qhttp { -Server::Ptr Server::create(asio::io_service& io_service, unsigned short port, int backlog) { - return std::shared_ptr(new Server(io_service, port, backlog)); +Server::Ptr Server::create(asio::io_service& io_service, unsigned short port, int backlog, + std::size_t const maxResponseBufSize) { + return std::shared_ptr(new Server(io_service, port, backlog, maxResponseBufSize)); } unsigned short Server::getPort() { return _acceptor.local_endpoint().port(); } -Server::Server(asio::io_service& io_service, unsigned short port, int backlog) +Server::Server(asio::io_service& io_service, unsigned short port, int backlog, + std::size_t const maxResponseBufSize) : _io_service(io_service), _backlog(backlog), + _maxResponseBufSize(std::max(maxResponseBufSize, (std::size_t)DEFAULT_MAX_RESPONSE_BUF_SIZE)), _acceptorEndpoint(ip::tcp::v4(), port), _acceptor(io_service), _requestTimeout(DEFAULT_REQUEST_TIMEOUT) {} @@ -96,6 +101,10 @@ AjaxEndpoint::Ptr Server::addAjaxEndpoint(const std::string& pattern) { void Server::setRequestTimeout(chrono::milliseconds const& timeout) { _requestTimeout = timeout; } +void Server::setMaxResponseBufSize(std::size_t const maxResponseBufSize) { + _maxResponseBufSize = maxResponseBufSize; +} + void Server::_accept() { auto socket = std::make_shared(_io_service); { @@ -206,7 +215,8 @@ void Server::_readRequest(std::shared_ptr socket) { socket->lowest_layer().shutdown(ip::tcp::socket::shutdown_both, ignore); socket->lowest_layer().close(ignore); } - })); + }, + _maxResponseBufSize)); // Create Request object for this request, and initiate header read. diff --git a/src/qhttp/Server.h b/src/qhttp/Server.h index 55364c2e2..eb5676e9c 100644 --- a/src/qhttp/Server.h +++ b/src/qhttp/Server.h @@ -75,7 +75,8 @@ class Server : public std::enable_shared_from_this { // maximum length of the queue of pending connections. static Ptr create(boost::asio::io_service& io_service, unsigned short port, - int backlog = boost::asio::socket_base::max_listen_connections); + int backlog = boost::asio::socket_base::max_listen_connections, + std::size_t const _maxResponseBufSize = 0); unsigned short getPort(); ~Server(); @@ -99,6 +100,12 @@ class Server : public std::enable_shared_from_this { void setRequestTimeout(std::chrono::milliseconds const& timeout); + //----- setMaxResponseBufSize() allows the user to override the default (or explicitly specified in + // the class's constructor) size of the response buffer. There are no restricton on when the method + // is called. The change will affect next requests processed after making the call. + + void setMaxResponseBufSize(std::size_t const maxResponseBufSize); + //----- start() opens the server listening socket and installs the head of the asynchronous event // handler chain onto the asio::io_service provided when the Server instance was constructed. // Server execution may be halted either calling stop(), or by calling asio::io_service::stop() @@ -117,7 +124,8 @@ class Server : public std::enable_shared_from_this { Server(Server const&) = delete; Server& operator=(Server const&) = delete; - Server(boost::asio::io_service& io_service, unsigned short port, int backlog); + Server(boost::asio::io_service& io_service, unsigned short port, int backlog, + std::size_t const _maxResponseBufSize); void _accept(); @@ -134,6 +142,7 @@ class Server : public std::enable_shared_from_this { boost::asio::io_service& _io_service; int const _backlog; + std::size_t _maxResponseBufSize; boost::asio::ip::tcp::endpoint _acceptorEndpoint; boost::asio::ip::tcp::acceptor _acceptor; diff --git a/src/qhttp/testqhttp.cc b/src/qhttp/testqhttp.cc index e5a162011..daef9f751 100644 --- a/src/qhttp/testqhttp.cc +++ b/src/qhttp/testqhttp.cc @@ -380,6 +380,61 @@ struct QhttpFixture { return std::string(respbegin, respbegin + respbuf.size()); } + void testStaticContent() { + //----- test invalid root directory + + BOOST_CHECK_THROW(server->addStaticContent("/*", "/doesnotexist"), fs::filesystem_error); + BOOST_CHECK_THROW(server->addStaticContent("/*", dataDir + "index.html"), fs::filesystem_error); + + //----- set up valid static content for subsequent tests + + server->addStaticContent("/*", dataDir); + start(); + + CurlEasy curl(numRetries, retryDelayMs); + + //----- test default index.html + + curl.setup("GET", urlPrefix, "").perform().validate(qhttp::STATUS_OK, "text/html"); + compareWithFile(curl.recdContent, dataDir + "index.html"); + + //----- test subdirectories and file typing by extension + + curl.setup("GET", urlPrefix + "css/style.css", "").perform().validate(qhttp::STATUS_OK, "text/css"); + compareWithFile(curl.recdContent, dataDir + "css/style.css"); + curl.setup("GET", urlPrefix + "images/lsst.gif", "") + .perform() + .validate(qhttp::STATUS_OK, "image/gif"); + compareWithFile(curl.recdContent, dataDir + "images/lsst.gif"); + curl.setup("GET", urlPrefix + "images/lsst.jpg", "") + .perform() + .validate(qhttp::STATUS_OK, "image/jpeg"); + compareWithFile(curl.recdContent, dataDir + "images/lsst.jpg"); + curl.setup("GET", urlPrefix + "images/lsst.png", "") + .perform() + .validate(qhttp::STATUS_OK, "image/png"); + compareWithFile(curl.recdContent, dataDir + "images/lsst.png"); + curl.setup("GET", urlPrefix + "js/main.js", "") + .perform() + .validate(qhttp::STATUS_OK, "application/javascript"); + compareWithFile(curl.recdContent, dataDir + "js/main.js"); + + //----- test redirect for directory w/o trailing "/" + + char* redirect = nullptr; + curl.setup("GET", urlPrefix + "css", "").perform().validate(qhttp::STATUS_MOVED_PERM, "text/html"); + BOOST_TEST(curl.recdContent.find(std::to_string(qhttp::STATUS_MOVED_PERM)) != std::string::npos); + BOOST_TEST(curl_easy_getinfo(curl.hcurl, CURLINFO_REDIRECT_URL, &redirect) == CURLE_OK); + BOOST_TEST(redirect == urlPrefix + "css/"); + + //----- test non-existent file + + curl.setup("GET", urlPrefix + "doesNotExist", "") + .perform() + .validate(qhttp::STATUS_NOT_FOUND, "text/html"); + BOOST_TEST(curl.recdContent.find(std::to_string(qhttp::STATUS_NOT_FOUND)) != std::string::npos); + } + asio::io_service service; std::vector serviceThreads; qhttp::Server::Ptr server; @@ -512,53 +567,17 @@ BOOST_FIXTURE_TEST_CASE(percent_decoding, QhttpFixture) { BOOST_TEST(curl.recdContent == "params[] query[key-with-==value-with-&,key2=value2]"); } -BOOST_FIXTURE_TEST_CASE(static_content, QhttpFixture) { - //----- test invalid root directory - - BOOST_CHECK_THROW(server->addStaticContent("/*", "/doesnotexist"), fs::filesystem_error); - BOOST_CHECK_THROW(server->addStaticContent("/*", dataDir + "index.html"), fs::filesystem_error); - - //----- set up valid static content for subsequent tests - - server->addStaticContent("/*", dataDir); - start(); - - CurlEasy curl(numRetries, retryDelayMs); - - //----- test default index.html - - curl.setup("GET", urlPrefix, "").perform().validate(qhttp::STATUS_OK, "text/html"); - compareWithFile(curl.recdContent, dataDir + "index.html"); - - //----- test subdirectories and file typing by extension +BOOST_FIXTURE_TEST_CASE(static_content, QhttpFixture) { testStaticContent(); } - curl.setup("GET", urlPrefix + "css/style.css", "").perform().validate(qhttp::STATUS_OK, "text/css"); - compareWithFile(curl.recdContent, dataDir + "css/style.css"); - curl.setup("GET", urlPrefix + "images/lsst.gif", "").perform().validate(qhttp::STATUS_OK, "image/gif"); - compareWithFile(curl.recdContent, dataDir + "images/lsst.gif"); - curl.setup("GET", urlPrefix + "images/lsst.jpg", "").perform().validate(qhttp::STATUS_OK, "image/jpeg"); - compareWithFile(curl.recdContent, dataDir + "images/lsst.jpg"); - curl.setup("GET", urlPrefix + "images/lsst.png", "").perform().validate(qhttp::STATUS_OK, "image/png"); - compareWithFile(curl.recdContent, dataDir + "images/lsst.png"); - curl.setup("GET", urlPrefix + "js/main.js", "") - .perform() - .validate(qhttp::STATUS_OK, "application/javascript"); - compareWithFile(curl.recdContent, dataDir + "js/main.js"); - - //----- test redirect for directory w/o trailing "/" +BOOST_FIXTURE_TEST_CASE(static_content_small_buf, QhttpFixture) { + //----- set a tiny buffer size for sending responses to evaluate an ability + // of the implementation to break the response into multiple messages. - char* redirect = nullptr; - curl.setup("GET", urlPrefix + "css", "").perform().validate(qhttp::STATUS_MOVED_PERM, "text/html"); - BOOST_TEST(curl.recdContent.find(std::to_string(qhttp::STATUS_MOVED_PERM)) != std::string::npos); - BOOST_TEST(curl_easy_getinfo(curl.hcurl, CURLINFO_REDIRECT_URL, &redirect) == CURLE_OK); - BOOST_TEST(redirect == urlPrefix + "css/"); + server->setMaxResponseBufSize(128); - //----- test non-existent file + //----- after that repeat the static content reading test - curl.setup("GET", urlPrefix + "doesNotExist", "") - .perform() - .validate(qhttp::STATUS_NOT_FOUND, "text/html"); - BOOST_TEST(curl.recdContent.find(std::to_string(qhttp::STATUS_NOT_FOUND)) != std::string::npos); + testStaticContent(); } BOOST_FIXTURE_TEST_CASE(relative_url_containment, QhttpFixture) {