Skip to content

Commit

Permalink
Refactored Qserv management classes to allow Czar requests
Browse files Browse the repository at this point in the history
The original base class QservMgtRequest got despecialized
from being exclusively Qserv workers-oriented base into
the neutral request type. Two specialized subclasses were
added to support the corresponding hierachies: QservCzarMgtRequest
and QservWorkerMgtRequest.
The change prepares ground for introducing Czar management requests
in the Replication Framework.
  • Loading branch information
iagaponenko committed Dec 14, 2023
1 parent 95947fa commit fa68f06
Show file tree
Hide file tree
Showing 31 changed files with 387 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/replica/AddReplicaQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ AddReplicaQservMgtRequest::AddReplicaQservMgtRequest(shared_ptr<ServiceProvider>
string const& worker, unsigned int chunk,
vector<string> const& databases,
AddReplicaQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_ADD_REPLICA", worker),
: QservWorkerMgtRequest(serviceProvider, "QSERV_ADD_REPLICA", worker),
_chunk(chunk),
_databases(databases),
_onFinish(onFinish) {}
Expand Down
12 changes: 6 additions & 6 deletions src/replica/AddReplicaQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "nlohmann/json.hpp"

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
Expand All @@ -46,7 +46,7 @@ namespace lsst::qserv::replica {
* Class AddReplicaQservMgtRequest implements a request notifying Qserv workers
* on new chunks added to the database.
*/
class AddReplicaQservMgtRequest : public QservMgtRequest {
class AddReplicaQservMgtRequest : public QservWorkerMgtRequest {
public:
typedef std::shared_ptr<AddReplicaQservMgtRequest> Ptr;

Expand All @@ -57,7 +57,7 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
AddReplicaQservMgtRequest(AddReplicaQservMgtRequest const&) = delete;
AddReplicaQservMgtRequest& operator=(AddReplicaQservMgtRequest const&) = delete;

virtual ~AddReplicaQservMgtRequest() final = default;
virtual ~AddReplicaQservMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand All @@ -83,14 +83,14 @@ class AddReplicaQservMgtRequest : public QservMgtRequest {
std::vector<std::string> const& databases() const { return _databases; }

/// @see QservMgtRequest::extendedPersistentState()
virtual std::list<std::pair<std::string, std::string>> extendedPersistentState() const final;
virtual std::list<std::pair<std::string, std::string>> extendedPersistentState() const override;

protected:
/// @see QservMgtRequest::createHttpReqImpl
virtual void createHttpReqImpl(replica::Lock const& lock) final;
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::notify
virtual void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) override;

private:
/// @see AddReplicaQservMgtRequest::create()
Expand Down
2 changes: 2 additions & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ target_sources(replica PRIVATE
PurgeApp.cc
PurgeJob.cc
QhttpTestApp.cc
QservCzarMgtRequest.cc
QservGetReplicasJob.cc
QservMgtRequest.cc
QservMgtServices.cc
QservStatusJob.cc
QservSyncJob.cc
QservWorkerApp.cc
QservWorkerMgtRequest.cc
QservWorkerPingApp.cc
RebalanceApp.cc
RebalanceJob.cc
Expand Down
10 changes: 5 additions & 5 deletions src/replica/DatabaseServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace lsst::qserv::replica {
class Configuration;
class ControllerIdentity;
class NamedMutexRegistry;
class QservMgtRequest;
class QservWorkerMgtRequest;
class Performance;
class Request;
} // namespace lsst::qserv::replica
Expand Down Expand Up @@ -423,17 +423,17 @@ class DatabaseServices : public std::enable_shared_from_this<DatabaseServices> {
virtual void updateHeartbeatTime(Job const& job) = 0;

/**
* Save the state of the QservMgtRequest. This operation can be called many times for
* a particular instance of the QservMgtRequest.
* Save the state of the QservWorkerMgtRequest. This operation can be called many times for
* a particular instance of the QservWorkerMgtRequest.
*
* The Performance object is explicitly passed as a parameter to avoid
* making a blocked call back to the request which may create a deadlock.
*
* @param request a reference to a QservMgtRequest object
* @param request a reference to a QservWorkerMgtRequest object
* @param performance a reference to a Performance object
* @param serverError a server error message (if any)
*/
virtual void saveState(QservMgtRequest const& request, Performance const& performance,
virtual void saveState(QservWorkerMgtRequest const& request, Performance const& performance,
std::string const& serverError) = 0;

/**
Expand Down
20 changes: 11 additions & 9 deletions src/replica/DatabaseServicesMySQL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include "replica/DatabaseMySQLUtils.h"
#include "replica/Job.h"
#include "replica/NamedMutexRegistry.h"
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"
#include "replica/ReplicaInfo.h"
#include "replica/Request.h"
#include "replica/SemanticMaps.h"
Expand Down Expand Up @@ -159,10 +159,10 @@ void DatabaseServicesMySQL::updateHeartbeatTime(Job const& job) {
LOGS(_log, LOG_LVL_DEBUG, context + "** DONE **");
}

void DatabaseServicesMySQL::saveState(QservMgtRequest const& request, Performance const& performance,
void DatabaseServicesMySQL::saveState(QservWorkerMgtRequest const& request, Performance const& performance,
string const& serverError) {
string const context =
"DatabaseServicesMySQL::" + string(__func__) + "[QservMgtRequest::" + request.type() + "] ";
"DatabaseServicesMySQL::" + string(__func__) + "[QservWorkerMgtRequest::" + request.type() + "] ";
LOGS(_log, LOG_LVL_DEBUG, context);

// Requests which haven't started yet or the ones which aren't associated
Expand All @@ -181,14 +181,14 @@ void DatabaseServicesMySQL::saveState(QservMgtRequest const& request, Performanc
replica::Lock lock(_mtx, context);

// The algorithm will first try the INSERT query into the base table.
// If a row with the same primary key (QservMgtRequest id) already exists in the table
// If a row with the same primary key (id) already exists in the table
// then the UPDATE query will be executed.
try {
auto const insert = [&](decltype(_conn) conn) {
string const query =
_g.insert("request", request.id(), request.jobId(), request.type(), request.workerName(),
0, QservMgtRequest::state2string(request.state()),
QservMgtRequest::state2string(request.extendedState()), serverError,
0, QservWorkerMgtRequest::state2string(request.state()),
QservWorkerMgtRequest::state2string(request.extendedState()), serverError,
performance.c_create_time, performance.c_start_time, performance.w_receive_time,
performance.w_start_time, performance.w_finish_time, performance.c_finish_time);
conn->execute(query);
Expand All @@ -201,8 +201,10 @@ void DatabaseServicesMySQL::saveState(QservMgtRequest const& request, Performanc
};
auto const update = [&](decltype(_conn) conn) {
string const query =
_g.update("request", make_pair("state", QservMgtRequest::state2string(request.state())),
make_pair("ext_state", QservMgtRequest::state2string(request.extendedState())),
_g.update("request",
make_pair("state", QservWorkerMgtRequest::state2string(request.state())),
make_pair("ext_state",
QservWorkerMgtRequest::state2string(request.extendedState())),
make_pair("server_status", serverError),
make_pair("c_create_time", performance.c_create_time),
make_pair("c_start_time", performance.c_start_time),
Expand Down Expand Up @@ -242,7 +244,7 @@ void DatabaseServicesMySQL::saveState(Request const& request, Performance const&
replica::Lock lock(_mtx, context);

// The algorithm will first try the INSERT query into the base table.
// If a row with the same primary key (QservMgtRequest id) already exists in the table
// If a row with the same primary key (request id) already exists in the table
// then the UPDATE query will be executed.
try {
auto const insert = [&](decltype(_conn) conn) {
Expand Down
2 changes: 1 addition & 1 deletion src/replica/DatabaseServicesMySQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DatabaseServicesMySQL : public DatabaseServices {

void updateHeartbeatTime(Job const& job) final;

void saveState(QservMgtRequest const& request, Performance const& performance,
void saveState(QservWorkerMgtRequest const& request, Performance const& performance,
std::string const& serverError) final;

void saveState(Request const& request, Performance const& performance) final;
Expand Down
2 changes: 1 addition & 1 deletion src/replica/DatabaseServicesPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void DatabaseServicesPool::updateHeartbeatTime(Job const& job) {
service()->updateHeartbeatTime(job);
}

void DatabaseServicesPool::saveState(QservMgtRequest const& request, Performance const& performance,
void DatabaseServicesPool::saveState(QservWorkerMgtRequest const& request, Performance const& performance,
string const& serverError) {
ServiceAllocator service(shared_from_base<DatabaseServicesPool>());
service()->saveState(request, performance, serverError);
Expand Down
2 changes: 1 addition & 1 deletion src/replica/DatabaseServicesPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DatabaseServicesPool : public DatabaseServices {

void updateHeartbeatTime(Job const& job) final;

void saveState(QservMgtRequest const& request, Performance const& performance,
void saveState(QservWorkerMgtRequest const& request, Performance const& performance,
std::string const& serverError) final;

void saveState(Request const& request, Performance const& performance) final;
Expand Down
2 changes: 1 addition & 1 deletion src/replica/GetConfigQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ shared_ptr<GetConfigQservMgtRequest> GetConfigQservMgtRequest::create(
GetConfigQservMgtRequest::GetConfigQservMgtRequest(shared_ptr<ServiceProvider> const& serviceProvider,
string const& worker,
GetConfigQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {}
: QservWorkerMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {}

void GetConfigQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/config";
Expand Down
10 changes: 5 additions & 5 deletions src/replica/GetConfigQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <string>

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
Expand All @@ -39,7 +39,7 @@ namespace lsst::qserv::replica {
* Class GetConfigQservMgtRequest is a request for obtaining various info
* on the database service of the Qserv worker.
*/
class GetConfigQservMgtRequest : public QservMgtRequest {
class GetConfigQservMgtRequest : public QservWorkerMgtRequest {
public:
typedef std::shared_ptr<GetConfigQservMgtRequest> Ptr;

Expand All @@ -50,7 +50,7 @@ class GetConfigQservMgtRequest : public QservMgtRequest {
GetConfigQservMgtRequest(GetConfigQservMgtRequest const&) = delete;
GetConfigQservMgtRequest& operator=(GetConfigQservMgtRequest const&) = delete;

virtual ~GetConfigQservMgtRequest() final = default;
virtual ~GetConfigQservMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand All @@ -68,10 +68,10 @@ class GetConfigQservMgtRequest : public QservMgtRequest {

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) override;

private:
/// @see GetConfigQservMgtRequest::create()
Expand Down
2 changes: 1 addition & 1 deletion src/replica/GetDbStatusQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ shared_ptr<GetDbStatusQservMgtRequest> GetDbStatusQservMgtRequest::create(
GetDbStatusQservMgtRequest::GetDbStatusQservMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
GetDbStatusQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {}
: QservWorkerMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {}

void GetDbStatusQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/mysql";
Expand Down
10 changes: 5 additions & 5 deletions src/replica/GetDbStatusQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <string>

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
Expand All @@ -39,7 +39,7 @@ namespace lsst::qserv::replica {
* Class GetDbStatusQservMgtRequest is a request for obtaining various info
* on the database service of the Qserv worker.
*/
class GetDbStatusQservMgtRequest : public QservMgtRequest {
class GetDbStatusQservMgtRequest : public QservWorkerMgtRequest {
public:
typedef std::shared_ptr<GetDbStatusQservMgtRequest> Ptr;

Expand All @@ -50,7 +50,7 @@ class GetDbStatusQservMgtRequest : public QservMgtRequest {
GetDbStatusQservMgtRequest(GetDbStatusQservMgtRequest const&) = delete;
GetDbStatusQservMgtRequest& operator=(GetDbStatusQservMgtRequest const&) = delete;

virtual ~GetDbStatusQservMgtRequest() final = default;
virtual ~GetDbStatusQservMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand All @@ -68,10 +68,10 @@ class GetDbStatusQservMgtRequest : public QservMgtRequest {

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) override;

private:
/// @see GetDbStatusQservMgtRequest::create()
Expand Down
2 changes: 1 addition & 1 deletion src/replica/GetReplicasQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ GetReplicasQservMgtRequest::Ptr GetReplicasQservMgtRequest::create(
GetReplicasQservMgtRequest::GetReplicasQservMgtRequest(
ServiceProvider::Ptr const& serviceProvider, string const& worker, string const& databaseFamily,
bool inUseOnly, GetReplicasQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_REPLICAS", worker),
: QservWorkerMgtRequest(serviceProvider, "QSERV_GET_REPLICAS", worker),
_databaseFamily(databaseFamily),
_inUseOnly(inUseOnly),
_onFinish(onFinish) {}
Expand Down
12 changes: 6 additions & 6 deletions src/replica/GetReplicasQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include "nlohmann/json.hpp"

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"
#include "replica/ReplicaInfo.h"

namespace lsst::qserv::replica {
Expand All @@ -45,7 +45,7 @@ namespace lsst::qserv::replica {
* Class GetReplicasQservMgtRequest implements a request retrieving a list of
* replicas known to Qserv workers.
*/
class GetReplicasQservMgtRequest : public QservMgtRequest {
class GetReplicasQservMgtRequest : public QservWorkerMgtRequest {
public:
typedef std::shared_ptr<GetReplicasQservMgtRequest> Ptr;

Expand All @@ -56,7 +56,7 @@ class GetReplicasQservMgtRequest : public QservMgtRequest {
GetReplicasQservMgtRequest(GetReplicasQservMgtRequest const&) = delete;
GetReplicasQservMgtRequest& operator=(GetReplicasQservMgtRequest const&) = delete;

virtual ~GetReplicasQservMgtRequest() final = default;
virtual ~GetReplicasQservMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand Down Expand Up @@ -93,14 +93,14 @@ class GetReplicasQservMgtRequest : public QservMgtRequest {

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::dataReady()
virtual QservMgtRequest::ExtendedState dataReady(replica::Lock const& lock,
nlohmann::json const& data) final;
nlohmann::json const& data) override;

/// @see QservMgtRequest::notify
virtual void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) override;

private:
/// @see GetReplicasQservMgtRequest::create()
Expand Down
2 changes: 1 addition & 1 deletion src/replica/GetResultFilesQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ GetResultFilesQservMgtRequest::GetResultFilesQservMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
vector<QueryId> const& queryIds, unsigned int maxFiles,
GetResultFilesQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker),
: QservWorkerMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker),
_queryIds(queryIds),
_maxFiles(maxFiles),
_onFinish(onFinish) {}
Expand Down
10 changes: 5 additions & 5 deletions src/replica/GetResultFilesQservMgtRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

// Qserv headers
#include "global/intTypes.h"
#include "replica/QservMgtRequest.h"
#include "replica/QservWorkerMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
Expand All @@ -41,7 +41,7 @@ namespace lsst::qserv::replica {
* Class GetResultFilesQservMgtRequest is a request for obtaining info
* on the partial result files from the Qserv worker.
*/
class GetResultFilesQservMgtRequest : public QservMgtRequest {
class GetResultFilesQservMgtRequest : public QservWorkerMgtRequest {
public:
typedef std::shared_ptr<GetResultFilesQservMgtRequest> Ptr;

Expand All @@ -52,7 +52,7 @@ class GetResultFilesQservMgtRequest : public QservMgtRequest {
GetResultFilesQservMgtRequest(GetResultFilesQservMgtRequest const&) = delete;
GetResultFilesQservMgtRequest& operator=(GetResultFilesQservMgtRequest const&) = delete;

virtual ~GetResultFilesQservMgtRequest() final = default;
virtual ~GetResultFilesQservMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
Expand All @@ -75,10 +75,10 @@ class GetResultFilesQservMgtRequest : public QservMgtRequest {

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) final;
virtual void notify(replica::Lock const& lock) override;

private:
/// @see GetResultFilesQservMgtRequest::create()
Expand Down
2 changes: 1 addition & 1 deletion src/replica/GetStatusQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ GetStatusQservMgtRequest::GetStatusQservMgtRequest(shared_ptr<ServiceProvider> c
string const& worker,
wbase::TaskSelector const& taskSelector,
GetStatusQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_STATUS", worker),
: QservWorkerMgtRequest(serviceProvider, "QSERV_GET_STATUS", worker),
_taskSelector(taskSelector),
_onFinish(onFinish) {}

Expand Down
Loading

0 comments on commit fa68f06

Please sign in to comment.