Skip to content

Commit

Permalink
The initial implementation of the HTTP frontend to Qserv Czar
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Feb 2, 2024
1 parent a560c1f commit 6fccd1a
Show file tree
Hide file tree
Showing 6 changed files with 726 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ add_library(czar OBJECT)

target_sources(czar PRIVATE
Czar.cc
HttpCzarSvc.cc
HttpCzarQueryModule.cc
HttpModule.cc
HttpMonitorModule.cc
HttpSvc.cc
Expand All @@ -21,3 +23,31 @@ target_link_libraries(czar PUBLIC
log
XrdSsiLib
)

function(CZAR_UTILS)
foreach(UTIL IN ITEMS ${ARGV})
add_executable(${UTIL})
target_sources(${UTIL} PRIVATE ${UTIL}.cc)
target_include_directories(${UTIL} PRIVATE ${XROOTD_INCLUDE_DIRS})
target_link_libraries(${UTIL} PRIVATE
cconfig
ccontrol
czar
global
mysql
parser
qana
qdisp
qproc
qserv_meta
query
rproc
sql
)
install(TARGETS ${UTIL})
endforeach()
endfunction()

czar_utils(
qserv-czar-http
)
226 changes: 226 additions & 0 deletions src/czar/HttpCzarQueryModule.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "czar/HttpCzarQueryModule.h"

// System headers
#include <map>
#include <stdexcept>
#include <vector>

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "czar/Czar.h"
#include "czar/SubmitResult.h"
#include "global/intTypes.h"
#include "http/Exceptions.h"
#include "http/RequestQuery.h"
#include "qdisp/CzarStats.h"
#include "sql/SqlConnection.h"
#include "sql/SqlConnectionFactory.h"
#include "sql/SqlResults.h"
#include "sql/Schema.h"
#include "util/String.h"

using namespace std;
using json = nlohmann::json;

namespace lsst::qserv::czar {

void HttpCzarQueryModule::process(string const& context, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp, string const& subModuleName,
http::AuthType const authType) {
HttpCzarQueryModule module(context, req, resp);
module.execute(subModuleName, authType);
}

HttpCzarQueryModule::HttpCzarQueryModule(string const& context, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp)
: HttpModule(context, req, resp) {}

json HttpCzarQueryModule::executeImpl(string const& subModuleName) {
string const func = string(__func__) + "[sub-module='" + subModuleName + "']";
debug(func);
enforceInstanceId(func, cconfig::CzarConfig::instance()->replicationInstanceId());
enforceCzarId(func);
if (subModuleName == "SUBMIT")
return _submit();
else if (subModuleName == "SUBMIT-ASYNC")
return _submitAsync();
else if (subModuleName == "CANCEL")
return _cancel();
else if (subModuleName == "STATUS")
return _status();
else if (subModuleName == "RESULT")
return _result();
throw invalid_argument(context() + func + " unsupported sub-module");
}

json HttpCzarQueryModule::_submit() {
debug(__func__);
checkApiVersion(__func__, 30);
string const userQuery = this->query().requiredString("query");
map<string, string> const hints;
SubmitResult const submitResult = Czar::getCzar()->submitQuery(userQuery, hints);
if (!submitResult.errorMessage.empty()) {
_dropTable(submitResult.messageTable);
throw http::Error(context() + __func__, submitResult.errorMessage);
}

// Block the current thread before the query will finish or fail.
string const messageSelectQuery =
"SELECT chunkId, code, message, severity+0, timeStamp FROM " + submitResult.messageTable;
auto const conn =
sql::SqlConnectionFactory::make(cconfig::CzarConfig::instance()->getMySqlResultConfig());
sql::SqlResults messageQueryResults;
sql::SqlErrorObject messageQueryErr;
if (!conn->runQuery(messageSelectQuery, messageQueryResults, messageQueryErr)) {
_dropTable(submitResult.messageTable);
_dropTable(submitResult.resultTable);
string const msg = "failed query=" + messageSelectQuery + " err=" + messageQueryErr.printErrMsg();
error(__func__, msg);
throw http::Error(context() + __func__, msg);
}

// Read thе message table to see if the user query suceeded or failed
vector<string> chunkId;
vector<string> code;
vector<string> message;
vector<string> severity;
sql::SqlErrorObject messageProcessErr;
if (!messageQueryResults.extractFirst4Columns(chunkId, code, message, severity, messageProcessErr)) {
messageQueryResults.freeResults();
_dropTable(submitResult.messageTable);
_dropTable(submitResult.resultTable);
string const msg = "failed to extract results of query=" + messageSelectQuery +
" err=" + messageProcessErr.printErrMsg();
error(__func__, msg);
throw http::Error(context() + __func__, msg);
}
string errorMsg;
for (size_t i = 0; i < chunkId.size(); ++i) {
if (code[i] != "0") {
errorMsg += "[chunkId=" + chunkId[i] + " code=" + code[i] +

" message=" + message[i] + " severity=" + severity[i] + "], ";
}
}
if (!errorMsg.empty()) {
messageQueryResults.freeResults();
_dropTable(submitResult.messageTable);
_dropTable(submitResult.resultTable);
error(__func__, errorMsg);
throw http::Error(context() + __func__, errorMsg);
}
messageQueryResults.freeResults();
_dropTable(submitResult.messageTable);

// Read a result set from the result table, package it into the JSON object
// and sent it back to a user.
sql::SqlResults resultQueryResults;
sql::SqlErrorObject resultQueryErr;
if (!conn->runQuery(submitResult.resultQuery, resultQueryResults, resultQueryErr)) {
_dropTable(submitResult.resultTable);
string const msg =
"failed query=" + submitResult.resultQuery + " err=" + resultQueryErr.printErrMsg();
error(__func__, msg);
throw http::Error(context() + __func__, msg);
}

sql::SqlErrorObject makeSchemaErr;
json const schemaJson = _schemaToJson(resultQueryResults.makeSchema(makeSchemaErr));
if (makeSchemaErr.isSet()) {
resultQueryResults.freeResults();
_dropTable(submitResult.resultTable);
string const msg = "failed to extract schema for query=" + submitResult.resultQuery +
" err=" + makeSchemaErr.printErrMsg();
error(__func__, msg);
throw http::Error(context() + __func__, msg);
}
json rowsJson = _rowsToJson(resultQueryResults);
resultQueryResults.freeResults();
_dropTable(submitResult.resultTable);
return json::object({{"schema", schemaJson}, {"rows", rowsJson}});
}

json HttpCzarQueryModule::_submitAsync() {
debug(__func__);
checkApiVersion(__func__, 30);
return json::object();
}

json HttpCzarQueryModule::_cancel() {
debug(__func__);
checkApiVersion(__func__, 30);
return json::object();
}

json HttpCzarQueryModule::_status() {
debug(__func__);
checkApiVersion(__func__, 30);
return json::object();
}

json HttpCzarQueryModule::_result() {
debug(__func__);
checkApiVersion(__func__, 30);
return json::object();
}

void HttpCzarQueryModule::_dropTable(string const& tableName) const {
if (tableName.empty()) return;
string const query = "DROP TABLE " + tableName;
debug(__func__, query);
auto const conn =
sql::SqlConnectionFactory::make(cconfig::CzarConfig::instance()->getMySqlResultConfig());
sql::SqlErrorObject err;
if (!conn->runQuery(query, err)) {
error(__func__, "failed query=" + query + " err=" + err.printErrMsg());
}
}

json HttpCzarQueryModule::_schemaToJson(sql::Schema const& schema) const {
json schemaJson = json::array();
for (auto const& colDef : schema.columns) {
json columnJson = json::object();
columnJson["table"] = colDef.table;
columnJson["column"] = colDef.name;
columnJson["type"] = colDef.colType.sqlType;
schemaJson.push_back(columnJson);
}
return schemaJson;
}

json HttpCzarQueryModule::_rowsToJson(sql::SqlResults& results) const {
json rowsJson = json::array();
for (sql::SqlResults::iterator itr = results.begin(); itr != results.end(); ++itr) {
sql::SqlResults::value_type const& row = *itr;
json rowJson = json::array();
for (size_t i = 0; i < row.size(); ++i) {
rowJson.push_back(string(row[i].first ? row[i].first : "NULL"));
}
rowsJson.push_back(rowJson);
}
return rowsJson;
}

} // namespace lsst::qserv::czar
94 changes: 94 additions & 0 deletions src/czar/HttpCzarQueryModule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_CZAR_HTTPCZARQUERYMODULE_H
#define LSST_QSERV_CZAR_HTTPCZARQUERYMODULE_H

// System headers
#include <memory>
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "czar/HttpModule.h"

// Forward declarations
namespace lsst::qserv::qhttp {
class Request;
class Response;
} // namespace lsst::qserv::qhttp

namespace lsst::qserv::sql {
class SqlResults;
struct Schema;
} // namespace lsst::qserv::sql

// This header declarations
namespace lsst::qserv::czar {

/**
* Class HttpCzarQueryModule implements a handler for processing user
* queries submitted to Czar via the HTTP-based frontend.
*/
class HttpCzarQueryModule : public czar::HttpModule {
public:
/**
* @note supported values for parameter 'subModuleName' are:
* 'SUBMIT' - submit a sync query
* 'SUBMIT-ASYNC' - submit an async query
* 'CANCEL' - cancel the previously submited async query
* 'STATUS' - return a status of the previously submited async query
* 'RESULT' - return data of the previously submited async query
*
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
*/
static void process(std::string const& context, std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp, std::string const& subModuleName,
http::AuthType const authType = http::AuthType::NONE);

HttpCzarQueryModule() = delete;
HttpCzarQueryModule(HttpCzarQueryModule const&) = delete;
HttpCzarQueryModule& operator=(HttpCzarQueryModule const&) = delete;

~HttpCzarQueryModule() final = default;

protected:
virtual nlohmann::json executeImpl(std::string const& subModuleName) final;

private:
HttpCzarQueryModule(std::string const& context, std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp);

nlohmann::json _submit();
nlohmann::json _submitAsync();
nlohmann::json _cancel();
nlohmann::json _status();
nlohmann::json _result();

void _dropTable(std::string const& tableName) const;
nlohmann::json _schemaToJson(sql::Schema const& schema) const;
nlohmann::json _rowsToJson(sql::SqlResults& results) const;
};

} // namespace lsst::qserv::czar

#endif // LSST_QSERV_CZAR_HTTPCZARQUERYMODULE_H
Loading

0 comments on commit 6fccd1a

Please sign in to comment.