Skip to content

Commit

Permalink
Extended the replication System's Framework to monitor result files
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 3, 2023
1 parent 653aa2f commit 39f6da1
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ target_sources(replica PRIVATE
GetReplicasQservMgtRequest.cc
GetDbStatusQservMgtRequest.cc
GetConfigQservMgtRequest.cc
GetResultFilesQservMgtRequest.cc
GetStatusQservMgtRequest.cc
HealthMonitorTask.cc
HttpAsyncReqApp.cc
Expand Down
71 changes: 71 additions & 0 deletions src/replica/GetResultFilesQservMgtRequest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/GetResultFilesQservMgtRequest.h"

// Qserv headers
#include "util/String.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetResultFilesQservMgtRequest");

} // namespace

namespace lsst::qserv::replica {

shared_ptr<GetResultFilesQservMgtRequest> GetResultFilesQservMgtRequest::create(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
vector<QueryId> const& queryIds, unsigned int maxFiles,
GetResultFilesQservMgtRequest::CallbackType const& onFinish) {
return shared_ptr<GetResultFilesQservMgtRequest>(
new GetResultFilesQservMgtRequest(serviceProvider, worker, queryIds, maxFiles, onFinish));
}

GetResultFilesQservMgtRequest::GetResultFilesQservMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& worker,
vector<QueryId> const& queryIds, unsigned int maxFiles,
GetResultFilesQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker),
_queryIds(queryIds),
_maxFiles(maxFiles),
_onFinish(onFinish) {}

void GetResultFilesQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/files";
string query;
query += "?query_ids=" + util::String::toString(_queryIds);
query += "&max_files=" + to_string(_maxFiles);
createHttpReq(lock, service, query);
}

void GetResultFilesQservMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<GetResultFilesQservMgtRequest>(lock, _onFinish);
}

} // namespace lsst::qserv::replica
98 changes: 98 additions & 0 deletions src/replica/GetResultFilesQservMgtRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H
#define LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H

// System headers
#include <memory>
#include <string>
#include <vector>

// Qserv headers
#include "global/intTypes.h"
#include "replica/QservMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
} // namespace lsst::qserv::replica

// This header declarations
namespace lsst::qserv::replica {

/**
* Class GetResultFilesQservMgtRequest is a request for obtaining info
* on the partial result files from the Qserv worker.
*/
class GetResultFilesQservMgtRequest : public QservMgtRequest {
public:
typedef std::shared_ptr<GetResultFilesQservMgtRequest> Ptr;

/// The function type for notifications on the completion of the request
typedef std::function<void(Ptr)> CallbackType;

GetResultFilesQservMgtRequest() = delete;
GetResultFilesQservMgtRequest(GetResultFilesQservMgtRequest const&) = delete;
GetResultFilesQservMgtRequest& operator=(GetResultFilesQservMgtRequest const&) = delete;

virtual ~GetResultFilesQservMgtRequest() final = default;

/**
* Static factory method is needed to prevent issues with the lifespan
* and memory management of instances created otherwise (as values or via
* low-level pointers).
* @param serviceProvider A reference to a provider of services for accessing
* Configuration, saving the request's persistent state to the database.
* @param worker The name of a worker to send the request to.
* @param queryIds The optional selector for queries. If empty then all queries will
* be considered.
* @param maxFiles The optional limit for maximum number of files to be reported.
* If 0 then no limit is set.
* @param onFinish (optional) callback function to be called upon request completion.
* @return A pointer to the created object.
*/
static std::shared_ptr<GetResultFilesQservMgtRequest> create(
std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& worker,
std::vector<QueryId> const& queryIds = std::vector<QueryId>(), unsigned int maxFiles = 0,
CallbackType const& onFinish = nullptr);

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) final;

private:
/// @see GetResultFilesQservMgtRequest::create()
GetResultFilesQservMgtRequest(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& worker, std::vector<QueryId> const& queryIds,
unsigned int maxFiles, CallbackType const& onFinish);

// Input parameters

std::vector<QueryId> const _queryIds;
unsigned int const _maxFiles;
CallbackType _onFinish; ///< This callback is reset after finishing the request.
};

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H
6 changes: 6 additions & 0 deletions src/replica/HttpProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ void HttpProcessor::registerServices() {
self->_processorConfig, req, resp,
"WORKER-DB");
});
httpServer()->addHandler("GET", "/replication/qserv/worker/files/:worker",
[self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) {
HttpQservMonitorModule::process(self->controller(), self->name(),
self->_processorConfig, req, resp,
"WORKER-FILES");
});
httpServer()->addHandler("GET", "/replication/qserv/master/status",
[self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) {
HttpQservMonitorModule::process(self->controller(), self->name(),
Expand Down
71 changes: 44 additions & 27 deletions src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "css/CssError.h"
#include "global/intTypes.h"
#include "http/Exceptions.h"
#include "replica/Common.h"
#include "replica/DatabaseMySQL.h"
#include "replica/DatabaseMySQLTypes.h"
#include "replica/DatabaseMySQLUtils.h"
Expand All @@ -45,6 +46,7 @@
#include "replica/QservMgtServices.h"
#include "replica/QservStatusJob.h"
#include "replica/ServiceProvider.h"
#include "util/String.h"
#include "wbase/TaskState.h"

// LSST headers
Expand Down Expand Up @@ -131,6 +133,15 @@ void HttpQservMonitorModule::process(Controller::Ptr const& controller, string c
module.execute(subModuleName, authType);
}

void HttpQservMonitorModule::_throwIfNotSucceeded(string const& func,
shared_ptr<QservMgtRequest> const& request) {
if (request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS) return;
string const msg = "request id: " + request->id() + " of type: " + request->type() +
" sent to worker: " + request->worker() +
" failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(func, msg);
}

HttpQservMonitorModule::HttpQservMonitorModule(Controller::Ptr const& controller, string const& taskName,
HttpProcessorConfig const& processorConfig,
qhttp::Request::Ptr const& req,
Expand All @@ -146,6 +157,8 @@ json HttpQservMonitorModule::executeImpl(string const& subModuleName) {
return _workerConfig();
else if (subModuleName == "WORKER-DB")
return _workerDb();
else if (subModuleName == "WORKER-FILES")
return _workerFiles();
else if (subModuleName == "CZAR")
return _czar();
else if (subModuleName == "CZAR-CONFIG")
Expand Down Expand Up @@ -215,7 +228,6 @@ json HttpQservMonitorModule::_worker() {

string const noParentJobId;
GetStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->status(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();
Expand Down Expand Up @@ -246,19 +258,12 @@ json HttpQservMonitorModule::_workerConfig() {

string const noParentJobId;
GetConfigQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId,
onFinish, timeoutSec);
request->wait();
_throwIfNotSucceeded(__func__, request);

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["config"] = request->info();
return result;
return json::object({{"config", request->info()}});
}

json HttpQservMonitorModule::_workerDb() {
Expand All @@ -273,19 +278,36 @@ json HttpQservMonitorModule::_workerDb() {

string const noParentJobId;
GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
_throwIfNotSucceeded(__func__, request);

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["status"] = request->info();
return result;
return json::object({{"status", request->info()}});
}

json HttpQservMonitorModule::_workerFiles() {
debug(__func__);
checkApiVersion(__func__, 28);

auto const worker = params().at("worker");
auto const queryIds = query().optionalVectorUInt64("query_ids");
auto const maxFiles = query().optionalUInt("max_files", 0);
unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec());

debug(__func__, "worker=" + worker);
debug(__func__, "query_ids=" + util::String::toString(queryIds));
debug(__func__, "max_files=" + to_string(maxFiles));
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

string const noParentJobId;
GetResultFilesQservMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->resultFiles(
worker, noParentJobId, queryIds, maxFiles, onFinish, timeoutSec);
request->wait();
_throwIfNotSucceeded(__func__, request);

return json::object({{"status", request->info()}});
}

json HttpQservMonitorModule::_czar() {
Expand Down Expand Up @@ -368,8 +390,8 @@ wbase::TaskSelector HttpQservMonitorModule::_translateTaskSelector(string const&
}
selector.maxTasks = query().optionalUInt("max_tasks", 0);
debug(func, "include_tasks=" + replica::bool2str(selector.includeTasks));
debug(func, "queryIds.size()=" + to_string(selector.queryIds.size()));
debug(func, "taskStates.size()=" + to_string(selector.taskStates.size()));
debug(func, "query_ids=" + util::String::toString(selector.queryIds));
debug(func, "task_states=" + util::String::toString(selector.taskStates));
debug(func, "max_tasks=" + to_string(selector.maxTasks));
return selector;
}
Expand Down Expand Up @@ -417,7 +439,6 @@ json HttpQservMonitorModule::_activeQueries() {
checkApiVersion(__func__, 25);

unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec());

debug(__func__, "timeout_sec=" + to_string(timeoutSec));

// Check which queries and in which schedulers are being executed
Expand Down Expand Up @@ -463,7 +484,6 @@ json HttpQservMonitorModule::_activeQueriesProgress() {

QueryId const selectQueryId = query().optionalUInt64("query_id", 0);
unsigned int const selectLastSeconds = query().optionalUInt("last_seconds", 0);

debug(__func__, "query_id=" + to_string(selectQueryId));
debug(__func__, "last_seconds=" + to_string(selectLastSeconds));

Expand All @@ -472,7 +492,6 @@ json HttpQservMonitorModule::_activeQueriesProgress() {
QueryGenerator const g(conn);
string const command = "query_info " + to_string(selectQueryId) + " " + to_string(selectLastSeconds);
string const query = g.call(g.QSERV_MANAGER(command));

debug(__func__, "query=" + query);

// Result set processor populates the JSON object and returns the completion
Expand Down Expand Up @@ -586,19 +605,17 @@ json HttpQservMonitorModule::_userQuery() {

auto const queryId = stoull(params().at("id"));
bool const includeMessages = query().optionalUInt("include_messages", 0) != 0;

debug(__func__, "id=" + to_string(queryId));
debug(__func__, "include_messages=" + bool2str(includeMessages));

json result;

// Connect to the master database
// Manage the new connection via the RAII-style handler to ensure the transaction
// is automatically rolled-back in case of exceptions.

ConnectionHandler const h(Connection::open(Configuration::qservCzarDbParams("qservMeta")));
QueryGenerator const g(h.conn);

json result;
h.conn->executeInOwnTransaction([&](auto conn) {
unsigned int const limit4past = 0;
result["queries_past"] =
Expand Down
Loading

0 comments on commit 39f6da1

Please sign in to comment.