Skip to content

Commit

Permalink
Merge pull request ClickHouse#59082 from ClickHouse/fix_race_async_in…
Browse files Browse the repository at this point in the history
…serts_queue

Fix race on `Context::async_insert_queue`
  • Loading branch information
tavplubix authored Mar 22, 2024
2 parents d3965f2 + d190ee8 commit 9082a01
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 8 deletions.
15 changes: 14 additions & 1 deletion src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
}

AsynchronousInsertQueue::~AsynchronousInsertQueue()
void AsynchronousInsertQueue::flushAndShutdown()
{
try
{
Expand Down Expand Up @@ -258,6 +258,19 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
}
}

AsynchronousInsertQueue::~AsynchronousInsertQueue()
{
for (const auto & shard : queue_shards)
{
for (const auto & [first_update, elem] : shard.queue)
{
const auto & insert_query = elem.key.query->as<const ASTInsertQuery &>();
LOG_WARNING(log, "Has unprocessed async insert for {}.{}",
backQuoteIfNeed(insert_query.getDatabase()), backQuoteIfNeed(insert_query.getTable()));
}
}
}

void AsynchronousInsertQueue::scheduleDataProcessingJob(
const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num)
{
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/AsynchronousInsertQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class AsynchronousInsertQueue : public WithContext
PushResult pushQueryWithBlock(ASTPtr query, Block block, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }

/// This method should be called manually because it's not flushed automatically in dtor
/// because all tables may be already unloaded when we destroy AsynchronousInsertQueue
void flushAndShutdown();

private:

struct InsertQuery
Expand Down
15 changes: 13 additions & 2 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,13 @@ struct ContextSharedPart : boost::noncopyable
return;

/// Need to flush the async insert queue before shutting down the database catalog
async_insert_queue.reset();
std::shared_ptr<AsynchronousInsertQueue> delete_async_insert_queue;
{
std::lock_guard lock(mutex);
delete_async_insert_queue = std::move(async_insert_queue);
}
if (delete_async_insert_queue)
delete_async_insert_queue->flushAndShutdown();

/// Stop periodic reloading of the configuration files.
/// This must be done first because otherwise the reloading may pass a changed config
Expand All @@ -585,6 +591,8 @@ struct ContextSharedPart : boost::noncopyable
LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown();

delete_async_insert_queue.reset();

SHUTDOWN(log, "merges executor", merge_mutate_executor, wait());
SHUTDOWN(log, "fetches executor", fetch_executor, wait());
SHUTDOWN(log, "moves executor", moves_executor, wait());
Expand Down Expand Up @@ -4990,15 +4998,18 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
return ignored_part_uuids;
}

AsynchronousInsertQueue * Context::getAsynchronousInsertQueue() const
AsynchronousInsertQueue * Context::tryGetAsynchronousInsertQueue() const
{
SharedLockGuard lock(shared->mutex);
return shared->async_insert_queue.get();
}

void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr)
{
AsynchronousInsertQueue::validateSettings(settings, getLogger("Context"));

SharedLockGuard lock(shared->mutex);

if (std::chrono::milliseconds(settings.async_insert_poll_timeout_ms) == std::chrono::milliseconds::zero())
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_poll_timeout_ms can't be zero");

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
PartUUIDsPtr getPartUUIDs() const;
PartUUIDsPtr getIgnoredPartUUIDs() const;

AsynchronousInsertQueue * getAsynchronousInsertQueue() const;
AsynchronousInsertQueue * tryGetAsynchronousInsertQueue() const;
void setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr);

ReadTaskCallback getReadTaskCallback() const;
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ BlockIO InterpreterSystemQuery::execute()
case Type::FLUSH_ASYNC_INSERT_QUEUE:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_ASYNC_INSERT_QUEUE);
auto * queue = getContext()->getAsynchronousInsertQueue();
auto * queue = getContext()->tryGetAsynchronousInsertQueue();
if (!queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot flush asynchronous insert queue because it is not initialized");
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
std::unique_ptr<IInterpreter> interpreter;

bool async_insert = false;
auto * queue = context->getAsynchronousInsertQueue();
auto * queue = context->tryGetAsynchronousInsertQueue();
auto logger = getLogger("executeQuery");

if (insert_query && async_insert_enabled)
Expand Down
2 changes: 1 addition & 1 deletion src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ void TCPHandler::processInsertQuery()
Block processed_block;
const auto & settings = query_context->getSettingsRef();

auto * insert_queue = query_context->getAsynchronousInsertQueue();
auto * insert_queue = query_context->tryGetAsynchronousInsertQueue();
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*state.parsed_query);

bool async_insert_enabled = settings.async_insert;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemAsynchronousInserts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
{
using namespace std::chrono;

auto * insert_queue = context->getAsynchronousInsertQueue();
auto * insert_queue = context->tryGetAsynchronousInsertQueue();
if (!insert_queue)
return;

Expand Down

0 comments on commit 9082a01

Please sign in to comment.