Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat id tracking #155

Merged
merged 29 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions core/crypto/crypto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,30 @@ namespace Lambda::Crypto {

class ShortID {
private:

union {
uint32_t numeric = 0;
char buffer[sizeof(uint32_t)];
uint32_t u32 = 0;
uint8_t buff[sizeof(uint32_t)];
} m_id;

std::string m_str;

void m_serialize() noexcept;

public:
ShortID();
ShortID(uint32_t init);
std::string toString() const;

ShortID(const ShortID& other) noexcept;
ShortID(ShortID&& other) noexcept;

ShortID& operator=(const ShortID& other) noexcept;

void update() noexcept;
void update(uint32_t value) noexcept;

uint32_t id() const noexcept;
const std::string& toString() const noexcept;
};
}

Expand Down
59 changes: 48 additions & 11 deletions core/crypto/uid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,65 @@
#include "../encoding/encoding.hpp"
#include "../polyfill/polyfill.hpp"

#include <chrono>
#include <random>

using namespace Lambda;
using namespace Lambda::Crypto;

static auto rng_gen = std::mt19937(std::random_device{}());
static auto rng_dist = std::uniform_int_distribution<int32_t>(1, std::numeric_limits<int32_t>::max());

ShortID::ShortID() {
time_t timeHighres = std::chrono::high_resolution_clock::now().time_since_epoch().count();
this->m_id.numeric = (timeHighres & ~0UL);
this->update();
}

ShortID::ShortID(uint32_t init) {
this->m_id.numeric = init;
this->update(init);
}

ShortID::ShortID(const ShortID& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = other.m_str;
}

ShortID::ShortID(ShortID&& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = std::move(other.m_str);
}

ShortID& ShortID::operator=(const ShortID& other) noexcept {
this->m_id.u32 = other.m_id.u32;
this->m_str = other.m_str;
return *this;
}

void ShortID::update() noexcept {
this->m_id.u32 = rng_dist(rng_gen);
this->m_serialize();
}

void ShortID::update(uint32_t value) noexcept {
this->m_id.u32 = value;
this->m_serialize();
}

std::string ShortID::toString() const {
void ShortID::m_serialize() noexcept {

std::string temphex;

std::string idstring;
idstring.reserve(sizeof(this->m_id.buffer) * 2);
temphex.reserve(sizeof(this->m_id) * 2);

for (size_t i = 0; i < sizeof(this->m_id.buffer); i++) {
idstring.append(Encoding::byteToHex(this->m_id.buffer[i]).string);
for (size_t i = 0; i < sizeof(this->m_id); i++) {
temphex.append(Encoding::byteToHex(this->m_id.buff[i]).string);
}

Strings::toLowerCase(idstring);
return idstring;
this->m_str = Strings::toLowerCase(static_cast<const std::string>(temphex));
}

uint32_t ShortID::id() const noexcept {
return this->m_id.u32;
}

const std::string& ShortID::toString() const noexcept {
return this->m_str;
}
23 changes: 21 additions & 2 deletions core/server/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,30 @@ using namespace Lambda::Websocket;

static const std::string wsMagicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

uint32_t hashConnectionData(const Network::Address& remoteAddr) {
time_t timeHighres = std::chrono::high_resolution_clock::now().time_since_epoch().count();
size_t hash = std::hash<std::string>{}(remoteAddr.hostname + std::to_string(remoteAddr.port));
return (timeHighres & ~0UL) ^ (timeHighres & (static_cast<size_t>(~0UL) << 32)) ^
(hash & ~0UL) ^ (hash & (static_cast<size_t>(~0UL) << 32));
}

IncomingConnection::IncomingConnection(
Network::TCP::Connection& connInit,
const ServeOptions& optsInit
) : conn(connInit), opts(optsInit), ctx(conn, opts.transport) {}
) : conn(connInit), opts(optsInit), ctx(conn, opts.transport),
m_ctx_id(hashConnectionData(connInit.info().remoteAddr)) {}

const Crypto::ShortID& IncomingConnection::contextID() const noexcept {
return this->m_ctx_id;
}

Network::TCP::Connection& IncomingConnection::tcpconn() const noexcept {
return this->conn;
}

const Network::ConnectionInfo& IncomingConnection::conninfo() const noexcept {
return this->conn.info();
}

std::optional<HTTP::Request> IncomingConnection::nextRequest() {

Expand All @@ -39,7 +59,6 @@ std::optional<HTTP::Request> IncomingConnection::nextRequest() {
Also most of the library uses exceptions to do error handling anyway
so making any of that that would be just super inconsistent and confusing.
*/

if (err.respondStatus.has_value()) {
this->respond(Pages::renderErrorPage(err.respondStatus.value(), err.message(), this->opts.errorResponseType));
}
Expand Down
37 changes: 22 additions & 15 deletions core/server/handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@ using namespace Lambda::Server;
using namespace Lambda::Server::Handlers;

void Handlers::serverlessHandler(
Network::TCP::Connection& conn,
IncomingConnection& connctx,
const ServeOptions& config,
const ServerlessCallback& handlerCallback
) {

const auto& conninfo = conn.info();
auto connctx = IncomingConnection(conn, config);
const auto& conninfo = connctx.conninfo();
const auto& ctxid = connctx.contextID().toString();

while (auto nextOpt = connctx.nextRequest()){

if (!nextOpt.has_value()) break;

auto& next = nextOpt.value();
auto requestID = Crypto::ShortID().toString();
const auto& next = nextOpt.value();
const auto& requestID = Crypto::ShortID().toString();

HTTP::Response response;
std::optional<std::string> handlerError;

try {

response = handlerCallback(next, {
ctxid,
requestID,
conninfo
});
Expand All @@ -54,20 +55,25 @@ void Handlers::serverlessHandler(
if (handlerError.has_value()) {

if (config.loglevel.requests) {
syncout.error({ requestID, "crashed:", handlerError.value() });
syncout.error({
"[Serverless]",
requestID,
"crashed:",
handlerError.value()
});
}

response = Pages::renderErrorPage(500, handlerError.value(), config.errorResponseType);
}

response.headers.set("x-request-id", requestID);
response.headers.set("x-request-id", ctxid + '-' + requestID);

connctx.respond(response);

if (config.loglevel.requests) {
syncout.log({
'[' + requestID + ']',
'(' + conninfo.remoteAddr.hostname + ')',
"[Serverless]",
(config.loglevel.transportEvents ? ctxid + '-' : conninfo.remoteAddr.hostname + ' ') + requestID,
next.method.toString(),
next.url.pathname,
"-->",
Expand All @@ -78,18 +84,15 @@ void Handlers::serverlessHandler(
}

void Handlers::streamHandler(
Network::TCP::Connection& conn,
IncomingConnection& connctx,
const ServeOptions& config,
const ConnectionCallback& handlerCallback
) {

auto connctx = IncomingConnection(conn, config);
std::optional<std::string> handlerError;

try {

handlerCallback(connctx);

} catch(const std::exception& e) {
handlerError = e.what();
} catch(...) {
Expand All @@ -98,8 +101,12 @@ void Handlers::streamHandler(

if (handlerError.has_value()) {

if (config.loglevel.requests) {
syncout.error({ "tcp handler crashed:", handlerError.value() });
if (config.loglevel.requests || config.loglevel.transportEvents) {
syncout.error({
"[Transport] streamHandler crashed in",
connctx.contextID().toString() + ":",
handlerError.value()
});
}

auto errorResponse = Pages::renderErrorPage(500, handlerError.value(), config.errorResponseType);
Expand Down
71 changes: 41 additions & 30 deletions core/server/instance.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "./server.hpp"
#include "./internal.hpp"
#include "../crypto/crypto.hpp"
#include "../network/tcp/listener.hpp"

#include <cstdio>
Expand Down Expand Up @@ -40,68 +39,80 @@ LambdaInstance::LambdaInstance(
void LambdaInstance::start() {

if (this->config.service.fastPortReuse) {
syncout.log("Warning: fast port reuse enabled");
syncout.log("[Service] Warning: fast port reuse enabled");
}

this->watchdogWorker = std::async([&]() {

while (!this->terminated && this->listener.active()) {
while (!this->m_terminated && this->listener.active()) {

auto nextConn = this->listener.acceptConnection();
if (!nextConn.has_value()) break;

std::thread([&](Lambda::Network::TCP::Connection&& conn) {

const auto& conninfo = conn.info();
std::optional<std::string> handlerError;
auto connctx = IncomingConnection(conn, this->config);
std::optional<std::exception> handlerError;

if (this->config.loglevel.connections) {
if (this->config.loglevel.transportEvents) {
syncout.log({
"[Transport]",
conninfo.remoteAddr.hostname + ':' + std::to_string(conninfo.remoteAddr.port),
"connected on",
conninfo.hostPort
"created",
connctx.contextID().toString()
});
}

const auto displayHandlerCrashMessage = [&](const std::string& message) {

if (!(config.loglevel.transportEvents || config.loglevel.requests)) return;

auto transportDisplayID = connctx.contextID().toString();
if (!this->config.loglevel.transportEvents) {
transportDisplayID += '(' + conninfo.remoteAddr.hostname +
':' + std::to_string(conninfo.remoteAddr.port) + ')';
}

syncout.error({
"[Transport]",
transportDisplayID,
"terminated:",
handlerError.value().what()
});
};

try {

switch (this->handlerType) {

case HandlerType::Serverless: {
serverlessHandler(conn, this->config, this->httpHandler);
serverlessHandler(connctx, this->config, this->httpHandler);
} break;

case HandlerType::Connection: {
streamHandler(conn, this->config, this->tcpHandler);
streamHandler(connctx, this->config, this->tcpHandler);
} break;

default: {
this->terminated = true;
throw std::runtime_error("connection handler undefined");
this->m_terminated = true;
throw Lambda::Error("Instance handler undefined");
} break;
}

} catch(const std::exception& e) {
handlerError = e.what();
} catch(const std::exception& err) {
displayHandlerCrashMessage(err.what());
return;
} catch(...) {
handlerError = "unknown error";
displayHandlerCrashMessage("Unknown exception");
return;
}

if (handlerError.has_value() && (config.loglevel.connections || config.loglevel.requests)) {

syncout.error({
"[Service] Connection to",
conninfo.remoteAddr.hostname,
"terminated",
'(' + handlerError.value() + ')'
});

} else if (config.loglevel.connections) {

if (config.loglevel.transportEvents) {
syncout.log({
conninfo.remoteAddr.hostname + ':' + std::to_string(conninfo.remoteAddr.port),
"disconnected from",
conninfo.hostPort
"[Transport]",
connctx.contextID().toString(),
"closed ok"
});
}

Expand All @@ -110,7 +121,7 @@ void LambdaInstance::start() {
});

if (config.loglevel.startMessage) {
syncout.log("[Service] Started server at http://localhost:" + std::to_string(this->config.service.port) + '/');
syncout.log("[Service] Started at http://localhost:" + std::to_string(this->config.service.port) + '/');
}
}

Expand All @@ -120,7 +131,7 @@ void LambdaInstance::shutdownn() {
}

void LambdaInstance::terminate() {
this->terminated = true;
this->m_terminated = true;
this->listener.stop();
this->awaitFinished();
}
Expand Down
4 changes: 2 additions & 2 deletions core/server/internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace Lambda::Server {

namespace Handlers {
void serverlessHandler(Network::TCP::Connection& conn, const ServeOptions& config, const ServerlessCallback& handlerCallback);
void streamHandler(Network::TCP::Connection& conn, const ServeOptions& config, const ConnectionCallback& handlerCallback);
void serverlessHandler(IncomingConnection& connctx, const ServeOptions& config, const ServerlessCallback& handlerCallback);
void streamHandler(IncomingConnection& connctx, const ServeOptions& config, const ConnectionCallback& handlerCallback);
};

namespace Pages {
Expand Down
Loading