Skip to content

Commit

Permalink
Refactored and streamlined implementation of FileChannelShared
Browse files Browse the repository at this point in the history
The new implementation no longer depends on a separate data processing
class TransmitData for extracting rows from the result sets and
generating the Protobuf-serialized output stream of data.

Eliminated unused methods from FileChannelShared.
  • Loading branch information
iagaponenko committed Jan 9, 2024
1 parent d6aee71 commit 49f935f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 98 deletions.
84 changes: 49 additions & 35 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,6 @@ bool FileChannelShared::transmitTaskLast() {
return lastTaskDone;
}

bool FileChannelShared::send(char const* buf, int bufLen) {
lock_guard<mutex> const streamMutexLock(_streamMutex);
return _sendChannel->send(buf, bufLen);
}

bool FileChannelShared::sendError(string const& msg, int code) {
lock_guard<mutex> const streamMutexLock(_streamMutex);
return _sendChannel->sendError(msg, code);
}

bool FileChannelShared::sendFile(int fd, wbase::SendChannel::Size fSize) {
lock_guard<mutex> const streamMutexLock(_streamMutex);
return _sendChannel->sendFile(fd, fSize);
}

bool FileChannelShared::sendStream(shared_ptr<xrdsvc::StreamBuffer> const& sBuf, bool last) {
lock_guard<mutex> const streamMutexLock(_streamMutex);
return _sendChannel->sendStream(sBuf, last);
}

bool FileChannelShared::kill(string const& note) {
lock_guard<mutex> const streamMutexLock(_streamMutex);
return _kill(streamMutexLock, note);
Expand Down Expand Up @@ -391,7 +371,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
bool erred = false;
bool hasMoreRows = true;

// This lock is to protect transmitData from having other Tasks mess with it
// This lock is to protect the stream from having other Tasks mess with it
// while data is loading.
lock_guard<mutex> const tMtxLock(tMtx);

Expand Down Expand Up @@ -436,17 +416,14 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta

// Only the last ("summary") message w/o any rows is sent to Czar to notify
// the about completion of the request.
initTransmit(tMtxLock, *task);
transmitData->prepareResponse(*task, _rowcount, _transmitsize);
bool const lastIn = true;
if (!prepTransmit(tMtxLock, task, cancelled, lastIn)) {
LOGS(_log, LOG_LVL_ERROR, "Could not transmit the summary message to Czar.");
erred = true;
break;
}
} else {
// Scrap the transmit buffer to be ready for processing the next set of rows
// of the current or the next task of the request.
transmitData.reset();
}
}
transmitT.stop();
Expand All @@ -473,20 +450,27 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta

bool FileChannelShared::_writeToFile(lock_guard<mutex> const& tMtxLock, shared_ptr<Task> const& task,
MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) {
// Initialize transmitData, if needed.
initTransmit(tMtxLock, *task);

// Transfer rows from a result set into the data buffer.
bool const hasMoreRows = !transmitData->fillRows(mResult);
// Initialize the result object
proto::Result result;
result.set_queryid(task->getQueryId());
result.set_jobid(task->getJobId());
result.set_fileresource_xroot(task->resultFileXrootUrl());
result.set_fileresource_http(task->resultFileHttpUrl());
result.set_attemptcount(task->getAttemptCount());

// Transfer rows from a result set into the object.
size_t tSize = 0;
bool const hasMoreRows = _fillRows(tMtxLock, mResult, result, rows, tSize);
result.set_rowcount(rows);
result.set_transmitsize(tSize);

// Serialize the content of the data buffer into the Protobuf data message
// that will be writen into the output file.
transmitData->buildDataMsg(*task, multiErr);
std::string msg;
result.SerializeToString(&msg);
bytes = msg.size();

bytes = transmitData->getResultSize();
rows = transmitData->getResultRowCount();

string const msg = transmitData->dataMsg();
// Create the file if not open.
if (!_file.is_open()) {
_fileName = task->resultFilePath();
_file.open(_fileName, ios::out | ios::trunc | ios::binary);
Expand All @@ -508,6 +492,36 @@ bool FileChannelShared::_writeToFile(lock_guard<mutex> const& tMtxLock, shared_p
return hasMoreRows;
}

bool FileChannelShared::_fillRows(lock_guard<mutex> const& tMtxLock, MYSQL_RES* mResult,
proto::Result& result, int& rows, size_t& tSize) {
int const numFields = mysql_num_fields(mResult);
unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT,
proto::ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT);
rows = 0;
tSize = 0;
MYSQL_ROW row;
while ((row = mysql_fetch_row(mResult))) {
auto lengths = mysql_fetch_lengths(mResult);
proto::RowBundle* rawRow = result.add_row();
for (int i = 0; i < numFields; ++i) {
if (row[i]) {
rawRow->add_column(row[i], lengths[i]);
rawRow->add_isnull(false);
} else {
rawRow->add_column();
rawRow->add_isnull(true);
}
}
tSize += rawRow->ByteSizeLong();
++rows;

// Each element needs to be mysql-sanitized
// Break the loop if the result is too big so this part can be transmitted.
if (tSize > szLimit) return true;
}
return false;
}

void FileChannelShared::_removeFile(lock_guard<mutex> const& tMtxLock) {
if (!_fileName.empty() && _file.is_open()) {
_file.close();
Expand Down
27 changes: 13 additions & 14 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
// Forward declarations

namespace lsst::qserv::proto {
class Result;
class TaskMsg;
} // namespace lsst::qserv::proto

Expand Down Expand Up @@ -159,20 +160,6 @@ class FileChannelShared {
bool buildAndTransmitResult(MYSQL_RES* mResult, std::shared_ptr<Task> const& task,
util::MultiError& multiErr, std::atomic<bool>& cancelled);

/// Wrappers for wbase::SendChannel public functions that may need to be used
/// by threads.
/// @see wbase::SendChannel::send
bool send(char const* buf, int bufLen);

/// @see wbase::SendChannel::sendError
bool sendError(std::string const& msg, int code);

/// @see wbase::SendChannel::sendFile
bool sendFile(int fd, wbase::SendChannel::Size fSize);

/// @see wbase::SendChannel::sendStream
bool sendStream(std::shared_ptr<xrdsvc::StreamBuffer> const& sBuf, bool last);

/// @see wbase::SendChannel::kill
bool kill(std::string const& note);

Expand Down Expand Up @@ -271,6 +258,18 @@ class FileChannelShared {
bool _writeToFile(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr);

/**
* Extract as many rows as allowed by the Google Protobuf implementation from
* from the input result set into the output result object.
* @param tMtxLock - a lock on the base class's mutex tMtx
* @param mResult - MySQL result to be used as a source
* @param result - the output result object to be filled in
* @param rows - the number of rows extracted from the result set
* @param tSize - the approximate amount of data extracted from the result set
* @return 'true' if there are more rows left in the result set.
*/
bool _fillRows(std::lock_guard<std::mutex> const& tMtxLock, MYSQL_RES* mResult, proto::Result& result,
int& rows, size_t& tSize);
/**
* Unconditionaly close and remove (potentially - the partially written) file.
* This method gets called in case of any failure detected while processing
Expand Down
36 changes: 0 additions & 36 deletions src/wbase/TransmitData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,37 +183,6 @@ void TransmitData::initResult(Task& task) {

bool TransmitData::hasErrormsg() const { return _result->has_errormsg(); }

bool TransmitData::fillRows(MYSQL_RES* mResult) {
lock_guard<mutex> const lock(_trMtx);
MYSQL_ROW row;

int const numFields = mysql_num_fields(mResult);
unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT,
proto::ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT);
while ((row = mysql_fetch_row(mResult))) {
auto lengths = mysql_fetch_lengths(mResult);
proto::RowBundle* rawRow = _result->add_row();
for (int i = 0; i < numFields; ++i) {
if (row[i]) {
rawRow->add_column(row[i], lengths[i]);
rawRow->add_isnull(false);
} else {
rawRow->add_column();
rawRow->add_isnull(true);
}
}
_tSize += rawRow->ByteSizeLong();
++_rowCount;

// Each element needs to be mysql-sanitized
// Break the loop if the result is too big so this part can be transmitted.
if (_tSize > szLimit) {
return false;
}
}
return true;
}

void TransmitData::prepareResponse(Task const& task, uint32_t rowcount, uint64_t transmitsize) {
lock_guard<mutex> const lock(_trMtx);
_rowCount = rowcount;
Expand All @@ -224,11 +193,6 @@ void TransmitData::prepareResponse(Task const& task, uint32_t rowcount, uint64_t
_buildDataMsg(lock, task, multiErr);
}

size_t TransmitData::getResultTransmitSize() const {
lock_guard<mutex> const lock(_trMtx);
return _tSize;
}

int TransmitData::getResultSize() const {
lock_guard<mutex> const lock(_trMtx);
return _dataMsg.size();
Expand Down
13 changes: 0 additions & 13 deletions src/wbase/TransmitData.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
#include <mutex>
#include <string>

// 3rd party headers
#include <mysql/mysql.h>

// Qserv headers
#include "proto/worker.pb.h"
#include "qmeta/types.h"
Expand Down Expand Up @@ -88,13 +85,6 @@ class TransmitData {
/// @return the protobuf string for the header.
std::string makeHeaderString(bool reallyLast);

/// Fill one row in the _result msg from one row in MYSQL_RES* 'mResult'
/// If the message has gotten larger than the desired message size,
/// return false.
/// @return false if there ARE MORE ROWS left in mResult.
/// true if there are no more rows remaining in mResult.
bool fillRows(MYSQL_RES* mResult);

/// Prepare the summary response by emptying the payload (rows) and setting
/// the counters.
/// @param task - the task responsible for the change
Expand All @@ -108,9 +98,6 @@ class TransmitData {
/// @return true if tData has an error message in _result.
bool hasErrormsg() const;

/// @return the size of the result (the 'transmitsize' of the result) in bytes.
size_t getResultTransmitSize() const;

/// @return the size of the result in bytes.
int getResultSize() const;

Expand Down

0 comments on commit 49f935f

Please sign in to comment.