Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure the number of Parquet row groups filtered by predicate pushdown #17594

Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5466c7a
Add a method to measure predicate pushdown row group filtering
mhaseeb123 Dec 13, 2024
57e1e1c
Merge branch 'branch-25.02' into fea/measure-predicate-pushdown-row-g…
mhaseeb123 Jan 14, 2025
c061a75
Merge branch 'branch-25.02' into fea/measure-predicate-pushdown-row-g…
mhaseeb123 Jan 16, 2025
17bfedd
Implementation
mhaseeb123 Jan 16, 2025
2d00bca
Minor, remove periods in comments
mhaseeb123 Jan 16, 2025
4f53906
Clean up docstrings
mhaseeb123 Jan 16, 2025
c50bb43
Move max row group checks
mhaseeb123 Jan 16, 2025
c0c0483
Remove erroneous comments
mhaseeb123 Jan 16, 2025
fcf7f28
Minor refactoring
mhaseeb123 Jan 16, 2025
7811641
Minor refactoring
mhaseeb123 Jan 16, 2025
5342d3c
Minor refactoring
mhaseeb123 Jan 16, 2025
d6d6e4e
Minor refactoring
mhaseeb123 Jan 16, 2025
00ff5fe
Update copyrights year
mhaseeb123 Jan 16, 2025
af3e2d3
Update copyrights year
mhaseeb123 Jan 16, 2025
629da8f
Merge branch 'branch-25.02' into fea/measure-predicate-pushdown-row-g…
mhaseeb123 Jan 17, 2025
b95b345
Merge branch 'branch-25.02' into fea/measure-predicate-pushdown-row-g…
mhaseeb123 Jan 28, 2025
cd64b02
Address feedback from reviews
mhaseeb123 Jan 28, 2025
de6f8b0
Merge branch 'branch-25.02' into fea/measure-predicate-pushdown-row-g…
galipremsagar Jan 28, 2025
d45aafd
Fix scalar value for tests
mhaseeb123 Jan 28, 2025
da30d0b
Minor updates. Make struct name more readable.
mhaseeb123 Jan 28, 2025
ae3ca5b
Remove erroneous test
mhaseeb123 Jan 28, 2025
a1bce6b
Apply suggestions from review
mhaseeb123 Jan 28, 2025
ae1bd1d
Remove stale code
mhaseeb123 Jan 28, 2025
c336184
Update docstring for variables
mhaseeb123 Jan 29, 2025
3eb411e
Update comment
mhaseeb123 Jan 30, 2025
0081b4e
Apply suggestions from code review.
mhaseeb123 Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -277,13 +277,20 @@ struct column_name_info {
struct table_metadata {
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source.
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source
//!< Currently only computed for Parquet readers if no
//!< AST filters being used. Empty vector otherwise.
//!< AST filters being used. Empty vector otherwise
std::map<std::string, std::string> user_data; //!< Format-dependent metadata of the first input
//!< file as key-values pairs (deprecated)
std::vector<std::unordered_map<std::string, std::string>>
per_file_user_data; //!< Per file format-dependent metadata as key-values pairs

// The following variables are currently only computed for Parquet reader
size_type num_input_row_groups{0}; //!< Total number of input row groups across all data sources
std::optional<size_type>
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: With recent updates to this PR:

  • num_row_groups_after_stats_filter will be std::nullopt if not filter.has_value() and equal to num_input_row_groups or actual value otherwise
  • num_row_groups_after_bloom_filter will be std::nullopt if not filter.has_value() or not bloom_filter_exist and equal to num_row_groups_after_stats_filter or actual value otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Updated

num_row_groups_after_stats_filter; //!< Number of remaining row groups after stats filter
std::optional<size_type>
num_row_groups_after_bloom_filter; //!< Number of remaining row groups after bloom filter
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
18 changes: 5 additions & 13 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types(
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> input_row_group_indices,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
Expand All @@ -610,17 +611,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// Number of input table columns
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size());

// Total number of row groups after StatsAST filtration
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Collect equality literals for each input table column
auto const equality_literals =
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals();
Expand Down Expand Up @@ -676,8 +666,10 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
h_bloom_filter_spans, stream, cudf::get_current_device_resource_ref());

// Create a bloom filter query table caster
bloom_filter_caster const bloom_filter_col{
bloom_filter_spans, parquet_types, total_row_groups, equality_col_schemas.size()};
bloom_filter_caster const bloom_filter_col{bloom_filter_spans,
parquet_types,
static_cast<size_t>(total_row_groups),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast simply reformatted this line

equality_col_schemas.size()};

// Converts bloom filter membership for equality predicate columns to a table
// containing a column for each `col[i] == literal` predicate to be evaluated.
Expand Down
67 changes: 36 additions & 31 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,40 +388,17 @@ class stats_expression_converter : public ast::detail::expression_transformer {
};
} // namespace

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
std::pair<std::optional<std::vector<std::vector<size_type>>>, surviving_row_groups>
aggregate_reader_metadata::filter_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
host_span<std::vector<size_type> const> input_row_group_indices,
size_type total_row_groups,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
auto mr = cudf::get_current_device_resource_ref();
// Create row group indices.
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this piece outside in select_row_groups() to take in input_row_group_indices as input param

std::vector<std::vector<size_type>> all_row_group_indices;
host_span<std::vector<size_type> const> input_row_group_indices;
if (row_group_indices.empty()) {
std::transform(per_file_metadata.cbegin(),
per_file_metadata.cend(),
std::back_inserter(all_row_group_indices),
[](auto const& file_meta) {
std::vector<size_type> rg_idx(file_meta.row_groups.size());
std::iota(rg_idx.begin(), rg_idx.end(), 0);
return rg_idx;
});
input_row_group_indices = host_span<std::vector<size_type> const>(all_row_group_indices);
} else {
input_row_group_indices = row_group_indices;
}
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Converts Column chunk statistics to a table
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
Expand Down Expand Up @@ -461,19 +438,47 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
auto const filtered_row_group_indices = collect_filtered_row_group_indices(
stats_table, stats_expr.get_stats_expr(), input_row_group_indices, stream);

// Number of surviving row groups after applying stats filter
auto const num_stats_filtered_row_groups =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't really a straightforward way in here to check if stats weren't available so this will be set to either total_row_groups or the filtered number of row groups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if there is a filter, but no stats, we still report a num_stats_filtered_row_groups number, even though we didn't really do any stats-based filtering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately, not that straightforward to check the availability of stats right now!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #17864 to handle this in future PR

filtered_row_group_indices.has_value()
? std::accumulate(filtered_row_group_indices.value().cbegin(),
filtered_row_group_indices.value().cend(),
size_type{0},
[](auto& sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
})
: total_row_groups;

// Span of row groups to apply bloom filtering on.
auto const bloom_filter_input_row_groups =
filtered_row_group_indices.has_value()
? host_span<std::vector<size_type> const>(filtered_row_group_indices.value())
: input_row_group_indices;

// Apply bloom filtering on the bloom filter input row groups
auto const bloom_filtered_row_groups = apply_bloom_filters(
sources, bloom_filter_input_row_groups, output_dtypes, output_column_schemas, filter, stream);
auto const bloom_filtered_row_groups = apply_bloom_filters(sources,
bloom_filter_input_row_groups,
num_stats_filtered_row_groups,
output_dtypes,
output_column_schemas,
filter,
stream);

// Number of surviving row groups after applying bloom filter
auto const num_bloom_filtered_row_groups =
bloom_filtered_row_groups.has_value()
? std::accumulate(bloom_filtered_row_groups.value().cbegin(),
bloom_filtered_row_groups.value().cend(),
size_type{0},
[](auto& sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
})
: num_stats_filtered_row_groups;

// Return bloom filtered row group indices iff collected
return bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups
: filtered_row_group_indices;
return {
bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups : filtered_row_group_indices,
{num_stats_filtered_row_groups, num_bloom_filtered_row_groups}};
}

// convert column named expression to column index reference expression
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,17 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
auto out_columns = std::vector<std::unique_ptr<column>>{};
out_columns.reserve(_output_buffers.size());

// Copy number of total input row groups and number of surviving row groups from predicate
// pushdown.
out_metadata.num_input_row_groups = _file_itm_data.num_input_row_groups;
// Copy the number surviving row groups from each predicate pushdown only if the filter has value.
if (_expr_conv.get_converted_expr().has_value()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy from _file_itm_data if filter.has_value()

out_metadata.num_row_groups_after_stats_filter =
_file_itm_data.remaining_row_groups.after_stats_filter;
out_metadata.num_row_groups_after_bloom_filter =
_file_itm_data.remaining_row_groups.after_bloom_filter;
}

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) {
// Check if number of rows per source should be included in output metadata.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ struct file_intermediate_data {
// partial sum of the number of rows per data source
std::vector<std::size_t> exclusive_sum_num_rows_per_source{};

size_type num_input_row_groups{0}; // total number of input row groups across all data sources

// struct containing the number of remaining row groups after each predicate pushdown filter
surviving_row_groups remaining_row_groups;

size_t _current_input_pass{0}; // current input pass index
size_t _output_chunk_count{0}; // how many output chunks we have produced

Expand Down
91 changes: 83 additions & 8 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,16 @@ int64_t aggregate_reader_metadata::calc_num_rows() const

size_type aggregate_reader_metadata::calc_num_row_groups() const
{
return std::accumulate(
per_file_metadata.cbegin(), per_file_metadata.cend(), 0, [](auto& sum, auto& pfm) {
auto const total_row_groups = std::accumulate(
per_file_metadata.cbegin(), per_file_metadata.cend(), size_t{0}, [](size_t& sum, auto& pfm) {
return sum + pfm.row_groups.size();
});

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

return static_cast<size_type>(total_row_groups);
}

// Copies info from the column and offset indexes into the passed in row_group_info.
Expand Down Expand Up @@ -1029,7 +1035,12 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con
return names;
}

std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
std::tuple<int64_t,
size_type,
std::vector<row_group_info>,
std::vector<size_t>,
size_type,
surviving_row_groups>
aggregate_reader_metadata::select_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
Expand All @@ -1040,17 +1051,63 @@ aggregate_reader_metadata::select_row_groups(
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const
{
// Compute total number of input row groups
size_type total_row_groups = [&]() {
if (not row_group_indices.empty()) {
size_t const total_row_groups =
std::accumulate(row_group_indices.begin(),
row_group_indices.end(),
size_t{0},
[](size_t& sum, auto const& pfm) { return sum + pfm.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");
return static_cast<size_type>(total_row_groups);
} else {
return num_row_groups;
}
}();

// Pair to store the number of row groups after stats and bloom filtering respectively. Initialize
// to total_row_groups.
surviving_row_groups num_row_groups_after_filters{total_row_groups, total_row_groups};

std::optional<std::vector<std::vector<size_type>>> filtered_row_group_indices;
// if filter is not empty, then gather row groups to read after predicate pushdown
if (filter.has_value()) {
filtered_row_group_indices = filter_row_groups(
sources, row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
// Span of input row group indices for predicate pushdown
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this piece of code out of filter_row_groups() to make its signature consistent with apply_bloom_filters()

host_span<std::vector<size_type> const> input_row_group_indices;
std::vector<std::vector<size_type>> all_row_group_indices;
if (row_group_indices.empty()) {
std::transform(per_file_metadata.cbegin(),
per_file_metadata.cend(),
std::back_inserter(all_row_group_indices),
[](auto const& file_meta) {
std::vector<size_type> rg_idx(file_meta.row_groups.size());
std::iota(rg_idx.begin(), rg_idx.end(), 0);
return rg_idx;
});
input_row_group_indices = host_span<std::vector<size_type> const>(all_row_group_indices);
} else {
input_row_group_indices = row_group_indices;
}
// Predicate pushdown: Filter row groups using stats and bloom filters
std::tie(filtered_row_group_indices, num_row_groups_after_filters) =
filter_row_groups(sources,
input_row_group_indices,
total_row_groups,
output_dtypes,
output_column_schemas,
filter.value(),
stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
}
}
std::vector<row_group_info> selection;

// Compute the number of rows to read and skip
auto [rows_to_skip, rows_to_read] = [&]() {
if (not row_group_indices.empty()) { return std::pair<int64_t, size_type>{}; }
auto const from_opts = cudf::io::detail::skip_rows_num_rows_from_options(
Expand All @@ -1061,7 +1118,9 @@ aggregate_reader_metadata::select_row_groups(
static_cast<size_type>(from_opts.second)};
}();

// Get number of rows in each data source
// Vector to hold the `row_group_info` of selected row groups
std::vector<row_group_info> selection;
// Number of rows in each data source
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0);

if (!row_group_indices.empty()) {
Expand All @@ -1083,6 +1142,10 @@ aggregate_reader_metadata::select_row_groups(
}
}
} else {
// Reset and recompute input row group count to adjust for num_rows and skip_rows. Here, the
// output from predicate pushdown was empty. i.e., no row groups filtered.
total_row_groups = 0;

size_type count = 0;
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) {
auto const& fmd = per_file_metadata[src_idx];
Expand All @@ -1093,6 +1156,9 @@ aggregate_reader_metadata::select_row_groups(
auto const chunk_start_row = count;
count += rg.num_rows;
if (count > rows_to_skip || count == 0) {
// Keep this row group, increase count
total_row_groups++;

// start row of this row group adjusted with rows_to_skip
num_rows_per_source[src_idx] += count;
num_rows_per_source[src_idx] -=
Expand All @@ -1113,9 +1179,18 @@ aggregate_reader_metadata::select_row_groups(
}
}
}

// Since no row groups were filtered, set the number of row groups after filters to the number
// of adjusted input row groups
num_row_groups_after_filters = {total_row_groups, total_row_groups};
}

return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)};
return {rows_to_skip,
rows_to_read,
std::move(selection),
std::move(num_rows_per_source),
total_row_groups,
std::move(num_row_groups_after_filters)};
}

std::tuple<std::vector<input_column_info>,
Expand Down
Loading
Loading