From 228c039af8e04ae554be00d577de566e53e3ff59 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Sat, 20 Jan 2024 05:24:56 +0000 Subject: [PATCH] Early termination of jobs to avoid pulling too much data from workers Checking if a job gets cancelled at rproc::InfileMerger while pulling and processing data from a worker. This is meant to reinforce an existing termination logic at qdisp::QueryRequest. NOTE: Upon further experiments with this change, it doesn't seem to make any difference. Apparently, a problem is that only one job at a time is able to ingest into the result table. Other jobs (there are 300 threads at USDF) are waiting on the corresponding mutex waiting for their turn to ingest rows into the table. The second issue is about (potentially) the huge amount of the result data that might be accumulated at workers since workers keep processing queries. This can't be prevented from Czar due to a lack of the back pressure like the memory-based one of the SSI streaming. Hence, a possible solution would be to introduce the result-size-based "break" at workers that would delay processing jobs of a query where there is a substantial (say more than 5 GB) amount of unclaimed files at the same worker. Another improvement is to add the result processing pool at Czar, a queue for worker responses, and the (priority + queryId)-based algorithm for feeding responses into the pool. The algorithm would give the higher priority for the interactive queries, and for the can queries it would give higher priority to the older (smaller queryId) queries. However, that should be done in a separate JIRA ticket. --- src/ccontrol/MergingHandler.cc | 2 +- src/rproc/InfileMerger.cc | 14 +++++++++++++- src/rproc/InfileMerger.h | 7 +++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 4b3fda182..27aacae01 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -480,7 +480,7 @@ bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary, if (_flushed) { throw util::Bug(ERR_LOC, "already flushed"); } - bool success = _infileMerger->merge(responseSummary, responseData); + bool success = _infileMerger->merge(responseSummary, responseData, job); if (!success) { LOGS(_log, LOG_LVL_WARN, __func__ << " failed"); util::Error const& err = _infileMerger->getError(); diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 2514c5d3c..00a713950 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -59,6 +59,8 @@ #include "proto/ProtoImporter.h" #include "proto/worker.pb.h" #include "qdisp/CzarStats.h" +#include "qdisp/Executive.h" +#include "qdisp/JobQuery.h" #include "qproc/DatabaseModels.h" #include "query/ColumnRef.h" #include "query/SelectStmt.h" @@ -219,7 +221,8 @@ void InfileMerger::mergeCompleteFor(int jobId) { } bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, - proto::ResponseData const& responseData) { + proto::ResponseData const& responseData, + std::shared_ptr const& jq) { int const jobId = responseSummary.jobid(); std::string queryIdJobStr = QueryIdHelper::makeIdStr(responseSummary.queryid(), jobId); if (!_queryIdStrSet) { @@ -231,6 +234,15 @@ bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, return true; } + // Do nothing if the query got cancelled for any reason. + if (jq->isQueryCancelled()) { + return true; + } + auto executive = jq->getExecutive(); + if (executive == nullptr || executive->getCancelled() || executive->isLimitRowComplete()) { + return true; + } + std::unique_ptr semaLock; if (_dbEngine != MYISAM) { // needed for parallel merging with INNODB and MEMORY diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index 246ea9689..116aabaf1 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -53,8 +53,9 @@ class ResponseData; class ResponseSummary; } // namespace proto namespace qdisp { +class JobQuery; class MessageStore; -} +} // namespace qdisp namespace qproc { class DatabaseModels; } @@ -159,8 +160,10 @@ class InfileMerger { std::string engineToStr(InfileMerger::DbEngine engine); /// Merge a worker response, which contains a single ResponseData message + /// Using job query info for early termination of the merge if needed. /// @return true if merge was successfully imported. - bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData); + bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, + std::shared_ptr const& jq); /// Indicate the merge for the job is complete. void mergeCompleteFor(int jobId);