-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data][Datasink] support passing write results to on_write_completes #49251
Conversation
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@@ -26,6 +122,38 @@ def num_rows_per_write(self): | |||
) | |||
|
|||
|
|||
def test_custom_write_results(ray_start_regular_shared): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only new test. The above are moved from test_formats.py
@@ -67,44 +46,32 @@ def write( | |||
self, | |||
blocks: Iterable[Block], | |||
ctx: TaskContext, | |||
) -> None: | |||
) -> WriteResultType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bveeramani Unrelated to this PR. But one pitfall about the current Datasink interface is that, the Datasink object will be used both on the driver (the on_xxx
callbacks) and on the workers (this write
function).
Users may mistakenly think that if they update an attribute in the write method, the update will be available on on_write_complete
.
We should consider addressing this issue before making the Datasink API public. One solution is to introduce a separate DatasinkWriter
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree, that's janky.
One solution is to introduce a separate DatasinkWriter class.
Sounds reasonable.
We should consider addressing this issue before making the Datasink API public.
Makes sense. There's no urgency to make Datasink
public.
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
write_row_to_path, | ||
lambda row=row, write_path=write_path: write_row_to_path( | ||
row, write_path | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated to this PR, but fixing the following lint error
python/ray/data/datasource/file_datasink.py:190:46: B023 Function definition does not bind loop variable 'write_path'.
Signed-off-by: Hao Chen <[email protected]>
# Total size in bytes of written data. | ||
size_bytes: int | ||
# Results of all `Datasink.write`. | ||
write_task_results: List[WriteResultType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: To avoid confusion between the WriteResult
dataclass and the object returned from write tasks, it might clearer if we rename write_task_results
to write_return_types
(and WriteResultType
to WriteReturnType
)
write_task_results: List[WriteResultType] | |
write_task_returns: List[WriteReturnType] |
@@ -67,44 +46,32 @@ def write( | |||
self, | |||
blocks: Iterable[Block], | |||
ctx: TaskContext, | |||
) -> None: | |||
) -> WriteResultType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree, that's janky.
One solution is to introduce a separate DatasinkWriter class.
Sounds reasonable.
We should consider addressing this issue before making the Datasink API public.
Makes sense. There's no urgency to make Datasink
public.
{ | ||
"num_rows": [total_num_rows], | ||
"size_bytes": [total_size_bytes], | ||
"write_task_result": [ctx.kwargs.get("_data_sink_custom_result", None)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than passing information through the TaskContext
, can we directly yield the write returns and stats in generate_write_fn.fn
?
# Pseudocode for `generate_write_fn.fn`
for block in blocks:
write_return = datasink.write(block)
yield Block({"write_return": write_return, "num_rows": block.num_rows()})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, on second thought, do we even still need generate_collect_write_stats_fn
?
I think we can return the write return and statistics from generate_write_fn.fn
, and then aggregate the statistics and create WriteResult
on the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having 2 separate TransformFns allows optimization rules to insert certain operations in between them. And to pass data between them, TaskContext is probably the best place.
@@ -16,19 +18,38 @@ | |||
from ray.data.datasource.datasource import Datasource | |||
|
|||
|
|||
def gen_data_sink_write_result( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Datasink
is one word, so this seems more accurate?
def gen_data_sink_write_result( | |
def gen_datasink_write_result( |
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
https://github.com/ray-project/ray/pull/49214/files. I've also made a fix for this issue. Is your pull request (PR) about to be merged? If it's going to be merged, I can make fixes for the changes written to Lance based on this PR. This scenario is quite urgent for us. |
@Jay-ju this PR should be merged soon. it was blocked by a doc issue. |
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
…ay-project#49251) A previous refactoring [PR](ray-project#47942) broke the ability to pass write results to `Datasink.on_write_completes`. This PR adds back the ability and refines the Datasink interface by decoupling the stats handling code with the `on_write_complete` callback. Closes ray-project#48933 --------- Signed-off-by: Hao Chen <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
…ay-project#49251) A previous refactoring [PR](ray-project#47942) broke the ability to pass write results to `Datasink.on_write_completes`. This PR adds back the ability and refines the Datasink interface by decoupling the stats handling code with the `on_write_complete` callback. Closes ray-project#48933 --------- Signed-off-by: Hao Chen <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
…49251) A previous refactoring [PR](#47942) broke the ability to pass write results to `Datasink.on_write_completes`. This PR adds back the ability and refines the Datasink interface by decoupling the stats handling code with the `on_write_complete` callback. Closes #48933 --------- Signed-off-by: Hao Chen <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
Why are these changes needed?
A previous refactoring PR broke the ability to pass write results to
Datasink.on_write_completes
.This PR adds back the ability and refines the Datasink interface by decoupling the stats handling code with the
on_write_complete
callback.Closes #48933
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.