Skip to content

Commit

Permalink
Extended the replication System's Framework to monitor result files
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 1, 2023
1 parent bd28651 commit 3242e3a
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ target_sources(replica PRIVATE
GetReplicasQservMgtRequest.cc
GetDbStatusQservMgtRequest.cc
GetConfigQservMgtRequest.cc
GetResultFilesQservMgtRequest.cc
GetStatusQservMgtRequest.cc
HealthMonitorTask.cc
HttpAsyncReqApp.cc
Expand Down
60 changes: 60 additions & 0 deletions src/replica/GetResultFilesQservMgtRequest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/GetResultFilesQservMgtRequest.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetResultFilesQservMgtRequest");

} // namespace

namespace lsst::qserv::replica {

shared_ptr<GetResultFilesQservMgtRequest> GetResultFilesQservMgtRequest::create(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
GetResultFilesQservMgtRequest::CallbackType const& onFinish) {
return shared_ptr<GetResultFilesQservMgtRequest>(
new GetResultFilesQservMgtRequest(serviceProvider, worker, onFinish));
}

GetResultFilesQservMgtRequest::GetResultFilesQservMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
GetResultFilesQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker), _onFinish(onFinish) {}

void GetResultFilesQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/files";
createHttpReq(lock, service);
}

void GetResultFilesQservMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<GetResultFilesQservMgtRequest>(lock, _onFinish);
}

} // namespace lsst::qserv::replica
88 changes: 88 additions & 0 deletions src/replica/GetResultFilesQservMgtRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H
#define LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H

// System headers
#include <memory>
#include <string>

// Qserv headers
#include "replica/QservMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
} // namespace lsst::qserv::replica

// This header declarations
namespace lsst::qserv::replica {

/**
* Class GetResultFilesQservMgtRequest is a request for obtaining info
* on the partial result files from the Qserv worker.
*/
class GetResultFilesQservMgtRequest : public QservMgtRequest {
public:
typedef std::shared_ptr<GetResultFilesQservMgtRequest> Ptr;

/// The function type for notifications on the completion of the request
typedef std::function<void(Ptr)> CallbackType;

GetResultFilesQservMgtRequest() = delete;
GetResultFilesQservMgtRequest(GetResultFilesQservMgtRequest const&) = delete;
GetResultFilesQservMgtRequest& operator=(GetResultFilesQservMgtRequest const&) = delete;

virtual ~GetResultFilesQservMgtRequest() final = default;

/**
* Static factory method is needed to prevent issues with the lifespan
* and memory management of instances created otherwise (as values or via
* low-level pointers).
* @param serviceProvider A reference to a provider of services for accessing
* Configuration, saving the request's persistent state to the database.
* @param worker The name of a worker to send the request to.
* @param onFinish (optional) callback function to be called upon request completion.
* @return A pointer to the created object.
*/
static std::shared_ptr<GetResultFilesQservMgtRequest> create(
std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& worker,
CallbackType const& onFinish = nullptr);

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) final;

private:
/// @see GetResultFilesQservMgtRequest::create()
GetResultFilesQservMgtRequest(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& worker, CallbackType const& onFinish);

// Input parameters

CallbackType _onFinish; ///< This callback is reset after finishing the request.
};

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H
6 changes: 6 additions & 0 deletions src/replica/HttpProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ void HttpProcessor::registerServices() {
self->_processorConfig, req, resp,
"WORKER-DB");
});
httpServer()->addHandler("GET", "/replication/qserv/worker/files/:worker",
[self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) {
HttpQservMonitorModule::process(self->controller(), self->name(),
self->_processorConfig, req, resp,
"WORKER-FILES");
});
httpServer()->addHandler("GET", "/replication/qserv/master/status",
[self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) {
HttpQservMonitorModule::process(self->controller(), self->name(),
Expand Down
52 changes: 36 additions & 16 deletions src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ void HttpQservMonitorModule::process(Controller::Ptr const& controller, string c
module.execute(subModuleName, authType);
}

void HttpQservMonitorModule::_throwIfNotSucceeded(string const& func,
shared_ptr<QservMgtRequest> const& request) {
if (request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS) return;
string const msg = "request id: " + request->id() + " of type: " + request->type() +
" sent to worker: " + request->worker() +
" failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(func, msg);
}

HttpQservMonitorModule::HttpQservMonitorModule(Controller::Ptr const& controller, string const& taskName,
HttpProcessorConfig const& processorConfig,
qhttp::Request::Ptr const& req,
Expand All @@ -146,6 +155,8 @@ json HttpQservMonitorModule::executeImpl(string const& subModuleName) {
return _workerConfig();
else if (subModuleName == "WORKER-DB")
return _workerDb();
else if (subModuleName == "WORKER-FILES")
return _workerFiles();
else if (subModuleName == "CZAR")
return _czar();
else if (subModuleName == "CZAR-CONFIG")
Expand Down Expand Up @@ -250,15 +261,9 @@ json HttpQservMonitorModule::_workerConfig() {
auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId,
onFinish, timeoutSec);
request->wait();
_throwIfNotSucceeded(__func__, request);

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["config"] = request->info();
return result;
return json::object({{"config", request->info()}});
}

json HttpQservMonitorModule::_workerDb() {
Expand All @@ -277,15 +282,30 @@ json HttpQservMonitorModule::_workerDb() {
auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
_throwIfNotSucceeded(__func__, request);

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["status"] = request->info();
return result;
return json::object({{"status", request->info()}});
}

json HttpQservMonitorModule::_workerFiles() {
debug(__func__);
checkApiVersion(__func__, 28);

auto const worker = params().at("worker");
unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec());

debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

string const noParentJobId;
GetResultFilesQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->resultFiles(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();

_throwIfNotSucceeded(__func__, request);
return json::object({{"status", request->info()}});
}

json HttpQservMonitorModule::_czar() {
Expand Down
20 changes: 20 additions & 0 deletions src/replica/HttpQservMonitorModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ namespace lsst::qserv::wbase {
struct TaskSelector;
} // namespace lsst::qserv::wbase

namespace lsst::qserv::replica {
class QservMgtRequest;
} // namespace lsst::qserv::replica

// This header declarations
namespace lsst::qserv::replica {

Expand All @@ -58,6 +62,7 @@ class HttpQservMonitorModule : public HttpModule {
* WORKER - get the status info of a specific worker
* WORKER-CONFIG - get configuration parameters of a specific worker
* WORKER-DB - get the database status of a specific worker
* WORKER-FILES - get acollection of partial result files from a worker
* CZAR - get the status info of Czar
* CZAR-CONFIG - get configuration parameters of Czar
* CZAR-DB - get the database status of Czar
Expand All @@ -84,6 +89,15 @@ class HttpQservMonitorModule : public HttpModule {
nlohmann::json executeImpl(std::string const& subModuleName) final;

private:
/**
* The helper method for check the completion status of a request to ensure it succeded.
* @param func The calling context (for error reporting).
* @param request A request to be evaluated.
* @throw http::Error If the request didn't succeed.
*/
static void _throwIfNotSucceeded(std::string const& func,
std::shared_ptr<QservMgtRequest> const& request);

HttpQservMonitorModule(Controller::Ptr const& controller, std::string const& taskName,
HttpProcessorConfig const& processorConfig, qhttp::Request::Ptr const& req,
qhttp::Response::Ptr const& resp);
Expand Down Expand Up @@ -113,6 +127,12 @@ class HttpQservMonitorModule : public HttpModule {
*/
nlohmann::json _workerDb();

/**
* Process a request for extracting info on the partial query result files
* from select Qserv worker.
*/
nlohmann::json _workerFiles();

/**
* Process a request for extracting various status info of Czar.
*/
Expand Down
12 changes: 12 additions & 0 deletions src/replica/QservMgtServices.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ GetConfigQservMgtRequest::Ptr QservMgtServices::config(string const& worker, str
return request;
}

GetResultFilesQservMgtRequest::Ptr QservMgtServices::resultFiles(
string const& worker, string const& jobId,
GetResultFilesQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) {
auto const request = GetResultFilesQservMgtRequest::create(
serviceProvider(), worker, [self = shared_from_this()](QservMgtRequest::Ptr const& request) {
self->_finish(request->id());
});
_register(__func__, request, onFinish);
request->start(jobId, requestExpirationIvalSec);
return request;
}

void QservMgtServices::_finish(string const& id) {
string const context = "QservMgtServices::" + string(__func__) + "[" + id + "] ";
LOGS(_log, LOG_LVL_TRACE, context);
Expand Down
18 changes: 18 additions & 0 deletions src/replica/QservMgtServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "replica/GetReplicasQservMgtRequest.h"
#include "replica/GetDbStatusQservMgtRequest.h"
#include "replica/GetConfigQservMgtRequest.h"
#include "replica/GetResultFilesQservMgtRequest.h"
#include "replica/GetStatusQservMgtRequest.h"
#include "replica/RemoveReplicaQservMgtRequest.h"
#include "replica/SetReplicasQservMgtRequest.h"
Expand Down Expand Up @@ -294,6 +295,23 @@ class QservMgtServices : public std::enable_shared_from_this<QservMgtServices> {
GetConfigQservMgtRequest::CallbackType const& onFinish = nullptr,
unsigned int requestExpirationIvalSec = 0);

/**
* Request info on the partial result files of a Qserv worker
* @param worker The name of a worker.
* @param jobId An optional identifier of a job specifying a context in which
* a request will be executed.
* @param onFinish A callback function to be called upon request completion.
* @param requestExpirationIvalSec The maximum amount of time to wait before
* completion of the request. If a value of the parameter is set to 0 then no
* limit will be enforced.
* @return A pointer to the request object if the request was made. Return
* nullptr otherwise.
*/
GetResultFilesQservMgtRequest::Ptr resultFiles(
std::string const& worker, std::string const& jobId = "",
GetResultFilesQservMgtRequest::CallbackType const& onFinish = nullptr,
unsigned int requestExpirationIvalSec = 0);

private:
/**
* @param serviceProvider Is required for accessing configuration parameters.
Expand Down

0 comments on commit 3242e3a

Please sign in to comment.