Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use max_inflight_executions for buffered channels on same node #49755

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def __init__(
# We conservatively set num_shm_buffers to _max_inflight_executions.
# It means that the DAG can be underutilized, but it guarantees there's
# no false positive timeouts.
num_shm_buffers=1,
num_shm_buffers=self._max_inflight_executions,
)
if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0:
raise ValueError(
Expand Down
1 change: 0 additions & 1 deletion python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2408,7 +2408,6 @@ def test_driver_and_intraprocess_read(ray_start_cluster):
assert ray.get(dag.execute(1)) == [1, 2]


@pytest.mark.skip("Currently buffer size is set to 1 because of regression.")
@pytest.mark.parametrize("temporary_change_timeout", [1], indirect=True)
def test_buffered_inputs(shutdown_only, temporary_change_timeout):
ray.init()
Expand Down
4 changes: 4 additions & 0 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,10 @@ def test_torch_tensor_nccl_all_reduce_wrong_shape(ray_start_regular):
# exception, such as when the task returns torch.Tensors of the wrong
# shape or dtype. Check that we can no longer submit to the DAG.
ref = compiled_dag.execute([((20,), dtype, 1) for _ in workers])
try:
ray.get(ref)
except Exception:
pass
with pytest.raises(RayChannelError):
ref = compiled_dag.execute([((20,), dtype, 1) for _ in workers])

Expand Down
43 changes: 29 additions & 14 deletions python/ray/experimental/channel/shared_memory_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
# entry/init points.
logger = logging.getLogger(__name__)

DEFAULT_MAX_BUFFER_SIZE = int(1e6) # 100 mB
# The min buffer size must be large enough to at least fit an instance of the
# _ResizeChannel class along with any metadata.
MIN_BUFFER_SIZE = int(1000) # 1000 bytes
# For shared memory channels, the default number of buffers per channel to
# allocate.
DEFAULT_NUM_SHM_BUFFERS = 1


def _create_channel_ref(
self,
Expand Down Expand Up @@ -112,11 +104,16 @@ def __init__(
num_shm_buffers: The number of shared memory buffer per channel.
"""
super().__init__()

from ray.dag import DAGContext

ctx = DAGContext.get_current()

if buffer_size_bytes is None:
buffer_size_bytes = DEFAULT_MAX_BUFFER_SIZE
buffer_size_bytes = ctx.buffer_size_bytes
self.buffer_size_bytes = buffer_size_bytes
if num_shm_buffers is None:
num_shm_buffers = DEFAULT_NUM_SHM_BUFFERS
num_shm_buffers = ctx.max_inflight_executions
self._num_shm_buffers = num_shm_buffers

def create_channel(
Expand Down Expand Up @@ -192,6 +189,9 @@ def __init__(
elif isinstance(typ, int):
typ = SharedMemoryType(buffer_size_bytes=typ)

# The min buffer size must be large enough to at least fit an instance of the
# _ResizeChannel class along with any metadata.
MIN_BUFFER_SIZE = int(1000) # 1000 bytes
if typ.buffer_size_bytes < MIN_BUFFER_SIZE:
raise ValueError(
"typ.buffer_size_bytes must be at least MIN_BUFFER_SIZE "
Expand Down Expand Up @@ -699,14 +699,29 @@ def __init__(
actor_id = self._get_actor_id(self._writer)
self._channel_dict[actor_id] = local_channel
# There are some remote readers which are not the same Ray actor as the writer.
# Create a shared memory channel for the writer and the remote readers.
if len(remote_reader_and_node_list) != 0:
# We create a BufferedSharedMemoryChannel for readers on the same node, and
# a single Channel for readers on different nodes due to
# https://github.com/ray-project/ray/issues/49044
(
readers_same_node,
readers_different_node,
) = utils.split_readers_by_node_locality(
utils.get_writer_node(self._writer), remote_reader_and_node_list
)

if len(readers_same_node) != 0:
remote_channel = BufferedSharedMemoryChannel(
self._writer, remote_reader_and_node_list, num_shm_buffers
self._writer, readers_same_node, num_shm_buffers
)
self._channels.add(remote_channel)
for reader, _ in readers_same_node:
actor_id = self._get_actor_id(reader)
self._channel_dict[actor_id] = remote_channel

for reader, _ in remote_reader_and_node_list:
if len(readers_different_node) != 0:
remote_channel = Channel(self._writer, readers_different_node)
self._channels.add(remote_channel)
for reader, _ in readers_different_node:
actor_id = self._get_actor_id(reader)
self._channel_dict[actor_id] = remote_channel

Expand Down
48 changes: 48 additions & 0 deletions python/ray/experimental/channel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,51 @@ def split_readers_by_locality(
local_readers.append((reader, node))

return remote_readers, local_readers


def split_readers_by_node_locality(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a utils method, let's try to make it general. "readers" are not relevant here, the function just splits actors based on locality.

writer_node: str,
reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
) -> Tuple[
List[Tuple["ray.actor.ActorHandle", str]], List[Tuple["ray.actor.ActorHandle", str]]
]:
"""Split readers into remote and local readers based on writer.

Args:
writer_node: The node of the writer
reader_and_node_list: List of (reader, node) tuples

Returns:
Tuple containing:
- List of (reader, node) tuples for readers on the same node
- List of (reader, node) tuples for readers on a different node
"""
readers_on_same_node = []
readers_on_different_node = []

for reader, node in reader_and_node_list:
if node == writer_node:
readers_on_same_node.append((reader, node))
else:
readers_on_different_node.append((reader, node))

return readers_on_same_node, readers_on_different_node


def get_writer_node(writer: "ray.actor.ActorHandle"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call it get_node?

return type annotation

"""Get the node of the writer.

Args:
writer: The actor handle of the writer

Returns:
The node of the writer
"""
if writer is None or writer == ray.get_runtime_context().current_actor:
return ray.get_runtime_context().get_node_id()
else:
return ray.get(
writer.__ray_call__.remote(
lambda self: ray.get_runtime_context().get_node_id()
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def exec_ray_dag(
use_cgraph=True,
static_shape=False,
direct_return=False,
num_times_execute=1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just num_executions?

):
# Test torch.Tensor sent between actors.
with InputNode() as inp:
Expand All @@ -140,11 +141,23 @@ def exec_ray_dag(

if use_cgraph:
dag = dag.experimental_compile()
if num_times_execute == 1:

def _run():
ref = dag.execute(b"x")
result = ray.get(ref)
assert result == b"x"
def _run():
ref = dag.execute(b"x")
result = ray.get(ref)
assert result == b"x"

else:

def _run():
results = []
for i in range(num_times_execute):
if i % 2 == 0:
results.append(dag.execute(b"x"))
else:
results.append(dag.execute(b"yyyyyyy"))
Comment on lines +156 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this code? Add a comment

ray.get(results)

else:

Expand Down Expand Up @@ -297,6 +310,14 @@ def exec_ray_dag_cpu(sender_hint, receiver_hint):
return exec_ray_dag("exec_ray_dag_cpu", sender, receiver)


def exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint):
sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote()
receiver = TorchTensorWorker.options(scheduling_strategy=receiver_hint).remote()
return exec_ray_dag(
"exec_ray_dag_cpu_ten_times", sender, receiver, num_times_execute=10
)


def exec_ray_core_cpu(sender_hint, receiver_hint):
time.sleep(1)
sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote()
Expand Down Expand Up @@ -408,6 +429,7 @@ def main(distributed):

results += exec_ray_core_cpu(sender_hint, receiver_hint)
results += exec_ray_dag_cpu(sender_hint, receiver_hint)
results += exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint)
results += exec_ray_core_gpu(sender_hint, receiver_hint)
results += exec_ray_dag_gpu_cpu_gpu(sender_hint, receiver_hint)
results += exec_ray_dag_gpu_nccl(
Expand Down
Loading