Skip to content

Commit

Permalink
Merge pull request #1386 from Lazin/fix/s3-connection-pool-concurrenc…
Browse files Browse the repository at this point in the history
…y-issue

s3: Fix client_pool
  • Loading branch information
emaxerrno authored May 17, 2021
2 parents 0021765 + db9fba3 commit bfe8b78
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/v/s3/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "ssx/sformat.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/temporary_buffer.hh>
Expand Down Expand Up @@ -517,7 +519,9 @@ client_pool::client_pool(

ss::future<> client_pool::stop() {
_as.request_abort();
return _gate.close();
_cvar.broken();
// Wait until all leased objects are returned
co_await _gate.close();
}

/// \brief Acquire http client from the pool.
Expand All @@ -529,20 +533,31 @@ ss::future<> client_pool::stop() {
/// are in use)
ss::future<client_pool::client_lease> client_pool::acquire() {
gate_guard guard(_gate);
if (_pool.empty()) {
if (_policy == client_pool_overdraft_policy::wait_if_empty) {
co_await _cvar.wait([this] { return !_pool.empty(); });
} else {
auto cl = ss::make_shared<client>(_config, _as);
_pool.emplace_back(std::move(cl));
try {
while (_pool.empty() && !_gate.is_closed()) {
if (_policy == client_pool_overdraft_policy::wait_if_empty) {
co_await _cvar.wait();
} else {
auto cl = ss::make_shared<client>(_config, _as);
_pool.emplace_back(std::move(cl));
}
}
} catch (const ss::broken_condition_variable&) {
}
if (_gate.is_closed() || _as.abort_requested()) {
throw ss::gate_closed_exception();
}
vassert(!_pool.empty(), "'acquire' invariant is broken");
auto client = _pool.back();
_pool.pop_back();
co_return client_lease{
.client = client,
.deleter = ss::make_deleter(
[this, client, g = std::move(guard)] { release(client); })};
[pool = weak_from_this(), client, g = std::move(guard)] {
if (pool) {
pool->release(client);
}
})};
}
size_t client_pool::size() const noexcept { return _pool.size(); }
void client_pool::init() {
Expand Down

0 comments on commit bfe8b78

Please sign in to comment.