diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 8453922ac..5fa37d227 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -113,7 +113,7 @@ lsst::qserv::TimeCountTracker::CALLBACKFUNC const reportFileRecvRate = }; bool readXrootFileResourceAndMerge(string const& xrootUrl, - function const& messageIsReady) { + function const& messageIsReady) { string const context = "MergingHandler::" + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl); @@ -132,6 +132,10 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, return false; } + // A value of the flag is set by the message processor when it's time to finish + // or abort reading the file. + bool last = false; + // Temporary buffer for messages read from the file. The buffer will be (re-)allocated // as needed to get the largest message. Note that a size of the messages won't exceed // a limit set in ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT. @@ -141,7 +145,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, uint64_t offset = 0; // A location of the next byte to be read from the input file. bool success = true; try { - while (true) { + while (!last) { // This starts a timer of the data transmit rate tracker. auto transmitRateTracker = make_unique>(reportFileRecvRate); @@ -205,7 +209,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, transmitRateTracker.reset(); // Proceed to the result merge - success = messageIsReady(buf.get(), msgSizeBytes); + success = messageIsReady(buf.get(), msgSizeBytes, last); if (!success) break; } } catch (exception const& ex) { @@ -230,7 +234,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl, } bool readHttpFileAndMerge(string const& httpUrl, - function const& messageIsReady) { + function const& messageIsReady) { string const context = "MergingHandler::" + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl); @@ -272,9 +276,12 @@ bool readHttpFileAndMerge(string const& httpUrl, clientConfig.tcpKeepIntvl = 5; // the default is 60 sec http::Client reader(http::Method::GET, httpUrl, noClientData, noClientHeaders, clientConfig); reader.read([&](char const* inBuf, size_t inBufSize) { + // A value of the flag is set by the message processor when it's time to finish + // or abort reading the file. + bool last = false; char const* next = inBuf; char const* const end = inBuf + inBufSize; - while (next < end) { + while ((next < end) && !last) { if (msgSizeBytes == 0) { // Continue or finish reading the frame header. size_t const bytes2read = @@ -330,7 +337,7 @@ bool readHttpFileAndMerge(string const& httpUrl, } // Parse and evaluate the message. - bool const success = messageIsReady(msgBuf.get(), msgSizeBytes); + bool const success = messageIsReady(msgBuf.get(), msgSizeBytes, last); if (!success) { throw runtime_error(context + "message processing failed at offset " + to_string(offset - msgSizeBytes) + ", file: " + httpUrl); @@ -376,7 +383,7 @@ MergingHandler::MergingHandler(std::shared_ptr merger, std: MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); } -bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& resultRows) { +bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) { _wName = responseSummary.wname(); // Why this is needed to ensure the job query would be staying alive for the duration @@ -404,11 +411,16 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& r // the result delivery protocol configured at the worker. // Dispatch result processing to the corresponidng method which depends on // the result delivery protocol configured at the worker. - auto const dataMerger = [&](char const* buf, uint32_t size) { + // Notify the file reader when all rows have been read by setting 'last = true'. + auto const dataMerger = [&](char const* buf, uint32_t size, bool& last) { + last = true; proto::ResponseData responseData; if (responseData.ParseFromArray(buf, size) && responseData.IsInitialized()) { bool const success = _merge(responseSummary, responseData); - if (success) resultRows += responseData.row_size(); + if (success) { + resultRows += responseData.row_size(); + last = resultRows >= responseSummary.rowcount(); + } return success; } throw runtime_error("MergingHandler::flush ** message deserialization failed **"); diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index 32f571254..89fc1736d 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -63,7 +63,7 @@ class MergingHandler : public qdisp::ResponseHandler { /// Process the response and read the result file if no error was reported by a worker. /// @return true if successful (no error) - bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) override; + bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override; /// Signal an unrecoverable error condition. No further calls are expected. void errorFlush(std::string const& msg, int code) override; diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 49043a9e7..71541cd78 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -635,7 +635,7 @@ void Executive::_setupLimit() { _limitSquashApplies = hasLimit && !(groupBy || orderBy || allChunksRequired); } -void Executive::addResultRows(int rowCount) { _totalResultRows += rowCount; } +void Executive::addResultRows(uint32_t rowCount) { _totalResultRows += rowCount; } void Executive::checkLimitRowComplete() { if (!_limitSquashApplies) return; diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index 83bee5de4..6993bf96f 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -148,7 +148,7 @@ class Executive : public std::enable_shared_from_this { bool startQuery(std::shared_ptr const& jobQuery); /// Add 'rowCount' to the total number of rows in the result table. - void addResultRows(int rowCount); + void addResultRows(uint32_t rowCount); int64_t getTotalResultRows() const { return _totalResultRows; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index ebadff3f6..a8d4e8f2e 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -199,7 +199,7 @@ bool QueryRequest::_importResultFile(JobQuery::Ptr const& jq) { LOGS(_log, LOG_LVL_ERROR, __func__ << " " << err); throw util::Bug(ERR_LOC, err); } - int resultRows = 0; + uint32_t resultRows = 0; if (!jq->getDescription()->respHandler()->flush(responseSummary, resultRows)) { LOGS(_log, LOG_LVL_ERROR, __func__ << " not flushOk"); _flushError(jq); diff --git a/src/qdisp/QueryRequest.h b/src/qdisp/QueryRequest.h index a9c697ef6..9ba10edd6 100644 --- a/src/qdisp/QueryRequest.h +++ b/src/qdisp/QueryRequest.h @@ -149,7 +149,7 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this _rowsIgnored{0}; ///< Limit log messages about rows being ignored. std::atomic _respCount{0}; ///< number of responses created diff --git a/src/qdisp/ResponseHandler.h b/src/qdisp/ResponseHandler.h index 507c2fdf0..95a82f835 100644 --- a/src/qdisp/ResponseHandler.h +++ b/src/qdisp/ResponseHandler.h @@ -64,7 +64,7 @@ class ResponseHandler { /// @param responseSummary - worker response to be analyzed and processed /// @param resultRows - number of result rows in this result. /// @return true if successful (no error) - virtual bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) = 0; + virtual bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) = 0; /// Signal an unrecoverable error condition. No further calls are expected. virtual void errorFlush(std::string const& msg, int code) = 0;