diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 4b3fda182..27aacae01 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -480,7 +480,7 @@ bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary, if (_flushed) { throw util::Bug(ERR_LOC, "already flushed"); } - bool success = _infileMerger->merge(responseSummary, responseData); + bool success = _infileMerger->merge(responseSummary, responseData, job); if (!success) { LOGS(_log, LOG_LVL_WARN, __func__ << " failed"); util::Error const& err = _infileMerger->getError(); diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 2514c5d3c..00a713950 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -59,6 +59,8 @@ #include "proto/ProtoImporter.h" #include "proto/worker.pb.h" #include "qdisp/CzarStats.h" +#include "qdisp/Executive.h" +#include "qdisp/JobQuery.h" #include "qproc/DatabaseModels.h" #include "query/ColumnRef.h" #include "query/SelectStmt.h" @@ -219,7 +221,8 @@ void InfileMerger::mergeCompleteFor(int jobId) { } bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, - proto::ResponseData const& responseData) { + proto::ResponseData const& responseData, + std::shared_ptr const& jq) { int const jobId = responseSummary.jobid(); std::string queryIdJobStr = QueryIdHelper::makeIdStr(responseSummary.queryid(), jobId); if (!_queryIdStrSet) { @@ -231,6 +234,15 @@ bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, return true; } + // Do nothing if the query got cancelled for any reason. + if (jq->isQueryCancelled()) { + return true; + } + auto executive = jq->getExecutive(); + if (executive == nullptr || executive->getCancelled() || executive->isLimitRowComplete()) { + return true; + } + std::unique_ptr semaLock; if (_dbEngine != MYISAM) { // needed for parallel merging with INNODB and MEMORY diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index 246ea9689..116aabaf1 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -53,8 +53,9 @@ class ResponseData; class ResponseSummary; } // namespace proto namespace qdisp { +class JobQuery; class MessageStore; -} +} // namespace qdisp namespace qproc { class DatabaseModels; } @@ -159,8 +160,10 @@ class InfileMerger { std::string engineToStr(InfileMerger::DbEngine engine); /// Merge a worker response, which contains a single ResponseData message + /// Using job query info for early termination of the merge if needed. /// @return true if merge was successfully imported. - bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData); + bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, + std::shared_ptr const& jq); /// Indicate the merge for the job is complete. void mergeCompleteFor(int jobId);