Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-48758: Refined INFORMATION_SCHEMA support in Qserv #890

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 71 additions & 37 deletions doc/user/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This is illustrated by the following example:

.. code-block:: sql

SUBMIT SELECT objectId FROM dp02_dc2_catalogs.Object LIMIT 3
SUBMIT SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra != 0

If the query validation succeded and the query placed into a processing queue, the command will always return one row with
two columns, where the particularly interesting column is ``jobId``:
Expand All @@ -44,9 +44,10 @@ two columns, where the particularly interesting column is ``jobId``:
+--------+---------------------+
| jobId | resultLocation |
+--------+---------------------+
| 311817 | table:result_311817 |
| 313689 | table:result_313689 |
+--------+---------------------+


At this poing the query is running asynchronously. The ``jobId`` is the unique identifier of the query that can be used
for checking the query status, retrieving the results, or cancelling the query.

Expand All @@ -58,39 +59,38 @@ Based on the ``jobId`` returned by the ``SUBMIT`` command, you can check the sta

.. code-block::

SELECT * FROM information_schema.processlist WHERE id=311817\G
SELECT * FROM information_schema.processlist WHERE id=313689\G

**Note**: ``\G`` is a MySQL command that formats the output. It's not part of the SQL syntax.
The command is quite handy to display result sets comprising many columns or having long values of
the columns in a more readable format.

The query will return a row with the following columns:
If the query is still being executed the information schema query will return a row with the following columns:

.. code-block::

*************************** 1. row ***************************
ID: 311817
USER: anonymous
HOST: NULL
DB: dp02_dc2_catalogs
COMMAND: ASYNC
TIME: NULL
STATE: COMPLETED
INFO: SELECT objectId FROM dp02_dc2_catalogs.Object LIMIT 3
SUBMITTED: 2024-10-06 20:01:18
COMPLETED: 2024-10-06 20:01:18
RETURNED: NULL
CZARID: 9
RESULTLOCATION: table:result_311817
NCHUNKS: 1477
TotalChunks: NULL
CompletedChunks: NULL
LastUpdate: NULL
ID: 313689
TYPE: ASYNC
CZAR: proxy
CZAR_ID: 9
SUBMITTED: 2025-02-06 08:58:18
UPDATED: 2025-02-06 08:58:18
CHUNKS: 1477
CHUNKS_COMP: 739
QUERY: SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra != 0

Particularly interesting columns here are:

- ``ID``: the unique identifier of the original query (it's the same as ``jobId`` reported by the ``SUBMIT`` command)
- ``STATE``: the query status, which can be one of: ``EXECUTING``, ``COMPLETED``, ``FAILED``, or ``ABORTED``
- ``TYPE``: the query type, which is always ``ASYNC`` for asynchronous queries
- ``SUBMITTED``: the timestamp when the query was submitted
- ``UPDATED``: the timestamp of the last update of the query status
- ``CHUNKS``: the total number of chunks to be processed
- ``CHUNKS_COMP``: the number of chunks already processed

The user may periodically repeat this command to compute the performance metrics of the query execution
ad to get an estimate of the remaining time to completion.

One can also use the following information commands to get the status of all active queries:

Expand All @@ -99,10 +99,46 @@ One can also use the following information commands to get the status of all act
SHOW PROCESSLIST
SHOW FULL PROCESSLIST

**Note**: once the query is over and the results are retrieved, the corresponding row in the ``information_schema.processlist``
table will be deleted. And the query status will no longer be available. However, Qserv will still maintain the history
of the queries in other system tables. You may contact the Qserv administrator to get the history of the queries should
you need it.
For example the ``SHOW PROCESSLIST`` command will return:

.. code-block::

+--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+
| ID | COMMAND | CZAR | CZAR_ID | SUBMITTED | UPDATED | CHUNKS | CHUNKS_COMPL | QUERY |
+--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+
| 313689 | ASYNC | proxy | 9 | 2025-02-06 08:58:18 | 2025-02-06 08:58:18 | 1477 | 1 | SELECT COUNT(*) FROM dp02_dc2_ca |
+--------+---------+-------+---------+---------------------+---------------------+--------+--------------+----------------------------------+

The result set of the ``PROCESSLIST`` queries will be empty if the query has already completed. In this case, the query status can be retrieved
by querying the query history table:

.. code-block::

SELECT * FROM information_schema.queries WHERE id=313689\G

The query will return:

.. code-block::

*************************** 1. row ***************************
ID: 313689
TYPE: ASYNC
CZAR: proxy
CZAR_ID: 9
STATUS: COMPLETED
SUBMITTED: 2025-02-06 08:58:18
COMPLETED: 2025-02-06 08:58:21
RETURNED: NULL
CHUNKS: 1477
BYTES: 13856
ROWS_COLLECTED: 1477
ROWS: 1
DBS: dp02_dc2_catalogs
QUERY: SELECT COUNT(*) FROM dp02_dc2_catalogs.Object WHERE coord_ra !=0

Particularly interesting columns here are:

- ``STATUS``: the query status, which can be one of: ``EXECUTING``, ``COMPLETED``, ``FAILED``, or ``ABORTED``

Retrieving Results
==================
Expand All @@ -116,26 +152,24 @@ To retrieve the results of a query, use the following syntax:
This will return the full results (columns and rows) of the original query corresponding to the provided identifier of
the query.

For example, the following query will return the results of the query with ``jobId`` of ``311817``:
For example, the following query will return the results of the query with ``jobId`` of ``313689``:

.. code-block::

SELECT * FROM qserv_result(311817)
+---------------------+
| objectId |
+---------------------+
| 1248649384967536732 |
| 1248649384967536769 |
| 1248649384967536891 |
+---------------------+
SELECT * FROM qserv_result(313689)
+-----------+
| COUNT(*) |
+-----------+
| 278318452 |
+-----------+

The command may be called one time only. The query result table will be deleted after returning the result set.
Any subsequent attempts to retrieve the results will return an error message:

.. code-block::

SELECT * FROM qserv_result(311817)
ERROR 1146 (42S02) at line 1: Table 'qservResult.result_311817' doesn't exist
SELECT * FROM qserv_result(313689)
ERROR 1146 (42S02) at line 1: Table 'qservResult.result_313689' doesn't exist

Cancellation
============
Expand Down
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/itest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def run_detached(self, connection: str, qserv: bool, database: str) -> None:
"--binary-as-hex",
"--skip-column-names",
"-e",
f"SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST WHERE ID = {query_id}",
f"SELECT STATUS FROM INFORMATION_SCHEMA.QUERIES WHERE ID = {query_id}",
]
_log.debug("SQLCmd.execute waiting for query to complete")
end_time = time.time() + self.async_timeout
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ target_sources(ccontrol PRIVATE
UserQueryFactory.cc
UserQueryFlushChunksCache.cc
UserQueryProcessList.cc
UserQueryQueries.cc
UserQuerySelectCountStar.cc
UserQueryQservManager.cc
UserQueryResources.cc
Expand Down
51 changes: 51 additions & 0 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "ccontrol/UserQueryFlushChunksCache.h"
#include "ccontrol/UserQueryInvalid.h"
#include "ccontrol/UserQueryProcessList.h"
#include "ccontrol/UserQueryQueries.h"
#include "ccontrol/UserQueryResources.h"
#include "ccontrol/UserQuerySelect.h"
#include "ccontrol/UserQuerySelectCountStar.h"
Expand Down Expand Up @@ -93,6 +94,23 @@ bool _stmtRefersToProcessListTable(query::SelectStmt::Ptr& stmt, std::string def
return false;
}

/**
* @brief Determine if the table name in the FROM statement refers to QUERIES table.
*
* @param stmt SelectStmt representing the query.
* @param defaultDb Default database name, may be empty.
* @return true if the query refers only to the QUERIES table.
* @return false if the query does not refer only to the QUERIES table.
*/
bool _stmtRefersQueriesTable(query::SelectStmt::Ptr& stmt, std::string defaultDb) {
auto const& tableRefList = stmt->getFromList().getTableRefList();
if (tableRefList.size() != 1) return false;
auto const& tblRef = tableRefList[0];
std::string const& db = tblRef->getDb().empty() ? defaultDb : tblRef->getDb();
if (UserQueryType::isQueriesTable(db, tblRef->getTable())) return true;
return false;
}

/**
* @brief Make a UserQueryProcessList (or UserQueryInvalid) from given parameters.
*
Expand Down Expand Up @@ -123,6 +141,35 @@ std::shared_ptr<UserQuery> _makeUserQueryProcessList(query::SelectStmt::Ptr& stm
}
}

/**
* @brief Make a UserQueryQueries (or UserQueryInvalid) from given parameters.
*
* @param stmt The SelectStmt representing the query.
* @param sharedResources Resources used by UserQueryFactory to create UserQueries.
* @param userQueryId Unique string identifying the query.
* @param resultDb Name of the databse that will contain results.
* @param aQuery The original query string.
* @param async If the query is to be run asynchronously.
* @return std::shared_ptr<UserQuery>, will be a UserQueryQueries or UserQueryInvalid.
*/
std::shared_ptr<UserQuery> _makeUserQueryQueries(query::SelectStmt::Ptr& stmt,
userQuerySharedResourcesPtr& sharedResources,
std::string const& userQueryId, std::string const& resultDb,
std::string const& aQuery, bool async) {
if (async) {
// no point supporting async for these
return std::make_shared<UserQueryInvalid>("SUBMIT is not allowed with query: " + aQuery);
}
LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a QUERIES");
try {
return std::make_shared<UserQueryQueries>(stmt, sharedResources->resultDbConn.get(),
sharedResources->qMetaSelect, sharedResources->qMetaCzarId,
userQueryId, resultDb);
} catch (std::exception const& exc) {
return std::make_shared<UserQueryInvalid>(exc.what());
}
}

/**
* @brief Determine if the qmeta database has a metadata table with chunks & row
* counts that represents the table in the FROM statement for a SELECT
Expand Down Expand Up @@ -259,6 +306,10 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
return _makeUserQueryProcessList(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery,
async);
}
if (_stmtRefersQueriesTable(stmt, defaultDb)) {
return _makeUserQueryQueries(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery,
async);
}

/// Determine if a SelectStmt is a simple COUNT(*) query and can be run as an optimized query.
/// It may not be runnable as an optimzed simple COUNT(*) query because:
Expand Down
31 changes: 10 additions & 21 deletions src/ccontrol/UserQueryProcessList.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> co
auto qtempl = stmt->getQueryTemplate();
_query = qtempl.sqlFragment();

// we also do not want to return too many results by default
// as QMeta can contain a lot of records, limit ourselves
// to some reasonable number, users can override with different LIMIT
if (!stmt->hasLimit()) {
_query += " LIMIT 1000";
}

if (stmt->hasOrderBy()) {
_orderBy = stmt->getOrderBy().sqlFragment();
}
Expand All @@ -107,20 +100,16 @@ UserQueryProcessList::UserQueryProcessList(bool full, sql::SqlConnection* result
_messageStore(std::make_shared<qdisp::MessageStore>()),
_resultTableName(::g_nextResultTableId(userQueryId)),
_resultDb(resultDb) {
// use ShowProcessList view with completion statistics.
_query = "SELECT Id, User, Host, db, Command, Time, State, ";
_query += full ? "Info, " : "SUBSTRING(Info FROM 1 FOR 100) Info, ";
_query += "totalChunks, completedChunks, lastUpdate, ";
// These are non-standard but they need to be there because they appear in WHERE
_query += "CzarId, Submitted, Completed, ResultLocation";
_query += " FROM ShowProcessList";

// only show stuff for current czar and not too old
_query += " WHERE CzarId = " + std::to_string(qMetaCzarId) +
" AND "
"(Completed IS NULL OR Completed > NOW() - INTERVAL 3 DAY)";

_orderBy = "Submitted";
_query = "SELECT `qi`.`queryId` `ID`,`qi`.`qType` `TYPE`,`qc`.`czar` `CZAR`,`qc`.`czarId` `CZAR_ID`,"
"`qi`.`submitted` `SUBMITTED`,`qs`.`lastUpdate` `UPDATED`,`qi`.`chunkCount` `CHUNKS`,"
"`qs`.`completedChunks` `CHUNKS_COMPL`,";
_query += (full ? "`qi`.`query`" : "SUBSTR(`qi`.`query`,1,32) `QUERY`");
_query +=
" FROM `QInfo` AS `qi` "
" LEFT OUTER JOIN `QStatsTmp` AS `qs` ON `qi`.`queryId`=`qs`.`queryId`"
" JOIN `QCzar` AS `qc` ON `qi`.`czarId`=`qc`.`czarId`"
" WHERE `qi`.`status` = 'EXECUTING'";
_orderBy = "`SUBMITTED`";
}

std::string UserQueryProcessList::getError() const { return std::string(); }
Expand Down
Loading