Skip to content

Commit

Permalink
Feat id tracking (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua authored Feb 10, 2024
1 parent f339af5 commit 3b2ab35
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 70 deletions.
21 changes: 18 additions & 3 deletions core/crypto/crypto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,30 @@ namespace Lambda::Crypto {

class ShortID {
private:

union {
uint32_t numeric = 0;
char buffer[sizeof(uint32_t)];
uint32_t u32 = 0;
uint8_t buff[sizeof(uint32_t)];
} m_id;

std::string m_str;

void m_serialize() noexcept;

public:
ShortID();
ShortID(uint32_t init);
std::string toString() const;

ShortID(const ShortID& other) noexcept;
ShortID(ShortID&& other) noexcept;

ShortID& operator=(const ShortID& other) noexcept;

void update() noexcept;
void update(uint32_t value) noexcept;

uint32_t id() const noexcept;
const std::string& toString() const noexcept;
};
}

Expand Down
59 changes: 48 additions & 11 deletions core/crypto/uid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,65 @@
#include "../encoding/encoding.hpp"
#include "../polyfill/polyfill.hpp"

#include <chrono>
#include <random>

using namespace Lambda;
using namespace Lambda::Crypto;

static auto rng_gen = std::mt19937(std::random_device{}());
static auto rng_dist = std::uniform_int_distribution<int32_t>(1, std::numeric_limits<int32_t>::max());

ShortID::ShortID() {
time_t timeHighres = std::chrono::high_resolution_clock::now().time_since_epoch().count();
this->m_id.numeric = (timeHighres & ~0UL);
this->update();
}

ShortID::ShortID(uint32_t init) {
this->m_id.numeric = init;
this->update(init);
}

ShortID::ShortID(const ShortID& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = other.m_str;
}

ShortID::ShortID(ShortID&& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = std::move(other.m_str);
}

ShortID& ShortID::operator=(const ShortID& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = other.m_str;
return *this;
}

void ShortID::update() noexcept {
this->m_id.u32 = rng_dist(rng_gen);
this->m_serialize();
}

void ShortID::update(uint32_t value) noexcept {
this->m_id.u32 = value;
this->m_serialize();
}

std::string ShortID::toString() const {
void ShortID::m_serialize() noexcept {

std::string temphex;

std::string idstring;
idstring.reserve(sizeof(this->m_id.buffer) * 2);
temphex.reserve(sizeof(this->m_id) * 2);

for (size_t i = 0; i < sizeof(this->m_id.buffer); i++) {
idstring.append(Encoding::byteToHex(this->m_id.buffer[i]).string);
for (size_t i = 0; i < sizeof(this->m_id); i++) {
temphex.append(Encoding::byteToHex(this->m_id.buff[i]).string);
}

Strings::toLowerCase(idstring);
return idstring;
this->m_str = Strings::toLowerCase(static_cast<const std::string>(temphex));
}

uint32_t ShortID::id() const noexcept {
return this->m_id.u32;
}

const std::string& ShortID::toString() const noexcept {
return this->m_str;
}
23 changes: 21 additions & 2 deletions core/server/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,30 @@ using namespace Lambda::Websocket;

static const std::string wsMagicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

uint32_t hashConnectionData(const Network::Address& remoteAddr) {
time_t timeHighres = std::chrono::high_resolution_clock::now().time_since_epoch().count();
size_t hash = std::hash<std::string>{}(remoteAddr.hostname + std::to_string(remoteAddr.port));
return (timeHighres & ~0UL) ^ (timeHighres & (static_cast<size_t>(~0UL) << 32)) ^
(hash & ~0UL) ^ (hash & (static_cast<size_t>(~0UL) << 32));
}

IncomingConnection::IncomingConnection(
Network::TCP::Connection& connInit,
const ServeOptions& optsInit
) : conn(connInit), opts(optsInit), ctx(conn, opts.transport) {}
) : conn(connInit), opts(optsInit), ctx(conn, opts.transport),
m_ctx_id(hashConnectionData(connInit.info().remoteAddr)) {}

const Crypto::ShortID& IncomingConnection::contextID() const noexcept {
return this->m_ctx_id;
}

Network::TCP::Connection& IncomingConnection::tcpconn() const noexcept {
return this->conn;
}

const Network::ConnectionInfo& IncomingConnection::conninfo() const noexcept {
return this->conn.info();
}

std::optional<HTTP::Request> IncomingConnection::nextRequest() {

Expand All @@ -39,7 +59,6 @@ std::optional<HTTP::Request> IncomingConnection::nextRequest() {
Also most of the library uses exceptions to do error handling anyway
so making any of that that would be just super inconsistent and confusing.
*/

if (err.respondStatus.has_value()) {
this->respond(Pages::renderErrorPage(err.respondStatus.value(), err.message(), this->opts.errorResponseType));
}
Expand Down
37 changes: 22 additions & 15 deletions core/server/handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@ using namespace Lambda::Server;
using namespace Lambda::Server::Handlers;

void Handlers::serverlessHandler(
Network::TCP::Connection& conn,
IncomingConnection& connctx,
const ServeOptions& config,
const ServerlessCallback& handlerCallback
) {

const auto& conninfo = conn.info();
auto connctx = IncomingConnection(conn, config);
const auto& conninfo = connctx.conninfo();
const auto& ctxid = connctx.contextID().toString();

while (auto nextOpt = connctx.nextRequest()){

if (!nextOpt.has_value()) break;

auto& next = nextOpt.value();
auto requestID = Crypto::ShortID().toString();
const auto& next = nextOpt.value();
const auto& requestID = Crypto::ShortID().toString();

HTTP::Response response;
std::optional<std::string> handlerError;

try {

response = handlerCallback(next, {
ctxid,
requestID,
conninfo
});
Expand All @@ -54,20 +55,25 @@ void Handlers::serverlessHandler(
if (handlerError.has_value()) {

if (config.loglevel.requests) {
syncout.error({ requestID, "crashed:", handlerError.value() });
syncout.error({
"[Serverless]",
requestID,
"crashed:",
handlerError.value()
});
}

response = Pages::renderErrorPage(500, handlerError.value(), config.errorResponseType);
}

response.headers.set("x-request-id", requestID);
response.headers.set("x-request-id", ctxid + '-' + requestID);

connctx.respond(response);

if (config.loglevel.requests) {
syncout.log({
'[' + requestID + ']',
'(' + conninfo.remoteAddr.hostname + ')',
"[Serverless]",
(config.loglevel.transportEvents ? ctxid + '-' : conninfo.remoteAddr.hostname + ' ') + requestID,
next.method.toString(),
next.url.pathname,
"-->",
Expand All @@ -78,18 +84,15 @@ void Handlers::serverlessHandler(
}

void Handlers::streamHandler(
Network::TCP::Connection& conn,
IncomingConnection& connctx,
const ServeOptions& config,
const ConnectionCallback& handlerCallback
) {

auto connctx = IncomingConnection(conn, config);
std::optional<std::string> handlerError;

try {

handlerCallback(connctx);

} catch(const std::exception& e) {
handlerError = e.what();
} catch(...) {
Expand All @@ -98,8 +101,12 @@ void Handlers::streamHandler(

if (handlerError.has_value()) {

if (config.loglevel.requests) {
syncout.error({ "tcp handler crashed:", handlerError.value() });
if (config.loglevel.requests || config.loglevel.transportEvents) {
syncout.error({
"[Transport] streamHandler crashed in",
connctx.contextID().toString() + ":",
handlerError.value()
});
}

auto errorResponse = Pages::renderErrorPage(500, handlerError.value(), config.errorResponseType);
Expand Down
71 changes: 41 additions & 30 deletions core/server/instance.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "./server.hpp"
#include "./internal.hpp"
#include "../crypto/crypto.hpp"
#include "../network/tcp/listener.hpp"

#include <cstdio>
Expand Down Expand Up @@ -40,68 +39,80 @@ LambdaInstance::LambdaInstance(
void LambdaInstance::start() {

if (this->config.service.fastPortReuse) {
syncout.log("Warning: fast port reuse enabled");
syncout.log("[Service] Warning: fast port reuse enabled");
}

this->watchdogWorker = std::async([&]() {

while (!this->terminated && this->listener.active()) {
while (!this->m_terminated && this->listener.active()) {

auto nextConn = this->listener.acceptConnection();
if (!nextConn.has_value()) break;

std::thread([&](Lambda::Network::TCP::Connection&& conn) {

const auto& conninfo = conn.info();
std::optional<std::string> handlerError;
auto connctx = IncomingConnection(conn, this->config);
std::optional<std::exception> handlerError;

if (this->config.loglevel.connections) {
if (this->config.loglevel.transportEvents) {
syncout.log({
"[Transport]",
conninfo.remoteAddr.hostname + ':' + std::to_string(conninfo.remoteAddr.port),
"connected on",
conninfo.hostPort
"created",
connctx.contextID().toString()
});
}

const auto displayHandlerCrashMessage = [&](const std::string& message) {

if (!(config.loglevel.transportEvents || config.loglevel.requests)) return;

auto transportDisplayID = connctx.contextID().toString();
if (!this->config.loglevel.transportEvents) {
transportDisplayID += '(' + conninfo.remoteAddr.hostname +
':' + std::to_string(conninfo.remoteAddr.port) + ')';
}

syncout.error({
"[Transport]",
transportDisplayID,
"terminated:",
handlerError.value().what()
});
};

try {

switch (this->handlerType) {

case HandlerType::Serverless: {
serverlessHandler(conn, this->config, this->httpHandler);
serverlessHandler(connctx, this->config, this->httpHandler);
} break;

case HandlerType::Connection: {
streamHandler(conn, this->config, this->tcpHandler);
streamHandler(connctx, this->config, this->tcpHandler);
} break;

default: {
this->terminated = true;
throw std::runtime_error("connection handler undefined");
this->m_terminated = true;
throw Lambda::Error("Instance handler undefined");
} break;
}

} catch(const std::exception& e) {
handlerError = e.what();
} catch(const std::exception& err) {
displayHandlerCrashMessage(err.what());
return;
} catch(...) {
handlerError = "unknown error";
displayHandlerCrashMessage("Unknown exception");
return;
}

if (handlerError.has_value() && (config.loglevel.connections || config.loglevel.requests)) {

syncout.error({
"[Service] Connection to",
conninfo.remoteAddr.hostname,
"terminated",
'(' + handlerError.value() + ')'
});

} else if (config.loglevel.connections) {

if (config.loglevel.transportEvents) {
syncout.log({
conninfo.remoteAddr.hostname + ':' + std::to_string(conninfo.remoteAddr.port),
"disconnected from",
conninfo.hostPort
"[Transport]",
connctx.contextID().toString(),
"closed ok"
});
}

Expand All @@ -110,7 +121,7 @@ void LambdaInstance::start() {
});

if (config.loglevel.startMessage) {
syncout.log("[Service] Started server at http://localhost:" + std::to_string(this->config.service.port) + '/');
syncout.log("[Service] Started at http://localhost:" + std::to_string(this->config.service.port) + '/');
}
}

Expand All @@ -120,7 +131,7 @@ void LambdaInstance::shutdownn() {
}

void LambdaInstance::terminate() {
this->terminated = true;
this->m_terminated = true;
this->listener.stop();
this->awaitFinished();
}
Expand Down
4 changes: 2 additions & 2 deletions core/server/internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace Lambda::Server {

namespace Handlers {
void serverlessHandler(Network::TCP::Connection& conn, const ServeOptions& config, const ServerlessCallback& handlerCallback);
void streamHandler(Network::TCP::Connection& conn, const ServeOptions& config, const ConnectionCallback& handlerCallback);
void serverlessHandler(IncomingConnection& connctx, const ServeOptions& config, const ServerlessCallback& handlerCallback);
void streamHandler(IncomingConnection& connctx, const ServeOptions& config, const ConnectionCallback& handlerCallback);
};

namespace Pages {
Expand Down
Loading

0 comments on commit 3b2ab35

Please sign in to comment.