From 46ed74ce77635111247a0cc29b39816d4cf1e159 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 22 Jan 2024 23:16:48 +0000 Subject: [PATCH] Fixed a bug in the result reading algorithm at Czar. In the previous version, the reading of files over HTTP is terminated when a TCP connection is closed by a worker. However, the latter might not happen if the HTTP server is configured to reuse sockets. In the new version of the code the file transfer is finished when the reader gets enough rows (as specified in the worker response summary message). Also, did some minor refactoring in the row counting code. --- src/ccontrol/MergingHandler.cc | 30 +++++++++++++++++++++--------- src/ccontrol/MergingHandler.h | 2 +- src/qdisp/Executive.cc | 2 +- src/qdisp/Executive.h | 2 +- src/qdisp/QueryRequest.cc | 2 +- src/qdisp/QueryRequest.h | 2 +- src/qdisp/ResponseHandler.h | 2 +- 7 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 8453922ac..5fa37d227 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -113,7 +113,7 @@ lsst::qserv::TimeCountTracker::CALLBACKFUNC const reportFileRecvRate = }; bool readXrootFileResourceAndMerge(string const& xrootUrl, - function const& messageIsReady) { + function const& messageIsReady) { string const context = "MergingHandler::" + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl); @@ -132,6 +132,10 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, return false; } + // A value of the flag is set by the message processor when it's time to finish + // or abort reading the file. + bool last = false; + // Temporary buffer for messages read from the file. The buffer will be (re-)allocated // as needed to get the largest message. Note that a size of the messages won't exceed // a limit set in ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT. @@ -141,7 +145,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, uint64_t offset = 0; // A location of the next byte to be read from the input file. bool success = true; try { - while (true) { + while (!last) { // This starts a timer of the data transmit rate tracker. auto transmitRateTracker = make_unique>(reportFileRecvRate); @@ -205,7 +209,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, transmitRateTracker.reset(); // Proceed to the result merge - success = messageIsReady(buf.get(), msgSizeBytes); + success = messageIsReady(buf.get(), msgSizeBytes, last); if (!success) break; } } catch (exception const& ex) { @@ -230,7 +234,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, } bool readHttpFileAndMerge(string const& httpUrl, - function const& messageIsReady) { + function const& messageIsReady) { string const context = "MergingHandler::" + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl); @@ -272,9 +276,12 @@ bool readHttpFileAndMerge(string const& httpUrl, clientConfig.tcpKeepIntvl = 5; // the default is 60 sec http::Client reader(http::Method::GET, httpUrl, noClientData, noClientHeaders, clientConfig); reader.read([&](char const* inBuf, size_t inBufSize) { + // A value of the flag is set by the message processor when it's time to finish + // or abort reading the file. + bool last = false; char const* next = inBuf; char const* const end = inBuf + inBufSize; - while (next < end) { + while ((next < end) && !last) { if (msgSizeBytes == 0) { // Continue or finish reading the frame header. size_t const bytes2read = @@ -330,7 +337,7 @@ bool readHttpFileAndMerge(string const& httpUrl, } // Parse and evaluate the message. - bool const success = messageIsReady(msgBuf.get(), msgSizeBytes); + bool const success = messageIsReady(msgBuf.get(), msgSizeBytes, last); if (!success) { throw runtime_error(context + "message processing failed at offset " + to_string(offset - msgSizeBytes) + ", file: " + httpUrl); @@ -376,7 +383,7 @@ MergingHandler::MergingHandler(std::shared_ptr merger, std: MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); } -bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& resultRows) { +bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) { _wName = responseSummary.wname(); // Why this is needed to ensure the job query would be staying alive for the duration @@ -404,11 +411,16 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& r // the result delivery protocol configured at the worker. // Dispatch result processing to the corresponidng method which depends on // the result delivery protocol configured at the worker. - auto const dataMerger = [&](char const* buf, uint32_t size) { + // Notify the file reader when all rows have been read by setting 'last = true'. + auto const dataMerger = [&](char const* buf, uint32_t size, bool& last) { + last = true; proto::ResponseData responseData; if (responseData.ParseFromArray(buf, size) && responseData.IsInitialized()) { bool const success = _merge(responseSummary, responseData); - if (success) resultRows += responseData.row_size(); + if (success) { + resultRows += responseData.row_size(); + last = resultRows >= responseSummary.rowcount(); + } return success; } throw runtime_error("MergingHandler::flush ** message deserialization failed **"); diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index 32f571254..89fc1736d 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -63,7 +63,7 @@ class MergingHandler : public qdisp::ResponseHandler { /// Process the response and read the result file if no error was reported by a worker. /// @return true if successful (no error) - bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) override; + bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override; /// Signal an unrecoverable error condition. No further calls are expected. void errorFlush(std::string const& msg, int code) override; diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 49043a9e7..71541cd78 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -635,7 +635,7 @@ void Executive::_setupLimit() { _limitSquashApplies = hasLimit && !(groupBy || orderBy || allChunksRequired); } -void Executive::addResultRows(int rowCount) { _totalResultRows += rowCount; } +void Executive::addResultRows(uint32_t rowCount) { _totalResultRows += rowCount; } void Executive::checkLimitRowComplete() { if (!_limitSquashApplies) return; diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index 83bee5de4..6993bf96f 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -148,7 +148,7 @@ class Executive : public std::enable_shared_from_this { bool startQuery(std::shared_ptr const& jobQuery); /// Add 'rowCount' to the total number of rows in the result table. - void addResultRows(int rowCount); + void addResultRows(uint32_t rowCount); int64_t getTotalResultRows() const { return _totalResultRows; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index ebadff3f6..a8d4e8f2e 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -199,7 +199,7 @@ bool QueryRequest::_importResultFile(JobQuery::Ptr const& jq) { LOGS(_log, LOG_LVL_ERROR, __func__ << " " << err); throw util::Bug(ERR_LOC, err); } - int resultRows = 0; + uint32_t resultRows = 0; if (!jq->getDescription()->respHandler()->flush(responseSummary, resultRows)) { LOGS(_log, LOG_LVL_ERROR, __func__ << " not flushOk"); _flushError(jq); diff --git a/src/qdisp/QueryRequest.h b/src/qdisp/QueryRequest.h index a9c697ef6..9ba10edd6 100644 --- a/src/qdisp/QueryRequest.h +++ b/src/qdisp/QueryRequest.h @@ -149,7 +149,7 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this _rowsIgnored{0}; ///< Limit log messages about rows being ignored. std::atomic _respCount{0}; ///< number of responses created diff --git a/src/qdisp/ResponseHandler.h b/src/qdisp/ResponseHandler.h index 507c2fdf0..95a82f835 100644 --- a/src/qdisp/ResponseHandler.h +++ b/src/qdisp/ResponseHandler.h @@ -64,7 +64,7 @@ class ResponseHandler { /// @param responseSummary - worker response to be analyzed and processed /// @param resultRows - number of result rows in this result. /// @return true if successful (no error) - virtual bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) = 0; + virtual bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) = 0; /// Signal an unrecoverable error condition. No further calls are expected. virtual void errorFlush(std::string const& msg, int code) = 0;