diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 779a5bd3295e7..496e29a8dea44 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -5,6 +5,7 @@ import logging import time import warnings +from collections.abc import Sequence from typing import ( TYPE_CHECKING, Any, @@ -764,19 +765,20 @@ def add_column( f"got: {batch_format}" ) + def _raise_duplicate_column_error(col: str): + raise ValueError(f"Trying to add an existing column with name {col!r}") + def add_column(batch: DataBatch) -> DataBatch: column = fn(batch) if batch_format == "pandas": import pandas as pd - assert isinstance(column, pd.Series), ( + assert isinstance(column, (pd.Series, Sequence)), ( f"For pandas batch format, the function must return a pandas " - f"Series, got: {type(column)}" + f"Series or sequence, got: {type(column)}" ) if col in batch: - raise ValueError( - f"Trying to add an existing column with name" f" {col}" - ) + _raise_duplicate_column_error(col) batch.loc[:, col] = column return batch elif batch_format == "pyarrow": @@ -798,9 +800,7 @@ def add_column(batch: DataBatch) -> DataBatch: # Append the column to the table return batch.append_column(col, column) else: - raise ValueError( - f"Trying to add an existing column with name {col}" - ) + _raise_duplicate_column_error(col) else: # batch format is assumed to be numpy since we checked at the @@ -810,9 +810,7 @@ def add_column(batch: DataBatch) -> DataBatch: f"numpy.ndarray, got: {type(column)}" ) if col in batch: - raise ValueError( - f"Trying to add an existing column with name" f" {col}" - ) + _raise_duplicate_column_error(col) batch[col] = column return batch diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index d4e7e2c374de9..41100f4b8a2ce 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -353,7 +353,7 @@ def test_add_column(ray_start_regular_shared): # Adding a column that is already there should result in an error with pytest.raises( ray.exceptions.UserCodeException, - match="Trying to add an existing column with name id", + match="Trying to add an existing column with name 'id'", ): ds = ray.data.range(5).add_column( "id", lambda x: pc.add(x["id"], 1), batch_format="pyarrow" @@ -362,7 +362,7 @@ def test_add_column(ray_start_regular_shared): # Adding a column in the wrong format should result in an error with pytest.raises( - ray.exceptions.UserCodeException, match="For pyarrow batch " "format" + ray.exceptions.UserCodeException, match="For pyarrow batch format" ): ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="pyarrow") assert ds.take(2) == [{"id": 1}, {"id": 2}] @@ -381,7 +381,7 @@ def test_add_column(ray_start_regular_shared): # Adding a column that is already there should result in an error with pytest.raises( ray.exceptions.UserCodeException, - match="Trying to add an existing column with name id", + match="Trying to add an existing column with name 'id'", ): ds = ray.data.range(5).add_column( "id", lambda x: np.add(x["id"], 1), batch_format="numpy" @@ -390,7 +390,7 @@ def test_add_column(ray_start_regular_shared): # Adding a column in the wrong format should result in an error with pytest.raises( - ray.exceptions.UserCodeException, match="For numpy batch " "format" + ray.exceptions.UserCodeException, match="For numpy batch format" ): ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="numpy") assert ds.take(2) == [{"id": 1}, {"id": 2}] @@ -405,16 +405,18 @@ def test_add_column(ray_start_regular_shared): # Adding a column that is already there should result in an error with pytest.raises( ray.exceptions.UserCodeException, - match="Trying to add an existing column with name id", + match="Trying to add an existing column with name 'id'", ): ds = ray.data.range(5).add_column("id", lambda x: x["id"] + 1) assert ds.take(2) == [{"id": 1}, {"id": 2}] # Adding a column in the wrong format should result in an error with pytest.raises( - ray.exceptions.UserCodeException, match="For pandas batch " "format" + ray.exceptions.UserCodeException, match="For pandas batch format" ): - ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="pandas") + ds = ray.data.range(5).add_column( + "id", lambda x: np.array([1]), batch_format="pandas" + ) assert ds.take(2) == [{"id": 1}, {"id": 2}] with pytest.raises(ValueError): diff --git a/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py b/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py index f440e72752fbb..895d43bdcdaba 100644 --- a/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py +++ b/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py @@ -58,13 +58,16 @@ class TorchTensorWorker: def __init__(self): self.device = torch_utils.get_devices()[0] - def send(self, shape, dtype, value: int): - t = torch.ones(shape, dtype=dtype, device=self.device) * value + def send(self, shape, dtype, _): + t = torch.ones(shape, dtype=dtype, device=self.device) * 1 return t def recv(self, tensor): + # This benchmark tests the overhead of sending a tensor between + # actors. To minimize the overhead of shared memory transfer, + # we return only a byte string. assert tensor.device == self.device - return (tensor[0].item(), tensor.shape, tensor.dtype) + return b"x" @ray.remote(num_gpus=1) @@ -139,17 +142,15 @@ def exec_ray_dag( dag = dag.experimental_compile() def _run(): - i = np.random.randint(100) - ref = dag.execute(i) + ref = dag.execute(b"x") result = ray.get(ref) - assert result == (i, SHAPE, DTYPE) + assert result == b"x" else: def _run(): - i = np.random.randint(100) - result = ray.get(dag.execute(i)) - assert result == (i, SHAPE, DTYPE) + result = ray.get(dag.execute(b"x")) + assert result == b"x" results = timeit(label, _run) diff --git a/src/mock/ray/core_worker/reference_count.h b/src/mock/ray/core_worker/reference_count.h index c0679dec135f5..c9f7a1d0b4151 100644 --- a/src/mock/ray/core_worker/reference_count.h +++ b/src/mock/ray/core_worker/reference_count.h @@ -41,7 +41,7 @@ class MockReferenceCounter : public ReferenceCounterInterface { bool add_local_ref, const absl::optional &pinned_at_raylet_id)); - MOCK_METHOD2(AddObjectPrimaryCopyDeleteCallback, + MOCK_METHOD2(AddObjectOutOfScopeOrFreedCallback, bool(const ObjectID &object_id, const std::function callback)); diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 02a89a7c65c91..a31c402fae11b 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -172,7 +172,7 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, } if (inserted && owned) { - RAY_CHECK(reference_counter_->AddObjectPrimaryCopyDeleteCallback( + RAY_CHECK(reference_counter_->AddObjectOutOfScopeOrFreedCallback( actor_creation_return_id, [this, actor_id](const ObjectID &object_id) { MarkActorKilledOrOutOfScope(GetActorHandle(actor_id)); })); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5dcd799b8936a..e04ade96a0fe7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3860,7 +3860,7 @@ void CoreWorker::ProcessSubscribeForObjectEviction( // Returns true if the object was present and the callback was added. It might have // already been evicted by the time we get this request, in which case we should // respond immediately so the raylet unpins the object. - if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback(object_id, unpin_object)) { + if (!reference_counter_->AddObjectOutOfScopeOrFreedCallback(object_id, unpin_object)) { // If the object is already evicted (callback cannot be set), unregister the // subscription & publish the message so that the subscriber knows it. unpin_object(object_id); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index eb5abe2ea3872..6dd4c8bf6b7de 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -543,7 +543,7 @@ int64_t ReferenceCounter::ReleaseLineageReferences(ReferenceTable::iterator ref) RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id; arg_it->second.lineage_ref_count--; if (arg_it->second.OutOfScope(lineage_pinning_enabled_)) { - DeleteObjectPrimaryCopy(arg_it); + OnObjectOutOfScopeOrFreed(arg_it); } if (arg_it->second.ShouldDelete(lineage_pinning_enabled_)) { RAY_CHECK(arg_it->second.on_ref_removed == nullptr); @@ -663,7 +663,7 @@ void ReferenceCounter::FreePlasmaObjects(const std::vector &object_ids } // Free only the plasma value. We must keep the reference around so that we // have the ownership information. - DeleteObjectPrimaryCopy(it); + OnObjectOutOfScopeOrFreed(it); } } @@ -700,8 +700,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, DeleteReferenceInternal(inner_it, deleted); } } - // Perform the deletion. - DeleteObjectPrimaryCopy(it); + OnObjectOutOfScopeOrFreed(it); if (deleted) { deleted->push_back(id); } @@ -764,20 +763,20 @@ int64_t ReferenceCounter::EvictLineage(int64_t min_bytes_to_evict) { return lineage_bytes_evicted; } -void ReferenceCounter::DeleteObjectPrimaryCopy(ReferenceTable::iterator it) { - RAY_LOG(DEBUG) << "Calling on_object_primary_copy_delete for object " << it->first - << " num callbacks: " - << it->second.on_object_primary_copy_delete_callbacks.size(); - for (const auto &callback : it->second.on_object_primary_copy_delete_callbacks) { +void ReferenceCounter::OnObjectOutOfScopeOrFreed(ReferenceTable::iterator it) { + RAY_LOG(DEBUG) << "Calling on_object_out_of_scope_or_freed_callbacks for object " + << it->first << " num callbacks: " + << it->second.on_object_out_of_scope_or_freed_callbacks.size(); + for (const auto &callback : it->second.on_object_out_of_scope_or_freed_callbacks) { callback(it->first); } - it->second.on_object_primary_copy_delete_callbacks.clear(); + it->second.on_object_out_of_scope_or_freed_callbacks.clear(); + UnsetObjectPrimaryCopy(it); +} + +void ReferenceCounter::UnsetObjectPrimaryCopy(ReferenceTable::iterator it) { it->second.pinned_at_raylet_id.reset(); if (it->second.spilled && !it->second.spilled_node_id.IsNil()) { - // The spilled copy of the object should get deleted during the - // on_object_primary_copy_delete callback, so reset the spill location metadata here. - // NOTE(swang): Spilled copies in cloud storage are not GCed, so we do not - // reset the spilled metadata. it->second.spilled = false; it->second.spilled_url = ""; it->second.spilled_node_id = NodeID::Nil(); @@ -795,7 +794,7 @@ bool ReferenceCounter::SetObjectRefDeletedCallback( return true; } -bool ReferenceCounter::AddObjectPrimaryCopyDeleteCallback( +bool ReferenceCounter::AddObjectOutOfScopeOrFreedCallback( const ObjectID &object_id, const std::function callback) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); @@ -812,7 +811,7 @@ bool ReferenceCounter::AddObjectPrimaryCopyDeleteCallback( return false; } - it->second.on_object_primary_copy_delete_callbacks.emplace_back(callback); + it->second.on_object_out_of_scope_or_freed_callbacks.emplace_back(callback); return true; } @@ -822,7 +821,7 @@ void ReferenceCounter::ResetObjectsOnRemovedNode(const NodeID &raylet_id) { const auto &object_id = it->first; if (it->second.pinned_at_raylet_id.value_or(NodeID::Nil()) == raylet_id || it->second.spilled_node_id == raylet_id) { - DeleteObjectPrimaryCopy(it); + UnsetObjectPrimaryCopy(it); if (!it->second.OutOfScope(lineage_pinning_enabled_)) { objects_to_recover_.push_back(object_id); } @@ -862,7 +861,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, if (check_node_alive_(raylet_id)) { it->second.pinned_at_raylet_id = raylet_id; } else { - DeleteObjectPrimaryCopy(it); + UnsetObjectPrimaryCopy(it); objects_to_recover_.push_back(object_id); } } @@ -1429,7 +1428,7 @@ bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id, } else { RAY_LOG(DEBUG).WithField(spilled_node_id).WithField(object_id) << "Object spilled to dead node "; - DeleteObjectPrimaryCopy(it); + UnsetObjectPrimaryCopy(it); objects_to_recover_.push_back(object_id); } return true; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 4ef6e14a00167..5eb228301c34b 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -49,7 +49,7 @@ class ReferenceCounterInterface { bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id = absl::optional()) = 0; - virtual bool AddObjectPrimaryCopyDeleteCallback( + virtual bool AddObjectOutOfScopeOrFreedCallback( const ObjectID &object_id, const std::function callback) = 0; virtual bool SetObjectRefDeletedCallback( @@ -320,7 +320,7 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Adds the callback that will be run when the object goes out of scope /// (Reference.OutOfScope() returns true). /// Returns true if the object was in scope and the callback was added, else false. - bool AddObjectPrimaryCopyDeleteCallback( + bool AddObjectOutOfScopeOrFreedCallback( const ObjectID &object_id, const std::function callback) ABSL_LOCKS_EXCLUDED(mutex_); @@ -783,13 +783,13 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Metadata related to borrowing. std::unique_ptr borrow_info; - /// Callback that will be called when this Object's primary copy - /// should be deleted: out of scope or internal_api.free + /// Callback that will be called when this object + /// is out of scope or manually freed. /// Note: when an object is out of scope, it can still /// have lineage ref count and on_object_ref_delete /// will be called when lineage ref count is also 0. std::vector> - on_object_primary_copy_delete_callbacks; + on_object_out_of_scope_or_freed_callbacks; /// Callback that will be called when the object ref is deleted /// from the reference table (all refs including lineage ref count go to 0). std::function on_object_ref_delete; @@ -847,9 +847,12 @@ class ReferenceCounter : public ReferenceCounterInterface, rpc::Address *owner_address = nullptr) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - /// Delete the object primary copy, if any. Also unsets the raylet address - /// that the object was pinned at, if the address was set. - void DeleteObjectPrimaryCopy(ReferenceTable::iterator it); + /// Unsets the raylet address + /// that the object was pinned at or spilled at, if the address was set. + void UnsetObjectPrimaryCopy(ReferenceTable::iterator it); + + /// This should be called whenever the object is out of scope or manually freed. + void OnObjectOutOfScopeOrFreed(ReferenceTable::iterator it); /// Shutdown if all references have gone out of scope and shutdown /// is scheduled. diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index b5c938c6c0ceb..8f68f28cd8457 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -168,7 +168,7 @@ class ActorManagerTest : public ::testing::Test { ray_namespace, -1, false); - EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _)) .WillRepeatedly(testing::Return(true)); actor_manager_->AddNewActorHandle(std::move(actor_handle), call_site, @@ -207,7 +207,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { "", -1, false); - EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _)) .WillRepeatedly(testing::Return(true)); // Add an actor handle. @@ -284,7 +284,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { "", -1, false); - EXPECT_CALL(*reference_counter_, AddObjectPrimaryCopyDeleteCallback(_, _)) + EXPECT_CALL(*reference_counter_, AddObjectOutOfScopeOrFreedCallback(_, _)) .WillRepeatedly(testing::Return(true)); ObjectID outer_object_id = ObjectID::Nil(); diff --git a/src/ray/core_worker/test/reference_count_test.cc b/src/ray/core_worker/test/reference_count_test.cc index 4351692284e28..67423a3ed75ad 100644 --- a/src/ray/core_worker/test/reference_count_test.cc +++ b/src/ray/core_worker/test/reference_count_test.cc @@ -572,9 +572,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // The object goes out of scope once it has no more refs. std::vector out; - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); ASSERT_TRUE(*out_of_scope); @@ -582,9 +582,9 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // Unreconstructable objects go out of scope even if they have a nonzero // lineage ref count. *out_of_scope = false; - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->UpdateSubmittedTaskReferences({}, {id}); ASSERT_FALSE(*out_of_scope); rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out); @@ -2437,9 +2437,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // The object goes out of scope once it has no more refs. std::vector out; - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/true); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); ASSERT_FALSE(*out_of_scope); ASSERT_FALSE(*out_of_scope); rc->RemoveLocalReference(id, &out); @@ -2450,9 +2450,9 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // Unreconstructable objects stay in scope if they have a nonzero lineage ref // count. *out_of_scope = false; - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->AddOwnedObject(id, {}, address, "", 0, false, /*add_local_ref=*/false); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->UpdateSubmittedTaskReferences({return_id}, {id}); ASSERT_TRUE(rc->IsObjectPendingCreation(return_id)); ASSERT_FALSE(*out_of_scope); @@ -2541,7 +2541,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { rc->UpdateFinishedTaskReferences({}, {id}, false, empty_borrower, empty_refs, &out); // We should fail to set the deletion callback because the object has // already gone out of scope. - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback( + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback( id, [&](const ObjectID &object_id) { ASSERT_FALSE(true); })); ASSERT_EQ(out.size(), 1); @@ -2658,7 +2658,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); @@ -2674,7 +2674,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { deleted->clear(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); rc->ResetObjectsOnRemovedNode(node_id); auto objects = rc->FlushObjectsToRecover(); @@ -2683,7 +2683,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); - ASSERT_TRUE(deleted->count(id) > 0); + ASSERT_TRUE(deleted->empty()); deleted->clear(); } @@ -2699,7 +2699,7 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); - ASSERT_FALSE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_FALSE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); bool owned_by_us; @@ -2714,7 +2714,7 @@ TEST_F(ReferenceCountTest, TestFree) { // Test free after receiving information about where the object is pinned. rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true, /*add_local_ref=*/true); - ASSERT_TRUE(rc->AddObjectPrimaryCopyDeleteCallback(id, callback)); + ASSERT_TRUE(rc->AddObjectOutOfScopeOrFreedCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->FreePlasmaObjects({id}); diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index c54b9deb16ec6..af2600e0a6b83 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -44,7 +44,7 @@ void ActorTaskSubmitter::NotifyGCSWhenActorOutOfScope( })); }; - if (!reference_counter_->AddObjectPrimaryCopyDeleteCallback( + if (!reference_counter_->AddObjectOutOfScopeOrFreedCallback( actor_creation_return_id, [actor_out_of_scope_callback](const ObjectID &object_id) { actor_out_of_scope_callback(object_id);