Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-48758: Refined INFORMATION_SCHEMA support in Qserv #890

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/itest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def run_detached(self, connection: str, qserv: bool, database: str) -> None:
"--binary-as-hex",
"--skip-column-names",
"-e",
f"SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST WHERE ID = {query_id}",
f"SELECT STATUS FROM INFORMATION_SCHEMA.QUERIES WHERE ID = {query_id}",
]
_log.debug("SQLCmd.execute waiting for query to complete")
end_time = time.time() + self.async_timeout
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ target_sources(ccontrol PRIVATE
UserQueryFactory.cc
UserQueryFlushChunksCache.cc
UserQueryProcessList.cc
UserQueryQueries.cc
UserQuerySelectCountStar.cc
UserQueryQservManager.cc
UserQueryResources.cc
Expand Down
51 changes: 51 additions & 0 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "ccontrol/UserQueryFlushChunksCache.h"
#include "ccontrol/UserQueryInvalid.h"
#include "ccontrol/UserQueryProcessList.h"
#include "ccontrol/UserQueryQueries.h"
#include "ccontrol/UserQueryResources.h"
#include "ccontrol/UserQuerySelect.h"
#include "ccontrol/UserQuerySelectCountStar.h"
Expand Down Expand Up @@ -93,6 +94,23 @@ bool _stmtRefersToProcessListTable(query::SelectStmt::Ptr& stmt, std::string def
return false;
}

/**
* @brief Determine if the table name in the FROM statement refers to QUERIES table.
*
* @param stmt SelectStmt representing the query.
* @param defaultDb Default database name, may be empty.
* @return true if the query refers only to the QUERIES table.
* @return false if the query does not refer only to the QUERIES table.
*/
bool _stmtRefersQueriesTable(query::SelectStmt::Ptr& stmt, std::string defaultDb) {
auto const& tableRefList = stmt->getFromList().getTableRefList();
if (tableRefList.size() != 1) return false;
auto const& tblRef = tableRefList[0];
std::string const& db = tblRef->getDb().empty() ? defaultDb : tblRef->getDb();
if (UserQueryType::isQueriesTable(db, tblRef->getTable())) return true;
return false;
}

/**
* @brief Make a UserQueryProcessList (or UserQueryInvalid) from given parameters.
*
Expand Down Expand Up @@ -123,6 +141,35 @@ std::shared_ptr<UserQuery> _makeUserQueryProcessList(query::SelectStmt::Ptr& stm
}
}

/**
* @brief Make a UserQueryQueries (or UserQueryInvalid) from given parameters.
*
* @param stmt The SelectStmt representing the query.
* @param sharedResources Resources used by UserQueryFactory to create UserQueries.
* @param userQueryId Unique string identifying the query.
* @param resultDb Name of the databse that will contain results.
* @param aQuery The original query string.
* @param async If the query is to be run asynchronously.
* @return std::shared_ptr<UserQuery>, will be a UserQueryQueries or UserQueryInvalid.
*/
std::shared_ptr<UserQuery> _makeUserQueryQueries(query::SelectStmt::Ptr& stmt,
userQuerySharedResourcesPtr& sharedResources,
std::string const& userQueryId, std::string const& resultDb,
std::string const& aQuery, bool async) {
if (async) {
// no point supporting async for these
return std::make_shared<UserQueryInvalid>("SUBMIT is not allowed with query: " + aQuery);
}
LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a QUERIES");
try {
return std::make_shared<UserQueryQueries>(stmt, sharedResources->resultDbConn.get(),
sharedResources->qMetaSelect, sharedResources->qMetaCzarId,
userQueryId, resultDb);
} catch (std::exception const& exc) {
return std::make_shared<UserQueryInvalid>(exc.what());
}
}

/**
* @brief Determine if the qmeta database has a metadata table with chunks & row
* counts that represents the table in the FROM statement for a SELECT
Expand Down Expand Up @@ -259,6 +306,10 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
return _makeUserQueryProcessList(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery,
async);
}
if (_stmtRefersQueriesTable(stmt, defaultDb)) {
return _makeUserQueryQueries(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery,
async);
}

/// Determine if a SelectStmt is a simple COUNT(*) query and can be run as an optimized query.
/// It may not be runnable as an optimzed simple COUNT(*) query because:
Expand Down
31 changes: 10 additions & 21 deletions src/ccontrol/UserQueryProcessList.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> co
auto qtempl = stmt->getQueryTemplate();
_query = qtempl.sqlFragment();

// we also do not want to return too many results by default
// as QMeta can contain a lot of records, limit ourselves
// to some reasonable number, users can override with different LIMIT
if (!stmt->hasLimit()) {
_query += " LIMIT 1000";
}

if (stmt->hasOrderBy()) {
_orderBy = stmt->getOrderBy().sqlFragment();
}
Expand All @@ -107,20 +100,16 @@ UserQueryProcessList::UserQueryProcessList(bool full, sql::SqlConnection* result
_messageStore(std::make_shared<qdisp::MessageStore>()),
_resultTableName(::g_nextResultTableId(userQueryId)),
_resultDb(resultDb) {
// use ShowProcessList view with completion statistics.
_query = "SELECT Id, User, Host, db, Command, Time, State, ";
_query += full ? "Info, " : "SUBSTRING(Info FROM 1 FOR 100) Info, ";
_query += "totalChunks, completedChunks, lastUpdate, ";
// These are non-standard but they need to be there because they appear in WHERE
_query += "CzarId, Submitted, Completed, ResultLocation";
_query += " FROM ShowProcessList";

// only show stuff for current czar and not too old
_query += " WHERE CzarId = " + std::to_string(qMetaCzarId) +
" AND "
"(Completed IS NULL OR Completed > NOW() - INTERVAL 3 DAY)";

_orderBy = "Submitted";
_query = "SELECT `qi`.`queryId` `ID`,`qi`.`qType` `TYPE`,`qc`.`czar` `CZAR`,`qc`.`czarId` `CZAR_ID`,"
"`qi`.`submitted` `SUBMITTED`,`qs`.`lastUpdate` `UPDATED`,`qi`.`chunkCount` `CHUNKS`,"
"`qs`.`completedChunks` `CHUNKS_COMPL`,";
_query += (full ? "`qi`.`query`" : "SUBSTR(`qi`.`query`,1,32) `QUERY`");
_query +=
" FROM `QInfo` AS `qi` "
" LEFT OUTER JOIN `QStatsTmp` AS `qs` ON `qi`.`queryId`=`qs`.`queryId`"
" JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId`"
" WHERE `qi`.`status` = 'EXECUTING'";
_orderBy = "`SUBMITTED`";
}

std::string UserQueryProcessList::getError() const { return std::string(); }
Expand Down
215 changes: 215 additions & 0 deletions src/ccontrol/UserQueryQueries.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
* Copyright 2017 AURA/LSST.
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "ccontrol/UserQueryQueries.h"

// System headers
#include <atomic>
#include <ctime>
#include <sstream>

// LSST headers
#include "lsst/log/Log.h"

// Qserv headers
#include "css/CssAccess.h"
#include "css/CssError.h"
#include "qdisp/MessageStore.h"
#include "qmeta/Exceptions.h"
#include "qmeta/QMetaSelect.h"
#include "query/FromList.h"
#include "query/SelectStmt.h"
#include "sql/SqlConnection.h"
#include "sql/SqlErrorObject.h"
#include "sql/SqlBulkInsert.h"
#include "sql/statement.h"
#include "util/IterableFormatter.h"

using namespace lsst::qserv;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQueryQueries");

std::string g_nextResultTableId(std::string const& userQueryId) {
return "qserv_result_queries_" + userQueryId;
}

} // namespace

namespace lsst::qserv::ccontrol {

// Constructor
UserQueryQueries::UserQueryQueries(std::shared_ptr<query::SelectStmt> const& statement,
sql::SqlConnection* resultDbConn,
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
qmeta::CzarId qMetaCzarId, std::string const& userQueryId,
std::string const& resultDb)
: _resultDbConn(resultDbConn),
_qMetaSelect(qMetaSelect),
_qMetaCzarId(qMetaCzarId),
_messageStore(std::make_shared<qdisp::MessageStore>()),
_resultTableName(::g_nextResultTableId(userQueryId)),
_resultDb(resultDb) {
// The SQL statement should be mostly OK alredy but we need to change
// table name, instead of INFORMATION_SCHEMA.QUERIES we use special
// Qmeta view with the name InfoSchemaQueries
auto stmt = statement->clone();
for (auto& tblRef : stmt->getFromList().getTableRefList()) {
// assume all table refs have to be replaced
// (in practice we accept only one table in FROM
tblRef->setDb("");
tblRef->setTable("InfoSchemaQueries");
}

auto qtempl = stmt->getQueryTemplate();
_query = qtempl.sqlFragment();

if (stmt->hasOrderBy()) {
_orderBy = stmt->getOrderBy().sqlFragment();
}
}

std::string UserQueryQueries::getError() const { return std::string(); }

// Attempt to kill in progress.
void UserQueryQueries::kill() {}

// Submit or execute the query.
void UserQueryQueries::submit() {
// query database
std::unique_ptr<sql::SqlResults> results;
try {
results = _qMetaSelect->select(_query);
} catch (std::exception const& exc) {
LOGS(_log, LOG_LVL_ERROR, "error in querying QMeta: " << exc.what());
std::string message = "Internal failure, error in querying QMeta: ";
message += exc.what();
_messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}

// get result schema
sql::SqlErrorObject errObj;
auto schema = results->makeSchema(errObj);
if (errObj.isSet()) {
LOGS(_log, LOG_LVL_ERROR, "failed to extract schema from result: " << errObj.errMsg());
std::string message = "Internal failure, failed to extract schema from result: " + errObj.errMsg();
_messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}

// create result table, one could use formCreateTable() method
// to build statement but it does not set NULL flag on TIMESTAMP columns
std::string createTable = "CREATE TABLE " + _resultTableName;
char sep = '(';
for (auto& col : schema.columns) {
createTable += sep;
sep = ',';
createTable += "`" + col.name + "`";
createTable += " ";
createTable += col.colType.sqlType;
if (col.colType.sqlType == "TIMESTAMP") createTable += " NULL";
}
createTable += ')';
LOGS(_log, LOG_LVL_DEBUG, "creating result table: " << createTable);
if (!_resultDbConn->runQuery(createTable, errObj)) {
LOGS(_log, LOG_LVL_ERROR, "failed to create result table: " << errObj.errMsg());
std::string message = "Internal failure, failed to create result table: " + errObj.errMsg();
_messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}

// list of column names
std::vector<std::string> resColumns;
for (auto& col : schema.columns) {
resColumns.push_back(col.name);
}

// copy stuff over to result table
sql::SqlBulkInsert bulkInsert(_resultDbConn, _resultTableName, resColumns);
for (auto& row : *results) {
std::vector<std::string> values;
for (unsigned i = 0; i != row.size(); ++i) {
auto ptr = row[i].first;
auto len = row[i].second;

if (ptr == nullptr) {
values.push_back("NULL");
} else if (IS_NUM(schema.columns[i].colType.mysqlType) &&
schema.columns[i].colType.mysqlType != MYSQL_TYPE_TIMESTAMP) {
// Numeric types do not need quoting (IS_NUM is mysql macro).
// In mariadb 10.2 IS_NUM returns true for TIMESTAMP even though value is
// date-time formatted string, IS_NUM returned false in mariadb 10.1 and prior.
// Potentially we can quote all values, but I prefer to have numbers look like
// numbers, both solutions (IS_NUM or quotes) are mysql-specific.
values.push_back(std::string(ptr, ptr + len));
} else {
// everything else should be quoted
values.push_back("'" + _resultDbConn->escapeString(std::string(ptr, ptr + len)) + "'");
}
}

if (!bulkInsert.addRow(values, errObj)) {
LOGS(_log, LOG_LVL_ERROR, "error updating result table: " << errObj.errMsg());
std::string message = "Internal failure, error updating result table: " + errObj.errMsg();
_messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}
}
if (!bulkInsert.flush(errObj)) {
LOGS(_log, LOG_LVL_ERROR, "error updating result table: " << errObj.errMsg());
std::string message = "Internal failure, error updating result table: " + errObj.errMsg();
_messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}

_qState = SUCCESS;
}

std::string UserQueryQueries::getResultQuery() const {
std::string ret = "SELECT * FROM " + _resultDb + "." + getResultTableName();
std::string orderBy = _getResultOrderBy();
if (not orderBy.empty()) {
ret += " ORDER BY " + orderBy;
}
return ret;
}

// Block until a submit()'ed query completes.
QueryState UserQueryQueries::join() {
// everything should be done in submit()
return _qState;
}

// Release resources.
void UserQueryQueries::discard() {
// no resources
}

} // namespace lsst::qserv::ccontrol
Loading