Skip to content

Commit

Permalink
Fixed a bug in the result reading algorithm at Czar.
Browse files Browse the repository at this point in the history
In the previous version, the reading of files over HTTP is terminated
when a TCP connection is closed by a worker. However, the latter might
not happen if the HTTP server is configured to reuse sockets.
In the new version of the code the file transfer is finished when
the reader gets enough rows (as specified in the worker response
summary message).

Also, did some minor refactoring in the row counting code.
  • Loading branch information
iagaponenko committed Jan 24, 2024
1 parent 65f03b6 commit 9fee761
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 15 deletions.
31 changes: 22 additions & 9 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ lsst::qserv::TimeCountTracker<double>::CALLBACKFUNC const reportFileRecvRate =
};

bool readXrootFileResourceAndMerge(string const& xrootUrl,
function<bool(char const*, uint32_t)> const& messageIsReady) {
function<bool(char const*, uint32_t, bool&)> const& messageIsReady) {
string const context = "MergingHandler::" + string(__func__) + " ";

LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl);
Expand All @@ -133,6 +133,10 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
return false;
}

// A value of the flag is set by the message processor when it's time to finish
// or abort reading the file.
bool last = false;

// Temporary buffer for messages read from the file. The buffer will be (re-)allocated
// as needed to get the largest message. Note that a size of the messages won't exceed
// a limit set in ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT.
Expand All @@ -142,7 +146,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
uint64_t offset = 0; // A location of the next byte to be read from the input file.
bool success = true;
try {
while (true) {
while (!last) {
// This starts a timer of the data transmit rate tracker.
auto transmitRateTracker = make_unique<lsst::qserv::TimeCountTracker<double>>(reportFileRecvRate);

Expand Down Expand Up @@ -206,7 +210,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
transmitRateTracker.reset();

// Proceed to the result merge
success = messageIsReady(buf.get(), msgSizeBytes);
success = messageIsReady(buf.get(), msgSizeBytes, last);
if (!success) break;
}
} catch (exception const& ex) {
Expand All @@ -230,7 +234,8 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
return success;
}

bool readHttpFileAndMerge(string const& httpUrl, function<bool(char const*, uint32_t)> const& messageIsReady,
bool readHttpFileAndMerge(string const& httpUrl,
function<bool(char const*, uint32_t, bool&)> const& messageIsReady,
shared_ptr<http::ClientConnPool> const& httpConnPool) {
string const context = "MergingHandler::" + string(__func__) + " ";

Expand Down Expand Up @@ -273,9 +278,12 @@ bool readHttpFileAndMerge(string const& httpUrl, function<bool(char const*, uint
http::Client reader(http::Method::GET, httpUrl, noClientData, noClientHeaders, clientConfig,
httpConnPool);
reader.read([&](char const* inBuf, size_t inBufSize) {
// A value of the flag is set by the message processor when it's time to finish
// or abort reading the file.
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
while (next < end) {
while ((next < end) && !last) {
if (msgSizeBytes == 0) {
// Continue or finish reading the frame header.
size_t const bytes2read =
Expand Down Expand Up @@ -331,7 +339,7 @@ bool readHttpFileAndMerge(string const& httpUrl, function<bool(char const*, uint
}

// Parse and evaluate the message.
bool const success = messageIsReady(msgBuf.get(), msgSizeBytes);
bool const success = messageIsReady(msgBuf.get(), msgSizeBytes, last);
if (!success) {
throw runtime_error(context + "message processing failed at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
Expand Down Expand Up @@ -385,7 +393,7 @@ MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std:

MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); }

bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& resultRows) {
bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) {
_wName = responseSummary.wname();

// Why this is needed to ensure the job query would be staying alive for the duration
Expand Down Expand Up @@ -413,11 +421,16 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& r
// the result delivery protocol configured at the worker.
// Dispatch result processing to the corresponidng method which depends on
// the result delivery protocol configured at the worker.
auto const dataMerger = [&](char const* buf, uint32_t size) {
// Notify the file reader when all rows have been read by setting 'last = true'.
auto const dataMerger = [&](char const* buf, uint32_t size, bool& last) {
last = true;
proto::ResponseData responseData;
if (responseData.ParseFromArray(buf, size) && responseData.IsInitialized()) {
bool const success = _merge(responseSummary, responseData);
if (success) resultRows += responseData.row_size();
if (success) {
resultRows += responseData.row_size();
last = resultRows >= responseSummary.rowcount();
}
return success;
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
Expand Down
2 changes: 1 addition & 1 deletion src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class MergingHandler : public qdisp::ResponseHandler {

/// Process the response and read the result file if no error was reported by a worker.
/// @return true if successful (no error)
bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) override;
bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override;

/// Signal an unrecoverable error condition. No further calls are expected.
void errorFlush(std::string const& msg, int code) override;
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ void Executive::_setupLimit() {
_limitSquashApplies = hasLimit && !(groupBy || orderBy || allChunksRequired);
}

void Executive::addResultRows(int rowCount) { _totalResultRows += rowCount; }
void Executive::addResultRows(uint32_t rowCount) { _totalResultRows += rowCount; }

void Executive::checkLimitRowComplete() {
if (!_limitSquashApplies) return;
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class Executive : public std::enable_shared_from_this<Executive> {
bool startQuery(std::shared_ptr<JobQuery> const& jobQuery);

/// Add 'rowCount' to the total number of rows in the result table.
void addResultRows(int rowCount);
void addResultRows(uint32_t rowCount);

int64_t getTotalResultRows() const { return _totalResultRows; }

Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/QueryRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ bool QueryRequest::_importResultFile(JobQuery::Ptr const& jq) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " " << err);
throw util::Bug(ERR_LOC, err);
}
int resultRows = 0;
uint32_t resultRows = 0;
if (!jq->getDescription()->respHandler()->flush(responseSummary, resultRows)) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " not flushOk");
_flushError(jq);
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/QueryRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this<Q

QdispPool::Ptr _qdispPool;

int _totalRows = 0; ///< number of rows in query added to the result table.
uint32_t _totalRows = 0; ///< number of rows in query added to the result table.

std::atomic<int> _rowsIgnored{0}; ///< Limit log messages about rows being ignored.
std::atomic<uint> _respCount{0}; ///< number of responses created
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/ResponseHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ResponseHandler {
/// @param responseSummary - worker response to be analyzed and processed
/// @param resultRows - number of result rows in this result.
/// @return true if successful (no error)
virtual bool flush(proto::ResponseSummary const& responseSummary, int& resultRows) = 0;
virtual bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) = 0;

/// Signal an unrecoverable error condition. No further calls are expected.
virtual void errorFlush(std::string const& msg, int code) = 0;
Expand Down

0 comments on commit 9fee761

Please sign in to comment.