Skip to content

Commit

Permalink
Added wrapper on top of direct connection io
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua committed Feb 12, 2024
1 parent ea9879f commit 2e3a809
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 46 deletions.
6 changes: 6 additions & 0 deletions core/http/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ namespace Lambda::HTTP::Transport {
virtual bool awaitNext() = 0;
virtual HTTP::Request nextRequest() = 0;
virtual void respond(const HTTP::Response& response) = 0;

virtual std::vector<uint8_t> readRaw() = 0;
virtual std::vector<uint8_t> readRaw(size_t expectedSize) = 0;
virtual void writeRaw(const std::vector<uint8_t>& data) = 0;

virtual void reset() noexcept = 0;
virtual bool hasPartialData() const noexcept = 0;
virtual void close() = 0;

TransportFlags flags;
};
Expand Down
6 changes: 6 additions & 0 deletions core/http/transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ namespace Lambda::HTTP::Transport {
bool awaitNext();
HTTP::Request nextRequest();
void respond(const HTTP::Response& response);

std::vector<uint8_t> readRaw();
std::vector<uint8_t> readRaw(size_t expectedSize);
void writeRaw(const std::vector<uint8_t>& data);

void reset() noexcept;
bool hasPartialData() const noexcept;
void close();
};
};

Expand Down
79 changes: 57 additions & 22 deletions core/http/transport_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,6 @@ static const std::initializer_list<std::string> compressibleTypes = {
"text", "html", "json", "xml"
};

const Network::ConnectionInfo& TransportContextV1::conninfo() const noexcept {
return this->m_conn.info();
}

const TransportOptions& TransportContextV1::options() const noexcept {
return this->m_topts;
}

Network::TCP::Connection& TransportContextV1::tcpconn() const noexcept {
return this->m_conn;
}

const ContentEncodings& TransportContextV1::getEnconding() const noexcept {
return this->m_compress;
}

bool TransportContextV1::ok() const noexcept {
return this->m_conn.active();
}

TransportContextV1::TransportContextV1(
Network::TCP::Connection& connInit,
const TransportOptions& optsInit
Expand All @@ -52,7 +32,7 @@ TransportContextV1::TransportContextV1(
bool TransportContextV1::awaitNext() {

if (this->m_next != nullptr) {
throw Lambda::Error("awaitNext() cannot receive more requests until the previous one is processed");
throw Lambda::Error("awaitNext() cancelec: the previous requests was not processed yet");
}

auto headerEnded = this->m_readbuff.end();
Expand Down Expand Up @@ -206,7 +186,7 @@ bool TransportContextV1::awaitNext() {
HTTP::Request TransportContextV1::nextRequest() {

if (this->m_next == nullptr) {
throw Lambda::Error("nextRequest() canceled: no requests pending");
throw Lambda::Error("nextRequest() canceled: no requests pending. Use awaitNext() to read more requests");
}

const auto tempNext = std::move(*this->m_next);
Expand All @@ -219,6 +199,10 @@ HTTP::Request TransportContextV1::nextRequest() {

void TransportContextV1::respond(const Response& response) {

if (this->m_next != nullptr) {
throw Lambda::Error("respond() canceled: Before responding to a request one must be read with nextRequest() call first");
}

auto applyEncoding = ContentEncodings::None;

auto responseHeaders = response.headers;
Expand Down Expand Up @@ -296,10 +280,61 @@ void TransportContextV1::respond(const Response& response) {
if (bodySize) this->m_conn.write(responseBody);
}

const Network::ConnectionInfo& TransportContextV1::conninfo() const noexcept {
return this->m_conn.info();
}

const TransportOptions& TransportContextV1::options() const noexcept {
return this->m_topts;
}

Network::TCP::Connection& TransportContextV1::tcpconn() const noexcept {
return this->m_conn;
}

const ContentEncodings& TransportContextV1::getEnconding() const noexcept {
return this->m_compress;
}

bool TransportContextV1::ok() const noexcept {
return this->m_conn.active();
}

void TransportContextV1::reset() noexcept {

this->m_readbuff.clear();

if (this->m_next != nullptr) {
delete this->m_next;
this->m_next = nullptr;
}
}

bool TransportContextV1::hasPartialData() const noexcept {
return this->m_readbuff.size() > 0;
}

void TransportContextV1::close() {
this->m_conn.end();
}

void TransportContextV1::writeRaw(const std::vector<uint8_t>& data) {
this->m_conn.write(data);
}

std::vector<uint8_t> TransportContextV1::readRaw() {
return this->readRaw(Network::TCP::Connection::ReadChunkSize);
}

std::vector<uint8_t> TransportContextV1::readRaw(size_t expectedSize) {

auto bufferHave = std::move(this->m_readbuff);
this->m_readbuff = {};

if (bufferHave.size() < expectedSize) {
auto bufferFetched = this->m_conn.read(expectedSize - bufferHave.size());
bufferHave.insert(bufferHave.end(), bufferFetched.begin(), bufferFetched.end());
}

return bufferHave;
}
2 changes: 1 addition & 1 deletion core/sse/sse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace Lambda::SSE {

class Writer {
private:
Network::TCP::Connection& m_conn;
HTTP::Transport::TransportContext& transport;

public:
Writer(HTTP::Transport::TransportContext& tctx, const HTTP::Request initRequest);
Expand Down
21 changes: 12 additions & 9 deletions core/sse/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ using namespace Lambda;
using namespace Lambda::Network;
using namespace Lambda::SSE;

Writer::Writer(HTTP::Transport::TransportContext& tctx, const HTTP::Request initRequest) : m_conn(tctx.tcpconn()) {
Writer::Writer(HTTP::Transport::TransportContext& tctx, const HTTP::Request initRequest) : transport(tctx) {

tctx.flags.autocompress = false;
tctx.flags.forceContentLength = false;
Expand All @@ -15,8 +15,7 @@ Writer::Writer(HTTP::Transport::TransportContext& tctx, const HTTP::Request init
auto upgradeResponse = HTTP::Response(200, {
{ "connection", "keep-alive" },
{ "cache-control", "no-cache" },
{ "content-type", "text/event-stream; charset=UTF-8" },
{ "pragma", "no-cache" },
{ "content-type", "text/event-stream; charset=UTF-8" }
});

if (originHeader.size()) {
Expand All @@ -28,7 +27,7 @@ Writer::Writer(HTTP::Transport::TransportContext& tctx, const HTTP::Request init

void Writer::push(const EventMessage& event) {

if (!this->m_conn.active()) {
if (!this->transport.ok()) {
throw Lambda::Error("SSE listener disconnected");
}

Expand Down Expand Up @@ -63,17 +62,21 @@ void Writer::push(const EventMessage& event) {
serializedMessage.insert(serializedMessage.end(), lineSeparator.begin(), lineSeparator.end());

try {
this->m_conn.write(serializedMessage);
this->transport.writeRaw(serializedMessage);
} catch(...) {
this->m_conn.end();
this->transport.tcpconn().end();
}
}

bool Writer::connected() const noexcept {
return this->m_conn.active();
return this->transport.ok();
}

void Writer::close() {
this->push({ "", "close" });
this->m_conn.end();

EventMessage closeEvent;
closeEvent.event = "close";

this->push(closeEvent);
this->transport.tcpconn().end();
}
8 changes: 4 additions & 4 deletions core/websocket/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static const std::string wsMagicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
static const time_t sockRcvTimeout = 100;

WebsocketContext::WebsocketContext(HTTP::Transport::TransportContext& tctx, const HTTP::Request initRequest)
: conn(tctx.tcpconn()), topts(tctx.options()) {
: transport(tctx), topts(tctx.options()) {

auto headerUpgrade = Strings::toLowerCase(initRequest.headers.get("Upgrade"));
auto headerWsKey = initRequest.headers.get("Sec-WebSocket-Key");
Expand All @@ -49,8 +49,8 @@ WebsocketContext::WebsocketContext(HTTP::Transport::TransportContext& tctx, cons
tctx.respond(handshakeReponse);
tctx.reset();

this->conn.flags.closeOnTimeout = false;
this->conn.setTimeouts(sockRcvTimeout, Network::SetTimeoutsDirection::Receive);
this->transport.tcpconn().flags.closeOnTimeout = false;
this->transport.tcpconn().setTimeouts(sockRcvTimeout, Network::SetTimeoutsDirection::Receive);

this->m_reader = std::async(&WebsocketContext::asyncWorker, this);
}
Expand Down Expand Up @@ -83,7 +83,7 @@ void WebsocketContext::close(Websocket::CloseReason reason) {

closeMessageBuff.insert(closeMessageBuff.end(), closeReasonBuff.begin(), closeReasonBuff.end());

this->conn.write(closeMessageBuff);
this->transport.writeRaw(closeMessageBuff);

if (this->m_reader.valid()) {
try { this->m_reader.get(); }
Expand Down
19 changes: 10 additions & 9 deletions core/websocket/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ static const std::string wsPingString = "ping/lambda/ws";
// these values are used for both pings and actual receive timeouts
static const time_t wsActTimeout = 5000;
static const unsigned short wsMaxSkippedAttempts = 3;
static const size_t wsReadChunk = 256;

static const std::initializer_list<OpCode> supportedWsOpcodes = {
OpCode::Binary,
Expand All @@ -22,7 +23,7 @@ static const std::initializer_list<OpCode> supportedWsOpcodes = {

void WebsocketContext::sendMessage(const Websocket::Message& msg) {
auto writeBuff = serializeMessage(msg);
this->conn.write(writeBuff);
this->transport.writeRaw(writeBuff);
}

FrameHeader Transport::parseFrameHeader(const std::vector<uint8_t>& buffer) {
Expand Down Expand Up @@ -115,7 +116,7 @@ void WebsocketContext::asyncWorker() {
auto lastPingResponse = std::chrono::steady_clock::now();
auto pingWindow = std::chrono::milliseconds(wsMaxSkippedAttempts * wsActTimeout);

while (this->conn.active() && !this->m_stopped) {
while (this->transport.ok() && !this->m_stopped) {

// send ping or terminate websocket if there is no response
if ((lastPing - lastPingResponse) > pingWindow) {
Expand All @@ -131,21 +132,21 @@ void WebsocketContext::asyncWorker() {
wsPingString.size()
});

this->conn.write(pingHeader);
this->conn.write(std::vector<uint8_t>(wsPingString.begin(), wsPingString.end()));
this->transport.writeRaw(pingHeader);
this->transport.writeRaw(std::vector<uint8_t>(wsPingString.begin(), wsPingString.end()));

lastPing = std::chrono::steady_clock::now();
}

auto nextChunk = this->conn.read();
auto nextChunk = this->transport.readRaw(wsReadChunk);
if (!nextChunk.size()) continue;

downloadBuff.insert(downloadBuff.end(), nextChunk.begin(), nextChunk.end());
if (downloadBuff.size() < FrameHeader::min_size) continue;

if (downloadBuff.size() > this->topts.maxRequestSize) {
this->close(CloseReason::MessageTooBig);
throw std::runtime_error("expected frame size too large");
throw std::runtime_error("Expected frame size too large");
}

auto frameHeader = parseFrameHeader(downloadBuff);
Expand Down Expand Up @@ -174,7 +175,7 @@ void WebsocketContext::asyncWorker() {
if (frameHeader.payloadSize + frameHeader.payloadSize < downloadBuff.size()) {

auto expectedSize = frameHeader.payloadSize - payloadBuff.size();
auto payloadChunk = this->conn.read(expectedSize);
auto payloadChunk = this->transport.readRaw(expectedSize);

if (payloadChunk.size() < expectedSize) {
this->close(CloseReason::ProtocolError);
Expand Down Expand Up @@ -225,8 +226,8 @@ void WebsocketContext::asyncWorker() {
frameHeader.payloadSize
});

this->conn.write(pongHeader);
this->conn.write(payloadBuff);
this->transport.writeRaw(pongHeader);
this->transport.writeRaw(payloadBuff);

} break;

Expand Down
2 changes: 1 addition & 1 deletion core/websocket/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace Lambda::Websocket {

class WebsocketContext {
private:
Network::TCP::Connection& conn;
HTTP::Transport::TransportContext& transport;
const HTTP::Transport::TransportOptions& topts;
std::future<void> m_reader;
std::queue<Websocket::Message> m_queue;
Expand Down

0 comments on commit 2e3a809

Please sign in to comment.