From 59893b1c04427c8f51d14e0ca079cedc551bbe02 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 22 Jan 2024 23:16:48 +0000 Subject: [PATCH] Fixed a bug in the result reading algorithm at Czar. In the previous version, the reading of files over HTTP is terminated when a TCP connection is closed by a worker. However, the latter might not happen if the HTTP server is configured to reuse sockets. In the new version of the code the file transfer is finished when the reader gets enough rows (as specified in the worker response summary message). Also, did some minor refactoring in the row counting code. --- src/ccontrol/MergingHandler.cc | 31 ++++++++++++++++++++++--------- src/ccontrol/MergingHandler.h | 2 +- src/qdisp/Executive.cc | 2 +- src/qdisp/Executive.h | 2 +- src/qdisp/QueryRequest.cc | 2 +- src/qdisp/QueryRequest.h | 2 +- src/qdisp/ResponseHandler.h | 2 +- 7 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 27aacae01..ae3c67795 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -114,7 +114,7 @@ lsst::qserv::TimeCountTracker::CALLBACKFUNC const reportFileRecvRate = }; bool readXrootFileResourceAndMerge(string const& xrootUrl, - function const& messageIsReady) { + function const& messageIsReady) { string const context = "MergingHandler::" + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl); @@ -133,6 +133,10 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, return false; } + // A value of the flag is set by the message processor when it's time to finish + // or abort reading the file. + bool last = false; + // Temporary buffer for messages read from the file. The buffer will be (re-)allocated // as needed to get the largest message. Note that a size of the messages won't exceed // a limit set in ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT. @@ -142,7 +146,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, uint64_t offset = 0; // A location of the next byte to be read from the input file. bool success = true; try { - while (true) { + while (!last) { // This starts a timer of the data transmit rate tracker. auto transmitRateTracker = make_unique>(reportFileRecvRate); @@ -206,7 +210,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, transmitRateTracker.reset(); // Proceed to the result merge - success = messageIsReady(buf.get(), msgSizeBytes); + success = messageIsReady(buf.get(), msgSizeBytes, last); if (!success) break; } } catch (exception const& ex) { @@ -230,7 +234,8 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, return success; } -bool readHttpFileAndMerge(string const& httpUrl, function const& messageIsReady, +bool readHttpFileAndMerge(string const& httpUrl, + function const& messageIsReady, shared_ptr const& httpConnPool) { string const context = "MergingHandler::" + string(__func__) + " "; @@ -273,9 +278,12 @@ bool readHttpFileAndMerge(string const& httpUrl, function merger, std: MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); } -bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& resultRows) { +bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) { _wName = responseSummary.wname(); // Why this is needed to ensure the job query would be staying alive for the duration @@ -413,11 +421,16 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& r // the result delivery protocol configured at the worker. // Dispatch result processing to the corresponidng method which depends on // the result delivery protocol configured at the worker. - auto const dataMerger = [&](char const* buf, uint32_t size) { + // Notify the file reader when all rows have been read by setting 'last = true'. + auto const dataMerger = [&](char const* buf, uint32_t size, bool& last) { + last = true; proto::ResponseData responseData; if (responseData.ParseFromArray(buf, size) && responseData.IsInitialized()) { bool const success = _merge(responseSummary, responseData); - if (success) resultRows += responseData.row_size(); + if (success) { + resultRows += responseData.row_size(); + last = resultRows >= responseSummary.rowcount(); + } return success; } throw runtime_error("MergingHandler::flush ** message deserialization failed **"); diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index ab941ce73..0ccd3cb10 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -68,7 +68,7 @@ class MergingHandler : public qdisp::ResponseHandler { /// Process the response and read the result file if no error was reported by a worker. /// @return true if successful (no error) - bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) override; + bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override; /// Signal an unrecoverable error condition. No further calls are expected. void errorFlush(std::string const& msg, int code) override; diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 49043a9e7..71541cd78 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -635,7 +635,7 @@ void Executive::_setupLimit() { _limitSquashApplies = hasLimit && !(groupBy || orderBy || allChunksRequired); } -void Executive::addResultRows(int rowCount) { _totalResultRows += rowCount; } +void Executive::addResultRows(uint32_t rowCount) { _totalResultRows += rowCount; } void Executive::checkLimitRowComplete() { if (!_limitSquashApplies) return; diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index 83bee5de4..6993bf96f 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -148,7 +148,7 @@ class Executive : public std::enable_shared_from_this { bool startQuery(std::shared_ptr const& jobQuery); /// Add 'rowCount' to the total number of rows in the result table. - void addResultRows(int rowCount); + void addResultRows(uint32_t rowCount); int64_t getTotalResultRows() const { return _totalResultRows; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index ebadff3f6..a8d4e8f2e 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -199,7 +199,7 @@ bool QueryRequest::_importResultFile(JobQuery::Ptr const& jq) { LOGS(_log, LOG_LVL_ERROR, __func__ << " " << err); throw util::Bug(ERR_LOC, err); } - int resultRows = 0; + uint32_t resultRows = 0; if (!jq->getDescription()->respHandler()->flush(responseSummary, resultRows)) { LOGS(_log, LOG_LVL_ERROR, __func__ << " not flushOk"); _flushError(jq); diff --git a/src/qdisp/QueryRequest.h b/src/qdisp/QueryRequest.h index a9c697ef6..9ba10edd6 100644 --- a/src/qdisp/QueryRequest.h +++ b/src/qdisp/QueryRequest.h @@ -149,7 +149,7 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this _rowsIgnored{0}; ///< Limit log messages about rows being ignored. std::atomic _respCount{0}; ///< number of responses created diff --git a/src/qdisp/ResponseHandler.h b/src/qdisp/ResponseHandler.h index 507c2fdf0..95a82f835 100644 --- a/src/qdisp/ResponseHandler.h +++ b/src/qdisp/ResponseHandler.h @@ -64,7 +64,7 @@ class ResponseHandler { /// @param responseSummary - worker response to be analyzed and processed /// @param resultRows - number of result rows in this result. /// @return true if successful (no error) - virtual bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) = 0; + virtual bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) = 0; /// Signal an unrecoverable error condition. No further calls are expected. virtual void errorFlush(std::string const& msg, int code) = 0;