diff --git a/doc/user/async.rst b/doc/user/async.rst index 72d0fa7ef..9e493df59 100644 --- a/doc/user/async.rst +++ b/doc/user/async.rst @@ -34,7 +34,7 @@ This is illustrated by the following example: .. code-block:: sql - SUBMIT SELECT objectId FROM dp02_dc2_catalogs.Object LIMIT 3 + SUBMIT SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra != 0 If the query validation succeded and the query placed into a processing queue, the command will always return one row with two columns, where the particularly interesting column is ``jobId``: @@ -44,9 +44,10 @@ two columns, where the particularly interesting column is ``jobId``: +--------+---------------------+ | jobId | resultLocation | +--------+---------------------+ - | 311817 | table:result_311817 | + | 313689 | table:result_313689 | +--------+---------------------+ + At this poing the query is running asynchronously. The ``jobId`` is the unique identifier of the query that can be used for checking the query status, retrieving the results, or cancelling the query. @@ -58,39 +59,38 @@ Based on the ``jobId`` returned by the ``SUBMIT`` command, you can check the sta .. code-block:: - SELECT * FROM information_schema.processlist WHERE id=311817\G + SELECT * FROM information_schema.processlist WHERE id=313689\G **Note**: ``\G`` is a MySQL command that formats the output. It's not part of the SQL syntax. The command is quite handy to display result sets comprising many columns or having long values of the columns in a more readable format. -The query will return a row with the following columns: +If the query is still being executed the information schema query will return a row with the following columns: .. code-block:: *************************** 1. row *************************** - ID: 311817 - USER: anonymous - HOST: NULL - DB: dp02_dc2_catalogs - COMMAND: ASYNC - TIME: NULL - STATE: COMPLETED - INFO: SELECT objectId FROM dp02_dc2_catalogs.Object LIMIT 3 - SUBMITTED: 2024-10-06 20:01:18 - COMPLETED: 2024-10-06 20:01:18 - RETURNED: NULL - CZARID: 9 - RESULTLOCATION: table:result_311817 - NCHUNKS: 1477 - TotalChunks: NULL - CompletedChunks: NULL - LastUpdate: NULL + ID: 313689 + TYPE: ASYNC + CZAR: proxy + CZAR_ID: 9 + SUBMITTED: 2025-02-06 08:58:18 + UPDATED: 2025-02-06 08:58:18 + CHUNKS: 1477 + CHUNKS_COMP: 739 + QUERY: SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra != 0 Particularly interesting columns here are: - ``ID``: the unique identifier of the original query (it's the same as ``jobId`` reported by the ``SUBMIT`` command) -- ``STATE``: the query status, which can be one of: ``EXECUTING``, ``COMPLETED``, ``FAILED``, or ``ABORTED`` +- ``TYPE``: the query type, which is always ``ASYNC`` for asynchronous queries +- ``SUBMITTED``: the timestamp when the query was submitted +- ``UPDATED``: the timestamp of the last update of the query status +- ``CHUNKS``: the total number of chunks to be processed +- ``CHUNKS_COMP``: the number of chunks already processed + +The user may periodically repeat this command to compute the performance metrics of the query execution +ad to get an estimate of the remaining time to completion. One can also use the following information commands to get the status of all active queries: @@ -99,10 +99,46 @@ One can also use the following information commands to get the status of all act SHOW PROCESSLIST SHOW FULL PROCESSLIST -**Note**: once the query is over and the results are retrieved, the corresponding row in the ``information_schema.processlist`` -table will be deleted. And the query status will no longer be available. However, Qserv will still maintain the history -of the queries in other system tables. You may contact the Qserv administrator to get the history of the queries should -you need it. +For example the ``SHOW PROCESSLIST`` command will return: + +.. code-block:: + + +--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+ + | ID | COMMAND | CZAR | CZAR_ID | SUBMITTED | UPDATED | CHUNKS | CHUNKS_COMPL | QUERY | + +--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+ + | 313689 | ASYNC | proxy | 9 | 2025-02-06 08:58:18 | 2025-02-06 08:58:18 | 1477 | 1 | SELECT COUNT(*) FROM dp02_dc2_ca | + +--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+ + +The result set of the ``PROCESSLIST`` queries will be empty if the query has already completed. In this case, the query status can be retrieved +by querying the query history table: + +.. code-block:: + + SELECT * FROM information_schema.queries WHERE id=313689\G + +The query will return: + +.. code-block:: + + *************************** 1. row *************************** + ID: 313689 + TYPE: ASYNC + CZAR: proxy + CZAR_ID: 9 + STATUS: COMPLETED + SUBMITTED: 2025-02-06 08:58:18 + COMPLETED: 2025-02-06 08:58:21 + RETURNED: NULL + CHUNKS: 1477 + BYTES: 13856 + ROWS_COLLECTED: 1477 + ROWS: 1 + DBS: dp02_dc2_catalogs + QUERY: SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra !=0 + +Particularly interesting columns here are: + +- ``STATUS``: the query status, which can be one of: ``EXECUTING``, ``COMPLETED``, ``FAILED``, or ``ABORTED`` Retrieving Results ================== @@ -116,26 +152,24 @@ To retrieve the results of a query, use the following syntax: This will return the full results (columns and rows) of the original query corresponding to the provided identifier of the query. -For example, the following query will return the results of the query with ``jobId`` of ``311817``: +For example, the following query will return the results of the query with ``jobId`` of ``313689``: .. code-block:: - SELECT * FROM qserv_result(311817) - +---------------------+ - | objectId | - +---------------------+ - | 1248649384967536732 | - | 1248649384967536769 | - | 1248649384967536891 | - +---------------------+ + SELECT * FROM qserv_result(313689) + +-----------+ + | COUNT(*) | + +-----------+ + | 278318452 | + +-----------+ The command may be called one time only. The query result table will be deleted after returning the result set. Any subsequent attempts to retrieve the results will return an error message: .. code-block:: - SELECT * FROM qserv_result(311817) - ERROR 1146 (42S02) at line 1: Table 'qservResult.result_311817' doesn't exist + SELECT * FROM qserv_result(313689) + ERROR 1146 (42S02) at line 1: Table 'qservResult.result_313689' doesn't exist Cancellation ============ diff --git a/src/admin/python/lsst/qserv/admin/itest.py b/src/admin/python/lsst/qserv/admin/itest.py index eabe61b9e..ccc678239 100644 --- a/src/admin/python/lsst/qserv/admin/itest.py +++ b/src/admin/python/lsst/qserv/admin/itest.py @@ -207,7 +207,7 @@ def run_detached(self, connection: str, qserv: bool, database: str) -> None: "--binary-as-hex", "--skip-column-names", "-e", - f"SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST WHERE ID = {query_id}", + f"SELECT STATUS FROM INFORMATION_SCHEMA.QUERIES WHERE ID = {query_id}", ] _log.debug("SQLCmd.execute waiting for query to complete") end_time = time.time() + self.async_timeout diff --git a/src/ccontrol/CMakeLists.txt b/src/ccontrol/CMakeLists.txt index 60a042e2c..bfeda2067 100644 --- a/src/ccontrol/CMakeLists.txt +++ b/src/ccontrol/CMakeLists.txt @@ -17,6 +17,7 @@ target_sources(ccontrol PRIVATE UserQueryFactory.cc UserQueryFlushChunksCache.cc UserQueryProcessList.cc + UserQueryQueries.cc UserQuerySelectCountStar.cc UserQueryQservManager.cc UserQueryResources.cc diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 589088692..d91d3fe90 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -45,6 +45,7 @@ #include "ccontrol/UserQueryFlushChunksCache.h" #include "ccontrol/UserQueryInvalid.h" #include "ccontrol/UserQueryProcessList.h" +#include "ccontrol/UserQueryQueries.h" #include "ccontrol/UserQueryResources.h" #include "ccontrol/UserQuerySelect.h" #include "ccontrol/UserQuerySelectCountStar.h" @@ -93,6 +94,23 @@ bool _stmtRefersToProcessListTable(query::SelectStmt::Ptr& stmt, std::string def return false; } +/** + * @brief Determine if the table name in the FROM statement refers to QUERIES table. + * + * @param stmt SelectStmt representing the query. + * @param defaultDb Default database name, may be empty. + * @return true if the query refers only to the QUERIES table. + * @return false if the query does not refer only to the QUERIES table. + */ +bool _stmtRefersQueriesTable(query::SelectStmt::Ptr& stmt, std::string defaultDb) { + auto const& tableRefList = stmt->getFromList().getTableRefList(); + if (tableRefList.size() != 1) return false; + auto const& tblRef = tableRefList[0]; + std::string const& db = tblRef->getDb().empty() ? defaultDb : tblRef->getDb(); + if (UserQueryType::isQueriesTable(db, tblRef->getTable())) return true; + return false; +} + /** * @brief Make a UserQueryProcessList (or UserQueryInvalid) from given parameters. * @@ -123,6 +141,35 @@ std::shared_ptr _makeUserQueryProcessList(query::SelectStmt::Ptr& stm } } +/** + * @brief Make a UserQueryQueries (or UserQueryInvalid) from given parameters. + * + * @param stmt The SelectStmt representing the query. + * @param sharedResources Resources used by UserQueryFactory to create UserQueries. + * @param userQueryId Unique string identifying the query. + * @param resultDb Name of the databse that will contain results. + * @param aQuery The original query string. + * @param async If the query is to be run asynchronously. + * @return std::shared_ptr, will be a UserQueryQueries or UserQueryInvalid. + */ +std::shared_ptr _makeUserQueryQueries(query::SelectStmt::Ptr& stmt, + userQuerySharedResourcesPtr& sharedResources, + std::string const& userQueryId, std::string const& resultDb, + std::string const& aQuery, bool async) { + if (async) { + // no point supporting async for these + return std::make_shared("SUBMIT is not allowed with query: " + aQuery); + } + LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a QUERIES"); + try { + return std::make_shared(stmt, sharedResources->resultDbConn.get(), + sharedResources->qMetaSelect, sharedResources->qMetaCzarId, + userQueryId, resultDb); + } catch (std::exception const& exc) { + return std::make_shared(exc.what()); + } +} + /** * @brief Determine if the qmeta database has a metadata table with chunks & row * counts that represents the table in the FROM statement for a SELECT @@ -259,6 +306,10 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st return _makeUserQueryProcessList(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery, async); } + if (_stmtRefersQueriesTable(stmt, defaultDb)) { + return _makeUserQueryQueries(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery, + async); + } /// Determine if a SelectStmt is a simple COUNT(*) query and can be run as an optimized query. /// It may not be runnable as an optimzed simple COUNT(*) query because: diff --git a/src/ccontrol/UserQueryProcessList.cc b/src/ccontrol/UserQueryProcessList.cc index 85a626eca..d367dd63a 100644 --- a/src/ccontrol/UserQueryProcessList.cc +++ b/src/ccontrol/UserQueryProcessList.cc @@ -85,13 +85,6 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr co auto qtempl = stmt->getQueryTemplate(); _query = qtempl.sqlFragment(); - // we also do not want to return too many results by default - // as QMeta can contain a lot of records, limit ourselves - // to some reasonable number, users can override with different LIMIT - if (!stmt->hasLimit()) { - _query += " LIMIT 1000"; - } - if (stmt->hasOrderBy()) { _orderBy = stmt->getOrderBy().sqlFragment(); } @@ -107,20 +100,16 @@ UserQueryProcessList::UserQueryProcessList(bool full, sql::SqlConnection* result _messageStore(std::make_shared()), _resultTableName(::g_nextResultTableId(userQueryId)), _resultDb(resultDb) { - // use ShowProcessList view with completion statistics. - _query = "SELECT Id, User, Host, db, Command, Time, State, "; - _query += full ? "Info, " : "SUBSTRING(Info FROM 1 FOR 100) Info, "; - _query += "totalChunks, completedChunks, lastUpdate, "; - // These are non-standard but they need to be there because they appear in WHERE - _query += "CzarId, Submitted, Completed, ResultLocation"; - _query += " FROM ShowProcessList"; - - // only show stuff for current czar and not too old - _query += " WHERE CzarId = " + std::to_string(qMetaCzarId) + - " AND " - "(Completed IS NULL OR Completed > NOW() - INTERVAL 3 DAY)"; - - _orderBy = "Submitted"; + _query = "SELECT `qi`.`queryId` `ID`,`qi`.`qType` `TYPE`,`qc`.`czar` `CZAR`,`qc`.`czarId` `CZAR_ID`," + "`qi`.`submitted` `SUBMITTED`,`qs`.`lastUpdate` `UPDATED`,`qi`.`chunkCount` `CHUNKS`," + "`qs`.`completedChunks` `CHUNKS_COMPL`,"; + _query += (full ? "`qi`.`query`" : "SUBSTR(`qi`.`query`,1,32) `QUERY`"); + _query += + " FROM `QInfo` AS `qi` " + " LEFT OUTER JOIN `QStatsTmp` AS `qs` ON `qi`.`queryId`=`qs`.`queryId`" + " JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId`" + " WHERE `qi`.`status` = 'EXECUTING'"; + _orderBy = "`SUBMITTED`"; } std::string UserQueryProcessList::getError() const { return std::string(); } diff --git a/src/ccontrol/UserQueryQueries.cc b/src/ccontrol/UserQueryQueries.cc new file mode 100644 index 000000000..bd0d1dbc7 --- /dev/null +++ b/src/ccontrol/UserQueryQueries.cc @@ -0,0 +1,215 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2017 AURA/LSST. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "ccontrol/UserQueryQueries.h" + +// System headers +#include +#include +#include + +// LSST headers +#include "lsst/log/Log.h" + +// Qserv headers +#include "css/CssAccess.h" +#include "css/CssError.h" +#include "qdisp/MessageStore.h" +#include "qmeta/Exceptions.h" +#include "qmeta/QMetaSelect.h" +#include "query/FromList.h" +#include "query/SelectStmt.h" +#include "sql/SqlConnection.h" +#include "sql/SqlErrorObject.h" +#include "sql/SqlBulkInsert.h" +#include "sql/statement.h" +#include "util/IterableFormatter.h" + +using namespace lsst::qserv; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQueryQueries"); + +std::string g_nextResultTableId(std::string const& userQueryId) { + return "qserv_result_queries_" + userQueryId; +} + +} // namespace + +namespace lsst::qserv::ccontrol { + +// Constructor +UserQueryQueries::UserQueryQueries(std::shared_ptr const& statement, + sql::SqlConnection* resultDbConn, + std::shared_ptr const& qMetaSelect, + qmeta::CzarId qMetaCzarId, std::string const& userQueryId, + std::string const& resultDb) + : _resultDbConn(resultDbConn), + _qMetaSelect(qMetaSelect), + _qMetaCzarId(qMetaCzarId), + _messageStore(std::make_shared()), + _resultTableName(::g_nextResultTableId(userQueryId)), + _resultDb(resultDb) { + // The SQL statement should be mostly OK alredy but we need to change + // table name, instead of INFORMATION_SCHEMA.QUERIES we use special + // Qmeta view with the name InfoSchemaQueries + auto stmt = statement->clone(); + for (auto& tblRef : stmt->getFromList().getTableRefList()) { + // assume all table refs have to be replaced + // (in practice we accept only one table in FROM + tblRef->setDb(""); + tblRef->setTable("InfoSchemaQueries"); + } + + auto qtempl = stmt->getQueryTemplate(); + _query = qtempl.sqlFragment(); + + if (stmt->hasOrderBy()) { + _orderBy = stmt->getOrderBy().sqlFragment(); + } +} + +std::string UserQueryQueries::getError() const { return std::string(); } + +// Attempt to kill in progress. +void UserQueryQueries::kill() {} + +// Submit or execute the query. +void UserQueryQueries::submit() { + // query database + std::unique_ptr results; + try { + results = _qMetaSelect->select(_query); + } catch (std::exception const& exc) { + LOGS(_log, LOG_LVL_ERROR, "error in querying QMeta: " << exc.what()); + std::string message = "Internal failure, error in querying QMeta: "; + message += exc.what(); + _messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR); + _qState = ERROR; + return; + } + + // get result schema + sql::SqlErrorObject errObj; + auto schema = results->makeSchema(errObj); + if (errObj.isSet()) { + LOGS(_log, LOG_LVL_ERROR, "failed to extract schema from result: " << errObj.errMsg()); + std::string message = "Internal failure, failed to extract schema from result: " + errObj.errMsg(); + _messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR); + _qState = ERROR; + return; + } + + // create result table, one could use formCreateTable() method + // to build statement but it does not set NULL flag on TIMESTAMP columns + std::string createTable = "CREATE TABLE " + _resultTableName; + char sep = '('; + for (auto& col : schema.columns) { + createTable += sep; + sep = ','; + createTable += "`" + col.name + "`"; + createTable += " "; + createTable += col.colType.sqlType; + if (col.colType.sqlType == "TIMESTAMP") createTable += " NULL"; + } + createTable += ')'; + LOGS(_log, LOG_LVL_DEBUG, "creating result table: " << createTable); + if (!_resultDbConn->runQuery(createTable, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "failed to create result table: " << errObj.errMsg()); + std::string message = "Internal failure, failed to create result table: " + errObj.errMsg(); + _messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR); + _qState = ERROR; + return; + } + + // list of column names + std::vector resColumns; + for (auto& col : schema.columns) { + resColumns.push_back(col.name); + } + + // copy stuff over to result table + sql::SqlBulkInsert bulkInsert(_resultDbConn, _resultTableName, resColumns); + for (auto& row : *results) { + std::vector values; + for (unsigned i = 0; i != row.size(); ++i) { + auto ptr = row[i].first; + auto len = row[i].second; + + if (ptr == nullptr) { + values.push_back("NULL"); + } else if (IS_NUM(schema.columns[i].colType.mysqlType) && + schema.columns[i].colType.mysqlType != MYSQL_TYPE_TIMESTAMP) { + // Numeric types do not need quoting (IS_NUM is mysql macro). + // In mariadb 10.2 IS_NUM returns true for TIMESTAMP even though value is + // date-time formatted string, IS_NUM returned false in mariadb 10.1 and prior. + // Potentially we can quote all values, but I prefer to have numbers look like + // numbers, both solutions (IS_NUM or quotes) are mysql-specific. + values.push_back(std::string(ptr, ptr + len)); + } else { + // everything else should be quoted + values.push_back("'" + _resultDbConn->escapeString(std::string(ptr, ptr + len)) + "'"); + } + } + + if (!bulkInsert.addRow(values, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "error updating result table: " << errObj.errMsg()); + std::string message = "Internal failure, error updating result table: " + errObj.errMsg(); + _messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR); + _qState = ERROR; + return; + } + } + if (!bulkInsert.flush(errObj)) { + LOGS(_log, LOG_LVL_ERROR, "error updating result table: " << errObj.errMsg()); + std::string message = "Internal failure, error updating result table: " + errObj.errMsg(); + _messageStore->addMessage(-1, "QUERIES", 1051, message, MessageSeverity::MSG_ERROR); + _qState = ERROR; + return; + } + + _qState = SUCCESS; +} + +std::string UserQueryQueries::getResultQuery() const { + std::string ret = "SELECT * FROM " + _resultDb + "." + getResultTableName(); + std::string orderBy = _getResultOrderBy(); + if (not orderBy.empty()) { + ret += " ORDER BY " + orderBy; + } + return ret; +} + +// Block until a submit()'ed query completes. +QueryState UserQueryQueries::join() { + // everything should be done in submit() + return _qState; +} + +// Release resources. +void UserQueryQueries::discard() { + // no resources +} + +} // namespace lsst::qserv::ccontrol diff --git a/src/ccontrol/UserQueryQueries.h b/src/ccontrol/UserQueryQueries.h new file mode 100644 index 000000000..18ab197ad --- /dev/null +++ b/src/ccontrol/UserQueryQueries.h @@ -0,0 +1,120 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +#ifndef LSST_QSERV_CCONTROL_USERQUERYQUERIES_H +#define LSST_QSERV_CCONTROL_USERQUERYQUERIES_H + +// System headers +#include +#include + +// Third-party headers + +// Qserv headers +#include "ccontrol/UserQuery.h" +#include "qmeta/QMetaSelect.h" +#include "qmeta/types.h" + +// Forward decl +namespace lsst::qserv::qmeta { +class QMeta; +} + +namespace lsst::qserv::query { +class SelectStmt; +} + +namespace lsst::qserv::sql { +class SqlConnection; +} + +namespace lsst::qserv::ccontrol { + +/// UserQueryQueries : implementation of the INFORMATION_SCHEMA.QUERIES table. +class UserQueryQueries : public UserQuery { +public: + /** + * Constructor for "SELECT ... FROM INFORMATION_SCHEMA.QUERIES ...". + * + * @param statement: Parsed SELECT statement + * @param resultDbConn: Connection to results database + * @param qMetaSelect: QMetaSelect instance + * @param qMetaCzarId: Czar ID for QMeta queries + * @param userQueryId: Unique string identifying query + */ + UserQueryQueries(std::shared_ptr const& statement, sql::SqlConnection* resultDbConn, + std::shared_ptr const& qMetaSelect, qmeta::CzarId qMetaCzarId, + std::string const& userQueryId, std::string const& resultDb); + + UserQueryQueries(UserQueryQueries const&) = delete; + UserQueryQueries& operator=(UserQueryQueries const&) = delete; + + // Accessors + + /// @return a non-empty string describing the current error state + /// Returns an empty string if no errors have been detected. + std::string getError() const override; + + /// Begin execution of the query over all ChunkSpecs added so far. + void submit() override; + + /// Wait until the query has completed execution. + /// @return the final execution state. + QueryState join() override; + + /// Stop a query in progress (for immediate shutdowns) + void kill() override; + + /// Release resources related to user query + void discard() override; + + // Delegate objects + std::shared_ptr getMessageStore() override { return _messageStore; } + + /// @return Name of the result table for this query, can be empty + std::string getResultTableName() const override { return _resultTableName; } + + /// @return Result location for this query, can be empty + std::string getResultLocation() const override { return "table:" + _resultTableName; } + + /// @return get the SELECT statement to be executed by proxy + std::string getResultQuery() const override; + +private: + /// @return ORDER BY part of SELECT statement that gets executed by the proxy + std::string _getResultOrderBy() const { return _orderBy; } + + sql::SqlConnection* _resultDbConn; + std::shared_ptr _qMetaSelect; + qmeta::CzarId const _qMetaCzarId; ///< Czar ID in QMeta database + QueryState _qState = UNKNOWN; + std::shared_ptr _messageStore; + std::string _resultTableName; + std::string _query; ///< query to execute on QMeta database + std::string _orderBy; + std::string _resultDb; +}; + +} // namespace lsst::qserv::ccontrol + +#endif // LSST_QSERV_CCONTROL_USERQUERYQUERIES_H diff --git a/src/ccontrol/UserQueryType.cc b/src/ccontrol/UserQueryType.cc index 3612886d5..89fa03b01 100644 --- a/src/ccontrol/UserQueryType.cc +++ b/src/ccontrol/UserQueryType.cc @@ -177,6 +177,11 @@ bool UserQueryType::isProcessListTable(std::string const& dbName, std::string co boost::to_upper_copy(tblName) == "PROCESSLIST"; } +/// Returns true if table name refers to QUERIES table +bool UserQueryType::isQueriesTable(std::string const& dbName, std::string const& tblName) { + return boost::to_upper_copy(dbName) == "INFORMATION_SCHEMA" && boost::to_upper_copy(tblName) == "QUERIES"; +} + /// Returns true if query is SUBMIT ... bool UserQueryType::isSubmit(std::string const& query, std::string& stripped) { LOGS(_log, LOG_LVL_TRACE, "isSubmit: " << query); diff --git a/src/ccontrol/UserQueryType.h b/src/ccontrol/UserQueryType.h index 2e42eea0d..e4daf7570 100644 --- a/src/ccontrol/UserQueryType.h +++ b/src/ccontrol/UserQueryType.h @@ -75,6 +75,12 @@ class UserQueryType { */ static bool isProcessListTable(std::string const& dbName, std::string const& tblName); + /** + * Returns true if database/table name refers to QUERIES table in + * INFORMATION_SCHEMA pseudo-database. + */ + static bool isQueriesTable(std::string const& dbName, std::string const& tblName); + /** * Returns true if query is SUBMIT ..., returns query without "SUBMIT" * in `stripped` string. diff --git a/src/ccontrol/testCControl.cc b/src/ccontrol/testCControl.cc index 05093639e..702facd13 100644 --- a/src/ccontrol/testCControl.cc +++ b/src/ccontrol/testCControl.cc @@ -326,6 +326,24 @@ BOOST_AUTO_TEST_CASE(testUserQueryType) { BOOST_CHECK(not UserQueryType::isProcessListTable(test.db, test.table)); } + struct { + const char* db; + const char* table; + } queries_table_ok[] = {{"INFORMATION_SCHEMA", "QUERIES"}, + {"information_schema", "queries"}, + {"Information_Schema", "Queries"}}; + for (auto test : queries_table_ok) { + BOOST_CHECK(UserQueryType::isQueriesTable(test.db, test.table)); + } + + struct { + const char* db; + const char* table; + } queries_table_fail[] = {{"INFORMATIONSCHEMA", "QUERIES"}, {"information_schema", "query"}}; + for (auto test : queries_table_fail) { + BOOST_CHECK(not UserQueryType::isQueriesTable(test.db, test.table)); + } + struct { const char* query; int id; diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index d669b05f8..7314550a8 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -47,7 +47,7 @@ using namespace std; namespace { // Current version of QMeta schema -char const VERSION_STR[] = "9"; +char const VERSION_STR[] = "10"; LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql"); diff --git a/src/qmeta/schema/migrate-9-to-10.sql b/src/qmeta/schema/migrate-9-to-10.sql new file mode 100644 index 000000000..b79196ee9 --- /dev/null +++ b/src/qmeta/schema/migrate-9-to-10.sql @@ -0,0 +1,49 @@ +DROP VIEW IF EXISTS `ShowProcessList`; +DROP VIEW IF EXISTS `InfoSchemaProcessList`; + +-- ---------------------------------------------------------------------------------------- +-- View `InfoSchemaProcessList` +-- This shows full Qmeta info suitable for "SELECT ... FROM INFORMATION_SCHEMA.PROCESSLIST" +-- ---------------------------------------------------------------------------------------- +CREATE OR REPLACE + SQL SECURITY INVOKER + VIEW `InfoSchemaProcessList` AS + SELECT `qi`.`queryId` `ID`, + `qi`.`qType` `TYPE`, + `qc`.`czar` `CZAR`, + `qc`.`czarId` `CZAR_ID`, + `qi`.`submitted` `SUBMITTED`, + `qs`.`lastUpdate` `UPDATED`, + `qi`.`chunkCount` `CHUNKS`, + `qs`.`completedChunks` `CHUNKS_COMP`, + `qi`.`query` `QUERY` + FROM `QInfo` AS `qi` + LEFT OUTER JOIN `QStatsTmp` AS `qs` ON `qi`.`queryId`=`qs`.`queryId` + JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId` + WHERE `qi`.`status` = 'EXECUTING'; + +-- ------------------------------------------------------------------------------------ +-- View `InfoSchemaQueries` +-- This shows full Qmeta info suitable for "SELECT ... FROM INFORMATION_SCHEMA.QUERIES" +-- ------------------------------------------------------------------------------------ +CREATE OR REPLACE + SQL SECURITY INVOKER + VIEW `InfoSchemaQueries` AS + SELECT `qi`.`queryId` `ID`, + `qi`.`qType` `TYPE`, + `qc`.`czar` `CZAR`, + `qc`.`czarId` `CZAR_ID`, + `qi`.`status` `STATUS`, + `qi`.`submitted` `SUBMITTED`, + `qi`.`completed` `COMPLETED`, + `qi`.`returned` `RETURNED`, + `qi`.`chunkCount` `CHUNKS`, + `qi`.`collectedBytes` `BYTES`, + `qi`.`collectedRows` `ROWS_COLLECTED`, + `qi`.`finalRows` `ROWS`, + GROUP_CONCAT(DISTINCT `qt`.`dbName`) `DBS`, + `qi`.`query` `QUERY` + FROM `QInfo` AS `qi` + JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId` + LEFT OUTER JOIN `QTable` AS `qt` ON `qi`.`queryId`=`qt`.`queryId` + GROUP BY `qi`.`queryId`; diff --git a/src/qmeta/schema/migrate-None-to-9.sql.jinja b/src/qmeta/schema/migrate-None-to-10.sql.jinja similarity index 80% rename from src/qmeta/schema/migrate-None-to-9.sql.jinja rename to src/qmeta/schema/migrate-None-to-10.sql.jinja index 46bf1ba39..615e7ee8b 100644 --- a/src/qmeta/schema/migrate-None-to-9.sql.jinja +++ b/src/qmeta/schema/migrate-None-to-10.sql.jinja @@ -118,64 +118,52 @@ CREATE TABLE IF NOT EXISTS `QStatsTmp` ( ENGINE = MEMORY COMMENT = 'Table to track statistics of running queries.'; --- ----------------------------------------------------- --- View `ShowProcessList` --- This shows abbreviated Qmeta info suitable for "SHOW PROCESSLIST" --- ----------------------------------------------------- -CREATE OR REPLACE - SQL SECURITY INVOKER - VIEW `ShowProcessList` AS - SELECT DISTINCT - `QInfo`.`queryId` `Id`, - `QInfo`.`user` `User`, - NULL `Host`, - GROUP_CONCAT(DISTINCT `QTable`.`dbName`) `db`, - `QInfo`.`qType` `Command`, - NULL `Time`, - `QInfo`.`status` `State`, - `QInfo`.`query` `Info`, - NULL `Progress`, - `QInfo`.`submitted` `Submitted`, - `QInfo`.`completed` `Completed`, - `QInfo`.`returned` `Returned`, - `QInfo`.`czarId` `CzarId`, - REPLACE(`QInfo`.`resultLocation`, '#QID#', `QInfo`.`queryId`) `ResultLocation`, - `QStatsTmp`.`totalChunks` `TotalChunks`, - `QStatsTmp`.`completedChunks` `CompletedChunks`, - `QStatsTmp`.`lastUpdate` `LastUpdate` - FROM `QInfo` LEFT OUTER JOIN `QTable` USING (`queryId`) - LEFT OUTER JOIN `QStatsTmp` USING (`queryId`) - GROUP BY `QInfo`.`queryId`; - --- ----------------------------------------------------- +-- ---------------------------------------------------------------------------------------- -- View `InfoSchemaProcessList` -- This shows full Qmeta info suitable for "SELECT ... FROM INFORMATION_SCHEMA.PROCESSLIST" --- ----------------------------------------------------- +-- ---------------------------------------------------------------------------------------- CREATE OR REPLACE SQL SECURITY INVOKER VIEW `InfoSchemaProcessList` AS - SELECT DISTINCT - `QInfo`.`queryId` `ID`, - `QInfo`.`user` `USER`, - NULL `HOST`, - GROUP_CONCAT(DISTINCT `QTable`.`dbName`) `DB`, - `QInfo`.`qType` `COMMAND`, - NULL `TIME`, - `QInfo`.`status` `STATE`, - `QInfo`.`query` `INFO`, - `QInfo`.`submitted` `SUBMITTED`, - `QInfo`.`completed` `COMPLETED`, - `QInfo`.`returned` `RETURNED`, - `QInfo`.`czarId` `CZARID`, - REPLACE(`QInfo`.`resultLocation`, '#QID#', `QInfo`.`queryId`) `RESULTLOCATION`, - NULLIF(COUNT(`QWorker`.`chunk`), 0) `NCHUNKS`, - `QStatsTmp`.`totalChunks` `TotalChunks`, - `QStatsTmp`.`completedChunks` `CompletedChunks`, - `QStatsTmp`.`lastUpdate` `LastUpdate` - FROM `QInfo` LEFT OUTER JOIN `QTable` USING (`queryId`) - LEFT OUTER JOIN `QWorker` USING (`queryId`) - LEFT OUTER JOIN `QStatsTmp` USING (`queryId`) - GROUP BY `QInfo`.`queryId`; + SELECT `qi`.`queryId` `ID`, + `qi`.`qType` `TYPE`, + `qc`.`czar` `CZAR`, + `qc`.`czarId` `CZAR_ID`, + `qi`.`submitted` `SUBMITTED`, + `qs`.`lastUpdate` `UPDATED`, + `qi`.`chunkCount` `CHUNKS`, + `qs`.`completedChunks` `CHUNKS_COMP`, + `qi`.`query` `QUERY` + FROM `QInfo` AS `qi` + LEFT OUTER JOIN `QStatsTmp` AS `qs` ON `qi`.`queryId`=`qs`.`queryId` + JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId` + WHERE `qi`.`status` = 'EXECUTING'; + +-- ------------------------------------------------------------------------------------ +-- View `InfoSchemaQueries` +-- This shows full Qmeta info suitable for "SELECT ... FROM INFORMATION_SCHEMA.QUERIES" +-- ------------------------------------------------------------------------------------ +CREATE OR REPLACE + SQL SECURITY INVOKER + VIEW `InfoSchemaQueries` AS + SELECT `qi`.`queryId` `ID`, + `qi`.`qType` `TYPE`, + `qc`.`czar` `CZAR`, + `qc`.`czarId` `CZAR_ID`, + `qi`.`status` `STATUS`, + `qi`.`submitted` `SUBMITTED`, + `qi`.`completed` `COMPLETED`, + `qi`.`returned` `RETURNED`, + `qi`.`chunkCount` `CHUNKS`, + `qi`.`collectedBytes` `BYTES`, + `qi`.`collectedRows` `ROWS_COLLECTED`, + `qi`.`finalRows` `ROWS`, + GROUP_CONCAT(DISTINCT `qt`.`dbName`) `DBS`, + `qi`.`query` `QUERY` + FROM `QInfo` AS `qi` + JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId` + LEFT OUTER JOIN `QTable` AS `qt` ON `qi`.`queryId`=`qt`.`queryId` + GROUP BY `qi`.`queryId`; -- ----------------------------------------------------- -- Table `QMetadata` @@ -219,4 +207,5 @@ COMMENT = 'Table of messages generated during queries.'; -- Version 7 added final row count to QInfo. -- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo. -- Version 9 removed the full-text index on the query text from QInfo. -INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '9'); +-- Version 10 redefined schema of the ProcessList tables. +INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10');