From 1979259617ad311a52d16b21f388f8d2f347c2c8 Mon Sep 17 00:00:00 2001 From: John Gates Date: Thu, 6 Feb 2025 11:28:25 -0800 Subject: [PATCH] Cleanup --- src/wbase/FileChannelShared.cc | 2 +- src/wbase/Task.cc | 10 +++++-- src/wbase/UberJobData.cc | 9 ++++--- src/wbase/UberJobData.h | 3 ++- src/wdb/QueryRunner.cc | 9 ------- src/wsched/BlendScheduler.cc | 43 ++++++++++++------------------ src/xrdsvc/HttpWorkerCzarModule.cc | 39 +++++++++++++++------------ src/xrdsvc/HttpWorkerCzarModule.h | 3 ++- 8 files changed, 57 insertions(+), 61 deletions(-) diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 599691b2a..fffa36af6 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -369,7 +369,7 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared } // Delete the result file as nobody will come looking for it. _kill(tMtxLock, " buildAndTransmitError"); - return _uberJobData->responseError(multiErr, task, cancelled); + return _uberJobData->responseError(multiErr, task->getChunkId(), cancelled); } bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const& task, diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index ba3973893..5c2629e37 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -182,9 +182,7 @@ std::vector Task::createTasksFromUberJobMsg( UberJobId ujId = ujData->getUberJobId(); CzarIdType czId = ujData->getCzarId(); - auto startBuildTasks = CLOCK::now(); vector vect; // List of created tasks to be returned. - wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId, czId); UserQueryInfo::Ptr userQueryInfo = queryStats->getUserQueryInfo(); @@ -387,16 +385,24 @@ void Task::action(util::CmdData* data) { // Get a local copy for safety. auto qr = _taskQueryRunner; bool success = false; + string errStr; try { success = qr->runQuery(); } catch (UnsupportedError const& e) { LOGS(_log, LOG_LVL_ERROR, __func__ << " runQuery threw UnsupportedError " << e.what() << tIdStr); + errStr = e.what(); } if (not success) { LOGS(_log, LOG_LVL_ERROR, "runQuery failed " << tIdStr); if (not getSendChannel()->kill("Foreman::_setRunFunc")) { LOGS(_log, LOG_LVL_WARN, "runQuery sendChannel already killed " << tIdStr); } + // Send a message back saying this UberJobFailed, redundant error messages should be + // harmless. + util::MultiError multiErr; + util::Error err(_chunkId, string("UberJob run error ") + errStr); + multiErr.push_back(err); + _ujData->responseError(multiErr, -1, false); } // The QueryRunner class access to sendChannel for results is over by this point. diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index 7a59f4dcc..49656928a 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -118,8 +118,10 @@ void UberJobData::responseFileReady(string const& httpFileUrl, uint64_t rowCount _queueUJResponse(method, headers, url, requestContext, requestStr); } -bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr const& task, - bool cancelled) { +bool UberJobData::responseError(util::MultiError& multiErr, int chunkId, bool cancelled) { + // TODO:UJ Maybe register this UberJob as failed with a czar notification method + // so that a secondary means can be used to make certain the czar hears about + // the error. LOGS(_log, LOG_LVL_INFO, cName(__func__)); string errorMsg; int errorCode = 0; @@ -131,8 +133,7 @@ bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptrgetChunkId()) + - ": " + errorMsg; + errorMsg = cName(__func__) + " error(s) in result for chunk #" + to_string(chunkId) + ": " + errorMsg; LOGS(_log, LOG_LVL_ERROR, errorMsg); } diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index 0ab60410d..2f024ee25 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -96,7 +96,8 @@ class UberJobData : public std::enable_shared_from_this { uint64_t headerCount); // TODO:UJ remove headerCount /// Let the Czar know there's been a problem. - bool responseError(util::MultiError& multiErr, std::shared_ptr const& task, bool cancelled); + //&&&bool responseError(util::MultiError& multiErr, std::shared_ptr const& task, bool cancelled); + bool responseError(util::MultiError& multiErr, int chunkId, bool cancelled); std::string const& getIdStr() const { return _idStr; } std::string cName(std::string const& funcName) { return "UberJobData::" + funcName + " " + getIdStr(); } diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index a58f31c1b..a0f9277cb 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -132,7 +132,6 @@ void QueryRunner::_setDb() { } util::TimerHistogram memWaitHisto("initConnection Hist", {1, 5, 10, 20, 40}); //&&& -std::atomic memWaitLimiter = 0; bool QueryRunner::runQuery() { util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId())); @@ -180,15 +179,7 @@ bool QueryRunner::runQuery() { bool interactive = _task->getScanInteractive() && !(_task->getSendChannel()->getTaskCount() > 1); wcontrol::SqlConnLock sqlConnLock(*_sqlConnMgr, not interactive, _task->getSendChannel()); - util::Timer memTimer; - memTimer.start(); bool connOk = _initConnection(); - memTimer.stop(); - memWaitHisto.addTime(memTimer.getElapsed()); - if (memWaitLimiter++ % 100 == 0) { - LOGS(_log, LOG_LVL_INFO, "&&& initConnection " << memWaitHisto.getString()); - } - if (!connOk) { // Since there's an error, this will be the last transmit from this QueryRunner. if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) { diff --git a/src/wsched/BlendScheduler.cc b/src/wsched/BlendScheduler.cc index 721c8005f..b0dd45f1c 100644 --- a/src/wsched/BlendScheduler.cc +++ b/src/wsched/BlendScheduler.cc @@ -167,12 +167,11 @@ void BlendScheduler::queCmd(std::vector const& cmds) { throw util::Bug(ERR_LOC, "BlendScheduler::queCmd cmds.size() > 1 when no task was set."); } { - //&&&util::LockGuardTimed guardA(util::CommandQueue::_mx, "BlendScheduler::queCmd a"); lock_guard guardA(util::CommandQueue::_mx); _ctrlCmdQueue.queCmd(cmd); } - notify(false); //&&& notify(true); // notify all=true + notify(true); // notify all=true continue; } @@ -180,7 +179,6 @@ void BlendScheduler::queCmd(std::vector const& cmds) { QSERV_LOGCONTEXT_QUERY_JOB(task->getQueryId(), task->getJobId()); } - //&&&util::LockGuardTimed guardB(util::CommandQueue::_mx, "BlendScheduler::queCmd b"); lock_guard guardB(util::CommandQueue::_mx); // Check for scan tables. The information for all tasks should be the same // as they all belong to the same query, so only examine the first task. @@ -250,7 +248,7 @@ void BlendScheduler::queCmd(std::vector const& cmds) { queryStats->tasksAddedToScheduler(targSched, taskCmds.size()); } _infoChanged = true; - notify(false); //&&¬ify(true); // notify all=true + notify(true); // notify all=true } } @@ -294,7 +292,7 @@ void BlendScheduler::commandFinish(util::Command::Ptr const& cmd) { if (LOG_CHECK_LVL(_log, LOG_LVL_TRACE)) { _logChunkStatus(); } - notify(false); //&&¬ify(true); // notify all=true + notify(false); // notify one } bool BlendScheduler::ready() { @@ -359,32 +357,24 @@ bool BlendScheduler::_ready() { atomic logChunkLimiter = 0; util::Command::Ptr BlendScheduler::getCmd(bool wait) { - util::Timer timeToLock; - util::Timer timeHeld; + //&&&util::Timer timeToLock; + //&&&util::Timer timeHeld; util::Command::Ptr cmd; - double totalTimeHeld = 0.0; + //&&&double totalTimeHeld = 0.0; bool ready = false; { - timeToLock.start(); + //&&&timeToLock.start(); unique_lock lock(util::CommandQueue::_mx); - timeToLock.stop(); + //&&&timeToLock.stop(); if (wait) { util::CommandQueue::_cv.wait(lock, [this]() { return _ready(); }); - /* &&& - while (!_ready()) { - timeHeld.stop(); - totalTimeHeld += timeHeld.getElapsed(); - util::CommandQueue::_cv.wait(lock); - timeHeld.start(); - } - */ ready = true; } else { ready = _ready(); } - timeHeld.start(); + //&&&timeHeld.start(); if (ready && _taskLoadQueue.size() > 0) { cmd = _taskLoadQueue.front(); @@ -411,12 +401,13 @@ util::Command::Ptr BlendScheduler::getCmd(bool wait) { _readySched.reset(); } } + if (cmd == nullptr) { // The scheduler didn't have anything, see if there's anything on the control queue, // which could change the size of the pool. cmd = _ctrlCmdQueue.getCmd(); } - //&&& } + if (cmd != nullptr) { _infoChanged = true; if (LOG_CHECK_LVL(_log, LOG_LVL_TRACE) || (logChunkLimiter++ % 100 == 0)) { @@ -425,12 +416,12 @@ util::Command::Ptr BlendScheduler::getCmd(bool wait) { notify(false); // notify all=false } // returning nullptr is acceptable. - timeHeld.stop(); - totalTimeHeld += timeHeld.getElapsed(); - LOGS(_log, LOG_LVL_TRACE, - "lockTime BlendScheduler::getCmd ready toLock=" << timeToLock.getElapsed() - << " held=" << timeHeld.getElapsed() - << " totalHeld=" << totalTimeHeld); + //&&&timeHeld.stop(); + //&&&totalTimeHeld += timeHeld.getElapsed(); + //&&&LOGS(_log, LOG_LVL_TRACE, + //&&& "lockTime BlendScheduler::getCmd ready toLock=" << timeToLock.getElapsed() + //&&& << " held=" << timeHeld.getElapsed() + //&&& << " totalHeld=" << totalTimeHeld); return cmd; } diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index 12b4b0ead..0115bcf8e 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -41,6 +41,8 @@ #include "protojson/WorkerQueryStatusData.h" #include "qmeta/types.h" #include "util/Command.h" +#include "util/Error.h" +#include "util/MultiError.h" #include "util/String.h" #include "util/Timer.h" #include "wbase/FileChannelShared.h" @@ -127,10 +129,6 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { // Get or create QueryStatistics and UserQueryInfo instances. auto queryStats = foreman()->getQueriesAndChunks()->addQueryId(ujQueryId, ujCzInfo->czId); auto userQueryInfo = queryStats->getUserQueryInfo(); - LOGS(_log, LOG_LVL_DEBUG, - uberJobMsg->getIdStr() << " &&& added to stats" - << " &&& bytesWritten added to stats maxTableSizeMb=" << maxTableSizeMb - << " maxTableSizeBytes=" << maxTableSizeBytes); if (userQueryInfo->getCancelledByCzar()) { throw wbase::TaskException( @@ -143,10 +141,18 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { std::shared_ptr foremanPtr = foreman(); std::string authKeyStr = authKey(); + + // It is important to create UberJobData at this point as it will be the only way to + // inform the czar of errors after this function returns. + auto ujData = + wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName, + ujCzInfo->czPort, ujQueryId, ujRowLimit, maxTableSizeBytes, + targetWorkerId, foremanPtr, authKeyStr, foremanPtr->httpPort()); + auto lFunc = [ujId, ujQueryId, ujCzInfo, ujRowLimit, maxTableSizeBytes, targetWorkerId, userQueryInfo, - uberJobMsg, foremanPtr, authKeyStr](util::CmdData*) { + uberJobMsg, foremanPtr, authKeyStr, ujData](util::CmdData*) { _buildTasks(ujId, ujQueryId, ujCzInfo, ujRowLimit, maxTableSizeBytes, targetWorkerId, - userQueryInfo, uberJobMsg, foremanPtr, authKeyStr); + userQueryInfo, uberJobMsg, foremanPtr, authKeyStr, ujData); }; util::Command::Ptr taskLoadCmd = std::make_shared(lFunc); @@ -154,8 +160,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { string note = string("qId=") + to_string(ujQueryId) + " ujId=" + to_string(ujId); jsRet = {{"success", 1}, {"errortype", "none"}, {"note", note}}; - LOGS(_log, LOG_LVL_INFO, "&&& jsRet=" << jsRet); - + LOGS(_log, LOG_LVL_TRACE, "_handleQueryJob jsRet=" << jsRet); } catch (wbase::TaskException const& texp) { LOGS(_log, LOG_LVL_ERROR, "HttpWorkerCzarModule::_handleQueryJob wbase::TaskException received " << texp.what()); @@ -170,16 +175,12 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId, std::shared_ptr const& userQueryInfo, protojson::UberJobMsg::Ptr const& uberJobMsg, shared_ptr const& foremanPtr, - string const& authKeyStr) { + string const& authKeyStr, wbase::UberJobData::Ptr const& ujData) { try { - LOGS(_log, LOG_LVL_DEBUG, __func__ << " &&& qid=" << ujQueryId << "ujId=" << ujId); - util::Timer timerParse; // &&& + LOGS(_log, LOG_LVL_TRACE, __func__ << " qid=" << ujQueryId << "ujId=" << ujId); + util::Timer timerParse; timerParse.start(); auto czarId = ujCzInfo->czId; - auto ujData = - wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName, - ujCzInfo->czPort, ujQueryId, ujRowLimit, maxTableSizeBytes, - targetWorkerId, foremanPtr, authKeyStr, foremanPtr->httpPort()); // Find the entry for this queryId, create a new one if needed. userQueryInfo->addUberJob(ujData); @@ -190,7 +191,7 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId, auto ujTasks = wbase::Task::createTasksFromUberJobMsg( uberJobMsg, ujData, channelShared, foremanPtr->chunkResourceMgr(), foremanPtr->mySqlConfig(), - foremanPtr->sqlConnMgr(), foremanPtr->queriesAndChunks()); //&&&, foremanPtr->httpPort()); + foremanPtr->sqlConnMgr(), foremanPtr->queriesAndChunks()); channelShared->setTaskCount(ujTasks.size()); ujData->addTasks(ujTasks); @@ -211,7 +212,11 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId, } catch (wbase::TaskException const& texp) { LOGS(_log, LOG_LVL_ERROR, "HttpWorkerCzarModule::_buildTasks wbase::TaskException received " << texp.what()); - // &&& send a message back saying this UberJobFailed + // Send a message back saying this UberJobFailed + util::MultiError multiErr; + util::Error err(-1, string("UberJob parse error ") + texp.what()); + multiErr.push_back(err); + ujData->responseError(multiErr, -1, false); } } diff --git a/src/xrdsvc/HttpWorkerCzarModule.h b/src/xrdsvc/HttpWorkerCzarModule.h index 185aeb4f0..00a6d4fd6 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.h +++ b/src/xrdsvc/HttpWorkerCzarModule.h @@ -45,6 +45,7 @@ class Response; } // namespace lsst::qserv::qhttp namespace lsst::qserv::wbase { +class UberJobData; class UserQueryInfo; } // namespace lsst::qserv::wbase @@ -97,7 +98,7 @@ class HttpWorkerCzarModule : public xrdsvc::HttpModule { std::shared_ptr const& userQueryInfo, std::shared_ptr const& uberJobMsg, std::shared_ptr const& foremanPtr, - std::string const& authKeyStr); + std::string const& authKeyStr, std::shared_ptr const& ujData); /// Verify some aspects of the query and call _handleQueryStatus nlohmann::json _queryStatus();