Skip to content

Commit

Permalink
Removed unneccesary data member from the result merger
Browse files Browse the repository at this point in the history
The older implementation of the class MergingHandler had a data
member that had no pracical use.
  • Loading branch information
iagaponenko committed Jan 24, 2024
1 parent 3253f35 commit 0928919
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 15 deletions.
8 changes: 3 additions & 5 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,15 +388,13 @@ MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); }
bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& resultRows) {
_wName = responseSummary.wname();

int const jobId = responseSummary.jobid();
_jobIds.insert(jobId);

// Why this is needed to ensure the job query would be staying alive for the duration
// of the operation?
auto const jobQuery = getJobQuery().lock();

LOGS(_log, LOG_LVL_TRACE,
"MergingHandler::" << __func__ << " transmitsize=" << responseSummary.transmitsize()
"MergingHandler::" << __func__ << " jobid=" << responseSummary.jobid()
<< " transmitsize=" << responseSummary.transmitsize()
<< " rowcount=" << responseSummary.rowcount() << " rowSize="
<< " attemptcount=" << responseSummary.attemptcount() << " errorcode="
<< responseSummary.errorcode() << " errormsg=" << responseSummary.errormsg());
Expand Down Expand Up @@ -437,7 +435,7 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, int& r
throw util::Bug(ERR_LOC, err);
}
if (success) {
_infileMerger->mergeCompleteFor(_jobIds);
_infileMerger->mergeCompleteFor(responseSummary.jobid());
}
return success;
}
Expand Down
4 changes: 0 additions & 4 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <atomic>
#include <memory>
#include <mutex>
#include <set>

// Qserv headers
#include "qdisp/ResponseHandler.h"
Expand Down Expand Up @@ -111,9 +110,6 @@ class MergingHandler : public qdisp::ResponseHandler {
mutable std::mutex _errorMutex; ///< Protect readers from partial updates
bool _flushed{false}; ///< flushed to InfileMerger?
std::string _wName{"~"}; ///< worker name
/// Set of jobIds added in this request. Using std::set to prevent duplicates when the same
/// jobId has multiple merge calls.
std::set<int> _jobIds;
};

} // namespace lsst::qserv::ccontrol
Expand Down
6 changes: 2 additions & 4 deletions src/rproc/InfileMerger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,9 @@ void InfileMerger::_setQueryIdStr(std::string const& qIdStr) {
_queryIdStrSet = true;
}

void InfileMerger::mergeCompleteFor(std::set<int> const& jobIds) {
void InfileMerger::mergeCompleteFor(int jobId) {
std::lock_guard<std::mutex> resultSzLock(_mtxResultSizeMtx);
for (int jobId : jobIds) {
_totalResultSize += _perJobResultSize[jobId];
}
_totalResultSize += _perJobResultSize[jobId];
}

bool InfileMerger::merge(proto::ResponseSummary const& responseSummary,
Expand Down
4 changes: 2 additions & 2 deletions src/rproc/InfileMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ class InfileMerger {
/// @return true if merge was successfully imported.
bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData);

/// Indicate the the merge for all of the jobs in jobIds is complete.
void mergeCompleteFor(std::set<int> const& jobIds);
/// Indicate the merge for the job is complete.
void mergeCompleteFor(int jobId);

/// @return error details if finalize() returns false
util::Error const& getError() const { return _error; }
Expand Down

0 comments on commit 0928919

Please sign in to comment.