Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support != Filter in GCS for Task State API #48983

Merged
merged 7 commits into from
Dec 13, 2024
Merged

[Core] Support != Filter in GCS for Task State API #48983

merged 7 commits into from
Dec 13, 2024

Conversation

MengjinYan
Copy link
Collaborator

Why are these changes needed?

Today, when the State API handles the list tasks requests with filters, it will:

  1. Fetch the task events from GCS with certain "=" filters
  2. With the events fetched from GCS, further filtering them with rest of the specified filters

At the same time, when fetching events from GCS, there is a 10k limit to avoid overwhelming the service.

Therefore, when there are more than 10k task events after filtering on GCS, the events will be truncated and incomplete information will be returned.

To fundamentally fix the issue, the idea is to move the whole task events pipeline outside of GCS.

This PR is a short term fix to add support for more filters in GCS so that the return result set can be smaller than 10k. To be specific, this PR:

  • Added "!=" filter support for the existing filter fields (task id, actor id, job id, task name, state) in GCS
  • Modified & add tests for the new logic

Related issue number

Closes #48251

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@MengjinYan MengjinYan requested review from a team, pcmoritz and raulchen as code owners November 28, 2024 03:43
@@ -361,24 +363,31 @@ void GcsTaskManager::HandleGetTaskEvents(rpc::GetTaskEventsRequest request,
RAY_LOG(DEBUG) << "Getting task status:" << request.ShortDebugString();

// Select candidate events by indexing if possible.
std::vector<rpc::TaskEvents> task_events;
std::optional<std::vector<rpc::TaskEvents>> task_events;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the std::optional here: an empty vector is enough to indicate "no events, early return".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::optional here is needed to differentiate the case where there is no valid event for the filter and the case where the task_events are not assigned.

In the case when the task filters are specified but all filters are "not equal" filters, the task_events will not be assigned within the filters.task_filters_size() > 0 statement and should be assigned later with all the task events on GCS. Checking whether the task_events has value will make sure the assignment handles the above case and avoid reassignment in the empty vector situation due to the existing task/job filter.

task_events = task_event_storage_->GetTaskEvents(job_id);
// Populate per-job data loss.
if (task_event_storage_->HasJob(job_id)) {
const auto &job_summary = task_event_storage_->GetJobTaskSummary(job_id);
reply->set_num_profile_task_events_dropped(job_summary.NumProfileEventsDropped());
reply->set_num_status_task_events_dropped(job_summary.NumTaskAttemptsDropped());
}
} else {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we remove the else- branch? I think

let events = if (has task id filter) {read_from task} else if (has job id filter) {read from job} else {read all};

then

events.filter_by(filters);

is good

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is to handle the case where only "not equal" filters are there in the task filters.

In this case, we cannot get the events from specifying the list of task IDs but need to get the events through getting all the task events.

I'll add comments to clarify the logic.

src/ray/protobuf/gcs_service.proto Outdated Show resolved Hide resolved
src/ray/gcs/gcs_server/gcs_task_manager.cc Outdated Show resolved Hide resolved
src/ray/gcs/gcs_server/gcs_task_manager.cc Outdated Show resolved Hide resolved
src/ray/gcs/gcs_server/gcs_task_manager.cc Show resolved Hide resolved
Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

nit: I also believe the filter lambda refactoring ruiyang suggests will make things cleaner and more testable.

src/ray/protobuf/gcs_service.proto Outdated Show resolved Hide resolved
python/ray/tests/test_state_api.py Show resolved Hide resolved
@jjyao
Copy link
Collaborator

jjyao commented Dec 12, 2024

@rickyyx can you take another look since a significant change is made after your last review.

@MengjinYan MengjinYan added the go add ONLY when ready to merge, run all tests label Dec 12, 2024
Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of nits only.

The main comment is on the add_task_filter routine where I think we could separate the filter building from early return filters checking, which is currently coupled in the function.

python/ray/dashboard/state_aggregator.py Outdated Show resolved Hide resolved
python/ray/util/state/state_manager.py Outdated Show resolved Hide resolved
python/ray/util/state/state_manager.py Outdated Show resolved Hide resolved
python/ray/util/state/state_manager.py Outdated Show resolved Hide resolved
python/ray/util/state/state_manager.py Outdated Show resolved Hide resolved
src/ray/gcs/gcs_server/gcs_task_manager.cc Show resolved Hide resolved
@rickyyx
Copy link
Contributor

rickyyx commented Dec 12, 2024

Also, I wonder if we have plans in followup to make it expose to users (in high level apis/docs and other places?)

Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work!

@@ -163,6 +164,30 @@ def __post_init__(self):
"Available predicates: =, !=."
)

def has_conflicting_filters(self) -> bool:
# Check the filters in the ListApiOptions conflicts. Specifically for:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: potentially a = with the same value and a != with the same value would also result in empty results on the same key i guess - but given this is an optimization and the doc is clear. I think it's fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Added a todo in the PR for the optimization

@jjyao jjyao merged commit a6b1b1a into master Dec 13, 2024
5 checks passed
@jjyao jjyao deleted the issue-48251 branch December 13, 2024 22:19
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Dashboard] Fix listing APIs to avoid truncating at 10k entities
5 participants