diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 86087d6b3..c6675f1e8 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -40,7 +40,6 @@ #include "ccontrol/msgCode.h" #include "global/clock_defs.h" #include "global/debugUtil.h" -#include "global/MsgReceiver.h" #include "http/Client.h" #include "http/Method.h" #include "proto/ProtoHeaderWrap.h" @@ -368,12 +367,8 @@ namespace lsst::qserv::ccontrol { //////////////////////////////////////////////////////////////////////// // MergingHandler public //////////////////////////////////////////////////////////////////////// -MergingHandler::MergingHandler(std::shared_ptr msgReceiver, - std::shared_ptr merger, std::string const& tableName) - : _msgReceiver{msgReceiver}, - _infileMerger{merger}, - _tableName{tableName}, - _response{new WorkerResponse()} { +MergingHandler::MergingHandler(std::shared_ptr merger, std::string const& tableName) + : _infileMerger{merger}, _tableName{tableName}, _response{new WorkerResponse()} { _initState(); } diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index 7f39cc326..ec94503ae 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -33,15 +33,13 @@ #include "qdisp/ResponseHandler.h" // Forward decl -namespace lsst::qserv { -class MsgReceiver; -namespace proto { +namespace lsst::qserv::proto { struct WorkerResponse; -} -namespace rproc { +} // namespace lsst::qserv::proto + +namespace lsst::qserv::rproc { class InfileMerger; -} -} // namespace lsst::qserv +} // namespace lsst::qserv::rproc namespace lsst::qserv::ccontrol { @@ -63,11 +61,9 @@ class MergingHandler : public qdisp::ResponseHandler { typedef std::shared_ptr Ptr; virtual ~MergingHandler(); - /// @param msgReceiver Message code receiver /// @param merger downstream merge acceptor /// @param tableName target table for incoming data - MergingHandler(std::shared_ptr msgReceiver, std::shared_ptr merger, - std::string const& tableName); + MergingHandler(std::shared_ptr merger, std::string const& tableName); /// Flush the retrieved buffer where bLen bytes were set. If last==true, /// then no more buffer() and flush() calls should occur. @@ -101,7 +97,6 @@ class MergingHandler : public qdisp::ResponseHandler { bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer. bool _noErrorsInResult(); ///< Check if the result message has no errors, report the ones (if any). - std::shared_ptr _msgReceiver; ///< Message code receiver std::shared_ptr _infileMerger; ///< Merging delegate std::string _tableName; ///< Target table name Error _error; ///< Error description diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 2bd03ac14..586a3dc31 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -82,7 +82,6 @@ #include "ccontrol/UserQueryError.h" #include "global/constants.h" #include "global/LogContext.h" -#include "global/MsgReceiver.h" #include "proto/worker.pb.h" #include "proto/ProtoImporter.h" #include "qdisp/Executive.h" @@ -122,25 +121,6 @@ class ProtoPrinter { virtual ~ProtoPrinter() {} }; -/// Factory to create chunkid-specific MsgReceiver objs linked to the right -/// messagestore -class ChunkMsgReceiver : public MsgReceiver { -public: - virtual void operator()(int code, std::string const& msg) { - messageStore->addMessage(chunkId, "CHUNK", code, msg); - } - static std::shared_ptr newInstance(int chunkId, - std::shared_ptr ms) { - std::shared_ptr r = std::make_shared(); - r->chunkId = chunkId; - r->messageStore = ms; - return r; - } - - int chunkId; - std::shared_ptr messageStore; -}; - //////////////////////////////////////////////////////////////////////// // UserQuerySelect implementation namespace ccontrol { @@ -309,12 +289,11 @@ void UserQuerySelect::submit() { } std::string chunkResultName = ttn.make(cs->chunkId); - std::shared_ptr cmr = ChunkMsgReceiver::newInstance(cs->chunkId, _messageStore); ResourceUnit ru; ru.setAsDbChunk(cs->db, cs->chunkId); qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create( _qMetaCzarId, _executive->getId(), sequence, ru, - std::make_shared(cmr, _infileMerger, chunkResultName), taskMsgFactory, cs, + std::make_shared(_infileMerger, chunkResultName), taskMsgFactory, cs, chunkResultName); _executive->add(jobDesc); }; diff --git a/src/global/MsgReceiver.h b/src/global/MsgReceiver.h deleted file mode 100644 index 9a0a97198..000000000 --- a/src/global/MsgReceiver.h +++ /dev/null @@ -1,45 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2014 LSST Corporation. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ -#ifndef LSST_QSERV_MSGRECEIVER_H -#define LSST_QSERV_MSGRECEIVER_H -/** - * @author Daniel L. Wang, SLAC - */ - -// System headers -#include - -namespace lsst::qserv { - -/// MsgReceiver : a functor for receiving simple messages. Used to encapsulate -/// the most basic error reporting so that downstream objects can report errors -/// without directly depending on a logging or error management facility. -class MsgReceiver { -public: - virtual ~MsgReceiver() {} - virtual void operator()(int code, std::string const& msg) = 0; -}; - -} // namespace lsst::qserv - -#endif // LSST_QSERV_MSGRECEIVER_H diff --git a/src/qdisp/testQDisp.cc b/src/qdisp/testQDisp.cc index b37ce67bd..5e049e43c 100644 --- a/src/qdisp/testQDisp.cc +++ b/src/qdisp/testQDisp.cc @@ -38,7 +38,6 @@ // Qserv headers #include "ccontrol/MergingHandler.h" #include "global/ResourceUnit.h" -#include "global/MsgReceiver.h" #include "qdisp/Executive.h" #include "qdisp/JobQuery.h" #include "qdisp/MessageStore.h" @@ -60,19 +59,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.qdisp.testQDisp"); typedef util::Sequential SequentialInt; typedef std::vector RequesterVector; -class ChunkMsgReceiverMock : public MsgReceiver { -public: - virtual void operator()(int code, std::string const& msg) { - LOGS_DEBUG("Mock::operator() chunkId=" << _chunkId << ", code=" << code << ", msg=" << msg); - } - static std::shared_ptr newInstance(int chunkId) { - std::shared_ptr r = std::make_shared(); - r->_chunkId = chunkId; - return r; - } - int _chunkId; -}; - namespace lsst::qserv::qproc { // Normally, there's one TaskMsgFactory that all jobs in a user query share. @@ -131,9 +117,8 @@ std::shared_ptr executiveTest(qdisp::Executive::Ptr const& ex, ResourceUnit ru; std::string chunkResultName = "mock"; std::shared_ptr infileMerger; - std::shared_ptr cmr = ChunkMsgReceiverMock::newInstance(chunkId); ccontrol::MergingHandler::Ptr mh = - std::make_shared(cmr, infileMerger, chunkResultName); + std::make_shared(infileMerger, chunkResultName); RequesterVector rv; for (int j = 0; j < copies; ++j) { rv.push_back(mh);