-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Support != Filter in GCS for Task State API #48983
Conversation
Signed-off-by: Mengjin Yan <[email protected]>
@@ -361,24 +363,31 @@ void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request, | |||
RAY_LOG(DEBUG) << "Getting task status:" << request.ShortDebugString(); | |||
|
|||
// Select candidate events by indexing if possible. | |||
std::vector<rpc::TaskEvents> task_events; | |||
std::optional<std::vector<rpc::TaskEvents>> task_events; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the std::optional here: an empty vector is enough to indicate "no events, early return".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The std::optional
here is needed to differentiate the case where there is no valid event for the filter and the case where the task_events
are not assigned.
In the case when the task filters are specified but all filters are "not equal" filters, the task_events
will not be assigned within the filters.task_filters_size() > 0
statement and should be assigned later with all the task events on GCS. Checking whether the task_events
has value will make sure the assignment handles the above case and avoid reassignment in the empty vector situation due to the existing task/job filter.
task_events = task_event_storage_->GetTaskEvents(job_id); | ||
// Populate per-job data loss. | ||
if (task_event_storage_->HasJob(job_id)) { | ||
const auto &job_summary = task_event_storage_->GetJobTaskSummary(job_id); | ||
reply->set_num_profile_task_events_dropped(job_summary.NumProfileEventsDropped()); | ||
reply->set_num_status_task_events_dropped(job_summary.NumTaskAttemptsDropped()); | ||
} | ||
} else { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we remove the else- branch? I think
let events = if (has task id filter) {read_from task} else if (has job id filter) {read from job} else {read all};
then
events.filter_by(filters);
is good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it is to handle the case where only "not equal" filters are there in the task filters.
In this case, we cannot get the events from specifying the list of task IDs but need to get the events through getting all the task events.
I'll add comments to clarify the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
nit: I also believe the filter lambda refactoring ruiyang suggests will make things cleaner and more testable.
…all '=' & '!=' down to GCS Signed-off-by: Mengjin Yan <[email protected]>
@rickyyx can you take another look since a significant change is made after your last review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of nits only.
The main comment is on the add_task_filter
routine where I think we could separate the filter building from early return filters checking, which is currently coupled in the function.
Also, I wonder if we have plans in followup to make it expose to users (in high level apis/docs and other places?) |
…heck early return logic earlier to the list function Signed-off-by: Mengjin Yan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work!
@@ -163,6 +164,30 @@ def __post_init__(self): | |||
"Available predicates: =, !=." | |||
) | |||
|
|||
def has_conflicting_filters(self) -> bool: | |||
# Check the filters in the ListApiOptions conflicts. Specifically for: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: potentially a =
with the same value and a !=
with the same value would also result in empty results on the same key i guess - but given this is an optimization and the doc is clear. I think it's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Added a todo in the PR for the optimization
Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: Mengjin Yan <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Why are these changes needed?
Today, when the State API handles the list tasks requests with filters, it will:
At the same time, when fetching events from GCS, there is a 10k limit to avoid overwhelming the service.
Therefore, when there are more than 10k task events after filtering on GCS, the events will be truncated and incomplete information will be returned.
To fundamentally fix the issue, the idea is to move the whole task events pipeline outside of GCS.
This PR is a short term fix to add support for more filters in GCS so that the return result set can be smaller than 10k. To be specific, this PR:
Related issue number
Closes #48251
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.