From 81d00bb9bcf2085ab61ee9fb82e388adccad6a26 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Tue, 9 Jan 2024 07:25:33 +0000 Subject: [PATCH] Refactored and streamlined implementation of FileChannelShared The new implementation no longer depends on a separate data processing class TransmitData for extracting rows from the result sets and generating the Protobuf-serialized output stream of data. --- src/wbase/FileChannelShared.cc | 64 ++++++++++++++++++++++++++-------- src/wbase/FileChannelShared.h | 13 +++++++ src/wbase/TransmitData.cc | 36 ------------------- src/wbase/TransmitData.h | 13 ------- 4 files changed, 62 insertions(+), 64 deletions(-) diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 76261cd37..6d830c5c6 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -391,7 +391,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLock(tMtx); @@ -436,6 +436,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrprepareResponse(*task, _rowcount, _transmitsize); bool const lastIn = true; if (!prepTransmit(tMtxLock, task, cancelled, lastIn)) { @@ -443,10 +444,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const& tMtxLock, shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) { - // Initialize transmitData, if needed. - initTransmit(tMtxLock, *task); - - // Transfer rows from a result set into the data buffer. - bool const hasMoreRows = !transmitData->fillRows(mResult); + // Initialize the result object + proto::Result result; + result.set_queryid(task->getQueryId()); + result.set_jobid(task->getJobId()); + result.set_fileresource_xroot(task->resultFileXrootUrl()); + result.set_fileresource_http(task->resultFileHttpUrl()); + result.set_attemptcount(task->getAttemptCount()); + + // Transfer rows from a result set into the object. + size_t tSize = 0; + bool const hasMoreRows = _fillRows(tMtxLock, mResult, result, rows, tSize); + result.set_rowcount(rows); + result.set_transmitsize(tSize); // Serialize the content of the data buffer into the Protobuf data message // that will be writen into the output file. - transmitData->buildDataMsg(*task, multiErr); - - bytes = transmitData->getResultSize(); - rows = transmitData->getResultRowCount(); + std::string msg; + result.SerializeToString(&msg); + bytes = msg.size(); - string const msg = transmitData->dataMsg(); + // Create the file if not open. if (!_file.is_open()) { _fileName = task->resultFilePath(); _file.open(_fileName, ios::out | ios::trunc | ios::binary); @@ -508,6 +512,36 @@ bool FileChannelShared::_writeToFile(lock_guard const& tMtxLock, shared_p return hasMoreRows; } +bool FileChannelShared::_fillRows(lock_guard const& tMtxLock, MYSQL_RES* mResult, + proto::Result& result, int& rows, size_t& tSize) { + int const numFields = mysql_num_fields(mResult); + unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT, + proto::ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT); + rows = 0; + tSize = 0; + MYSQL_ROW row; + while ((row = mysql_fetch_row(mResult))) { + auto lengths = mysql_fetch_lengths(mResult); + proto::RowBundle* rawRow = result.add_row(); + for (int i = 0; i < numFields; ++i) { + if (row[i]) { + rawRow->add_column(row[i], lengths[i]); + rawRow->add_isnull(false); + } else { + rawRow->add_column(); + rawRow->add_isnull(true); + } + } + tSize += rawRow->ByteSizeLong(); + ++rows; + + // Each element needs to be mysql-sanitized + // Break the loop if the result is too big so this part can be transmitted. + if (tSize > szLimit) return true; + } + return false; +} + void FileChannelShared::_removeFile(lock_guard const& tMtxLock) { if (!_fileName.empty() && _file.is_open()) { _file.close(); diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index ef60b7a64..c6f26eb77 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -43,6 +43,7 @@ // Forward declarations namespace lsst::qserv::proto { +class Result; class TaskMsg; } // namespace lsst::qserv::proto @@ -271,6 +272,18 @@ class FileChannelShared { bool _writeToFile(std::lock_guard const& tMtxLock, std::shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr); + /** + * Extract as many rows as allowed by the Google Protobuf implementation from + * from the input result set into the output result object. + * @param tMtxLock - a lock on the base class's mutex tMtx + * @param mResult - MySQL result to be used as a source + * @param result - the output result object to be filled in + * @param rows - the number of rows extracted from the result set + * @param tSize - the approximate amount of data extracted from the result set + * @return 'true' if there are more rows left in the result set. + */ + bool _fillRows(std::lock_guard const& tMtxLock, MYSQL_RES* mResult, proto::Result& result, + int& rows, size_t& tSize); /** * Unconditionaly close and remove (potentially - the partially written) file. * This method gets called in case of any failure detected while processing diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 81593e322..85cd25a54 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -183,37 +183,6 @@ void TransmitData::initResult(Task& task) { bool TransmitData::hasErrormsg() const { return _result->has_errormsg(); } -bool TransmitData::fillRows(MYSQL_RES* mResult) { - lock_guard const lock(_trMtx); - MYSQL_ROW row; - - int const numFields = mysql_num_fields(mResult); - unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT, - proto::ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT); - while ((row = mysql_fetch_row(mResult))) { - auto lengths = mysql_fetch_lengths(mResult); - proto::RowBundle* rawRow = _result->add_row(); - for (int i = 0; i < numFields; ++i) { - if (row[i]) { - rawRow->add_column(row[i], lengths[i]); - rawRow->add_isnull(false); - } else { - rawRow->add_column(); - rawRow->add_isnull(true); - } - } - _tSize += rawRow->ByteSizeLong(); - ++_rowCount; - - // Each element needs to be mysql-sanitized - // Break the loop if the result is too big so this part can be transmitted. - if (_tSize > szLimit) { - return false; - } - } - return true; -} - void TransmitData::prepareResponse(Task const& task, uint32_t rowcount, uint64_t transmitsize) { lock_guard const lock(_trMtx); _rowCount = rowcount; @@ -224,11 +193,6 @@ void TransmitData::prepareResponse(Task const& task, uint32_t rowcount, uint64_t _buildDataMsg(lock, task, multiErr); } -size_t TransmitData::getResultTransmitSize() const { - lock_guard const lock(_trMtx); - return _tSize; -} - int TransmitData::getResultSize() const { lock_guard const lock(_trMtx); return _dataMsg.size(); diff --git a/src/wbase/TransmitData.h b/src/wbase/TransmitData.h index 0c8accee2..3fbe8ceca 100644 --- a/src/wbase/TransmitData.h +++ b/src/wbase/TransmitData.h @@ -27,9 +27,6 @@ #include #include -// 3rd party headers -#include - // Qserv headers #include "proto/worker.pb.h" #include "qmeta/types.h" @@ -88,13 +85,6 @@ class TransmitData { /// @return the protobuf string for the header. std::string makeHeaderString(bool reallyLast); - /// Fill one row in the _result msg from one row in MYSQL_RES* 'mResult' - /// If the message has gotten larger than the desired message size, - /// return false. - /// @return false if there ARE MORE ROWS left in mResult. - /// true if there are no more rows remaining in mResult. - bool fillRows(MYSQL_RES* mResult); - /// Prepare the summary response by emptying the payload (rows) and setting /// the counters. /// @param task - the task responsible for the change @@ -108,9 +98,6 @@ class TransmitData { /// @return true if tData has an error message in _result. bool hasErrormsg() const; - /// @return the size of the result (the 'transmitsize' of the result) in bytes. - size_t getResultTransmitSize() const; - /// @return the size of the result in bytes. int getResultSize() const;