Skip to content

Commit

Permalink
Improved version of the response buffer management in qhttp
Browse files Browse the repository at this point in the history
When sending the payload of the static files, the file content
is split into records, where the maximum size of each record is
limited by 2MB.
  • Loading branch information
iagaponenko committed Jan 25, 2024
1 parent b3fdc96 commit 7e74789
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 90 deletions.
130 changes: 92 additions & 38 deletions src/qhttp/Response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "qhttp/Response.h"

// System headers
#include <algorithm>
#include <errno.h>
#include <map>
#include <memory>
#include <string>
Expand All @@ -35,6 +37,7 @@
#include "boost/asio.hpp"
#include "boost/filesystem.hpp"
#include "boost/filesystem/fstream.hpp"
#include "boost/system/error_code.hpp"

// Local headers
#include "lsst/log/Log.h"
Expand Down Expand Up @@ -120,8 +123,11 @@ std::unordered_map<std::string, const std::string> contentTypesByExtension = {
namespace lsst::qserv::qhttp {

Response::Response(std::shared_ptr<Server> const server, std::shared_ptr<ip::tcp::socket> const socket,
DoneCallback const& doneCallback)
: _server(std::move(server)), _socket(std::move(socket)), _doneCallback(doneCallback) {
DoneCallback const& doneCallback, std::size_t const maxResponseBufSize)
: _server(std::move(server)),
_socket(std::move(socket)),
_doneCallback(doneCallback),
_maxResponseBufSize(maxResponseBufSize) {
_transmissionStarted.clear();
}

Expand All @@ -142,67 +148,115 @@ void Response::send(std::string const& content, std::string const& contentType)
headers["Content-Type"] = contentType;
headers["Content-Length"] = std::to_string(content.length());

std::ostream responseStream(&_responsebuf);
responseStream << _headers() << "\r\n" << content;
_write();
_responseBuf = _headers() + "\r\n" + content;
_startTransmit();
asio::async_write(*_socket, asio::buffer(_responseBuf.data(), _responseBuf.size()),
[self = shared_from_this()](boost::system::error_code const& ec, std::size_t sent) {
self->_finishTransmit(ec, sent);
});
}

void Response::sendFile(fs::path const& path) {
_bytesRemaining = fs::file_size(path);
auto ct = contentTypesByExtension.find(path.extension().string());
headers["Content-Type"] = (ct != contentTypesByExtension.end()) ? ct->second : "text/plain";
headers["Content-Length"] = std::to_string(fs::file_size(path));
headers["Content-Length"] = std::to_string(_bytesRemaining);

// Try to open the file for streaming input. Throw if we hit a snag; exception expected to be caught by
// Try to open the file. Throw if we hit a snag; exception expected to be caught by
// top-level handler in Server::_dispatchRequest().
fs::ifstream responseFile(path);
if (!responseFile) {
LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "open failed for " << path << ": "
_fileName = path.string();
_inFile.open(_fileName, std::ios::binary);
if (!_inFile.is_open()) {
LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "open failed for " << _fileName << ": "
<< std::strerror(errno));
throw(boost::system::system_error(errno, boost::system::generic_category()));
}

std::ostream responseStream(&_responsebuf);
responseStream << _headers() << "\r\n" << responseFile.rdbuf();
_write();
// Make the initial allocation of the response buffer. For smaller files
// the buffer should be large enough to accomodate both the header and
// the file payload. And that would be the only record sent to a requestor.
// For the large files the buffer size will be set to the upper limit.
// In the last case a series of records of the same (but perhaps the very
// last one) size will be transmitted. The very last record may have
// the smaller size.
_responseBuf = _headers() + "\r\n";
std::size_t const hdrSize = _responseBuf.size();
if (hdrSize >= _maxResponseBufSize) {
// Disregard the suggested buffer size if it's too small to accomodate the header.
_responseBuf.resize(hdrSize);
} else {
_responseBuf.resize(std::min(hdrSize + _bytesRemaining, _maxResponseBufSize));
}

// Start reading the file payload into the buffer and transmitting a series of records.
_startTransmit();
std::string::size_type const pos = hdrSize;
std::size_t const size = std::min(_bytesRemaining, _responseBuf.size() - pos);
_sendFileRecord(pos, size);
}

std::string Response::_headers() const {
std::ostringstream headerst;
headerst << "HTTP/1.1 ";
std::ostringstream headerStream;
headerStream << "HTTP/1.1 ";

auto r = responseStringsByCode.find(status);
if (r == responseStringsByCode.end()) r = responseStringsByCode.find(STATUS_INTERNAL_SERVER_ERR);
headerst << r->first << " " << r->second;
auto resp = responseStringsByCode.find(status);
if (resp == responseStringsByCode.end()) resp = responseStringsByCode.find(STATUS_INTERNAL_SERVER_ERR);
headerStream << resp->first << " " << resp->second;

auto ilength = headers.find("Content-Length");
std::size_t length = (ilength == headers.end()) ? 0 : std::stoull(ilength->second);
LOGLS_INFO(_log, logger(_server) << logger(_socket) << headerst.str() << " + " << length << " bytes");
auto itr = headers.find("Content-Length");
std::size_t const length = (itr == headers.end()) ? 0 : std::stoull(itr->second);
LOGLS_INFO(_log, logger(_server) << logger(_socket) << headerStream.str() << " + " << length << " bytes");

headerst << "\r\n";
for (auto const& h : headers) {
headerst << h.first << ": " << h.second << "\r\n";
headerStream << "\r\n";
for (auto const& [key, val] : headers) {
headerStream << key << ": " << val << "\r\n";
}

return headerst.str();
return headerStream.str();
}

void Response::_write() {
void Response::_startTransmit() {
if (_transmissionStarted.test_and_set()) {
LOGLS_ERROR(_log, logger(_server)
<< logger(_socket) << "handler logic error: multiple responses sent");
return;
}
}

auto self = shared_from_this();
asio::async_write(*_socket, _responsebuf, [self](boost::system::error_code const& ec, std::size_t sent) {
if (ec) {
LOGLS_ERROR(_log, logger(self->_server)
<< logger(self->_socket) << "write failed: " << ec.message());
}
if (self->_doneCallback) {
self->_doneCallback(ec, sent);
}
});
void Response::_finishTransmit(boost::system::error_code const& ec, std::size_t sent) {
if (ec) {
LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "write failed: " << ec.message());
}
_responseBuf.clear();
_responseBuf.shrink_to_fit();
if (_inFile.is_open()) _inFile.close();
if (_doneCallback) _doneCallback(ec, sent);
}

void Response::_sendFileRecord(std::string::size_type pos, std::size_t size) {
if (!_inFile.read(&_responseBuf[pos], size)) {
LOGLS_ERROR(_log, logger(_server) << logger(_socket) << "read failed for " << _fileName << ": "
<< std::strerror(errno));
auto const ec = boost::system::system_error(errno, boost::system::generic_category());
_finishTransmit(ec.code(), _bytesSent);
return;
}
_bytesRemaining -= size;
asio::async_write(
*_socket, asio::buffer(_responseBuf.data(), pos + size),
[self = shared_from_this()](boost::system::error_code const& ec, std::size_t sent) {
if (ec) {
self->_finishTransmit(ec, self->_bytesSent);
} else {
self->_bytesSent += sent;
if (self->_bytesRemaining == 0) {
auto const ec = boost::system::errc::make_error_code(boost::system::errc::success);
self->_finishTransmit(ec, self->_bytesSent);
} else {
std::string::size_type const pos = 0;
std::size_t const size = std::min(self->_bytesRemaining, self->_maxResponseBufSize);
self->_sendFileRecord(pos, size);
}
}
});
}

} // namespace lsst::qserv::qhttp
14 changes: 11 additions & 3 deletions src/qhttp/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,26 @@ class Response : public std::enable_shared_from_this<Response> {
std::function<void(boost::system::error_code const& ec, std::size_t bytesTransferred)>;

Response(std::shared_ptr<Server> const server, std::shared_ptr<boost::asio::ip::tcp::socket> const socket,
DoneCallback const& doneCallback);
DoneCallback const& doneCallback, std::size_t const maxResponseBufSize);

std::string _headers() const;
void _write();
void _startTransmit();
void _finishTransmit(boost::system::error_code const& ec, std::size_t sent);
void _sendFileRecord(std::string::size_type pos, std::size_t size);

std::shared_ptr<Server> const _server;

std::shared_ptr<boost::asio::ip::tcp::socket> const _socket;
boost::asio::streambuf _responsebuf;
std::string _responseBuf;
std::atomic_flag _transmissionStarted;

std::string _fileName;
std::ifstream _inFile;
std::size_t _bytesRemaining = 0; // initialized with the file size
std::size_t _bytesSent = 0; // including the header

DoneCallback _doneCallback;
std::size_t const _maxResponseBufSize;
};

} // namespace lsst::qserv::qhttp
Expand Down
18 changes: 14 additions & 4 deletions src/qhttp/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "qhttp/Server.h"

// System headers
#include <algorithm>
#include <chrono>
#include <iostream>
#include <memory>
Expand All @@ -50,22 +51,26 @@ namespace chrono = std::chrono;
using namespace std::literals;

#define DEFAULT_REQUEST_TIMEOUT 5min
#define DEFAULT_MAX_RESPONSE_BUF_SIZE 2097152

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.qhttp");
}

namespace lsst::qserv::qhttp {

Server::Ptr Server::create(asio::io_service& io_service, unsigned short port, int backlog) {
return std::shared_ptr<Server>(new Server(io_service, port, backlog));
Server::Ptr Server::create(asio::io_service& io_service, unsigned short port, int backlog,
std::size_t const maxResponseBufSize) {
return std::shared_ptr<Server>(new Server(io_service, port, backlog, maxResponseBufSize));
}

unsigned short Server::getPort() { return _acceptor.local_endpoint().port(); }

Server::Server(asio::io_service& io_service, unsigned short port, int backlog)
Server::Server(asio::io_service& io_service, unsigned short port, int backlog,
std::size_t const maxResponseBufSize)
: _io_service(io_service),
_backlog(backlog),
_maxResponseBufSize(std::max(maxResponseBufSize, (std::size_t)DEFAULT_MAX_RESPONSE_BUF_SIZE)),
_acceptorEndpoint(ip::tcp::v4(), port),
_acceptor(io_service),
_requestTimeout(DEFAULT_REQUEST_TIMEOUT) {}
Expand Down Expand Up @@ -96,6 +101,10 @@ AjaxEndpoint::Ptr Server::addAjaxEndpoint(const std::string& pattern) {

void Server::setRequestTimeout(chrono::milliseconds const& timeout) { _requestTimeout = timeout; }

void Server::setMaxResponseBufSize(std::size_t const maxResponseBufSize) {
_maxResponseBufSize = maxResponseBufSize;
}

void Server::_accept() {
auto socket = std::make_shared<ip::tcp::socket>(_io_service);
{
Expand Down Expand Up @@ -206,7 +215,8 @@ void Server::_readRequest(std::shared_ptr<ip::tcp::socket> socket) {
socket->lowest_layer().shutdown(ip::tcp::socket::shutdown_both, ignore);
socket->lowest_layer().close(ignore);
}
}));
},
_maxResponseBufSize));

// Create Request object for this request, and initiate header read.

Expand Down
13 changes: 11 additions & 2 deletions src/qhttp/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class Server : public std::enable_shared_from_this<Server> {
// maximum length of the queue of pending connections.

static Ptr create(boost::asio::io_service& io_service, unsigned short port,
int backlog = boost::asio::socket_base::max_listen_connections);
int backlog = boost::asio::socket_base::max_listen_connections,
std::size_t const _maxResponseBufSize = 0);
unsigned short getPort();

~Server();
Expand All @@ -99,6 +100,12 @@ class Server : public std::enable_shared_from_this<Server> {

void setRequestTimeout(std::chrono::milliseconds const& timeout);

//----- setMaxResponseBufSize() allows the user to override the default (or explicitly specified in
// the class's constructor) size of the response buffer. There are no restricton on when the method
// is called. The change will affect next requests processed after making the call.

void setMaxResponseBufSize(std::size_t const maxResponseBufSize);

//----- start() opens the server listening socket and installs the head of the asynchronous event
// handler chain onto the asio::io_service provided when the Server instance was constructed.
// Server execution may be halted either calling stop(), or by calling asio::io_service::stop()
Expand All @@ -117,7 +124,8 @@ class Server : public std::enable_shared_from_this<Server> {
Server(Server const&) = delete;
Server& operator=(Server const&) = delete;

Server(boost::asio::io_service& io_service, unsigned short port, int backlog);
Server(boost::asio::io_service& io_service, unsigned short port, int backlog,
std::size_t const _maxResponseBufSize);

void _accept();

Expand All @@ -134,6 +142,7 @@ class Server : public std::enable_shared_from_this<Server> {
boost::asio::io_service& _io_service;

int const _backlog;
std::size_t _maxResponseBufSize;
boost::asio::ip::tcp::endpoint _acceptorEndpoint;
boost::asio::ip::tcp::acceptor _acceptor;

Expand Down
Loading

0 comments on commit 7e74789

Please sign in to comment.