Skip to content

Commit

Permalink
Made thread join list optional
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua committed Feb 16, 2024
1 parent 791fbea commit 00300ea
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 45 deletions.
105 changes: 64 additions & 41 deletions core/server/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,63 @@ LambdaInstance::LambdaInstance(RequestCallback handlerCallback, ServerConfig ini

this->serviceWorker = std::async([&]() {

const auto workerJoinFilter = [&](WorkerContext& node) -> bool {
if (!node.finished) {
return false;
}
if (this->config.service.useThreadList) {

if (node.worker.joinable()) {
node.worker.join();
}
const auto workerJoinFilter = [&](WorkerContext& node) -> bool {
if (!node.finished) {
return false;
}

if (node.worker.joinable()) {
node.worker.join();
}

this->m_connections_count--;
return true;
};

this->m_connections_count--;
return true;
};
const auto& svcmaxconn = this->config.service.maxConnections;
const auto gcThreshold = svcmaxconn * 0.75;

const auto& svcmaxconn = this->config.service.maxConnections;
const auto gcThreshold = svcmaxconn * 0.75;
while (!this->m_terminated && this->listener.active()) {

while (!this->m_terminated && this->listener.active()) {
auto nextConn = this->listener.acceptConnection();
if (!nextConn.has_value()) break;

auto nextConn = this->listener.acceptConnection();
if (!nextConn.has_value()) break;
if (svcmaxconn && (this->m_connections_count > svcmaxconn)) {
nextConn.value().end();
this->m_connections.remove_if(workerJoinFilter);
continue;
}

if (svcmaxconn && (this->m_connections_count > svcmaxconn)) {
nextConn.value().end();
this->m_connections.remove_if(workerJoinFilter);
continue;
this->m_connections.push_front({
std::move(nextConn.value())
});
this->m_connections_count++;

auto& nextWorker = this->m_connections.front();
nextWorker.worker = std::thread([&](WorkerContext& worker) {
connectionWorker(worker, this->config, this->httpHandler);
worker.finished = true;
}, std::ref(nextWorker));

if (!svcmaxconn || this->m_connections_count > gcThreshold) {
this->m_connections.remove_if(workerJoinFilter);
}
}

this->m_connections.push_front({
std::move(nextConn.value())
});
this->m_connections_count++;
} else {

while (!this->m_terminated && this->listener.active()) {

auto& nextWorker = this->m_connections.front();
nextWorker.worker = std::thread([&](WorkerContext& worker) {
connectionWorker(worker, this->config, this->httpHandler);
worker.finished = true;
}, std::ref(nextWorker));
auto nextConn = this->listener.acceptConnection();
if (!nextConn.has_value()) break;

if (!svcmaxconn || this->m_connections_count > gcThreshold) {
this->m_connections.remove_if(workerJoinFilter);
std::thread([&](WorkerContext&& worker) {
connectionWorker(worker, this->config, this->httpHandler);
}, WorkerContext {
std::move(nextConn.value())
}).detach();
}
}
});
Expand All @@ -96,9 +113,11 @@ void LambdaInstance::terminate() {
this->listener.stop();

// Request all connection workers to exit
for (auto& worker : this->m_connections) {
worker.shutdownFlag = true;
}
#ifdef LAMBDA_BUILDOPTS_ENABLE_THREADJOIN
for (auto& worker : this->m_connections) {
worker.shutdownFlag = true;
}
#endif
}

void LambdaInstance::awaitFinished() {
Expand All @@ -108,11 +127,13 @@ void LambdaInstance::awaitFinished() {
this->serviceWorker.get();

// Wait until all connection workers done
for (auto& item : this->m_connections) {
if (item.worker.joinable()) {
item.worker.join();
#ifdef LAMBDA_BUILDOPTS_ENABLE_THREADJOIN
for (auto& item : this->m_connections) {
if (item.worker.joinable()) {
item.worker.join();
}
}
}
#endif
}

LambdaInstance::~LambdaInstance() {
Expand All @@ -129,11 +150,13 @@ LambdaInstance::~LambdaInstance() {
}

// Wait until all connection workers exited
for (auto& item : this->m_connections) {
if (item.worker.joinable()) {
item.worker.join();
#ifdef LAMBDA_BUILDOPTS_ENABLE_THREADJOIN
for (auto& item : this->m_connections) {
if (item.worker.joinable()) {
item.worker.join();
}
}
}
#endif
}

const ServerConfig& LambdaInstance::getConfig() const noexcept {
Expand Down
17 changes: 16 additions & 1 deletion core/server/options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,25 @@ namespace Lambda {
* 0 = unlimited, but with the current joinlist implementation
* can cause slowdowns when a lot of connections are created at the same time
*
* Note that 50 is a minimal allowed value
* Note that 50 is a minimal allowed value and this option doesn't work at all
* if "useThreadList" is not enabled, and by default it is not.
*/
uint32_t maxConnections = 500;
static const uint32_t minConnections = 50;

/**
* Controls whether thread join list is in use.
* It's totally useless if you are not planning to destroy an instance,
* but can safe your ass from getting invalid references if you do.
*
* In other words, if you only create one lambda instance per program
* and keep it until program exits - you don't need this.
*
* The reason this option exists at all is that enabling it
* will tank the performance and in most projects you don't
* even need it in the first place.
*/
bool useThreadList = false;
};

struct ServerConfig : ServeOptions {
Expand Down
6 changes: 3 additions & 3 deletions core/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "../network/tcp/listener.hpp"

#include <forward_list>
#include <atomic>

namespace Lambda {

Expand All @@ -22,10 +21,11 @@ namespace Lambda {
RequestCallback httpHandler;

std::future<void> serviceWorker;
std::forward_list<WorkerContext> m_connections;
std::atomic<size_t> m_connections_count = 0;
bool m_terminated = false;

std::forward_list<WorkerContext> m_connections;
size_t m_connections_count = 0;

/**
* Internal call that signals all workers to exit
*/
Expand Down

0 comments on commit 00300ea

Please sign in to comment.