Skip to content

Commit

Permalink
Make pull requests even if we got num_objects from memory store on wait
Browse files Browse the repository at this point in the history
Signed-off-by: dayshah <[email protected]>
  • Loading branch information
dayshah committed Jan 29, 2025
1 parent 8efae5f commit ebc96c1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,10 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids,
if (fetch_local) {
RetryObjectInPlasmaErrors(
memory_store_, worker_context_, memory_object_ids, plasma_object_ids, ready);
if (static_cast<int>(ready.size()) < num_objects && !plasma_object_ids.empty()) {
// We make the request to the plasma store even if we have num_objects ready since we
// want to at least make the request to pull these objects if the user specified
// fetch_local so the pulling can start.
if (!plasma_object_ids.empty()) {
RAY_RETURN_NOT_OK(plasma_store_provider_->Wait(
plasma_object_ids,
std::min(static_cast<int>(plasma_object_ids.size()),
Expand Down
26 changes: 26 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,32 @@ void NodeManager::ProcessWaitRequestMessage(
current_task_id,
/*ray_get=*/false);
}
if (message->num_ready_objects() == 0) {
// If we don't need to wait for any, return immediately after making the pull
// requests through AsyncResolveObjects above.
flatbuffers::FlatBufferBuilder fbb;
std::vector<ObjectID> ready;
std::vector<ObjectID> remaining;
auto wait_reply = protocol::CreateWaitReply(
fbb, to_flatbuf(fbb, ready), to_flatbuf(fbb, remaining));
fbb.Finish(wait_reply);
const auto status =
client->WriteMessage(static_cast<int64_t>(protocol::MessageType::WaitReply),
fbb.GetSize(),
fbb.GetBufferPointer());
if (status.ok()) {
if (resolve_objects) {
AsyncResolveObjectsFinish(client, current_task_id);
}
} else {
// We failed to write to the client, so disconnect the client.
std::ostringstream stream;
stream << "Failed to write WaitReply to the client. Status " << status
<< ", message: " << status.message();
DisconnectClient(client, rpc::WorkerExitType::SYSTEM_ERROR, stream.str());
}
return;
}
uint64_t num_required_objects = static_cast<uint64_t>(message->num_ready_objects());
wait_manager_.Wait(
object_ids,
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/wait_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ void WaitManager::Wait(const std::vector<ObjectID> &object_ids,
<< "Waiting duplicate objects is not allowed. Please make sure all object IDs are "
"unique before calling `WaitManager::Wait`.";
RAY_CHECK(timeout_ms >= 0 || timeout_ms == -1);
RAY_CHECK_NE(num_required_objects, 0u);
RAY_CHECK_LE(num_required_objects, object_ids.size());

const uint64_t wait_id = next_wait_id_++;
Expand Down

0 comments on commit ebc96c1

Please sign in to comment.