Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data][Datasink] support passing write results to on_write_completes #49251

Merged
merged 23 commits into from
Dec 28, 2024

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Dec 13, 2024

Why are these changes needed?

A previous refactoring PR broke the ability to pass write results to Datasink.on_write_completes.
This PR adds back the ability and refines the Datasink interface by decoupling the stats handling code with the on_write_complete callback.

Closes #48933

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@raulchen raulchen requested a review from a team as a code owner December 13, 2024 07:33
@@ -26,6 +122,38 @@ def num_rows_per_write(self):
)


def test_custom_write_results(ray_start_regular_shared):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only new test. The above are moved from test_formats.py

@@ -67,44 +46,32 @@ def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> None:
) -> WriteResultType:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani Unrelated to this PR. But one pitfall about the current Datasink interface is that, the Datasink object will be used both on the driver (the on_xxx callbacks) and on the workers (this write function).
Users may mistakenly think that if they update an attribute in the write method, the update will be available on on_write_complete.

We should consider addressing this issue before making the Datasink API public. One solution is to introduce a separate DatasinkWriter class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, that's janky.

One solution is to introduce a separate DatasinkWriter class.

Sounds reasonable.

We should consider addressing this issue before making the Datasink API public.

Makes sense. There's no urgency to make Datasink public.

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
write_row_to_path,
lambda row=row, write_path=write_path: write_row_to_path(
row, write_path
),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR, but fixing the following lint error
python/ray/data/datasource/file_datasink.py:190:46: B023 Function definition does not bind loop variable 'write_path'.

Signed-off-by: Hao Chen <[email protected]>
# Total size in bytes of written data.
size_bytes: int
# Results of all `Datasink.write`.
write_task_results: List[WriteResultType]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: To avoid confusion between the WriteResult dataclass and the object returned from write tasks, it might clearer if we rename write_task_results to write_return_types (and WriteResultType to WriteReturnType)

Suggested change
write_task_results: List[WriteResultType]
write_task_returns: List[WriteReturnType]

@@ -67,44 +46,32 @@ def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> None:
) -> WriteResultType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, that's janky.

One solution is to introduce a separate DatasinkWriter class.

Sounds reasonable.

We should consider addressing this issue before making the Datasink API public.

Makes sense. There's no urgency to make Datasink public.

{
"num_rows": [total_num_rows],
"size_bytes": [total_size_bytes],
"write_task_result": [ctx.kwargs.get("_data_sink_custom_result", None)],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than passing information through the TaskContext, can we directly yield the write returns and stats in generate_write_fn.fn?

# Pseudocode for `generate_write_fn.fn`
for block in blocks:
    write_return = datasink.write(block)
    yield Block({"write_return": write_return, "num_rows": block.num_rows()})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, on second thought, do we even still need generate_collect_write_stats_fn?

I think we can return the write return and statistics from generate_write_fn.fn, and then aggregate the statistics and create WriteResult on the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having 2 separate TransformFns allows optimization rules to insert certain operations in between them. And to pass data between them, TaskContext is probably the best place.

@@ -16,19 +18,38 @@
from ray.data.datasource.datasource import Datasource


def gen_data_sink_write_result(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Datasink is one word, so this seems more accurate?

Suggested change
def gen_data_sink_write_result(
def gen_datasink_write_result(

Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@Jay-ju
Copy link
Contributor

Jay-ju commented Dec 23, 2024

https://github.com/ray-project/ray/pull/49214/files. I've also made a fix for this issue. Is your pull request (PR) about to be merged? If it's going to be merged, I can make fixes for the changes written to Lance based on this PR. This scenario is quite urgent for us.

@bveeramani bveeramani requested a review from a team as a code owner December 26, 2024 22:59
@raulchen
Copy link
Contributor Author

@Jay-ju this PR should be merged soon. it was blocked by a doc issue.

Signed-off-by: Hao Chen <[email protected]>
@raulchen raulchen added the go add ONLY when ready to merge, run all tests label Dec 26, 2024
Signed-off-by: Hao Chen <[email protected]>
@raulchen raulchen enabled auto-merge (squash) December 27, 2024 03:06
Signed-off-by: Hao Chen <[email protected]>
@github-actions github-actions bot disabled auto-merge December 27, 2024 18:18
@raulchen raulchen enabled auto-merge (squash) December 28, 2024 00:24
@raulchen raulchen merged commit d9f69fd into ray-project:master Dec 28, 2024
5 of 6 checks passed
@raulchen raulchen deleted the data-sink-write-res branch December 29, 2024 20:53
justinvyu pushed a commit to justinvyu/ray that referenced this pull request Dec 30, 2024
…ay-project#49251)

A previous refactoring
[PR](ray-project#47942) broke the ability to
pass write results to `Datasink.on_write_completes`.
This PR adds back the ability and refines the Datasink interface by
decoupling the stats handling code with the `on_write_complete`
callback.

Closes ray-project#48933

---------

Signed-off-by: Hao Chen <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
win5923 pushed a commit to win5923/ray that referenced this pull request Dec 30, 2024
…ay-project#49251)

A previous refactoring
[PR](ray-project#47942) broke the ability to
pass write results to `Datasink.on_write_completes`.
This PR adds back the ability and refines the Datasink interface by
decoupling the stats handling code with the `on_write_complete`
callback.

Closes ray-project#48933

---------

Signed-off-by: Hao Chen <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
srinathk10 pushed a commit that referenced this pull request Jan 3, 2025
…49251)

A previous refactoring
[PR](#47942) broke the ability to
pass write results to `Datasink.on_write_completes`.
This PR adds back the ability and refines the Datasink interface by
decoupling the stats handling code with the `on_write_complete`
callback.

Closes #48933

---------

Signed-off-by: Hao Chen <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data][DataSink] allow passing write results to on_write_complete
3 participants