-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Use max_inflight_executions for buffered channels on same node #49755
base: master
Are you sure you want to change the base?
Changes from all commits
81fdeb0
32ff703
2e064d1
7c03cd1
61d1d47
3b588c7
826cd31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,3 +41,51 @@ def split_readers_by_locality( | |
local_readers.append((reader, node)) | ||
|
||
return remote_readers, local_readers | ||
|
||
|
||
def split_readers_by_node_locality( | ||
writer_node: str, | ||
reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], | ||
) -> Tuple[ | ||
List[Tuple["ray.actor.ActorHandle", str]], List[Tuple["ray.actor.ActorHandle", str]] | ||
]: | ||
"""Split readers into remote and local readers based on writer. | ||
|
||
Args: | ||
writer_node: The node of the writer | ||
reader_and_node_list: List of (reader, node) tuples | ||
|
||
Returns: | ||
Tuple containing: | ||
- List of (reader, node) tuples for readers on the same node | ||
- List of (reader, node) tuples for readers on a different node | ||
""" | ||
readers_on_same_node = [] | ||
readers_on_different_node = [] | ||
|
||
for reader, node in reader_and_node_list: | ||
if node == writer_node: | ||
readers_on_same_node.append((reader, node)) | ||
else: | ||
readers_on_different_node.append((reader, node)) | ||
|
||
return readers_on_same_node, readers_on_different_node | ||
|
||
|
||
def get_writer_node(writer: "ray.actor.ActorHandle"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just call it return type annotation |
||
"""Get the node of the writer. | ||
|
||
Args: | ||
writer: The actor handle of the writer | ||
|
||
Returns: | ||
The node of the writer | ||
""" | ||
if writer is None or writer == ray.get_runtime_context().current_actor: | ||
return ray.get_runtime_context().get_node_id() | ||
else: | ||
return ray.get( | ||
writer.__ray_call__.remote( | ||
lambda self: ray.get_runtime_context().get_node_id() | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,6 +122,7 @@ def exec_ray_dag( | |
use_cgraph=True, | ||
static_shape=False, | ||
direct_return=False, | ||
num_times_execute=1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just |
||
): | ||
# Test torch.Tensor sent between actors. | ||
with InputNode() as inp: | ||
|
@@ -140,11 +141,23 @@ def exec_ray_dag( | |
|
||
if use_cgraph: | ||
dag = dag.experimental_compile() | ||
if num_times_execute == 1: | ||
|
||
def _run(): | ||
ref = dag.execute(b"x") | ||
result = ray.get(ref) | ||
assert result == b"x" | ||
def _run(): | ||
ref = dag.execute(b"x") | ||
result = ray.get(ref) | ||
assert result == b"x" | ||
|
||
else: | ||
|
||
def _run(): | ||
results = [] | ||
for i in range(num_times_execute): | ||
if i % 2 == 0: | ||
results.append(dag.execute(b"x")) | ||
else: | ||
results.append(dag.execute(b"yyyyyyy")) | ||
Comment on lines
+156
to
+159
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this code? Add a comment |
||
ray.get(results) | ||
|
||
else: | ||
|
||
|
@@ -297,6 +310,14 @@ def exec_ray_dag_cpu(sender_hint, receiver_hint): | |
return exec_ray_dag("exec_ray_dag_cpu", sender, receiver) | ||
|
||
|
||
def exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint): | ||
sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote() | ||
receiver = TorchTensorWorker.options(scheduling_strategy=receiver_hint).remote() | ||
return exec_ray_dag( | ||
"exec_ray_dag_cpu_ten_times", sender, receiver, num_times_execute=10 | ||
) | ||
|
||
|
||
def exec_ray_core_cpu(sender_hint, receiver_hint): | ||
time.sleep(1) | ||
sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote() | ||
|
@@ -408,6 +429,7 @@ def main(distributed): | |
|
||
results += exec_ray_core_cpu(sender_hint, receiver_hint) | ||
results += exec_ray_dag_cpu(sender_hint, receiver_hint) | ||
results += exec_ray_dag_cpu_ten_times(sender_hint, receiver_hint) | ||
results += exec_ray_core_gpu(sender_hint, receiver_hint) | ||
results += exec_ray_dag_gpu_cpu_gpu(sender_hint, receiver_hint) | ||
results += exec_ray_dag_gpu_nccl( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a utils method, let's try to make it general. "readers" are not relevant here, the function just splits actors based on locality.