Skip to content

Commit

Permalink
REST service at Qserv worker to collect info on result files
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 3, 2023
1 parent 3eaae87 commit 653aa2f
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 27
self.repl_api_version = 28
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/http/MetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ string const adminAuthKey;

namespace lsst::qserv::http {

unsigned int const MetaModule::version = 27;
unsigned int const MetaModule::version = 28;

void MetaModule::process(string const& context, nlohmann::json const& info, qhttp::Request::Ptr const& req,
qhttp::Response::Ptr const& resp, string const& subModuleName) {
Expand Down
83 changes: 79 additions & 4 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

// System headers
#include <functional>
#include <set>
#include <stdexcept>

// Third party headers
Expand All @@ -37,20 +38,52 @@
#include "wconfig/WorkerConfig.h"
#include "wpublish/QueriesAndChunks.h"
#include "util/MultiError.h"
#include "util/String.h"
#include "util/Timer.h"
#include "util/TimeUtils.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;
using namespace nlohmann;
namespace fs = boost::filesystem;
namespace util = lsst::qserv::util;
namespace wconfig = lsst::qserv::wconfig;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.wbase.FileChannelShared");

string const resultFileExt = ".proto";

bool isResultFile(fs::path const& filePath) {
return filePath.has_filename() && filePath.has_extension() && (filePath.extension() == resultFileExt);
}

/**
* Extract task attributes from the file path.
* The file path is required to have the following format:
* @code
* [<folder>/]<query-id>-<job-id>-<chunk-id>-<attemptcount>[.<ext>]
* @code
* @param filePath The file to be evaluated.
* @return nlohmann::json::object Task attributes.
* @throw std::invalid_argument If the file path did not match expectations.
*/
json file2task(fs::path const& filePath) {
vector<std::uint64_t> const taskAttributes =
util::String::parseToVectUInt64(filePath.stem().string(), "-");
if (taskAttributes.size() != 4) {
throw invalid_argument("FileChannelShared::" + string(__func__) +
" not a valid result file: " + filePath.string());
}
return json::object({{"query_id", taskAttributes[0]},
{"job_id", taskAttributes[1]},
{"chunk_id", taskAttributes[2]},
{"attemptcount", taskAttributes[3]}});
}

/**
* Iterate over the result files at the results folder and remove those
* which satisfy the desired criteria.
Expand All @@ -67,7 +100,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.wbase.FileChannelShared");
size_t cleanUpResultsImpl(string const& context, fs::path const& dirPath,
function<bool(string const&)> fileCanBeRemoved = nullptr) {
size_t numFilesRemoved = 0;
string const ext = ".proto";
boost::system::error_code ec;
auto itr = fs::directory_iterator(dirPath, ec);
if (ec.value() != 0) {
Expand All @@ -79,7 +111,7 @@ size_t cleanUpResultsImpl(string const& context, fs::path const& dirPath,
for (auto&& entry : boost::make_iterator_range(itr, {})) {
auto filePath = entry.path();
bool const removeIsCleared =
filePath.has_filename() && filePath.has_extension() && (filePath.extension() == ext) &&
::isResultFile(filePath) &&
((fileCanBeRemoved == nullptr) || fileCanBeRemoved(filePath.filename().string()));
if (removeIsCleared) {
fs::remove_all(filePath, ec);
Expand Down Expand Up @@ -173,11 +205,10 @@ json FileChannelShared::statusToJson() {
result["available_bytes"] = space.available;
uintmax_t sizeResultFilesBytes = 0;
uintmax_t numResultFiles = 0;
string const ext = ".proto";
auto itr = fs::directory_iterator(dirPath);
for (auto&& entry : boost::make_iterator_range(itr, {})) {
auto const filePath = entry.path();
if (filePath.has_filename() && filePath.has_extension() && (filePath.extension() == ext)) {
if (::isResultFile(filePath)) {
numResultFiles++;
sizeResultFilesBytes += fs::file_size(filePath);
}
Expand All @@ -191,6 +222,50 @@ json FileChannelShared::statusToJson() {
return result;
}

json FileChannelShared::filesToJson(vector<QueryId> const& queryIds, unsigned int maxFiles) {
string const context = "FileChannelShared::" + string(__func__) + " ";
set<QueryId> queryIdsFilter;
for (auto const queryId : queryIds) {
queryIdsFilter.insert(queryId);
}
auto const config = wconfig::WorkerConfig::instance();
fs::path const dirPath = config->resultsDirname();
json result = json::array();
lock_guard<mutex> const lock(_resultsDirCleanupMtx);
try {
auto itr = fs::directory_iterator(dirPath);
for (auto&& entry : boost::make_iterator_range(itr, {})) {
auto const filePath = entry.path();
if (::isResultFile(filePath)) {
// Skip files not matching the query criteria if the one was requested.
json const jsonTask = ::file2task(filePath);
QueryId const queryId = jsonTask.at("query_id");
if (!queryIdsFilter.empty() && !queryIdsFilter.contains(queryId)) continue;
// A separate exception handler to avoid and ignore race conditions if
// the current file gets deleted. In this scenario the file will not be
// reported in the result.
try {
result.push_back(json::object({{"filename", filePath.filename().string()},
{"size", fs::file_size(filePath)},
{"ctime", fs::creation_time(filePath)},
{"mtime", fs::last_write_time(filePath)},
{"current_time_ms", util::TimeUtils::now()},
{"task", jsonTask}}));
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN,
context << "failed to get info on files at " << dirPath << ", ex: " << ex.what());
}
// Stop collecting files after reaching the limit (if any).
if ((maxFiles != 0) && (result.size() >= maxFiles)) break;
}
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN,
context << "failed to iterate over files at " << dirPath << ", ex: " << ex.what());
}
return result;
}

FileChannelShared::Ptr FileChannelShared::create(shared_ptr<wbase::SendChannel> const& sendChannel,
shared_ptr<wcontrol::TransmitMgr> const& transmitMgr,
shared_ptr<proto::TaskMsg> const& taskMsg) {
Expand Down
11 changes: 11 additions & 0 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fstream>
#include <memory>
#include <mutex>
#include <vector>

// Third-party headers
#include <mysql/mysql.h>
Expand Down Expand Up @@ -98,6 +99,16 @@ class FileChannelShared : public ChannelShared {
/// @return Status and statistics on the results folder (capacity, usage, etc.)
static nlohmann::json statusToJson();

/**
* Locate existing result files.
* @param queryIds The optional selector for queries. If the collection is empty
* then all queries will be considered.
* @param maxFiles The optional limit for maximum number of files to be reported.
* If 0 then no limit is set.
* @return A collection of the results files matching the optional filter.
*/
static nlohmann::json filesToJson(std::vector<QueryId> const& queryIds, unsigned int maxFiles);

/// The factory method for the channel class.
static Ptr create(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<wcontrol::TransmitMgr> const& transmitMgr,
Expand Down
15 changes: 15 additions & 0 deletions src/xrdsvc/HttpMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "http/Exceptions.h"
#include "http/RequestQuery.h"
#include "mysql/MySqlUtils.h"
#include "util/String.h"
#include "wbase/FileChannelShared.h"
#include "wbase/TaskState.h"
#include "wconfig/WorkerConfig.h"
Expand Down Expand Up @@ -66,6 +67,8 @@ json HttpMonitorModule::executeImpl(string const& subModuleName) {
return _mysql();
else if (subModuleName == "STATUS")
return _status();
else if (subModuleName == "FILES")
return _files();
else if (subModuleName == "ECHO")
return _echo();
throw invalid_argument(context() + func + " unsupported sub-module");
Expand Down Expand Up @@ -114,6 +117,18 @@ json HttpMonitorModule::_status() {
return result;
}

json HttpMonitorModule::_files() {
debug(__func__);
checkApiVersion(__func__, 28);
auto const queryIds = query().optionalVectorUInt64("query_ids");
auto const maxFiles = query().optionalUInt("max_files", 0);
debug(__func__, "query_ids=" + util::String::toString(queryIds));
debug(__func__, "max_files=" + to_string(maxFiles));
json result;
result["files"] = wbase::FileChannelShared::filesToJson(queryIds, maxFiles);
return result;
}

json HttpMonitorModule::_echo() {
debug(__func__);
checkApiVersion(__func__, 27);
Expand Down
4 changes: 4 additions & 0 deletions src/xrdsvc/HttpMonitorModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HttpMonitorModule : public xrdsvc::HttpModule {
* 'CONFIG' - get configuration parameters
* 'MYSQL' - get the status (running queries) of the worker's MySQL service
* 'STATUS' - get the status info (tasks, schedulers, etc.)
* 'FILES' - get info on the partial result files
* 'ECHO' - send back the received data
*
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
Expand Down Expand Up @@ -86,6 +87,9 @@ class HttpMonitorModule : public xrdsvc::HttpModule {
/// @return The worker status info (tasks, schedulers, etc.).
nlohmann::json _status();

/// @return An info on the partial result files.
nlohmann::json _files();

/// @return Send back the received data.
nlohmann::json _echo();
};
Expand Down
14 changes: 4 additions & 10 deletions src/xrdsvc/HttpReplicaMgtModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include "http/RequestBody.h"
#include "http/RequestQuery.h"
#include "mysql/MySqlUtils.h"
#include "util/IterableFormatter.h"
#include "util/String.h"
#include "wconfig/WorkerConfig.h"
#include "wcontrol/Foreman.h"
#include "wcontrol/ResourceMonitor.h"
Expand All @@ -57,12 +57,6 @@ json const extErrorReplicaInUse = json::object({{"in_use", 1}});

string makeResource(string const& database, int chunk) { return "/chk/" + database + "/" + to_string(chunk); }

string vec2str(vector<string> const& v) {
ostringstream ss;
ss << lsst::qserv::util::printable(v, "", "", ",");
return ss.str();
}

} // namespace

namespace lsst::qserv::xrdsvc {
Expand Down Expand Up @@ -108,7 +102,7 @@ json HttpReplicaMgtModule::_getReplicas() {
bool const inUseOnly = query().optionalUInt("in_use_only", 0) != 0;
vector<string> const databases = query().requiredVectorStr("databases");
debug(__func__, "in_use_only: " + string(inUseOnly ? "1" : "0"));
debug(__func__, "databases: " + ::vec2str(databases));
debug(__func__, "databases: " + util::String::toString(databases));
set<string> databaseFilter;
for (string const& database : databases) {
databaseFilter.insert(database);
Expand All @@ -123,7 +117,7 @@ json HttpReplicaMgtModule::_setReplicas() {
bool const force = body().optional<int>("force", 0) != 0;
vector<string> const databases = body().requiredColl<string>("databases");
debug(__func__, "force: " + string(force ? "1" : "0"));
debug(__func__, "databases: " + ::vec2str(databases));
debug(__func__, "databases: " + util::String::toString(databases));
set<string> databaseFilter;
for (string const& database : databases) {
databaseFilter.insert(database);
Expand Down Expand Up @@ -308,7 +302,7 @@ void HttpReplicaMgtModule::_modifyReplica(string const& func, Direction directio
bool const force = body().optional<int>("force", 0) != 0;

debug(func, "chunk: " + to_string(chunk));
debug(func, "databases: " + ::vec2str(databases));
debug(func, "databases: " + util::String::toString(databases));
debug(func, "force: " + string(force ? "1" : "0"));

if (databases.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions src/xrdsvc/HttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ uint16_t HttpSvc::start() {
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, self->_foreman, req, resp, "STATUS");
}}});
_httpServerPtr->addHandlers(
{{"GET", "/files",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, self->_foreman, req, resp, "FILES");
}}});
_httpServerPtr->addHandlers(
{{"POST", "/echo",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
Expand Down

0 comments on commit 653aa2f

Please sign in to comment.