Skip to content

Commit

Permalink
[core] [6/N] Fix shared pointer usage for gcs server (#48990)
Browse files Browse the repository at this point in the history
Signed-off-by: hjiang <[email protected]>
  • Loading branch information
dentiny authored Dec 4, 2024
1 parent 53233d9 commit f5698a2
Show file tree
Hide file tree
Showing 31 changed files with 179 additions and 177 deletions.
6 changes: 3 additions & 3 deletions src/mock/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class MockGcsActorManager : public GcsActorManager {
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager)
: GcsActorManager(
nullptr,
nullptr,
nullptr,
/*scheduler=*/nullptr,
/*gcs_table_storage=*/nullptr,
/*gcs_publisher=*/nullptr,
runtime_env_manager,
function_manager,
[](const ActorID &) {},
Expand Down
10 changes: 0 additions & 10 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
namespace ray {
namespace gcs {

class Mockpair_hash : public pair_hash {
public:
};

} // namespace gcs
} // namespace ray

namespace ray {
namespace gcs {

class MockGcsPlacementGroupSchedulerInterface
: public GcsPlacementGroupSchedulerInterface {
public:
Expand Down
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,18 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause(
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
GcsTableStorage *gcs_table_storage,
GcsPublisher *gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::CoreWorkerClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(gcs_table_storage),
gcs_publisher_(gcs_publisher),
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed),
destroy_owned_placement_group_if_needed_(
std::move(destroy_owned_placement_group_if_needed)),
runtime_env_manager_(runtime_env_manager),
function_manager_(function_manager),
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param gcs_publisher Used to publish gcs message.
GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
GcsTableStorage *gcs_table_storage,
GcsPublisher *gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
Expand Down Expand Up @@ -687,9 +687,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// The scheduler to schedule all registered actors.
std::shared_ptr<GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Used to update actor information upon creation, deletion, etc.
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
GcsTableStorage *gcs_table_storage_;
/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsPublisher *gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::CoreWorkerClientFactoryFn worker_client_factory_;
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_init_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) {
<< job_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(load_job_table_data_callback));
RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(load_job_table_data_callback));
}

void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
Expand All @@ -59,7 +59,7 @@ void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
<< node_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback));
RAY_CHECK_OK(gcs_table_storage_.NodeTable().GetAll(load_node_table_data_callback));
}

void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) {
Expand All @@ -72,7 +72,7 @@ void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done)
<< placement_group_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(
RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().GetAll(
load_placement_group_table_data_callback));
}

Expand All @@ -85,7 +85,7 @@ void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) {
<< actor_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTable().AsyncRebuildIndexAndGetAll(
RAY_CHECK_OK(gcs_table_storage_.ActorTable().AsyncRebuildIndexAndGetAll(
load_actor_table_data_callback));
}

Expand All @@ -98,9 +98,9 @@ void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done)
<< actor_task_spec_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().GetAll(
RAY_CHECK_OK(gcs_table_storage_.ActorTaskSpecTable().GetAll(
load_actor_task_spec_table_data_callback));
}

} // namespace gcs
} // namespace ray
} // namespace ray
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_init_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class GcsInitData {
/// Create a GcsInitData.
///
/// \param gcs_table_storage The storage from which the metadata will be loaded.
explicit GcsInitData(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_table_storage_(std::move(gcs_table_storage)) {}
explicit GcsInitData(gcs::GcsTableStorage &gcs_table_storage)
: gcs_table_storage_(gcs_table_storage) {}

/// Load all required metadata from the store into memory at once asynchronously.
///
Expand Down Expand Up @@ -89,7 +89,7 @@ class GcsInitData {

protected:
/// The gcs table storage.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage &gcs_table_storage_;

/// Job metadata.
absl::flat_hash_map<JobID, rpc::JobTableData> job_table_data_;
Expand Down
18 changes: 9 additions & 9 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, /*done=*/nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, /*done=*/nullptr));
if (job_table_data.config().has_runtime_env_info()) {
runtime_env_manager_.AddURIReference(job_id.Hex(),
job_table_data.config().runtime_env_info());
Expand All @@ -122,7 +122,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
};

Status status =
gcs_table_storage_->JobTable().Put(job_id, mutable_job_table_data, on_done);
gcs_table_storage_.JobTable().Put(job_id, mutable_job_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
Expand All @@ -143,7 +143,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishJob(job_id, job_table_data, nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_table_data);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
Expand All @@ -160,7 +160,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
done_callback(status);
};

Status status = gcs_table_storage_->JobTable().Put(job_id, job_table_data, on_done);
Status status = gcs_table_storage_.JobTable().Put(job_id, job_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
Expand All @@ -176,7 +176,7 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};

Status status = gcs_table_storage_->JobTable().Get(
Status status = gcs_table_storage_.JobTable().Get(
job_id,
[this, job_id, send_reply](const Status &status,
const std::optional<rpc::JobTableData> &result) {
Expand Down Expand Up @@ -423,7 +423,7 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback);
}
};
Status status = gcs_table_storage_->JobTable().GetAll(on_done);
Status status = gcs_table_storage_.JobTable().GetAll(on_done);
if (!status.ok()) {
on_done(absl::flat_hash_map<JobID, JobTableData>());
}
Expand All @@ -433,14 +433,14 @@ void GcsJobManager::HandleReportJobError(rpc::ReportJobErrorRequest request,
rpc::ReportJobErrorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto job_id = JobID::FromBinary(request.job_error().job_id());
RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr));
RAY_CHECK_OK(gcs_publisher_.PublishError(job_id.Hex(), request.job_error(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsJobManager::HandleGetNextJobID(rpc::GetNextJobIDRequest request,
rpc::GetNextJobIDReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_job_id(gcs_table_storage_->GetNextJobID());
reply->set_job_id(gcs_table_storage_.GetNextJobID());
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Expand Down Expand Up @@ -472,7 +472,7 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
};

// make all jobs in current node to finished
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done));
RAY_CHECK_OK(gcs_table_storage_.JobTable().GetAll(on_done));
}

void GcsJobManager::RecordMetrics() {
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ using JobFinishListenerCallback = rpc::JobInfoHandler::JobFinishListenerCallback
/// This implementation class of `JobInfoHandler`.
class GcsJobManager : public rpc::JobInfoHandler {
public:
explicit GcsJobManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
explicit GcsJobManager(GcsTableStorage &gcs_table_storage,
GcsPublisher &gcs_publisher,
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
InternalKVInterface &internal_kv,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
: gcs_table_storage_(gcs_table_storage),
gcs_publisher_(gcs_publisher),
runtime_env_manager_(runtime_env_manager),
function_manager_(function_manager),
internal_kv_(internal_kv),
Expand Down Expand Up @@ -118,8 +118,8 @@ class GcsJobManager : public rpc::JobInfoHandler {
// Number of finished jobs since start of this GCS Server, used to report metrics.
int64_t finished_jobs_count_ = 0;

std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsTableStorage &gcs_table_storage_;
GcsPublisher &gcs_publisher_;

/// Listeners which monitors the finish of jobs.
std::vector<JobFinishListenerCallback> job_finished_listeners_;
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ namespace ray {
namespace gcs {

//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher,
gcs::GcsTableStorage *gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id)
: gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(std::move(gcs_table_storage)),
: gcs_publisher_(gcs_publisher),
gcs_table_storage_(gcs_table_storage),
raylet_client_pool_(raylet_client_pool),
cluster_id_(cluster_id) {}

Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
///
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id);
GcsNodeManager(GcsPublisher *gcs_publisher,
gcs::GcsTableStorage *gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id);

/// Handle register rpc request come from raylet.
void HandleGetClusterId(rpc::GetClusterIdRequest request,
Expand Down Expand Up @@ -244,9 +244,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_removed_listeners_;
/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
GcsPublisher *gcs_publisher_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage *gcs_table_storage_;
/// Raylet client pool.
rpc::NodeManagerClientPool *raylet_client_pool_ = nullptr;
/// Cluster ID to be shared with clients when connecting.
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() {

/////////////////////////////////////////////////////////////////////////////////////////

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager)
: io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {}

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context,
GcsPlacementGroupSchedulerInterface *scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage *gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace)
: io_context_(io_context),
gcs_placement_group_scheduler_(scheduler),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_table_storage_(gcs_table_storage),
gcs_resource_manager_(gcs_resource_manager),
get_ray_namespace_(std::move(get_ray_namespace)) {
placement_group_state_counter_.reset(
Expand All @@ -205,10 +209,6 @@ GcsPlacementGroupManager::GcsPlacementGroupManager(
Tick();
}

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context, GcsResourceManager &gcs_resource_manager)
: io_context_(io_context), gcs_resource_manager_(gcs_resource_manager) {}

void GcsPlacementGroupManager::RegisterPlacementGroup(
const std::shared_ptr<GcsPlacementGroup> &placement_group, StatusCallback callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
Expand Down
8 changes: 3 additions & 5 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include <optional>
#include <utility>

#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/common/id.h"
Expand Down Expand Up @@ -238,11 +236,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param get_ray_namespace A callback to get the ray namespace.
GcsPlacementGroupManager(instrumented_io_context &io_context,
GcsPlacementGroupSchedulerInterface *scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage *gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace);

~GcsPlacementGroupManager() = default;
~GcsPlacementGroupManager() override = default;

void HandleCreatePlacementGroup(rpc::CreatePlacementGroupRequest request,
rpc::CreatePlacementGroupReply *reply,
Expand Down Expand Up @@ -484,7 +482,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
gcs::GcsPlacementGroupSchedulerInterface *gcs_placement_group_scheduler_ = nullptr;

/// Used to update placement group information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
gcs::GcsTableStorage *gcs_table_storage_ = nullptr;

/// Counter of placement groups broken down by State.
std::shared_ptr<CounterMap<rpc::PlacementGroupTableData::PlacementGroupState>>
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ namespace gcs {

GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
gcs::GcsTableStorage &gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
rpc::NodeManagerClientPool &raylet_client_pool)
: io_context_(io_context),
return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_table_storage_(gcs_table_storage),
gcs_node_manager_(gcs_node_manager),
cluster_resource_scheduler_(cluster_resource_scheduler),
raylet_client_pool_(raylet_client_pool) {}
Expand Down Expand Up @@ -395,7 +395,7 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(

placement_group->UpdateState(rpc::PlacementGroupTableData::PREPARED);

RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
RAY_CHECK_OK(gcs_table_storage_.PlacementGroupTable().Put(
placement_group_id,
placement_group->GetPlacementGroupTableData(),
[this, lease_status_tracker, schedule_failure_handler, schedule_success_handler](
Expand Down
Loading

0 comments on commit f5698a2

Please sign in to comment.