From 39f6da1babac69ca8ff6dde916a1e719d01b447a Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 1 Dec 2023 23:13:12 +0000 Subject: [PATCH] Extended the replication System's Framework to monitor result files --- src/replica/CMakeLists.txt | 1 + src/replica/GetResultFilesQservMgtRequest.cc | 71 ++++++++++++++ src/replica/GetResultFilesQservMgtRequest.h | 98 ++++++++++++++++++++ src/replica/HttpProcessor.cc | 6 ++ src/replica/HttpQservMonitorModule.cc | 71 ++++++++------ src/replica/HttpQservMonitorModule.h | 20 ++++ src/replica/QservMgtServices.cc | 13 +++ src/replica/QservMgtServices.h | 24 +++++ 8 files changed, 277 insertions(+), 27 deletions(-) create mode 100644 src/replica/GetResultFilesQservMgtRequest.cc create mode 100644 src/replica/GetResultFilesQservMgtRequest.h diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index d8c83c113..d5e09b70a 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -76,6 +76,7 @@ target_sources(replica PRIVATE GetReplicasQservMgtRequest.cc GetDbStatusQservMgtRequest.cc GetConfigQservMgtRequest.cc + GetResultFilesQservMgtRequest.cc GetStatusQservMgtRequest.cc HealthMonitorTask.cc HttpAsyncReqApp.cc diff --git a/src/replica/GetResultFilesQservMgtRequest.cc b/src/replica/GetResultFilesQservMgtRequest.cc new file mode 100644 index 000000000..8a8ad0af9 --- /dev/null +++ b/src/replica/GetResultFilesQservMgtRequest.cc @@ -0,0 +1,71 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetResultFilesQservMgtRequest.h" + +// Qserv headers +#include "util/String.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetResultFilesQservMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetResultFilesQservMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + vector const& queryIds, unsigned int maxFiles, + GetResultFilesQservMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetResultFilesQservMgtRequest(serviceProvider, worker, queryIds, maxFiles, onFinish)); +} + +GetResultFilesQservMgtRequest::GetResultFilesQservMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + vector const& queryIds, unsigned int maxFiles, + GetResultFilesQservMgtRequest::CallbackType const& onFinish) + : QservMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker), + _queryIds(queryIds), + _maxFiles(maxFiles), + _onFinish(onFinish) {} + +void GetResultFilesQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const service = "/files"; + string query; + query += "?query_ids=" + util::String::toString(_queryIds); + query += "&max_files=" + to_string(_maxFiles); + createHttpReq(lock, service, query); +} + +void GetResultFilesQservMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetResultFilesQservMgtRequest.h b/src/replica/GetResultFilesQservMgtRequest.h new file mode 100644 index 000000000..c1131a4b6 --- /dev/null +++ b/src/replica/GetResultFilesQservMgtRequest.h @@ -0,0 +1,98 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H + +// System headers +#include +#include +#include + +// Qserv headers +#include "global/intTypes.h" +#include "replica/QservMgtRequest.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetResultFilesQservMgtRequest is a request for obtaining info + * on the partial result files from the Qserv worker. + */ +class GetResultFilesQservMgtRequest : public QservMgtRequest { +public: + typedef std::shared_ptr Ptr; + + /// The function type for notifications on the completion of the request + typedef std::function CallbackType; + + GetResultFilesQservMgtRequest() = delete; + GetResultFilesQservMgtRequest(GetResultFilesQservMgtRequest const&) = delete; + GetResultFilesQservMgtRequest& operator=(GetResultFilesQservMgtRequest const&) = delete; + + virtual ~GetResultFilesQservMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param queryIds The optional selector for queries. If empty then all queries will + * be considered. + * @param maxFiles The optional limit for maximum number of files to be reported. + * If 0 then no limit is set. + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + std::vector const& queryIds = std::vector(), unsigned int maxFiles = 0, + CallbackType const& onFinish = nullptr); + +protected: + /// @see QservMgtRequest::createHttpReqImpl() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see GetResultFilesQservMgtRequest::create() + GetResultFilesQservMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, std::vector const& queryIds, + unsigned int maxFiles, CallbackType const& onFinish); + + // Input parameters + + std::vector const _queryIds; + unsigned int const _maxFiles; + CallbackType _onFinish; ///< This callback is reset after finishing the request. +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H diff --git a/src/replica/HttpProcessor.cc b/src/replica/HttpProcessor.cc index 2d5699c23..7817374f1 100644 --- a/src/replica/HttpProcessor.cc +++ b/src/replica/HttpProcessor.cc @@ -257,6 +257,12 @@ void HttpProcessor::registerServices() { self->_processorConfig, req, resp, "WORKER-DB"); }); + httpServer()->addHandler("GET", "/replication/qserv/worker/files/:worker", + [self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) { + HttpQservMonitorModule::process(self->controller(), self->name(), + self->_processorConfig, req, resp, + "WORKER-FILES"); + }); httpServer()->addHandler("GET", "/replication/qserv/master/status", [self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) { HttpQservMonitorModule::process(self->controller(), self->name(), diff --git a/src/replica/HttpQservMonitorModule.cc b/src/replica/HttpQservMonitorModule.cc index 439ecf81f..63029e2f4 100644 --- a/src/replica/HttpQservMonitorModule.cc +++ b/src/replica/HttpQservMonitorModule.cc @@ -35,6 +35,7 @@ #include "css/CssError.h" #include "global/intTypes.h" #include "http/Exceptions.h" +#include "replica/Common.h" #include "replica/DatabaseMySQL.h" #include "replica/DatabaseMySQLTypes.h" #include "replica/DatabaseMySQLUtils.h" @@ -45,6 +46,7 @@ #include "replica/QservMgtServices.h" #include "replica/QservStatusJob.h" #include "replica/ServiceProvider.h" +#include "util/String.h" #include "wbase/TaskState.h" // LSST headers @@ -131,6 +133,15 @@ void HttpQservMonitorModule::process(Controller::Ptr const& controller, string c module.execute(subModuleName, authType); } +void HttpQservMonitorModule::_throwIfNotSucceeded(string const& func, + shared_ptr const& request) { + if (request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS) return; + string const msg = "request id: " + request->id() + " of type: " + request->type() + + " sent to worker: " + request->worker() + + " failed, error: " + QservMgtRequest::state2string(request->extendedState()); + throw http::Error(func, msg); +} + HttpQservMonitorModule::HttpQservMonitorModule(Controller::Ptr const& controller, string const& taskName, HttpProcessorConfig const& processorConfig, qhttp::Request::Ptr const& req, @@ -146,6 +157,8 @@ json HttpQservMonitorModule::executeImpl(string const& subModuleName) { return _workerConfig(); else if (subModuleName == "WORKER-DB") return _workerDb(); + else if (subModuleName == "WORKER-FILES") + return _workerFiles(); else if (subModuleName == "CZAR") return _czar(); else if (subModuleName == "CZAR-CONFIG") @@ -215,7 +228,6 @@ json HttpQservMonitorModule::_worker() { string const noParentJobId; GetStatusQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->status( worker, noParentJobId, taskSelector, onFinish, timeoutSec); request->wait(); @@ -246,19 +258,12 @@ json HttpQservMonitorModule::_workerConfig() { string const noParentJobId; GetConfigQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId, onFinish, timeoutSec); request->wait(); + _throwIfNotSucceeded(__func__, request); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); - throw http::Error(__func__, msg); - } - json result = json::object(); - result["config"] = request->info(); - return result; + return json::object({{"config", request->info()}}); } json HttpQservMonitorModule::_workerDb() { @@ -273,19 +278,36 @@ json HttpQservMonitorModule::_workerDb() { string const noParentJobId; GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus( worker, noParentJobId, onFinish, timeoutSec); request->wait(); + _throwIfNotSucceeded(__func__, request); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); - throw http::Error(__func__, msg); - } - json result = json::object(); - result["status"] = request->info(); - return result; + return json::object({{"status", request->info()}}); +} + +json HttpQservMonitorModule::_workerFiles() { + debug(__func__); + checkApiVersion(__func__, 28); + + auto const worker = params().at("worker"); + auto const queryIds = query().optionalVectorUInt64("query_ids"); + auto const maxFiles = query().optionalUInt("max_files", 0); + unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec()); + + debug(__func__, "worker=" + worker); + debug(__func__, "query_ids=" + util::String::toString(queryIds)); + debug(__func__, "max_files=" + to_string(maxFiles)); + debug(__func__, "timeout_sec=" + to_string(timeoutSec)); + + string const noParentJobId; + GetResultFilesQservMgtRequest::CallbackType const onFinish = nullptr; + auto const request = controller()->serviceProvider()->qservMgtServices()->resultFiles( + worker, noParentJobId, queryIds, maxFiles, onFinish, timeoutSec); + request->wait(); + _throwIfNotSucceeded(__func__, request); + + return json::object({{"status", request->info()}}); } json HttpQservMonitorModule::_czar() { @@ -368,8 +390,8 @@ wbase::TaskSelector HttpQservMonitorModule::_translateTaskSelector(string const& } selector.maxTasks = query().optionalUInt("max_tasks", 0); debug(func, "include_tasks=" + replica::bool2str(selector.includeTasks)); - debug(func, "queryIds.size()=" + to_string(selector.queryIds.size())); - debug(func, "taskStates.size()=" + to_string(selector.taskStates.size())); + debug(func, "query_ids=" + util::String::toString(selector.queryIds)); + debug(func, "task_states=" + util::String::toString(selector.taskStates)); debug(func, "max_tasks=" + to_string(selector.maxTasks)); return selector; } @@ -417,7 +439,6 @@ json HttpQservMonitorModule::_activeQueries() { checkApiVersion(__func__, 25); unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec()); - debug(__func__, "timeout_sec=" + to_string(timeoutSec)); // Check which queries and in which schedulers are being executed @@ -463,7 +484,6 @@ json HttpQservMonitorModule::_activeQueriesProgress() { QueryId const selectQueryId = query().optionalUInt64("query_id", 0); unsigned int const selectLastSeconds = query().optionalUInt("last_seconds", 0); - debug(__func__, "query_id=" + to_string(selectQueryId)); debug(__func__, "last_seconds=" + to_string(selectLastSeconds)); @@ -472,7 +492,6 @@ json HttpQservMonitorModule::_activeQueriesProgress() { QueryGenerator const g(conn); string const command = "query_info " + to_string(selectQueryId) + " " + to_string(selectLastSeconds); string const query = g.call(g.QSERV_MANAGER(command)); - debug(__func__, "query=" + query); // Result set processor populates the JSON object and returns the completion @@ -586,12 +605,9 @@ json HttpQservMonitorModule::_userQuery() { auto const queryId = stoull(params().at("id")); bool const includeMessages = query().optionalUInt("include_messages", 0) != 0; - debug(__func__, "id=" + to_string(queryId)); debug(__func__, "include_messages=" + bool2str(includeMessages)); - json result; - // Connect to the master database // Manage the new connection via the RAII-style handler to ensure the transaction // is automatically rolled-back in case of exceptions. @@ -599,6 +615,7 @@ json HttpQservMonitorModule::_userQuery() { ConnectionHandler const h(Connection::open(Configuration::qservCzarDbParams("qservMeta"))); QueryGenerator const g(h.conn); + json result; h.conn->executeInOwnTransaction([&](auto conn) { unsigned int const limit4past = 0; result["queries_past"] = diff --git a/src/replica/HttpQservMonitorModule.h b/src/replica/HttpQservMonitorModule.h index 292d42c3f..d474066e1 100644 --- a/src/replica/HttpQservMonitorModule.h +++ b/src/replica/HttpQservMonitorModule.h @@ -40,6 +40,10 @@ namespace lsst::qserv::wbase { struct TaskSelector; } // namespace lsst::qserv::wbase +namespace lsst::qserv::replica { +class QservMgtRequest; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -58,6 +62,7 @@ class HttpQservMonitorModule : public HttpModule { * WORKER - get the status info of a specific worker * WORKER-CONFIG - get configuration parameters of a specific worker * WORKER-DB - get the database status of a specific worker + * WORKER-FILES - get acollection of partial result files from a worker * CZAR - get the status info of Czar * CZAR-CONFIG - get configuration parameters of Czar * CZAR-DB - get the database status of Czar @@ -84,6 +89,15 @@ class HttpQservMonitorModule : public HttpModule { nlohmann::json executeImpl(std::string const& subModuleName) final; private: + /** + * The helper method for check the completion status of a request to ensure it succeded. + * @param func The calling context (for error reporting). + * @param request A request to be evaluated. + * @throw http::Error If the request didn't succeed. + */ + static void _throwIfNotSucceeded(std::string const& func, + std::shared_ptr const& request); + HttpQservMonitorModule(Controller::Ptr const& controller, std::string const& taskName, HttpProcessorConfig const& processorConfig, qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp); @@ -113,6 +127,12 @@ class HttpQservMonitorModule : public HttpModule { */ nlohmann::json _workerDb(); + /** + * Process a request for extracting info on the partial query result files + * from select Qserv worker. + */ + nlohmann::json _workerFiles(); + /** * Process a request for extracting various status info of Czar. */ diff --git a/src/replica/QservMgtServices.cc b/src/replica/QservMgtServices.cc index 459b7783a..a9b485ee0 100644 --- a/src/replica/QservMgtServices.cc +++ b/src/replica/QservMgtServices.cc @@ -152,6 +152,19 @@ GetConfigQservMgtRequest::Ptr QservMgtServices::config(string const& worker, str return request; } +GetResultFilesQservMgtRequest::Ptr QservMgtServices::resultFiles( + string const& worker, string const& jobId, vector const& queryIds, unsigned int maxFiles, + GetResultFilesQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + auto const request = GetResultFilesQservMgtRequest::create( + serviceProvider(), worker, queryIds, maxFiles, + [self = shared_from_this()](QservMgtRequest::Ptr const& request) { + self->_finish(request->id()); + }); + _register(__func__, request, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + void QservMgtServices::_finish(string const& id) { string const context = "QservMgtServices::" + string(__func__) + "[" + id + "] "; LOGS(_log, LOG_LVL_TRACE, context); diff --git a/src/replica/QservMgtServices.h b/src/replica/QservMgtServices.h index 9366aeda1..0481d0aa7 100644 --- a/src/replica/QservMgtServices.h +++ b/src/replica/QservMgtServices.h @@ -27,10 +27,12 @@ #include // Qserv headers +#include "global/intTypes.h" #include "replica/AddReplicaQservMgtRequest.h" #include "replica/GetReplicasQservMgtRequest.h" #include "replica/GetDbStatusQservMgtRequest.h" #include "replica/GetConfigQservMgtRequest.h" +#include "replica/GetResultFilesQservMgtRequest.h" #include "replica/GetStatusQservMgtRequest.h" #include "replica/RemoveReplicaQservMgtRequest.h" #include "replica/SetReplicasQservMgtRequest.h" @@ -294,6 +296,28 @@ class QservMgtServices : public std::enable_shared_from_this { GetConfigQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + /** + * Request info on the partial result files of a Qserv worker + * @param worker The name of a worker. + * @param jobId An optional identifier of a job specifying a context in which + * a request will be executed. + * @param queryIds The optional selector for queries. If empty then all queries will + * be considered. + * @param maxFiles The optional limit for maximum number of files to be reported. + * If 0 then no limit is set. + * @param onFinish A callback function to be called upon request completion. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. + * @return A pointer to the request object if the request was made. Return + * nullptr otherwise. + */ + GetResultFilesQservMgtRequest::Ptr resultFiles( + std::string const& worker, std::string const& jobId = "", + std::vector const& queryIds = std::vector(), unsigned int maxFiles = 0, + GetResultFilesQservMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + private: /** * @param serviceProvider Is required for accessing configuration parameters.