Skip to content

Commit

Permalink
Extended the registration service to support Czar(s)
Browse files Browse the repository at this point in the history
Also did some minor refactoring in the relevant classes to
support Czars and to reduce code duplication.
  • Loading branch information
iagaponenko committed Dec 11, 2023
1 parent 7f63df0 commit 7794b60
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 113 deletions.
2 changes: 1 addition & 1 deletion src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ target_sources(replica PRIVATE
RegistryHttpApp.cc
RegistryHttpSvc.cc
RegistryHttpSvcMod.cc
RegistryWorkers.cc
RegistryServices.cc
RemoveReplicaQservMgtRequest.cc
ReplicaInfo.cc
ReplicateApp.cc
Expand Down
10 changes: 5 additions & 5 deletions src/replica/Registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ Registry::Registry(ServiceProvider::Ptr const& serviceProvider)

vector<WorkerInfo> Registry::workers() const {
vector<WorkerInfo> coll;
json const resultJson =
_request(http::Method::GET, "/workers?instance_id=" + _serviceProvider->instanceId());
for (auto const& [name, workerJson] : resultJson.at("workers").items()) {
string const resource = "/services?instance_id=" + _serviceProvider->instanceId();
json const resultJson = _request(http::Method::GET, resource);
for (auto const& [name, workerJson] : resultJson.at("services").at("workers").items()) {
WorkerInfo worker;
if (_serviceProvider->config()->isKnownWorker(name)) {
worker = _serviceProvider->config()->workerInfo(name);
Expand Down Expand Up @@ -103,7 +103,7 @@ vector<WorkerInfo> Registry::workers() const {
return coll;
}

void Registry::add(string const& name) const {
void Registry::addWorker(string const& name) const {
bool const all = true;
string const hostName = util::get_current_host_fqdn(all);
auto const config = _serviceProvider->config();
Expand All @@ -129,7 +129,7 @@ void Registry::add(string const& name) const {
_request(http::Method::POST, "/worker", request);
}

void Registry::remove(string const& name) const {
void Registry::removeWorker(string const& name) const {
json const request = json::object(
{{"instance_id", _serviceProvider->instanceId()}, {"auth_key", _serviceProvider->authKey()}});
_request(http::Method::DELETE, "/worker/" + name, request);
Expand Down
8 changes: 4 additions & 4 deletions src/replica/Registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
namespace lsst::qserv::replica {

/**
* Class Registry is the client API for comunications with the worker registration
* Class Registry is the client API for comunications with the service registration
* server. The API provides an interface for inspecting and managing (adding/deleting)
* worker entries at the server.
* serice entries at the server.
*
* @note The implementation of the class is thread-safe.
*/
Expand Down Expand Up @@ -70,14 +70,14 @@ class Registry : public std::enable_shared_from_this<Registry> {
* @param name The unique identifier of the worker
* @see method Registry::_request for other exceptions.
*/
void add(std::string const& name) const;
void addWorker(std::string const& name) const;

/**
* Remove (if exists) a worker entry
* @param name A unique identifier (the name) of the worker
* @see method Registry::_request for other exceptions.
*/
void remove(std::string const& name) const;
void removeWorker(std::string const& name) const;

private:
/// @see Registry::create()
Expand Down
26 changes: 18 additions & 8 deletions src/replica/RegistryHttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "qhttp/Response.h"
#include "replica/Configuration.h"
#include "replica/RegistryHttpSvcMod.h"
#include "replica/RegistryWorkers.h"
#include "replica/RegistryServices.h"

// Third party headers
#include "nlohmann/json.hpp"
Expand All @@ -50,7 +50,7 @@ RegistryHttpSvc::RegistryHttpSvc(ServiceProvider::Ptr const& serviceProvider)
: HttpSvc(serviceProvider, serviceProvider->config()->get<uint16_t>("registry", "port"),
serviceProvider->config()->get<unsigned int>("registry", "max-listen-conn"),
serviceProvider->config()->get<size_t>("registry", "threads")),
_workers(new RegistryWorkers()) {}
_services(new RegistryServices()) {}

string const& RegistryHttpSvc::context() const { return ::context_; }

Expand All @@ -64,25 +64,35 @@ void RegistryHttpSvc::registerServices() {
{"instance_id", self->serviceProvider()->instanceId()}});
http::MetaModule::process(context_, info, req, resp, "VERSION");
}},
{"GET", "/workers",
{"GET", "/services",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_workers),
req, resp, "WORKERS", http::AuthType::NONE);
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "SERVICES", http::AuthType::NONE);
}},
{"POST", "/worker",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_workers),
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "ADD-WORKER");
}},
{"POST", "/qserv-worker",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_workers),
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "ADD-QSERV-WORKER");
}},
{"DELETE", "/worker/:name",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_workers),
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "DELETE-WORKER");
}},
{"POST", "/czar",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "ADD-CZAR");
}},
{"DELETE", "/czar/:id",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
RegistryHttpSvcMod::process(self->serviceProvider(), *(self->_services),
req, resp, "DELETE-CZAR");
}}});
}

Expand Down
4 changes: 2 additions & 2 deletions src/replica/RegistryHttpSvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

// Forward declarations
namespace lsst::qserv::replica {
class RegistryWorkers;
class RegistryServices;
} // namespace lsst::qserv::replica

// This header declarations
Expand Down Expand Up @@ -77,7 +77,7 @@ class RegistryHttpSvc : public HttpSvc {
RegistryHttpSvc(ServiceProvider::Ptr const& serviceProvider);

/// Synchronized collection of the Replication System's workers
std::unique_ptr<RegistryWorkers> _workers;
std::unique_ptr<RegistryServices> _services;
};

} // namespace lsst::qserv::replica
Expand Down
62 changes: 48 additions & 14 deletions src/replica/RegistryHttpSvcMod.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#include "replica/RegistryHttpSvcMod.h"

// Qserv header
#include "global/stringUtil.h"
#include "qhttp/Request.h"
#include "replica/RegistryWorkers.h"
#include "qmeta/types.h"
#include "replica/RegistryServices.h"
#include "util/TimeUtils.h"

// System headers
Expand All @@ -47,7 +49,7 @@ string senderIpAddr(qhttp::Request::Ptr const& req) {

/**
* Check if a key is one of the special attributes related to the security
* context of the workers registration protocol.
* context of the services registration protocol.
* @param key The key to be checked.
* @return 'true' if the key is one of the special keys.
*/
Expand All @@ -60,37 +62,42 @@ bool isSecurityContextKey(string const& key) {

namespace lsst::qserv::replica {

void RegistryHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, RegistryWorkers& workers,
void RegistryHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, RegistryServices& services,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
string const& subModuleName, http::AuthType const authType) {
RegistryHttpSvcMod module(serviceProvider, workers, req, resp);
RegistryHttpSvcMod module(serviceProvider, services, req, resp);
module.execute(subModuleName, authType);
}

RegistryHttpSvcMod::RegistryHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, RegistryWorkers& workers,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp)
RegistryHttpSvcMod::RegistryHttpSvcMod(ServiceProvider::Ptr const& serviceProvider,
RegistryServices& services, qhttp::Request::Ptr const& req,
qhttp::Response::Ptr const& resp)
: http::ModuleBase(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp),
_serviceProvider(serviceProvider),
_workers(workers) {}
_services(services) {}

string RegistryHttpSvcMod::context() const { return "REGISTRY-HTTP-SVC "; }

json RegistryHttpSvcMod::executeImpl(string const& subModuleName) {
string const func = string(__func__) + "[sub-module='" + subModuleName + "']";
debug(func);
enforceInstanceId(func, _serviceProvider->instanceId());
if (subModuleName == "WORKERS")
return _getWorkers();
if (subModuleName == "SERVICES")
return _getServices();
else if (subModuleName == "ADD-WORKER")
return _addWorker("replication");
else if (subModuleName == "ADD-QSERV-WORKER")
return _addWorker("qserv");
else if (subModuleName == "DELETE-WORKER")
return _deleteWorker();
else if (subModuleName == "ADD-CZAR")
return _addCzar();
else if (subModuleName == "DELETE-CZAR")
return _deleteCzar();
throw invalid_argument(context() + "unsupported sub-module: '" + subModuleName + "'");
}

json RegistryHttpSvcMod::_getWorkers() const { return json::object({{"workers", _workers.workers()}}); }
json RegistryHttpSvcMod::_getServices() const { return json::object({{"services", _services.toJson()}}); }

json RegistryHttpSvcMod::_addWorker(string const& kind) {
json const worker = body().required<json>("worker");
Expand All @@ -109,15 +116,42 @@ json RegistryHttpSvcMod::_addWorker(string const& kind) {
for (auto&& [key, val] : worker.items()) {
if (!::isSecurityContextKey(key)) workerEntry[kind][key] = val;
}
_workers.update(name, workerEntry);
return json::object({{"workers", _workers.workers()}});
_services.updateWorker(name, workerEntry);
return json::object({{"services", _services.toJson()}});
}

json RegistryHttpSvcMod::_deleteWorker() {
string const name = params().at("name");
debug(__func__, " name: " + name);
_workers.remove(name);
return json::object({{"workers", _workers.workers()}});
_services.removeWorker(name);
return json::object({{"services", _services.toJson()}});
}

json RegistryHttpSvcMod::_addCzar() {
json const czar = body().required<json>("czar");
qmeta::CzarId const czarId = czar.at("id").get<qmeta::CzarId>();
string const hostAddr = ::senderIpAddr(req());
uint64_t const loggedTime = util::TimeUtils::now();

debug(__func__, "id: " + to_string(czarId));
debug(__func__, "host-addr: " + hostAddr);
debug(__func__, "logged_time: " + to_string(loggedTime));

// Prepare the payload to be merged into the czar registration entry.
// Note that the merged payload is cleaned from any security-related contents.
json czarEntry = json::object({{"host-addr", hostAddr}, {"logged_time", loggedTime}});
for (auto&& [key, val] : czar.items()) {
if (!::isSecurityContextKey(key)) czarEntry[key] = val;
}
_services.updateCzar(czarId, czarEntry);
return json::object({{"services", _services.toJson()}});
}

json RegistryHttpSvcMod::_deleteCzar() {
qmeta::CzarId const czarId = qserv::stoui(params().at("id"));
debug(__func__, " id: " + to_string(czarId));
_services.removeCzar(czarId);
return json::object({{"services", _services.toJson()}});
}

} // namespace lsst::qserv::replica
24 changes: 16 additions & 8 deletions src/replica/RegistryHttpSvcMod.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

// Forward declarations
namespace lsst::qserv::replica {
class RegistryWorkers;
class RegistryServices;
} // namespace lsst::qserv::replica

// This header declarations
Expand All @@ -60,21 +60,23 @@ class RegistryHttpSvcMod : public http::ModuleBase {
*
* Supported values for parameter 'subModuleName':
*
* WORKERS return a collection of known workers
* SERVICES return info on all known services
* ADD-WORKER worker registration request (Replicaton System)
* ADD-QSERV-WORKER worker registration request (Qserv)
* DELETE-WORKER remove a worker from the collection
* ADD-CZAR czar registration request (Replicaton System)
* DELETE-CZAR remove a czar from the collection
*
* @param serviceProvider The provider of services is needed to access
* the identity and the authorization keys of the instance.
* @param workers The synchronized collection of workers.
* @param services The synchronized collection of services.
* @param req The HTTP request.
* @param resp The HTTP response channel.
* @param subModuleName The name of a submodule to be called.
* @param authType The authorization requirements for the module
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
*/
static void process(ServiceProvider::Ptr const& serviceProvider, RegistryWorkers& workers,
static void process(ServiceProvider::Ptr const& serviceProvider, RegistryServices& services,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
std::string const& subModuleName,
http::AuthType const authType = http::AuthType::REQUIRED);
Expand All @@ -88,11 +90,11 @@ class RegistryHttpSvcMod : public http::ModuleBase {

private:
/// @see method RegistryHttpSvcMod::create()
RegistryHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, RegistryWorkers& workers,
RegistryHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, RegistryServices& services,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp);

/// Return a collection of known workers.
nlohmann::json _getWorkers() const;
/// Return a collection of known services.
nlohmann::json _getServices() const;

/// Register/update a worker in the specified collection.
/// @param kind A kind of the worker to be updated ("replicaton", "qserv").
Expand All @@ -101,9 +103,15 @@ class RegistryHttpSvcMod : public http::ModuleBase {
/// Remove a worker from the collection.
nlohmann::json _deleteWorker();

/// Register/update a Czar.
nlohmann::json _addCzar();

/// Remove a Czar from the collection.
nlohmann::json _deleteCzar();

// Input parameters
ServiceProvider::Ptr const _serviceProvider;
RegistryWorkers& _workers;
RegistryServices& _services;
};

} // namespace lsst::qserv::replica
Expand Down
Loading

0 comments on commit 7794b60

Please sign in to comment.