Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use max_inflight_executions for buffered channels on same node #49755

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

dayshah
Copy link
Contributor

@dayshah dayshah commented Jan 10, 2025

Why are these changes needed?

We reverted the change to have max_inflight_executions buffered channels due to performance regressions on multi-node benchmarks.

This change uses the buffered channels when they're between actors on the same node, and just uses a single channel when it's between actors on different nodes.

Related issue number

Closes #49044

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@dayshah dayshah force-pushed the dag-buffered-regression branch 2 times, most recently from 4369481 to f78c82c Compare January 16, 2025 18:07
@dayshah
Copy link
Contributor Author

dayshah commented Jan 16, 2025

Gpu multinode Perf Results

exec_nccl_gpu = [3875.943098419206, 110.49947845465303]
  | exec_ray_core_cpu = [253.41809367330097, 1.603573809041073]
  | exec_ray_dag_cpu = [640.6405325322876, 13.631979315646879]
  | exec_ray_core_gpu = [243.22753202598307, 2.2226977250911846]
  | exec_ray_dag_gpu_cpu_gpu = [537.088861706659, 21.354196302156893]
  | exec_ray_dag_gpu_nccl_static_shape_direct_return = [862.695699432426, 14.123155884059045]
  | exec_ray_dag_gpu_nccl_direct_return = [613.4322920596449, 11.416573005329482]
  | exec_ray_dag_gpu_nccl_static_shape = [630.2706356988776, 12.600363037812317]
  | exec_ray_dag_gpu_nccl = [516.0769662325134, 11.152239477211477]

@dayshah
Copy link
Contributor Author

dayshah commented Jan 16, 2025

Second run

exec_nccl_gpu = [3859.74345306353, 138.13849291778993]
  | exec_ray_core_cpu = [254.2746160632538, 1.5401643001590601]
  | exec_ray_dag_cpu = [612.0530759884352, 12.545136969769587]
  | exec_ray_core_gpu = [235.52375122826749, 0.5917435681477302]
  | exec_ray_dag_gpu_cpu_gpu = [554.1660746091273, 15.127669832672883]
  | exec_ray_dag_gpu_nccl_static_shape_direct_return = [838.5866375219834, 6.124313458821903]
  | exec_ray_dag_gpu_nccl_direct_return = [647.4876374452369, 6.81533730461543]
  | exec_ray_dag_gpu_nccl_static_shape = [638.1970395337441, 11.681461849621408]
  | exec_ray_dag_gpu_nccl = [522.1410837508558, 18.40803158948017]

@dayshah dayshah force-pushed the dag-buffered-regression branch from bdb9b53 to 61d0124 Compare January 16, 2025 22:19
@dayshah
Copy link
Contributor Author

dayshah commented Jan 16, 2025

Improvement on single node

exec_torch_cpu_cpu = [12599.322323620976, 71.66839011380934]
  | exec_torch_gpu = [4824.656644835246, 21.00994267826902]
  | exec_torch_gpu_cpu_gpu = [4102.193317742758, 20.43579728014157]
  | exec_nccl_gpu = [4638.221015201911, 20.269008775639335]
  | exec_ray_put_cpu = [909.2526418575451, 21.858821994477612]
  | exec_ray_put_np_zero_copy = [1307.133050070244, 13.039733000247418]
  | exec_ray_put_gpu = [746.8679810648279, 17.48136888131177]
  | exec_ray_core_cpu = [263.361640763236, 1.996324404358528]
  | exec_ray_dag_cpu = [1524.9393645143894, 4.954025435722708]
  | exec_ray_core_gpu = [249.94649478502427, 0.8460789801799877]
  | exec_ray_dag_gpu_cpu_gpu = [1189.7412006249735, 4.106025318048982]
  | exec_ray_dag_gpu_nccl_static_shape_direct_return = [2699.6630433525825, 25.320757034733347]
  | exec_ray_dag_gpu_nccl_direct_return = [1673.2662534220722, 6.346931885102402]
  | exec_ray_dag_gpu_nccl_static_shape = [1452.942878699208, 22.081297376848003]
  | exec_ray_dag_gpu_nccl = [1153.0284537918992, 1.9565441028235748]

@dayshah dayshah force-pushed the dag-buffered-regression branch from 61d0124 to 32ff703 Compare January 19, 2025 23:29
@dayshah dayshah changed the title [core] Revert to max_inflight_executions buffered channels [core] Use max_inflight_executions for number of buffered channels Jan 21, 2025
@dayshah dayshah changed the title [core] Use max_inflight_executions for number of buffered channels [core] Use max_inflight_executions for number of buffered channels on same node Jan 21, 2025
@dayshah dayshah changed the title [core] Use max_inflight_executions for number of buffered channels on same node [core] Use max_inflight_executions for buffered channels on same node Jan 21, 2025
@dayshah
Copy link
Contributor Author

dayshah commented Jan 21, 2025

Multinode performance same as before with change to only create buffered when on same node

exec_nccl_gpu = [3790.8402979063458, 56.6865765961406]
  | exec_ray_core_cpu = [255.19360395823574, 1.711785647599441]
  | exec_ray_dag_cpu = [789.6452658355453, 5.61179682179874]
  | exec_ray_core_gpu = [212.16754485510126, 33.3374050190201]
  | exec_ray_dag_gpu_cpu_gpu = [327.17384678427555, 132.49909907628353]
  | exec_ray_dag_gpu_nccl_static_shape_direct_return = [1030.1235502352447, 35.65424435904805]
  | exec_ray_dag_gpu_nccl_direct_return = [818.6993175341537, 11.856613760733126]
  | exec_ray_dag_gpu_nccl_static_shape = [732.3645206565977, 5.451753950656095]
  | exec_ray_dag_gpu_nccl = [655.485461785894, 5.0137815617670025]

@dayshah dayshah marked this pull request as ready for review January 21, 2025 17:45
Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM

Comment on lines 705 to 715
if (
self._writer is None
or self._writer == ray.get_runtime_context().current_actor
):
writer_node = ray.get_runtime_context().get_node_id()
else:
writer_node = ray.get(
self._writer.__ray_call__.remote(
lambda self: ray.get_runtime_context().get_node_id()
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract this as a utils method? Maybe the larger scope can be extracted, i.e., have something like write_to_remote_node()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, made get_writer_node

Signed-off-by: dayshah <[email protected]>
Signed-off-by: dayshah <[email protected]>
Comment on lines +156 to +159
if i % 2 == 0:
results.append(dag.execute(b"x"))
else:
results.append(dag.execute(b"yyyyyyy"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this code? Add a comment

@@ -122,6 +122,7 @@ def exec_ray_dag(
use_cgraph=True,
static_shape=False,
direct_return=False,
num_times_execute=1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just num_executions?

return readers_on_same_node, readers_on_different_node


def get_writer_node(writer: "ray.actor.ActorHandle"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call it get_node?

return type annotation

@@ -41,3 +41,51 @@ def split_readers_by_locality(
local_readers.append((reader, node))

return remote_readers, local_readers


def split_readers_by_node_locality(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a utils method, let's try to make it general. "readers" are not relevant here, the function just splits actors based on locality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][compiled graphs] Fix and re-enable shared memory channel buffering support
3 participants