diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 1de681e24821..71e4ced9e448 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -796,7 +796,7 @@ def __init__( # We conservatively set num_shm_buffers to _max_inflight_executions. # It means that the DAG can be underutilized, but it guarantees there's # no false positive timeouts. - num_shm_buffers=1, + num_shm_buffers=self._max_inflight_executions, ) if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0: raise ValueError( diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index f900b9c54aab..8a80af1cfb4f 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -2408,7 +2408,6 @@ def test_driver_and_intraprocess_read(ray_start_cluster): assert ray.get(dag.execute(1)) == [1, 2] -@pytest.mark.skip("Currently buffer size is set to 1 because of regression.") @pytest.mark.parametrize("temporary_change_timeout", [1], indirect=True) def test_buffered_inputs(shutdown_only, temporary_change_timeout): ray.init() diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 871906fc1f1a..5fb188b83090 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -1125,6 +1125,10 @@ def test_torch_tensor_nccl_all_reduce_wrong_shape(ray_start_regular): # exception, such as when the task returns torch.Tensors of the wrong # shape or dtype. Check that we can no longer submit to the DAG. ref = compiled_dag.execute([((20,), dtype, 1) for _ in workers]) + try: + ray.get(ref) + except Exception: + pass with pytest.raises(RayChannelError): ref = compiled_dag.execute([((20,), dtype, 1) for _ in workers]) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 661437fae3cd..54049fe0eb29 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -18,14 +18,6 @@ # entry/init points. logger = logging.getLogger(__name__) -DEFAULT_MAX_BUFFER_SIZE = int(1e6) # 100 mB -# The min buffer size must be large enough to at least fit an instance of the -# _ResizeChannel class along with any metadata. -MIN_BUFFER_SIZE = int(1000) # 1000 bytes -# For shared memory channels, the default number of buffers per channel to -# allocate. -DEFAULT_NUM_SHM_BUFFERS = 1 - def _create_channel_ref( self, @@ -112,11 +104,16 @@ def __init__( num_shm_buffers: The number of shared memory buffer per channel. """ super().__init__() + + from ray.dag import DAGContext + + ctx = DAGContext.get_current() + if buffer_size_bytes is None: - buffer_size_bytes = DEFAULT_MAX_BUFFER_SIZE + buffer_size_bytes = ctx.buffer_size_bytes self.buffer_size_bytes = buffer_size_bytes if num_shm_buffers is None: - num_shm_buffers = DEFAULT_NUM_SHM_BUFFERS + num_shm_buffers = ctx.max_inflight_executions self._num_shm_buffers = num_shm_buffers def create_channel( @@ -192,6 +189,9 @@ def __init__( elif isinstance(typ, int): typ = SharedMemoryType(buffer_size_bytes=typ) + # The min buffer size must be large enough to at least fit an instance of the + # _ResizeChannel class along with any metadata. + MIN_BUFFER_SIZE = int(1000) # 1000 bytes if typ.buffer_size_bytes < MIN_BUFFER_SIZE: raise ValueError( "typ.buffer_size_bytes must be at least MIN_BUFFER_SIZE " @@ -699,14 +699,29 @@ def __init__( actor_id = self._get_actor_id(self._writer) self._channel_dict[actor_id] = local_channel # There are some remote readers which are not the same Ray actor as the writer. - # Create a shared memory channel for the writer and the remote readers. - if len(remote_reader_and_node_list) != 0: + # We create a BufferedSharedMemoryChannel for readers on the same node, and + # a single Channel for readers on different nodes due to + # https://github.com/ray-project/ray/issues/49044 + ( + readers_same_node, + readers_different_node, + ) = utils.split_readers_by_node_locality( + utils.get_actor_node(self._writer), remote_reader_and_node_list + ) + + if len(readers_same_node) != 0: remote_channel = BufferedSharedMemoryChannel( - self._writer, remote_reader_and_node_list, num_shm_buffers + self._writer, readers_same_node, num_shm_buffers ) self._channels.add(remote_channel) + for reader, _ in readers_same_node: + actor_id = self._get_actor_id(reader) + self._channel_dict[actor_id] = remote_channel - for reader, _ in remote_reader_and_node_list: + if len(readers_different_node) != 0: + remote_channel = Channel(self._writer, readers_different_node) + self._channels.add(remote_channel) + for reader, _ in readers_different_node: actor_id = self._get_actor_id(reader) self._channel_dict[actor_id] = remote_channel diff --git a/python/ray/experimental/channel/utils.py b/python/ray/experimental/channel/utils.py index 88560b3bc1c8..bb832cddfd84 100644 --- a/python/ray/experimental/channel/utils.py +++ b/python/ray/experimental/channel/utils.py @@ -41,3 +41,51 @@ def split_readers_by_locality( local_readers.append((reader, node)) return remote_readers, local_readers + + +def split_readers_by_node_locality( + writer_node: str, + reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], +) -> Tuple[ + List[Tuple["ray.actor.ActorHandle", str]], List[Tuple["ray.actor.ActorHandle", str]] +]: + """Split readers into remote and local readers based on writer. + + Args: + writer_node: The node of the writer + reader_and_node_list: List of (reader, node) tuples + + Returns: + Tuple containing: + - List of (reader, node) tuples for readers on the same node + - List of (reader, node) tuples for readers on a different node + """ + readers_on_same_node = [] + readers_on_different_node = [] + + for reader, node in reader_and_node_list: + if node == writer_node: + readers_on_same_node.append((reader, node)) + else: + readers_on_different_node.append((reader, node)) + + return readers_on_same_node, readers_on_different_node + + +def get_actor_node(actor: Optional["ray.actor.ActorHandle"]) -> str: + """Get the node of the actor. + + Args: + actor: The actor handle of the actor + + Returns: + The node of the actor + """ + if actor is None or actor == ray.get_runtime_context().current_actor: + return ray.get_runtime_context().get_node_id() + else: + return ray.get( + actor.__ray_call__.remote( + lambda self: ray.get_runtime_context().get_node_id() + ) + ) diff --git a/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py b/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py index a5b1d432f219..26622b1006ac 100644 --- a/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py +++ b/release/microbenchmark/experimental/accelerated_dag_gpu_microbenchmark.py @@ -122,6 +122,7 @@ def exec_ray_dag( use_cgraph=True, static_shape=False, direct_return=False, + num_executions=1, ): # Test torch.Tensor sent between actors. with InputNode() as inp: @@ -140,11 +141,20 @@ def exec_ray_dag( if use_cgraph: dag = dag.experimental_compile() + if num_executions == 5: - def _run(): - ref = dag.execute(b"x") - result = ray.get(ref) - assert result == b"x" + def _run(): + ref = dag.execute(b"x") + result = ray.get(ref) + assert result == b"x" + + else: + + def _run(): + results = [] + for i in range(num_executions): + results.append(dag.execute(b"x")) + ray.get(results) else: @@ -297,6 +307,14 @@ def exec_ray_dag_cpu(sender_hint, receiver_hint): return exec_ray_dag("exec_ray_dag_cpu", sender, receiver) +def exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint): + sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote() + receiver = TorchTensorWorker.options(scheduling_strategy=receiver_hint).remote() + return exec_ray_dag( + "exec_ray_dag_cpu_ten_times", sender, receiver, num_executions=5 + ) + + def exec_ray_core_cpu(sender_hint, receiver_hint): time.sleep(1) sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote() @@ -408,6 +426,7 @@ def main(distributed): results += exec_ray_core_cpu(sender_hint, receiver_hint) results += exec_ray_dag_cpu(sender_hint, receiver_hint) + results += exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint) results += exec_ray_core_gpu(sender_hint, receiver_hint) results += exec_ray_dag_gpu_cpu_gpu(sender_hint, receiver_hint) results += exec_ray_dag_gpu_nccl(