diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 48a2d12c9f57..71b96955b0cd 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -795,7 +795,7 @@ def __init__( # We conservatively set num_shm_buffers to _max_inflight_executions. # It means that the DAG can be underutilized, but it guarantees there's # no false positive timeouts. - num_shm_buffers=1, + num_shm_buffers=self._max_inflight_executions, ) if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0: raise ValueError( diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index a6046149657b..288b0d9728bd 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -2343,7 +2343,6 @@ def test_driver_and_intraprocess_read(ray_start_cluster): assert ray.get(dag.execute(1)) == [1, 2] -@pytest.mark.skip("Currently buffer size is set to 1 because of regression.") @pytest.mark.parametrize("temporary_change_timeout", [1], indirect=True) def test_buffered_inputs(shutdown_only, temporary_change_timeout): ray.init()