Skip to content

Commit

Permalink
Eliminated unused class MsgReceiver
Browse files Browse the repository at this point in the history
Cleaned up MergingHandler and its depenancies of any mentioning
of the eliminated class.
  • Loading branch information
iagaponenko committed Jan 10, 2024
1 parent 0c72afc commit b1ad8be
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 101 deletions.
9 changes: 2 additions & 7 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "ccontrol/msgCode.h"
#include "global/clock_defs.h"
#include "global/debugUtil.h"
#include "global/MsgReceiver.h"
#include "http/Client.h"
#include "http/Method.h"
#include "proto/ProtoHeaderWrap.h"
Expand Down Expand Up @@ -368,12 +367,8 @@ namespace lsst::qserv::ccontrol {
////////////////////////////////////////////////////////////////////////
// MergingHandler public
////////////////////////////////////////////////////////////////////////
MergingHandler::MergingHandler(std::shared_ptr<MsgReceiver> msgReceiver,
std::shared_ptr<rproc::InfileMerger> merger, std::string const& tableName)
: _msgReceiver{msgReceiver},
_infileMerger{merger},
_tableName{tableName},
_response{new WorkerResponse()} {
MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std::string const& tableName)
: _infileMerger{merger}, _tableName{tableName}, _response{new WorkerResponse()} {
_initState();
}

Expand Down
17 changes: 6 additions & 11 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@
#include "qdisp/ResponseHandler.h"

// Forward decl
namespace lsst::qserv {
class MsgReceiver;
namespace proto {
namespace lsst::qserv::proto {
struct WorkerResponse;
}
namespace rproc {
} // namespace lsst::qserv::proto

namespace lsst::qserv::rproc {
class InfileMerger;
}
} // namespace lsst::qserv
} // namespace lsst::qserv::rproc

namespace lsst::qserv::ccontrol {

Expand All @@ -63,11 +61,9 @@ class MergingHandler : public qdisp::ResponseHandler {
typedef std::shared_ptr<MergingHandler> Ptr;
virtual ~MergingHandler();

/// @param msgReceiver Message code receiver
/// @param merger downstream merge acceptor
/// @param tableName target table for incoming data
MergingHandler(std::shared_ptr<MsgReceiver> msgReceiver, std::shared_ptr<rproc::InfileMerger> merger,
std::string const& tableName);
MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std::string const& tableName);

/// Flush the retrieved buffer where bLen bytes were set. If last==true,
/// then no more buffer() and flush() calls should occur.
Expand Down Expand Up @@ -101,7 +97,6 @@ class MergingHandler : public qdisp::ResponseHandler {
bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer.
bool _noErrorsInResult(); ///< Check if the result message has no errors, report the ones (if any).

std::shared_ptr<MsgReceiver> _msgReceiver; ///< Message code receiver
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
std::string _tableName; ///< Target table name
Error _error; ///< Error description
Expand Down
23 changes: 1 addition & 22 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
#include "ccontrol/UserQueryError.h"
#include "global/constants.h"
#include "global/LogContext.h"
#include "global/MsgReceiver.h"
#include "proto/worker.pb.h"
#include "proto/ProtoImporter.h"
#include "qdisp/Executive.h"
Expand Down Expand Up @@ -122,25 +121,6 @@ class ProtoPrinter {
virtual ~ProtoPrinter() {}
};

/// Factory to create chunkid-specific MsgReceiver objs linked to the right
/// messagestore
class ChunkMsgReceiver : public MsgReceiver {
public:
virtual void operator()(int code, std::string const& msg) {
messageStore->addMessage(chunkId, "CHUNK", code, msg);
}
static std::shared_ptr<ChunkMsgReceiver> newInstance(int chunkId,
std::shared_ptr<qdisp::MessageStore> ms) {
std::shared_ptr<ChunkMsgReceiver> r = std::make_shared<ChunkMsgReceiver>();
r->chunkId = chunkId;
r->messageStore = ms;
return r;
}

int chunkId;
std::shared_ptr<qdisp::MessageStore> messageStore;
};

////////////////////////////////////////////////////////////////////////
// UserQuerySelect implementation
namespace ccontrol {
Expand Down Expand Up @@ -309,12 +289,11 @@ void UserQuerySelect::submit() {
}
std::string chunkResultName = ttn.make(cs->chunkId);

std::shared_ptr<ChunkMsgReceiver> cmr = ChunkMsgReceiver::newInstance(cs->chunkId, _messageStore);
ResourceUnit ru;
ru.setAsDbChunk(cs->db, cs->chunkId);
qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create(
_qMetaCzarId, _executive->getId(), sequence, ru,
std::make_shared<MergingHandler>(cmr, _infileMerger, chunkResultName), taskMsgFactory, cs,
std::make_shared<MergingHandler>(_infileMerger, chunkResultName), taskMsgFactory, cs,
chunkResultName);
_executive->add(jobDesc);
};
Expand Down
45 changes: 0 additions & 45 deletions src/global/MsgReceiver.h

This file was deleted.

17 changes: 1 addition & 16 deletions src/qdisp/testQDisp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
// Qserv headers
#include "ccontrol/MergingHandler.h"
#include "global/ResourceUnit.h"
#include "global/MsgReceiver.h"
#include "qdisp/Executive.h"
#include "qdisp/JobQuery.h"
#include "qdisp/MessageStore.h"
Expand All @@ -60,19 +59,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.qdisp.testQDisp");
typedef util::Sequential<int> SequentialInt;
typedef std::vector<qdisp::ResponseHandler::Ptr> RequesterVector;

class ChunkMsgReceiverMock : public MsgReceiver {
public:
virtual void operator()(int code, std::string const& msg) {
LOGS_DEBUG("Mock::operator() chunkId=" << _chunkId << ", code=" << code << ", msg=" << msg);
}
static std::shared_ptr<ChunkMsgReceiverMock> newInstance(int chunkId) {
std::shared_ptr<ChunkMsgReceiverMock> r = std::make_shared<ChunkMsgReceiverMock>();
r->_chunkId = chunkId;
return r;
}
int _chunkId;
};

namespace lsst::qserv::qproc {

// Normally, there's one TaskMsgFactory that all jobs in a user query share.
Expand Down Expand Up @@ -131,9 +117,8 @@ std::shared_ptr<qdisp::JobQuery> executiveTest(qdisp::Executive::Ptr const& ex,
ResourceUnit ru;
std::string chunkResultName = "mock";
std::shared_ptr<rproc::InfileMerger> infileMerger;
std::shared_ptr<ChunkMsgReceiverMock> cmr = ChunkMsgReceiverMock::newInstance(chunkId);
ccontrol::MergingHandler::Ptr mh =
std::make_shared<ccontrol::MergingHandler>(cmr, infileMerger, chunkResultName);
std::make_shared<ccontrol::MergingHandler>(infileMerger, chunkResultName);
RequesterVector rv;
for (int j = 0; j < copies; ++j) {
rv.push_back(mh);
Expand Down

0 comments on commit b1ad8be

Please sign in to comment.