Skip to content

Commit

Permalink
Refactored and streamlined implementation of FileChannelShared
Browse files Browse the repository at this point in the history
The new implementation no longer depends on a separate data processing
class TransmitData for extracting rows from the result sets and
generating the Protobuf-serialized output stream of data.

Eliminated unused methods from FileChannelShared.
  • Loading branch information
iagaponenko committed Jan 10, 2024
1 parent 940fcf8 commit 0c72afc
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 854 deletions.
1 change: 0 additions & 1 deletion src/wbase/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ target_sources(wbase PRIVATE
FileChannelShared.cc
SendChannel.cc
Task.cc
TransmitData.cc
UserQueryInfo.cc
WorkerCommand.cc
)
Expand Down
383 changes: 149 additions & 234 deletions src/wbase/FileChannelShared.cc

Large diffs are not rendered by default.

145 changes: 39 additions & 106 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@

// System headers
#include <atomic>
#include <condition_variable>
#include <fstream>
#include <memory>
#include <mutex>
#include <queue>
#include <vector>

// Third-party headers
Expand All @@ -43,23 +41,17 @@
// Forward declarations

namespace lsst::qserv::proto {
class TaskMsg;
class Result;
} // namespace lsst::qserv::proto

namespace lsst::qserv::wbase {
class Task;
class TransmitData;
} // namespace lsst::qserv::wbase

namespace lsst::qserv::util {
class InstanceCount;
class MultiError;
} // namespace lsst::qserv::util

namespace lsst::qserv::xrdsvc {
class StreamBuffer;
} // namespace lsst::qserv::xrdsvc

namespace lsst::qserv::wbase {

/// The class is responsible for writing mysql result rows as Protobuf
Expand All @@ -72,7 +64,7 @@ namespace lsst::qserv::wbase {
///
/// When building messages for result rows, multiple tasks may add to the
/// the output file before it gets closed and a reply is transmitted to the czar.
/// All the tasks adding rows to the TransmitData object must be operating on
/// All the tasks adding rows to the file must be operating on
/// the same chunk. This only happens for near-neighbor queries, which
/// have one task per subchunk.
class FileChannelShared {
Expand Down Expand Up @@ -116,8 +108,8 @@ class FileChannelShared {
static nlohmann::json filesToJson(std::vector<QueryId> const& queryIds, unsigned int maxFiles);

/// The factory method for the channel class.
static Ptr create(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<proto::TaskMsg> const& taskMsg);
static Ptr create(std::shared_ptr<wbase::SendChannel> const& sendChannel, qmeta::CzarId czarId,
std::string const& workerId = std::string());

FileChannelShared() = delete;
FileChannelShared(FileChannelShared const&) = delete;
Expand Down Expand Up @@ -153,103 +145,27 @@ class FileChannelShared {
/// @return a transmit data object indicating the errors in 'multiErr'.
bool buildAndTransmitError(util::MultiError& multiErr, std::shared_ptr<Task> const& task, bool cancelled);

/// Put the SQL results in a TransmitData object and transmit it to the czar
/// if appropriate.
/// Extract the SQL results and write them into the file and notify Czar after the last
/// row of the result result set depending on theis channel has been processed.
/// @return true if there was an error.
bool buildAndTransmitResult(MYSQL_RES* mResult, std::shared_ptr<Task> const& task,
util::MultiError& multiErr, std::atomic<bool>& cancelled);

/// Wrappers for wbase::SendChannel public functions that may need to be used
/// by threads.
/// @see wbase::SendChannel::send
bool send(char const* buf, int bufLen);

/// @see wbase::SendChannel::sendError
bool sendError(std::string const& msg, int code);

/// @see wbase::SendChannel::sendFile
bool sendFile(int fd, wbase::SendChannel::Size fSize);

/// @see wbase::SendChannel::sendStream
bool sendStream(std::shared_ptr<xrdsvc::StreamBuffer> const& sBuf, bool last);

/// @see wbase::SendChannel::kill
bool kill(std::string const& note);

/// @see wbase::SendChannel::isDead
bool isDead();

/// @return a log worthy string describing transmitData.
std::string dumpTransmit() const;

private:
/// Private constructor to protect shared pointer integrity.
FileChannelShared(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<proto::TaskMsg> const& taskMsg);

std::shared_ptr<wbase::SendChannel> const sendChannel() const { return _sendChannel; }

/// Dumps transmitData into a string within the thread-safe context.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
std::string dumpTransmit(std::lock_guard<std::mutex> const& tMtxLock) const;

/// @return a pointer to a new TransmitData> object.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
std::shared_ptr<TransmitData> createTransmit(std::lock_guard<std::mutex> const& tMtxLock, Task& task);

/// Create a new transmitData object if needed.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
void initTransmit(std::lock_guard<std::mutex> const& tMtxLock, Task& task);

/// Prepare the transmit data and then call addTransmit.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
bool prepTransmit(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
bool cancelled, bool lastIn);

/// Try to transmit the data in tData.
/// If the queue already has at least 2 TransmitData objects, addTransmit
/// may wait before returning. Result rows are read from the
/// database until there are no more rows or the buffer is
/// sufficiently full. addTransmit waits until that buffer has been
/// sent to the czar before reading more rows. Without the wait,
/// the worker may read in too many result rows, run out of memory,
/// and crash.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
/// @return true if transmit was added successfully.
/// @see _transmit code for further explanation.
bool addTransmit(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
bool cancelled, bool erred, bool lastIn, std::shared_ptr<TransmitData> const& tData,
int qId, int jId);
FileChannelShared(std::shared_ptr<wbase::SendChannel> const& sendChannel, qmeta::CzarId czarId,
std::string const& workerId);

/// @see wbase::SendChannel::kill
/// @param streamMutexLock - Lock on mutex _streamMutex to be acquired before calling the method.
bool _kill(std::lock_guard<std::mutex> const& streamMutexLock, std::string const& note);

/// Encode TransmitData items from _transmitQueue and pass them to XrdSsi
/// to be sent to the czar.
/// The header for the 'nextTransmit' item is appended to the result of
/// 'thisTransmit', with a specially constructed header appended for the
/// 'reallyLast' transmit.
/// The specially constructed header for the 'reallyLast' transmit just
/// says that there's no more data, this wbase::SendChannel is done.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
/// @param queueMtxLock - Lock on mutex _queueMtx to be acquired before calling the method.
bool _transmit(std::lock_guard<std::mutex> const& tMtxLock,
std::lock_guard<std::mutex> const& queueMtxLock, bool erred,
std::shared_ptr<Task> const& task);

/// Send the buffer 'streamBuffer' using xrdssi.
/// 'last' should only be true if this is the last buffer to be sent with this _sendChannel.
/// 'note' is just a log note about what/who is sending the buffer.
/// @param tMtxLock - Lock on mutex tMtx to be acquired before calling the method.
/// @param queueMtxLock - Lock on mutex _queueMtx to be acquired before calling the method.
/// @param streamMutexLock - Lock on mutex _streamMutex to be acquired before calling the method.
/// @return true if the buffer was sent.
bool _sendBuf(std::lock_guard<std::mutex> const& tMtxLock,
std::lock_guard<std::mutex> const& queueMtxLock,
std::lock_guard<std::mutex> const& streamMutexLock,
std::shared_ptr<xrdsvc::StreamBuffer>& streamBuf, bool last, std::string const& note);

/**
* Transfer rows of the result set into into the output file.
* @note The file will be created at the first call to the method.
Expand All @@ -258,7 +174,7 @@ class FileChannelShared {
* implementation. Also, the iterative approach to the data extraction allows
* the driving code to be interrupted should the correponding query be cancelled
* during the lengthy data processing phase.
* @param tMtxLock - a lock on the base class's mutex tMtx
* @param tMtxLock - a lock on the mutex tMtx
* @param task - a task that produced the result set
* @param mResult - MySQL result to be used as a source
* @param bytes - the number of bytes in the result message recorded into the file
Expand All @@ -271,6 +187,18 @@ class FileChannelShared {
bool _writeToFile(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr);

/**
* Extract as many rows as allowed by the Google Protobuf implementation from
* from the input result set into the output result object.
* @param tMtxLock - a lock on the mutex tMtx
* @param mResult - MySQL result to be used as a source
* @param result - the output result object to be filled in
* @param rows - the number of rows extracted from the result set
* @param tSize - the approximate amount of data extracted from the result set
* @return 'true' if there are more rows left in the result set.
*/
bool _fillRows(std::lock_guard<std::mutex> const& tMtxLock, MYSQL_RES* mResult, proto::Result& result,
int& rows, size_t& tSize);
/**
* Unconditionaly close and remove (potentially - the partially written) file.
* This method gets called in case of any failure detected while processing
Expand All @@ -279,29 +207,36 @@ class FileChannelShared {
* upon special requests made explicitly by Czar after uploading and consuming
* result sets. Unclaimed files that might be still remaining at the results
* folder would need to be garbage collected at the startup time of the worker.
* @param tMtxLock - a lock on the base class's mutex tMtx
* @param tMtxLock - a lock on the mutex tMtx
*/
void _removeFile(std::lock_guard<std::mutex> const& tMtxLock);

std::shared_ptr<TransmitData> transmitData; ///< TransmitData object
mutable std::mutex tMtx; ///< protects transmitData
/**
* Send the summary message to Czar upon complation or failure of a request.
* @param tMtxLock - a lock on the mutex tMtx
* @param task - a task that produced the result set
* @param cancelled - request cancellaton flag (if any)
* @param multiErr - a collector of any errors that were captured during result set processing
* @return 'true' if the operation was successfull
*/
bool _sendResponse(std::lock_guard<std::mutex> const& tMtxLock, std::shared_ptr<Task> const& task,
bool cancelled, util::MultiError const& multiErr);

mutable std::mutex _tMtx; ///< Protects data recording and Czar notification

std::shared_ptr<wbase::SendChannel> const _sendChannel; ///< Used to pass encoded information to XrdSsi.
std::string const _workerId; ///< The unique identifier of the worker.

/// streamMutex is used to protect _lastCount and messages that are sent
/// using FileChannelShared.
std::mutex _streamMutex;

std::queue<std::shared_ptr<TransmitData>> _transmitQueue; ///< Queue of data to be encoded and sent.
std::mutex _queueMtx; ///< protects _transmitQueue

/// metadata buffer. Once set, it cannot change until after Finish() has been called.
// Metadata and response buffers should persist for the lifetime of the stream.
std::string _metadataBuf;
std::string _responseBuf;

int _taskCount = 0; ///< The number of tasks to be sent over this wbase::SendChannel.
int _lastCount = 0; ///< The number of 'last' buffers received.
std::atomic<bool> _lastRecvd{false}; ///< The truly 'last' transmit message is in the queue.
std::atomic<bool> _firstTransmit{true}; ///< True until the first transmit has been sent.
int _taskCount = 0; ///< The number of tasks to be sent over this wbase::SendChannel.
int _lastCount = 0; ///< The number of 'last' buffers received.

qmeta::CzarId const _czarId; ///< id of the czar that requested this task(s).
uint64_t const _scsId; ///< id number for this FileChannelShared
Expand All @@ -314,8 +249,6 @@ class FileChannelShared {
/// true until getFirstChannelSqlConn() is called.
std::atomic<bool> _firstChannelSqlConn{true};

std::shared_ptr<util::InstanceCount> _icPtr; ///< temporary for LockupDB

/// The mutex is locked by the following static methods which require exclusive
/// access to the results folder: create(), cleanUpResultsOnCzarRestart(),
/// cleanUpResultsOnWorkerRestart(), and cleanUpResults().
Expand Down
37 changes: 0 additions & 37 deletions src/wbase/SendChannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ class NopChannel : public SendChannel {
cout << "NopChannel sendError(\"" << msg << "\", " << code << ");\n";
return true;
}
bool sendFile(int fd, Size fSize) override {
cout << "NopChannel sendFile(" << fd << ", " << fSize << ");\n";
return !isDead();
}
bool sendStream(xrdsvc::StreamBuffer::Ptr const& sBuf, bool last) override {
cout << "NopChannel sendStream(" << (void*)sBuf.get() << ", " << (last ? "true" : "false") << ");\n";
return !isDead();
Expand Down Expand Up @@ -99,30 +95,6 @@ class StringChannel : public SendChannel {
return true;
}

bool sendFile(int fd, Size fSize) override {
if (isDead()) return false;
vector<char> buf(fSize);
Size remain = fSize;
while (remain > 0) {
Size frag = ::read(fd, buf.data(), remain);
if (frag < 0) {
cout << "ERROR reading from fd during "
<< "StringChannel::sendFile("
<< "," << fSize << ")";
return false;
} else if (frag == 0) {
cout << "ERROR unexpected 0==read() during "
<< "StringChannel::sendFile("
<< "," << fSize << ")";
return false;
}
_dest.append(buf.data(), frag);
remain -= frag;
}
release();
return true;
}

bool sendStream(xrdsvc::StreamBuffer::Ptr const& sBuf, bool last) override {
if (isDead()) return false;
char const* buf = sBuf->data;
Expand Down Expand Up @@ -157,15 +129,6 @@ bool SendChannel::sendError(string const& msg, int code) {
return false;
}

bool SendChannel::sendFile(int fd, Size fSize) {
if (!isDead()) {
if (_ssiRequest->replyFile(fSize, fd)) return true;
}
kill("SendChannel::sendFile");
release();
return false;
}

bool SendChannel::kill(std::string const& note) {
bool oldVal = _dead.exchange(true);
if (!oldVal && !_destroying) {
Expand Down
5 changes: 1 addition & 4 deletions src/wbase/SendChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ class SendChannel {
virtual bool send(char const* buf, int bufLen);
virtual bool sendError(std::string const& msg, int code);

/// Send the bytes from a POSIX file handle
virtual bool sendFile(int fd, Size fSize);

/// Send a bucket of bytes.
/// @param last true if no more sendStream calls will be invoked.
virtual bool sendStream(xrdsvc::StreamBuffer::Ptr const& sBuf, bool last);
Expand All @@ -69,7 +66,7 @@ class SendChannel {
/// ******************************************************************

/// Set a function to be called when a resources from a deferred send*
/// operation may be released. This allows a sendFile() caller to be
/// operation may be released. This allows a caller to be
/// notified when the file descriptor may be closed and perhaps reclaimed.
void setReleaseFunc(std::function<void(void)> const& r) { _release = r; }
void release() { _release(); }
Expand Down
Loading

0 comments on commit 0c72afc

Please sign in to comment.