Skip to content

Commit

Permalink
Early termination of jobs to avoid pulling too much data from workers
Browse files Browse the repository at this point in the history
Checking if a job gets cancelled at rproc::InfileMerger while pulling
and processing data from a worker. This is meant to reinforce
an existing termination logic at qdisp::QueryRequest.

NOTE: Upon further experiments with this change, it doesn't seem
to make any difference. Apparently, a problem is that only one job
at a time is able to ingest into the result table. Other jobs (there
are 300 threads at USDF) are waiting on the corresponding mutex
waiting for their turn to ingest rows into the table. The second issue
is about (potentially) the huge amount of the result data that might be
accumulated at workers since workers keep processing queries. This can't
be prevented from Czar due to a lack of the back pressure like
the memory-based one of the SSI streaming. Hence, a possible solution
would be to introduce the result-size-based "break" at workers that would
delay processing jobs of a query where there is a substantial (say more
than 5 GB) amount of unclaimed files at the same worker.
Another improvement is to add the result processing pool at Czar, a queue
for worker responses, and the (priority + queryId)-based algorithm for
feeding responses into the pool. The algorithm would give the higher priority
for the interactive queries, and for the can queries it would give higher
priority to the older (smaller queryId) queries. However, that should be
done in a separate JIRA ticket.
  • Loading branch information
iagaponenko committed Jan 24, 2024
1 parent db7e7ea commit 65f03b6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary,
if (_flushed) {
throw util::Bug(ERR_LOC, "already flushed");
}
bool success = _infileMerger->merge(responseSummary, responseData);
bool success = _infileMerger->merge(responseSummary, responseData, job);
if (!success) {
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
util::Error const& err = _infileMerger->getError();
Expand Down
14 changes: 13 additions & 1 deletion src/rproc/InfileMerger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
#include "proto/ProtoImporter.h"
#include "proto/worker.pb.h"
#include "qdisp/CzarStats.h"
#include "qdisp/Executive.h"
#include "qdisp/JobQuery.h"
#include "qproc/DatabaseModels.h"
#include "query/ColumnRef.h"
#include "query/SelectStmt.h"
Expand Down Expand Up @@ -219,7 +221,8 @@ void InfileMerger::mergeCompleteFor(int jobId) {
}

bool InfileMerger::merge(proto::ResponseSummary const& responseSummary,
proto::ResponseData const& responseData) {
proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jq) {
int const jobId = responseSummary.jobid();
std::string queryIdJobStr = QueryIdHelper::makeIdStr(responseSummary.queryid(), jobId);
if (!_queryIdStrSet) {
Expand All @@ -231,6 +234,15 @@ bool InfileMerger::merge(proto::ResponseSummary const& responseSummary,
return true;
}

// Do nothing if the query got cancelled for any reason.
if (jq->isQueryCancelled()) {
return true;
}
auto executive = jq->getExecutive();
if (executive == nullptr || executive->getCancelled() || executive->isLimitRowComplete()) {
return true;
}

std::unique_ptr<util::SemaLock> semaLock;
if (_dbEngine != MYISAM) {
// needed for parallel merging with INNODB and MEMORY
Expand Down
7 changes: 5 additions & 2 deletions src/rproc/InfileMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ class ResponseData;
class ResponseSummary;
} // namespace proto
namespace qdisp {
class JobQuery;
class MessageStore;
}
} // namespace qdisp
namespace qproc {
class DatabaseModels;
}
Expand Down Expand Up @@ -159,8 +160,10 @@ class InfileMerger {
std::string engineToStr(InfileMerger::DbEngine engine);

/// Merge a worker response, which contains a single ResponseData message
/// Using job query info for early termination of the merge if needed.
/// @return true if merge was successfully imported.
bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData);
bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jq);

/// Indicate the merge for the job is complete.
void mergeCompleteFor(int jobId);
Expand Down

0 comments on commit 65f03b6

Please sign in to comment.