Skip to content

Commit

Permalink
[Core] Only publish WORKER_OBJECT_EVICTION when the object is out of …
Browse files Browse the repository at this point in the history
…scope or manually freed (#47990)

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Nov 25, 2024
1 parent 2514aff commit 8dbc123
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/mock/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class MockReferenceCounter : public ReferenceCounterInterface {
bool add_local_ref,
const absl::optional<NodeID> &pinned_at_raylet_id));

MOCK_METHOD2(AddObjectPrimaryCopyDeleteCallback,
MOCK_METHOD2(AddObjectOutOfScopeOrFreedCallback,
bool(const ObjectID &object_id,
const std::function<void(const ObjectID &)> callback));

Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ bool ActorManager::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
}

if (inserted && owned) {
RAY_CHECK(reference_counter_->AddObjectPrimaryCopyDeleteCallback(
RAY_CHECK(reference_counter_->AddObjectOutOfScopeOrFreedCallback(
actor_creation_return_id, [this, actor_id](const ObjectID &object_id) {
MarkActorKilledOrOutOfScope(GetActorHandle(actor_id));
}));
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3860,7 +3860,7 @@ void CoreWorker::ProcessSubscribeForObjectEviction(
// Returns true if the object was present and the callback was added. It might have
// already been evicted by the time we get this request, in which case we should
// respond immediately so the raylet unpins the object.
if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback(object_id, unpin_object)) {
if (!reference_counter_->AddObjectOutOfScopeOrFreedCallback(object_id, unpin_object)) {
// If the object is already evicted (callback cannot be set), unregister the
// subscription & publish the message so that the subscriber knows it.
unpin_object(object_id);
Expand Down
37 changes: 18 additions & 19 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ int64_t ReferenceCounter::ReleaseLineageReferences(ReferenceTable::iterator ref)
RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id;
arg_it->second.lineage_ref_count--;
if (arg_it->second.OutOfScope(lineage_pinning_enabled_)) {
DeleteObjectPrimaryCopy(arg_it);
OnObjectOutOfScopeOrFreed(arg_it);
}
if (arg_it->second.ShouldDelete(lineage_pinning_enabled_)) {
RAY_CHECK(arg_it->second.on_ref_removed == nullptr);
Expand Down Expand Up @@ -663,7 +663,7 @@ void ReferenceCounter::FreePlasmaObjects(const std::vector<ObjectID> &object_ids
}
// Free only the plasma value. We must keep the reference around so that we
// have the ownership information.
DeleteObjectPrimaryCopy(it);
OnObjectOutOfScopeOrFreed(it);
}
}

Expand Down Expand Up @@ -700,8 +700,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
DeleteReferenceInternal(inner_it, deleted);
}
}
// Perform the deletion.
DeleteObjectPrimaryCopy(it);
OnObjectOutOfScopeOrFreed(it);
if (deleted) {
deleted->push_back(id);
}
Expand Down Expand Up @@ -764,20 +763,20 @@ int64_t ReferenceCounter::EvictLineage(int64_t min_bytes_to_evict) {
return lineage_bytes_evicted;
}

void ReferenceCounter::DeleteObjectPrimaryCopy(ReferenceTable::iterator it) {
RAY_LOG(DEBUG) << "Calling on_object_primary_copy_delete for object " << it->first
<< " num callbacks: "
<< it->second.on_object_primary_copy_delete_callbacks.size();
for (const auto &callback : it->second.on_object_primary_copy_delete_callbacks) {
void ReferenceCounter::OnObjectOutOfScopeOrFreed(ReferenceTable::iterator it) {
RAY_LOG(DEBUG) << "Calling on_object_out_of_scope_or_freed_callbacks for object "
<< it->first << " num callbacks: "
<< it->second.on_object_out_of_scope_or_freed_callbacks.size();
for (const auto &callback : it->second.on_object_out_of_scope_or_freed_callbacks) {
callback(it->first);
}
it->second.on_object_primary_copy_delete_callbacks.clear();
it->second.on_object_out_of_scope_or_freed_callbacks.clear();
UnsetObjectPrimaryCopy(it);
}

void ReferenceCounter::UnsetObjectPrimaryCopy(ReferenceTable::iterator it) {
it->second.pinned_at_raylet_id.reset();
if (it->second.spilled && !it->second.spilled_node_id.IsNil()) {
// The spilled copy of the object should get deleted during the
// on_object_primary_copy_delete callback, so reset the spill location metadata here.
// NOTE(swang): Spilled copies in cloud storage are not GCed, so we do not
// reset the spilled metadata.
it->second.spilled = false;
it->second.spilled_url = "";
it->second.spilled_node_id = NodeID::Nil();
Expand All @@ -795,7 +794,7 @@ bool ReferenceCounter::SetObjectRefDeletedCallback(
return true;
}

bool ReferenceCounter::AddObjectPrimaryCopyDeleteCallback(
bool ReferenceCounter::AddObjectOutOfScopeOrFreedCallback(
const ObjectID &object_id, const std::function<void(const ObjectID &)> callback) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand All @@ -812,7 +811,7 @@ bool ReferenceCounter::AddObjectPrimaryCopyDeleteCallback(
return false;
}

it->second.on_object_primary_copy_delete_callbacks.emplace_back(callback);
it->second.on_object_out_of_scope_or_freed_callbacks.emplace_back(callback);
return true;
}

Expand All @@ -822,7 +821,7 @@ void ReferenceCounter::ResetObjectsOnRemovedNode(const NodeID &raylet_id) {
const auto &object_id = it->first;
if (it->second.pinned_at_raylet_id.value_or(NodeID::Nil()) == raylet_id ||
it->second.spilled_node_id == raylet_id) {
DeleteObjectPrimaryCopy(it);
UnsetObjectPrimaryCopy(it);
if (!it->second.OutOfScope(lineage_pinning_enabled_)) {
objects_to_recover_.push_back(object_id);
}
Expand Down Expand Up @@ -862,7 +861,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
if (check_node_alive_(raylet_id)) {
it->second.pinned_at_raylet_id = raylet_id;
} else {
DeleteObjectPrimaryCopy(it);
UnsetObjectPrimaryCopy(it);
objects_to_recover_.push_back(object_id);
}
}
Expand Down Expand Up @@ -1429,7 +1428,7 @@ bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id,
} else {
RAY_LOG(DEBUG).WithField(spilled_node_id).WithField(object_id)
<< "Object spilled to dead node ";
DeleteObjectPrimaryCopy(it);
UnsetObjectPrimaryCopy(it);
objects_to_recover_.push_back(object_id);
}
return true;
Expand Down
19 changes: 11 additions & 8 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReferenceCounterInterface {
bool is_reconstructable,
bool add_local_ref,
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>()) = 0;
virtual bool AddObjectPrimaryCopyDeleteCallback(
virtual bool AddObjectOutOfScopeOrFreedCallback(
const ObjectID &object_id,
const std::function<void(const ObjectID &)> callback) = 0;
virtual bool SetObjectRefDeletedCallback(
Expand Down Expand Up @@ -320,7 +320,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Adds the callback that will be run when the object goes out of scope
/// (Reference.OutOfScope() returns true).
/// Returns true if the object was in scope and the callback was added, else false.
bool AddObjectPrimaryCopyDeleteCallback(
bool AddObjectOutOfScopeOrFreedCallback(
const ObjectID &object_id, const std::function<void(const ObjectID &)> callback)
ABSL_LOCKS_EXCLUDED(mutex_);

Expand Down Expand Up @@ -783,13 +783,13 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Metadata related to borrowing.
std::unique_ptr<BorrowInfo> borrow_info;

/// Callback that will be called when this Object's primary copy
/// should be deleted: out of scope or internal_api.free
/// Callback that will be called when this object
/// is out of scope or manually freed.
/// Note: when an object is out of scope, it can still
/// have lineage ref count and on_object_ref_delete
/// will be called when lineage ref count is also 0.
std::vector<std::function<void(const ObjectID &)>>
on_object_primary_copy_delete_callbacks;
on_object_out_of_scope_or_freed_callbacks;
/// Callback that will be called when the object ref is deleted
/// from the reference table (all refs including lineage ref count go to 0).
std::function<void(const ObjectID &)> on_object_ref_delete;
Expand Down Expand Up @@ -847,9 +847,12 @@ class ReferenceCounter : public ReferenceCounterInterface,
rpc::Address *owner_address = nullptr) const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

/// Delete the object primary copy, if any. Also unsets the raylet address
/// that the object was pinned at, if the address was set.
void DeleteObjectPrimaryCopy(ReferenceTable::iterator it);
/// Unsets the raylet address
/// that the object was pinned at or spilled at, if the address was set.
void UnsetObjectPrimaryCopy(ReferenceTable::iterator it);

/// This should be called whenever the object is out of scope or manually freed.
void OnObjectOutOfScopeOrFreed(ReferenceTable::iterator it);

/// Shutdown if all references have gone out of scope and shutdown
/// is scheduled.
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/test/actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class ActorManagerTest : public ::testing::Test {
ray_namespace,
-1,
false);
EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _))
EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _))
.WillRepeatedly(testing::Return(true));
actor_manager_->AddNewActorHandle(std::move(actor_handle),
call_site,
Expand Down Expand Up @@ -207,7 +207,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
"",
-1,
false);
EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _))
EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _))
.WillRepeatedly(testing::Return(true));

// Add an actor handle.
Expand Down Expand Up @@ -284,7 +284,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) {
"",
-1,
false);
EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _))
EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _))
.WillRepeatedly(testing::Return(true));
ObjectID outer_object_id = ObjectID::Nil();

Expand Down
28 changes: 14 additions & 14 deletions src/ray/core_worker/test/reference_count_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,19 +572,19 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) {

// The object goes out of scope once it has no more refs.
std::vector<ObjectID> out;
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
ASSERT_FALSE(*out_of_scope);
rc->RemoveLocalReference(id, &out);
ASSERT_TRUE(*out_of_scope);

// Unreconstructable objects go out of scope even if they have a nonzero
// lineage ref count.
*out_of_scope = false;
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->UpdateSubmittedTaskReferences({}, {id});
ASSERT_FALSE(*out_of_scope);
rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out);
Expand Down Expand Up @@ -2437,9 +2437,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)

// The object goes out of scope once it has no more refs.
std::vector<ObjectID> out;
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
ASSERT_FALSE(*out_of_scope);
ASSERT_FALSE(*out_of_scope);
rc->RemoveLocalReference(id, &out);
Expand All @@ -2450,9 +2450,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)
// Unreconstructable objects stay in scope if they have a nonzero lineage ref
// count.
*out_of_scope = false;
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->UpdateSubmittedTaskReferences({return_id}, {id});
ASSERT_TRUE(rc->IsObjectPendingCreation(return_id));
ASSERT_FALSE(*out_of_scope);
Expand Down Expand Up @@ -2541,7 +2541,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out);
// We should fail to set the deletion callback because the object has
// already gone out of scope.
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(
id, [&](const ObjectID &object_id) { ASSERT_FALSE(true); }));

ASSERT_EQ(out.size(), 1);
Expand Down Expand Up @@ -2658,7 +2658,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ObjectID id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
Expand All @@ -2674,7 +2674,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
deleted->clear();

rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->UpdateObjectPinnedAtRaylet(id, node_id);
rc->ResetObjectsOnRemovedNode(node_id);
auto objects = rc->FlushObjectsToRecover();
Expand All @@ -2683,7 +2683,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
ASSERT_TRUE(deleted->count(id) > 0);
ASSERT_TRUE(deleted->empty());
deleted->clear();
}

Expand All @@ -2699,7 +2699,7 @@ TEST_F(ReferenceCountTest, TestFree) {
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
rc->FreePlasmaObjects({id});
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
ASSERT_EQ(deleted->count(id), 0);
rc->UpdateObjectPinnedAtRaylet(id, node_id);
bool owned_by_us;
Expand All @@ -2714,7 +2714,7 @@ TEST_F(ReferenceCountTest, TestFree) {

// Test free after receiving information about where the object is pinned.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true);
ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback));
ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback));
rc->UpdateObjectPinnedAtRaylet(id, node_id);
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
rc->FreePlasmaObjects({id});
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void ActorTaskSubmitter::NotifyGCSWhenActorOutOfScope(
}));
};

if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback(
if (!reference_counter_->AddObjectOutOfScopeOrFreedCallback(
actor_creation_return_id,
[actor_out_of_scope_callback](const ObjectID &object_id) {
actor_out_of_scope_callback(object_id);
Expand Down

0 comments on commit 8dbc123

Please sign in to comment.