Skip to content

Commit

Permalink
Merge pull request #1346 from vectorizedio/revert-1296-feature/persis…
Browse files Browse the repository at this point in the history
…tent-connections

Revert "Persistent http connections"
  • Loading branch information
emaxerrno authored May 5, 2021
2 parents 5c36d71 + 7c9bb6e commit 47c6e63
Show file tree
Hide file tree
Showing 12 changed files with 481 additions and 798 deletions.
36 changes: 17 additions & 19 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ std::ostream& operator<<(std::ostream& o, const configuration& cfg) {
}

ntp_archiver::ntp_archiver(
const storage::ntp_config& ntp,
const configuration& conf,
s3::client_pool& pool)
const storage::ntp_config& ntp, const configuration& conf)
: _ntp(ntp.ntp())
, _rev(ntp.get_revision())
, _pool(pool)
, _client_conf(conf.client_config)
, _policy(_ntp)
, _bucket(conf.bucket_name)
, _remote(_ntp, _rev)
Expand All @@ -78,10 +76,10 @@ ss::future<download_manifest_result> ntp_archiver::download_manifest() {
auto key = _remote.get_manifest_path();
vlog(archival_log.debug, "Download manifest {}", key());
auto path = s3::object_key(key().string());
auto [client, deleter] = co_await _pool.acquire();
s3::client client(_client_conf, _as);
auto result = download_manifest_result::success;
try {
auto resp = co_await client->get_object(_bucket, path);
auto resp = co_await client.get_object(_bucket, path);
vlog(archival_log.debug, "Receive OK response from {}", path);
co_await _remote.update(resp->as_input_stream());
} catch (const s3::rest_error_response& err) {
Expand All @@ -102,6 +100,7 @@ ss::future<download_manifest_result> ntp_archiver::download_manifest() {
throw;
}
}
co_await client.shutdown();
co_return result;
}

Expand All @@ -116,11 +115,12 @@ ss::future<> ntp_archiver::upload_manifest() {
std::vector<s3::object_tag> tags = {{"rp-type", "partition-manifest"}};
while (!_gate.is_closed() && backoff_quota-- > 0) {
bool slowdown = false;
auto [client, deleter] = co_await _pool.acquire();
s3::client client(_client_conf, _as);
try {
auto [is, size] = _remote.serialize();
co_await client->put_object(
co_await client.put_object(
_bucket, path, size, std::move(is), tags);
co_await client.shutdown();
} catch (const s3::rest_error_response& err) {
vlog(
archival_log.error,
Expand Down Expand Up @@ -172,7 +172,8 @@ ss::future<> ntp_archiver::upload_manifest() {

const manifest& ntp_archiver::get_remote_manifest() const { return _remote; }

ss::future<bool> ntp_archiver::upload_segment(upload_candidate candidate) {
ss::future<bool> ntp_archiver::upload_segment(
ss::semaphore& req_limit, upload_candidate candidate) {
gate_guard guard{_gate};
vlog(
archival_log.debug,
Expand All @@ -187,7 +188,8 @@ ss::future<bool> ntp_archiver::upload_segment(upload_candidate candidate) {
segment_name(candidate.exposed_name));
std::vector<s3::object_tag> tags = {{"rp-type", "segment"}};
while (!_gate.is_closed() && backoff_quota-- > 0) {
auto [client, deleter] = co_await _pool.acquire();
auto units = co_await ss::get_units(req_limit, 1);
s3::client client(_client_conf, _as);
auto stream = candidate.source->reader().data_stream(
candidate.file_offset, ss::default_priority_class());
bool slowdown = false;
Expand All @@ -198,12 +200,13 @@ ss::future<bool> ntp_archiver::upload_segment(upload_candidate candidate) {
s3path);
try {
// Segment upload attempt
co_await client->put_object(
co_await client.put_object(
_bucket,
s3::object_key(s3path().string()),
candidate.content_length,
std::move(stream),
tags);
co_await client.shutdown();
} catch (const s3::rest_error_response& err) {
vlog(
archival_log.error,
Expand Down Expand Up @@ -243,16 +246,11 @@ ss::future<bool> ntp_archiver::upload_segment(upload_candidate candidate) {
}
break;
}
vlog(
archival_log.debug,
"Finished segment upload for {}, path {}",
_ntp,
s3path);
co_return true;
}

ss::future<ntp_archiver::batch_result>
ntp_archiver::upload_next_candidates(storage::log_manager& lm) {
ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
ss::semaphore& req_limit, storage::log_manager& lm) {
vlog(archival_log.debug, "Uploading next candidates called for {}", _ntp);
gate_guard guard{_gate};
auto mlock = co_await ss::get_units(_mutex, 1);
Expand Down Expand Up @@ -295,7 +293,7 @@ ntp_archiver::upload_next_candidates(storage::log_manager& lm) {
continue;
}
offset = upload.source->offsets().committed_offset + model::offset(1);
flist.emplace_back(upload_segment(upload));
flist.emplace_back(upload_segment(req_limit, upload));
manifest::segment_meta m{
.is_compacted = upload.source->is_compacted_segment(),
.size_bytes = upload.content_length,
Expand Down
16 changes: 8 additions & 8 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ class ntp_archiver {
///
/// \param ntp is an ntp that archiver is responsible for
/// \param conf is an S3 client configuration
/// \param pool is a connection pool that should be used to send/recv data
ntp_archiver(
const storage::ntp_config& ntp,
const configuration& conf,
s3::client_pool& pool);
/// \param bucket is an S3 bucket that should be used to store the data
ntp_archiver(const storage::ntp_config& ntp, const configuration& conf);

/// Stop archiver.
///
Expand Down Expand Up @@ -103,19 +100,22 @@ class ntp_archiver {
/// will pick not more than '_concurrency' candidates and start
/// uploading them.
///
/// \param req_limit is used to limit number of parallel uploads
/// \param lm is a log manager instance
/// \return future that returns number of uploaded/failed segments
ss::future<batch_result> upload_next_candidates(storage::log_manager& lm);
ss::future<batch_result>
upload_next_candidates(ss::semaphore& req_limit, storage::log_manager& lm);

private:
/// Upload individual segment to S3.
///
/// \return true on success and false otherwise
ss::future<bool> upload_segment(upload_candidate candidate);
ss::future<bool>
upload_segment(ss::semaphore& req_limit, upload_candidate candidate);

model::ntp _ntp;
model::revision_id _rev;
s3::client_pool& _pool;
s3::configuration _client_conf;
archival_policy _policy;
s3::bucket_name _bucket;
/// Remote manifest contains representation of the data stored in S3 (it
Expand Down
43 changes: 20 additions & 23 deletions src/v/archival/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include "archival/logger.h"
#include "archival/ntp_archiver_service.h"
#include "archival/types.h"
#include "cluster/partition_manager.h"
#include "cluster/topic_table.h"
#include "config/configuration.h"
Expand Down Expand Up @@ -155,12 +154,12 @@ scheduler_service_impl::scheduler_service_impl(
ss::sharded<cluster::partition_manager>& pm,
ss::sharded<cluster::topic_table>& tt)
: _conf(conf)
, _pool(conf.connection_limit(), conf.client_config)
, _partition_manager(pm)
, _topic_table(tt)
, _storage_api(api)
, _jitter(conf.interval, 1ms)
, _gc_jitter(conf.gc_interval, 1ms)
, _conn_limit(conf.connection_limit())
, _stop_limit(conf.connection_limit()) {}

scheduler_service_impl::scheduler_service_impl(
Expand Down Expand Up @@ -194,22 +193,18 @@ ss::future<> scheduler_service_impl::start() {
ss::future<> scheduler_service_impl::stop() {
vlog(archival_log.info, "Scheduler service stop");
_timer.cancel();
_as.request_abort(); // interrupt possible sleep
return _pool.stop().then([this] {
std::vector<ss::future<>> outstanding;
for (auto& it : _queue) {
auto fut = ss::with_semaphore(
_stop_limit, 1, [it] { return it.second.archiver->stop(); });
outstanding.emplace_back(std::move(fut));
}
return ss::do_with(
std::move(outstanding),
[this](std::vector<ss::future<>>& outstanding) {
return ss::when_all_succeed(
outstanding.begin(), outstanding.end())
.then([this] { return _gate.close(); });
});
});
_as.request_abort();
std::vector<ss::future<>> outstanding;
for (auto& it : _queue) {
auto fut = ss::with_semaphore(
_stop_limit, 1, [it] { return it.second.archiver->stop(); });
outstanding.emplace_back(std::move(fut));
}
return ss::do_with(
std::move(outstanding), [this](std::vector<ss::future<>>& outstanding) {
return ss::when_all_succeed(outstanding.begin(), outstanding.end())
.then([this] { return _gate.close(); });
});
}

ss::lw_shared_ptr<ntp_archiver> scheduler_service_impl::get_upload_candidate() {
Expand All @@ -222,19 +217,21 @@ ss::future<> scheduler_service_impl::upload_topic_manifest(
auto cfg = _topic_table.local().get_topic_cfg(view);
if (cfg) {
try {
auto units = co_await ss::get_units(_conn_limit, 1);
vlog(archival_log.info, "Uploading topic manifest {}", view);
auto [client, deleter] = co_await _pool.acquire();
s3::client client(_conf.client_config, _as);
topic_manifest tm(*cfg, rev);
auto [istr, size_bytes] = tm.serialize();
auto key = tm.get_manifest_path();
vlog(archival_log.debug, "Topic manifest object key is '{}'", key);
std::vector<s3::object_tag> tags = {{"rp-type", "topic-manifest"}};
co_await client->put_object(
co_await client.put_object(
_conf.bucket_name,
s3::object_key(key),
size_bytes,
std::move(istr),
tags);
co_await client.shutdown();
} catch (const s3::rest_error_response& err) {
vlog(
archival_log.error,
Expand Down Expand Up @@ -267,7 +264,7 @@ scheduler_service_impl::create_archivers(std::vector<model::ntp> to_create) {
return ss::now();
}
auto svc = ss::make_lw_shared<ntp_archiver>(
log->config(), _conf, _pool);
log->config(), _conf);
return ss::repeat([this, svc, ntp] {
return svc->download_manifest()
.then(
Expand Down Expand Up @@ -354,7 +351,7 @@ scheduler_service_impl::remove_archivers(std::vector<model::ntp> to_remove) {
vlog(archival_log.info, "removing archiver for {}", ntp.path());
auto archiver = _queue[ntp];
return ss::with_semaphore(
_stop_limit, 1, [archiver] { return archiver->stop(); })
_conn_limit, 1, [archiver] { return archiver->stop(); })
.finally([this, ntp] {
vlog(archival_log.info, "archiver stopped {}", ntp.path());
_queue.erase(ntp);
Expand Down Expand Up @@ -430,7 +427,7 @@ ss::future<> scheduler_service_impl::run_uploads() {
archival_log.debug,
"Checking {} for S3 upload candidates",
archiver->get_ntp());
return archiver->upload_next_candidates(lm);
return archiver->upload_next_candidates(_conn_limit, lm);
});

auto results = co_await ss::when_all_succeed(
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ class scheduler_service_impl {
model::topic_namespace_view view, model::revision_id rev);

configuration _conf;
s3::client_pool _pool;
ss::sharded<cluster::partition_manager>& _partition_manager;
ss::sharded<cluster::topic_table>& _topic_table;
ss::sharded<storage::api>& _storage_api;
Expand All @@ -164,6 +163,7 @@ class scheduler_service_impl {
ss::timer<ss::lowres_clock> _gc_timer;
ss::gate _gate;
ss::abort_source _as;
ss::semaphore _conn_limit;
ss::semaphore _stop_limit;
ntp_upload_queue _queue;
simple_time_jitter<ss::lowres_clock> _backoff{100ms};
Expand Down
16 changes: 6 additions & 10 deletions src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ compare_json_objects(const std::string_view& lhs, const std::string_view& rhs) {

FIXTURE_TEST(test_download_manifest, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen(default_expectations);
auto conf = get_configuration();
s3::client_pool pool(conf.connection_limit(), conf.client_config);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration(), pool);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration());
auto action = ss::defer([&archiver] { archiver.stop().get(); });
archiver.download_manifest().get();
auto expected = load_manifest(manifest_payload);
Expand All @@ -136,9 +134,7 @@ FIXTURE_TEST(test_download_manifest, s3_imposter_fixture) { // NOLINT

FIXTURE_TEST(test_upload_manifest, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen(default_expectations);
auto conf = get_configuration();
s3::client_pool pool(conf.connection_limit(), conf.client_config);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration(), pool);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration());
auto action = ss::defer([&archiver] { archiver.stop().get(); });
auto pm = const_cast<manifest*>( // NOLINT
&archiver.get_remote_manifest());
Expand Down Expand Up @@ -167,9 +163,7 @@ FIXTURE_TEST(test_upload_manifest, s3_imposter_fixture) { // NOLINT
// NOLINTNEXTLINE
FIXTURE_TEST(test_upload_segments, archiver_fixture) {
set_expectations_and_listen(default_expectations);
auto conf = get_configuration();
s3::client_pool pool(conf.connection_limit(), conf.client_config);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration(), pool);
archival::ntp_archiver archiver(get_ntp_conf(), get_configuration());
auto action = ss::defer([&archiver] { archiver.stop().get(); });

std::vector<segment_desc> segments = {
Expand All @@ -178,8 +172,10 @@ FIXTURE_TEST(test_upload_segments, archiver_fixture) {
};
init_storage_api_local(segments);

ss::semaphore limit(2);
auto res = archiver
.upload_next_candidates(get_local_storage_api().log_mgr())
.upload_next_candidates(
limit, get_local_storage_api().log_mgr())
.get0();
BOOST_REQUIRE_EQUAL(res.num_succeded, 2);
BOOST_REQUIRE_EQUAL(res.num_failed, 0);
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ archival::configuration get_configuration() {
archival::configuration conf;
conf.client_config = s3conf;
conf.bucket_name = s3::bucket_name("test-bucket");
conf.connection_limit = archival::s3_connection_limit(2);
conf.connection_limit = archival::s3_connection_limit(10);
return conf;
}

Expand Down
Loading

0 comments on commit 47c6e63

Please sign in to comment.