Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Feb 6, 2025
1 parent 5209ddb commit 1979259
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared
}
// Delete the result file as nobody will come looking for it.
_kill(tMtxLock, " buildAndTransmitError");
return _uberJobData->responseError(multiErr, task, cancelled);
return _uberJobData->responseError(multiErr, task->getChunkId(), cancelled);
}

bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Task> const& task,
Expand Down
10 changes: 8 additions & 2 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ std::vector<Task::Ptr> Task::createTasksFromUberJobMsg(
UberJobId ujId = ujData->getUberJobId();
CzarIdType czId = ujData->getCzarId();

auto startBuildTasks = CLOCK::now();
vector<Task::Ptr> vect; // List of created tasks to be returned.

wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId, czId);
UserQueryInfo::Ptr userQueryInfo = queryStats->getUserQueryInfo();

Expand Down Expand Up @@ -387,16 +385,24 @@ void Task::action(util::CmdData* data) {
// Get a local copy for safety.
auto qr = _taskQueryRunner;
bool success = false;
string errStr;
try {
success = qr->runQuery();
} catch (UnsupportedError const& e) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " runQuery threw UnsupportedError " << e.what() << tIdStr);
errStr = e.what();
}
if (not success) {
LOGS(_log, LOG_LVL_ERROR, "runQuery failed " << tIdStr);
if (not getSendChannel()->kill("Foreman::_setRunFunc")) {
LOGS(_log, LOG_LVL_WARN, "runQuery sendChannel already killed " << tIdStr);
}
// Send a message back saying this UberJobFailed, redundant error messages should be
// harmless.
util::MultiError multiErr;
util::Error err(_chunkId, string("UberJob run error ") + errStr);
multiErr.push_back(err);
_ujData->responseError(multiErr, -1, false);
}

// The QueryRunner class access to sendChannel for results is over by this point.
Expand Down
9 changes: 5 additions & 4 deletions src/wbase/UberJobData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ void UberJobData::responseFileReady(string const& httpFileUrl, uint64_t rowCount
_queueUJResponse(method, headers, url, requestContext, requestStr);
}

bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr<Task> const& task,
bool cancelled) {
bool UberJobData::responseError(util::MultiError& multiErr, int chunkId, bool cancelled) {
// TODO:UJ Maybe register this UberJob as failed with a czar notification method
// so that a secondary means can be used to make certain the czar hears about
// the error.
LOGS(_log, LOG_LVL_INFO, cName(__func__));
string errorMsg;
int errorCode = 0;
Expand All @@ -131,8 +133,7 @@ bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr<Task
errorCode = -1;
}
if (!errorMsg.empty() or (errorCode != 0)) {
errorMsg = cName(__func__) + " error(s) in result for chunk #" + to_string(task->getChunkId()) +
": " + errorMsg;
errorMsg = cName(__func__) + " error(s) in result for chunk #" + to_string(chunkId) + ": " + errorMsg;
LOGS(_log, LOG_LVL_ERROR, errorMsg);
}

Expand Down
3 changes: 2 additions & 1 deletion src/wbase/UberJobData.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class UberJobData : public std::enable_shared_from_this<UberJobData> {
uint64_t headerCount); // TODO:UJ remove headerCount

/// Let the Czar know there's been a problem.
bool responseError(util::MultiError& multiErr, std::shared_ptr<Task> const& task, bool cancelled);
//&&&bool responseError(util::MultiError& multiErr, std::shared_ptr<Task> const& task, bool cancelled);
bool responseError(util::MultiError& multiErr, int chunkId, bool cancelled);

std::string const& getIdStr() const { return _idStr; }
std::string cName(std::string const& funcName) { return "UberJobData::" + funcName + " " + getIdStr(); }
Expand Down
9 changes: 0 additions & 9 deletions src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ void QueryRunner::_setDb() {
}

util::TimerHistogram memWaitHisto("initConnection Hist", {1, 5, 10, 20, 40}); //&&&
std::atomic<uint32_t> memWaitLimiter = 0;

bool QueryRunner::runQuery() {
util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId()));
Expand Down Expand Up @@ -180,15 +179,7 @@ bool QueryRunner::runQuery() {
bool interactive = _task->getScanInteractive() && !(_task->getSendChannel()->getTaskCount() > 1);
wcontrol::SqlConnLock sqlConnLock(*_sqlConnMgr, not interactive, _task->getSendChannel());

util::Timer memTimer;
memTimer.start();
bool connOk = _initConnection();
memTimer.stop();
memWaitHisto.addTime(memTimer.getElapsed());
if (memWaitLimiter++ % 100 == 0) {
LOGS(_log, LOG_LVL_INFO, "&&& initConnection " << memWaitHisto.getString());
}

if (!connOk) {
// Since there's an error, this will be the last transmit from this QueryRunner.
if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) {
Expand Down
43 changes: 17 additions & 26 deletions src/wsched/BlendScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,18 @@ void BlendScheduler::queCmd(std::vector<util::Command::Ptr> const& cmds) {
throw util::Bug(ERR_LOC, "BlendScheduler::queCmd cmds.size() > 1 when no task was set.");
}
{
//&&&util::LockGuardTimed guardA(util::CommandQueue::_mx, "BlendScheduler::queCmd a");
lock_guard guardA(util::CommandQueue::_mx);
_ctrlCmdQueue.queCmd(cmd);
}

notify(false); //&&& notify(true); // notify all=true
notify(true); // notify all=true
continue;
}

if (first) {
QSERV_LOGCONTEXT_QUERY_JOB(task->getQueryId(), task->getJobId());
}

//&&&util::LockGuardTimed guardB(util::CommandQueue::_mx, "BlendScheduler::queCmd b");
lock_guard guardB(util::CommandQueue::_mx);
// Check for scan tables. The information for all tasks should be the same
// as they all belong to the same query, so only examine the first task.
Expand Down Expand Up @@ -250,7 +248,7 @@ void BlendScheduler::queCmd(std::vector<util::Command::Ptr> const& cmds) {
queryStats->tasksAddedToScheduler(targSched, taskCmds.size());
}
_infoChanged = true;
notify(false); //&&&notify(true); // notify all=true
notify(true); // notify all=true
}
}

Expand Down Expand Up @@ -294,7 +292,7 @@ void BlendScheduler::commandFinish(util::Command::Ptr const& cmd) {
if (LOG_CHECK_LVL(_log, LOG_LVL_TRACE)) {
_logChunkStatus();
}
notify(false); //&&&notify(true); // notify all=true
notify(false); // notify one
}

bool BlendScheduler::ready() {
Expand Down Expand Up @@ -359,32 +357,24 @@ bool BlendScheduler::_ready() {
atomic<uint16_t> logChunkLimiter = 0;

util::Command::Ptr BlendScheduler::getCmd(bool wait) {
util::Timer timeToLock;
util::Timer timeHeld;
//&&&util::Timer timeToLock;
//&&&util::Timer timeHeld;
util::Command::Ptr cmd;
double totalTimeHeld = 0.0;
//&&&double totalTimeHeld = 0.0;
bool ready = false;
{
timeToLock.start();
//&&&timeToLock.start();
unique_lock<mutex> lock(util::CommandQueue::_mx);
timeToLock.stop();
//&&&timeToLock.stop();

if (wait) {
util::CommandQueue::_cv.wait(lock, [this]() { return _ready(); });
/* &&&
while (!_ready()) {
timeHeld.stop();
totalTimeHeld += timeHeld.getElapsed();
util::CommandQueue::_cv.wait(lock);
timeHeld.start();
}
*/
ready = true;
} else {
ready = _ready();
}

timeHeld.start();
//&&&timeHeld.start();

if (ready && _taskLoadQueue.size() > 0) {
cmd = _taskLoadQueue.front();
Expand All @@ -411,12 +401,13 @@ util::Command::Ptr BlendScheduler::getCmd(bool wait) {
_readySched.reset();
}
}

if (cmd == nullptr) {
// The scheduler didn't have anything, see if there's anything on the control queue,
// which could change the size of the pool.
cmd = _ctrlCmdQueue.getCmd();
}
//&&& }

if (cmd != nullptr) {
_infoChanged = true;
if (LOG_CHECK_LVL(_log, LOG_LVL_TRACE) || (logChunkLimiter++ % 100 == 0)) {
Expand All @@ -425,12 +416,12 @@ util::Command::Ptr BlendScheduler::getCmd(bool wait) {
notify(false); // notify all=false
}
// returning nullptr is acceptable.
timeHeld.stop();
totalTimeHeld += timeHeld.getElapsed();
LOGS(_log, LOG_LVL_TRACE,
"lockTime BlendScheduler::getCmd ready toLock=" << timeToLock.getElapsed()
<< " held=" << timeHeld.getElapsed()
<< " totalHeld=" << totalTimeHeld);
//&&&timeHeld.stop();
//&&&totalTimeHeld += timeHeld.getElapsed();
//&&&LOGS(_log, LOG_LVL_TRACE,
//&&& "lockTime BlendScheduler::getCmd ready toLock=" << timeToLock.getElapsed()
//&&& << " held=" << timeHeld.getElapsed()
//&&& << " totalHeld=" << totalTimeHeld);
return cmd;
}

Expand Down
39 changes: 22 additions & 17 deletions src/xrdsvc/HttpWorkerCzarModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "protojson/WorkerQueryStatusData.h"
#include "qmeta/types.h"
#include "util/Command.h"
#include "util/Error.h"
#include "util/MultiError.h"
#include "util/String.h"
#include "util/Timer.h"
#include "wbase/FileChannelShared.h"
Expand Down Expand Up @@ -127,10 +129,6 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {
// Get or create QueryStatistics and UserQueryInfo instances.
auto queryStats = foreman()->getQueriesAndChunks()->addQueryId(ujQueryId, ujCzInfo->czId);
auto userQueryInfo = queryStats->getUserQueryInfo();
LOGS(_log, LOG_LVL_DEBUG,
uberJobMsg->getIdStr() << " &&& added to stats"
<< " &&& bytesWritten added to stats maxTableSizeMb=" << maxTableSizeMb
<< " maxTableSizeBytes=" << maxTableSizeBytes);

if (userQueryInfo->getCancelledByCzar()) {
throw wbase::TaskException(
Expand All @@ -143,19 +141,26 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {

std::shared_ptr<wcontrol::Foreman> foremanPtr = foreman();
std::string authKeyStr = authKey();

// It is important to create UberJobData at this point as it will be the only way to
// inform the czar of errors after this function returns.
auto ujData =
wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName,
ujCzInfo->czPort, ujQueryId, ujRowLimit, maxTableSizeBytes,
targetWorkerId, foremanPtr, authKeyStr, foremanPtr->httpPort());

auto lFunc = [ujId, ujQueryId, ujCzInfo, ujRowLimit, maxTableSizeBytes, targetWorkerId, userQueryInfo,
uberJobMsg, foremanPtr, authKeyStr](util::CmdData*) {
uberJobMsg, foremanPtr, authKeyStr, ujData](util::CmdData*) {
_buildTasks(ujId, ujQueryId, ujCzInfo, ujRowLimit, maxTableSizeBytes, targetWorkerId,
userQueryInfo, uberJobMsg, foremanPtr, authKeyStr);
userQueryInfo, uberJobMsg, foremanPtr, authKeyStr, ujData);
};

util::Command::Ptr taskLoadCmd = std::make_shared<util::Command>(lFunc);
foremanPtr->getScheduler()->queTaskLoad(taskLoadCmd);

string note = string("qId=") + to_string(ujQueryId) + " ujId=" + to_string(ujId);
jsRet = {{"success", 1}, {"errortype", "none"}, {"note", note}};
LOGS(_log, LOG_LVL_INFO, "&&& jsRet=" << jsRet);

LOGS(_log, LOG_LVL_TRACE, "_handleQueryJob jsRet=" << jsRet);
} catch (wbase::TaskException const& texp) {
LOGS(_log, LOG_LVL_ERROR,
"HttpWorkerCzarModule::_handleQueryJob wbase::TaskException received " << texp.what());
Expand All @@ -170,16 +175,12 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId,
std::shared_ptr<wbase::UserQueryInfo> const& userQueryInfo,
protojson::UberJobMsg::Ptr const& uberJobMsg,
shared_ptr<wcontrol::Foreman> const& foremanPtr,
string const& authKeyStr) {
string const& authKeyStr, wbase::UberJobData::Ptr const& ujData) {
try {
LOGS(_log, LOG_LVL_DEBUG, __func__ << " &&& qid=" << ujQueryId << "ujId=" << ujId);
util::Timer timerParse; // &&&
LOGS(_log, LOG_LVL_TRACE, __func__ << " qid=" << ujQueryId << "ujId=" << ujId);
util::Timer timerParse;
timerParse.start();
auto czarId = ujCzInfo->czId;
auto ujData =
wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName,
ujCzInfo->czPort, ujQueryId, ujRowLimit, maxTableSizeBytes,
targetWorkerId, foremanPtr, authKeyStr, foremanPtr->httpPort());

// Find the entry for this queryId, create a new one if needed.
userQueryInfo->addUberJob(ujData);
Expand All @@ -190,7 +191,7 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId,

auto ujTasks = wbase::Task::createTasksFromUberJobMsg(
uberJobMsg, ujData, channelShared, foremanPtr->chunkResourceMgr(), foremanPtr->mySqlConfig(),
foremanPtr->sqlConnMgr(), foremanPtr->queriesAndChunks()); //&&&, foremanPtr->httpPort());
foremanPtr->sqlConnMgr(), foremanPtr->queriesAndChunks());
channelShared->setTaskCount(ujTasks.size());
ujData->addTasks(ujTasks);

Expand All @@ -211,7 +212,11 @@ void HttpWorkerCzarModule::_buildTasks(UberJobId ujId, QueryId ujQueryId,
} catch (wbase::TaskException const& texp) {
LOGS(_log, LOG_LVL_ERROR,
"HttpWorkerCzarModule::_buildTasks wbase::TaskException received " << texp.what());
// &&& send a message back saying this UberJobFailed
// Send a message back saying this UberJobFailed
util::MultiError multiErr;
util::Error err(-1, string("UberJob parse error ") + texp.what());
multiErr.push_back(err);
ujData->responseError(multiErr, -1, false);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/xrdsvc/HttpWorkerCzarModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Response;
} // namespace lsst::qserv::qhttp

namespace lsst::qserv::wbase {
class UberJobData;
class UserQueryInfo;
} // namespace lsst::qserv::wbase

Expand Down Expand Up @@ -97,7 +98,7 @@ class HttpWorkerCzarModule : public xrdsvc::HttpModule {
std::shared_ptr<wbase::UserQueryInfo> const& userQueryInfo,
std::shared_ptr<protojson::UberJobMsg> const& uberJobMsg,
std::shared_ptr<wcontrol::Foreman> const& foremanPtr,
std::string const& authKeyStr);
std::string const& authKeyStr, std::shared_ptr<wbase::UberJobData> const& ujData);

/// Verify some aspects of the query and call _handleQueryStatus
nlohmann::json _queryStatus();
Expand Down

0 comments on commit 1979259

Please sign in to comment.