From ebc96c10f7ec17ff5f83791324599a73fc87f6f3 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 28 Jan 2025 16:15:04 -0800 Subject: [PATCH] Make pull requests even if we got num_objects from memory store on wait Signed-off-by: dayshah --- src/ray/core_worker/core_worker.cc | 5 ++++- src/ray/raylet/node_manager.cc | 26 ++++++++++++++++++++++++++ src/ray/raylet/wait_manager.cc | 1 - 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 1a9142efdf706..909851115d6d2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2095,7 +2095,10 @@ Status CoreWorker::Wait(const std::vector &ids, if (fetch_local) { RetryObjectInPlasmaErrors( memory_store_, worker_context_, memory_object_ids, plasma_object_ids, ready); - if (static_cast(ready.size()) < num_objects && !plasma_object_ids.empty()) { + // We make the request to the plasma store even if we have num_objects ready since we + // want to at least make the request to pull these objects if the user specified + // fetch_local so the pulling can start. + if (!plasma_object_ids.empty()) { RAY_RETURN_NOT_OK(plasma_store_provider_->Wait( plasma_object_ids, std::min(static_cast(plasma_object_ids.size()), diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index db1da21f1fcc5..0e0d62b805054 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1759,6 +1759,32 @@ void NodeManager::ProcessWaitRequestMessage( current_task_id, /*ray_get=*/false); } + if (message->num_ready_objects() == 0) { + // If we don't need to wait for any, return immediately after making the pull + // requests through AsyncResolveObjects above. + flatbuffers::FlatBufferBuilder fbb; + std::vector ready; + std::vector remaining; + auto wait_reply = protocol::CreateWaitReply( + fbb, to_flatbuf(fbb, ready), to_flatbuf(fbb, remaining)); + fbb.Finish(wait_reply); + const auto status = + client->WriteMessage(static_cast(protocol::MessageType::WaitReply), + fbb.GetSize(), + fbb.GetBufferPointer()); + if (status.ok()) { + if (resolve_objects) { + AsyncResolveObjectsFinish(client, current_task_id); + } + } else { + // We failed to write to the client, so disconnect the client. + std::ostringstream stream; + stream << "Failed to write WaitReply to the client. Status " << status + << ", message: " << status.message(); + DisconnectClient(client, rpc::WorkerExitType::SYSTEM_ERROR, stream.str()); + } + return; + } uint64_t num_required_objects = static_cast(message->num_ready_objects()); wait_manager_.Wait( object_ids, diff --git a/src/ray/raylet/wait_manager.cc b/src/ray/raylet/wait_manager.cc index a618fdd9c17a4..8512c75d4c6ad 100644 --- a/src/ray/raylet/wait_manager.cc +++ b/src/ray/raylet/wait_manager.cc @@ -28,7 +28,6 @@ void WaitManager::Wait(const std::vector &object_ids, << "Waiting duplicate objects is not allowed. Please make sure all object IDs are " "unique before calling `WaitManager::Wait`."; RAY_CHECK(timeout_ms >= 0 || timeout_ms == -1); - RAY_CHECK_NE(num_required_objects, 0u); RAY_CHECK_LE(num_required_objects, object_ids.size()); const uint64_t wait_id = next_wait_id_++;