-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Measure the number of Parquet row groups filtered by predicate pushdown #17594
Changes from 19 commits
5466c7a
57e1e1c
c061a75
17bfedd
2d00bca
4f53906
c50bb43
c0c0483
fcf7f28
7811641
5342d3c
d6d6e4e
00ff5fe
af3e2d3
629da8f
b95b345
cd64b02
de6f8b0
d45aafd
da30d0b
ae3ca5b
a1bce6b
ae1bd1d
c336184
3eb411e
0081b4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -602,6 +602,7 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types( | |
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters( | ||
host_span<std::unique_ptr<datasource> const> sources, | ||
host_span<std::vector<size_type> const> input_row_group_indices, | ||
size_type total_row_groups, | ||
host_span<data_type const> output_dtypes, | ||
host_span<int const> output_column_schemas, | ||
std::reference_wrapper<ast::expression const> filter, | ||
|
@@ -610,17 +611,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap | |
// Number of input table columns | ||
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size()); | ||
|
||
// Total number of row groups after StatsAST filtration | ||
auto const total_row_groups = std::accumulate( | ||
input_row_group_indices.begin(), | ||
input_row_group_indices.end(), | ||
size_t{0}, | ||
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); }); | ||
|
||
// Check if we have less than 2B total row groups. | ||
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(), | ||
"Total number of row groups exceed the size_type's limit"); | ||
|
||
// Collect equality literals for each input table column | ||
auto const equality_literals = | ||
equality_literals_collector{filter.get(), num_input_columns}.get_equality_literals(); | ||
|
@@ -676,8 +666,10 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap | |
h_bloom_filter_spans, stream, cudf::get_current_device_resource_ref()); | ||
|
||
// Create a bloom filter query table caster | ||
bloom_filter_caster const bloom_filter_col{ | ||
bloom_filter_spans, parquet_types, total_row_groups, equality_col_schemas.size()}; | ||
bloom_filter_caster const bloom_filter_col{bloom_filter_spans, | ||
parquet_types, | ||
static_cast<size_t>(total_row_groups), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. static_cast simply reformatted this line |
||
equality_col_schemas.size()}; | ||
|
||
// Converts bloom filter membership for equality predicate columns to a table | ||
// containing a column for each `col[i] == literal` predicate to be evaluated. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,40 +388,17 @@ class stats_expression_converter : public ast::detail::expression_transformer { | |
}; | ||
} // namespace | ||
|
||
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups( | ||
std::pair<std::optional<std::vector<std::vector<size_type>>>, surviving_row_groups> | ||
aggregate_reader_metadata::filter_row_groups( | ||
host_span<std::unique_ptr<datasource> const> sources, | ||
host_span<std::vector<size_type> const> row_group_indices, | ||
host_span<std::vector<size_type> const> input_row_group_indices, | ||
size_type total_row_groups, | ||
host_span<data_type const> output_dtypes, | ||
host_span<int const> output_column_schemas, | ||
std::reference_wrapper<ast::expression const> filter, | ||
rmm::cuda_stream_view stream) const | ||
{ | ||
auto mr = cudf::get_current_device_resource_ref(); | ||
// Create row group indices. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this piece outside in |
||
std::vector<std::vector<size_type>> all_row_group_indices; | ||
host_span<std::vector<size_type> const> input_row_group_indices; | ||
if (row_group_indices.empty()) { | ||
std::transform(per_file_metadata.cbegin(), | ||
per_file_metadata.cend(), | ||
std::back_inserter(all_row_group_indices), | ||
[](auto const& file_meta) { | ||
std::vector<size_type> rg_idx(file_meta.row_groups.size()); | ||
std::iota(rg_idx.begin(), rg_idx.end(), 0); | ||
return rg_idx; | ||
}); | ||
input_row_group_indices = host_span<std::vector<size_type> const>(all_row_group_indices); | ||
} else { | ||
input_row_group_indices = row_group_indices; | ||
} | ||
auto const total_row_groups = std::accumulate( | ||
input_row_group_indices.begin(), | ||
input_row_group_indices.end(), | ||
size_t{0}, | ||
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); }); | ||
|
||
// Check if we have less than 2B total row groups. | ||
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(), | ||
"Total number of row groups exceed the size_type's limit"); | ||
|
||
// Converts Column chunk statistics to a table | ||
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1] | ||
|
@@ -461,19 +438,47 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi | |
auto const filtered_row_group_indices = collect_filtered_row_group_indices( | ||
stats_table, stats_expr.get_stats_expr(), input_row_group_indices, stream); | ||
|
||
// Number of surviving row groups after applying stats filter | ||
auto const num_stats_filtered_row_groups = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There isn't really a straightforward way in here to check if stats weren't available so this will be set to either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so if there is a filter, but no stats, we still report a num_stats_filtered_row_groups number, even though we didn't really do any stats-based filtering? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah unfortunately, not that straightforward to check the availability of stats right now! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #17864 to handle this in future PR |
||
filtered_row_group_indices.has_value() | ||
? std::accumulate(filtered_row_group_indices.value().cbegin(), | ||
filtered_row_group_indices.value().cend(), | ||
size_type{0}, | ||
[](auto& sum, auto const& per_file_row_groups) { | ||
return sum + per_file_row_groups.size(); | ||
}) | ||
: total_row_groups; | ||
|
||
// Span of row groups to apply bloom filtering on. | ||
auto const bloom_filter_input_row_groups = | ||
filtered_row_group_indices.has_value() | ||
? host_span<std::vector<size_type> const>(filtered_row_group_indices.value()) | ||
: input_row_group_indices; | ||
|
||
// Apply bloom filtering on the bloom filter input row groups | ||
auto const bloom_filtered_row_groups = apply_bloom_filters( | ||
sources, bloom_filter_input_row_groups, output_dtypes, output_column_schemas, filter, stream); | ||
auto const bloom_filtered_row_groups = apply_bloom_filters(sources, | ||
bloom_filter_input_row_groups, | ||
num_stats_filtered_row_groups, | ||
output_dtypes, | ||
output_column_schemas, | ||
filter, | ||
stream); | ||
|
||
// Number of surviving row groups after applying bloom filter | ||
auto const num_bloom_filtered_row_groups = | ||
bloom_filtered_row_groups.has_value() | ||
? std::accumulate(bloom_filtered_row_groups.value().cbegin(), | ||
bloom_filtered_row_groups.value().cend(), | ||
size_type{0}, | ||
[](auto& sum, auto const& per_file_row_groups) { | ||
return sum + per_file_row_groups.size(); | ||
}) | ||
: num_stats_filtered_row_groups; | ||
|
||
// Return bloom filtered row group indices iff collected | ||
return bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups | ||
: filtered_row_group_indices; | ||
return { | ||
bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups : filtered_row_group_indices, | ||
{num_stats_filtered_row_groups, num_bloom_filtered_row_groups}}; | ||
} | ||
|
||
// convert column named expression to column index reference expression | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -610,6 +610,17 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) | |
auto out_columns = std::vector<std::unique_ptr<column>>{}; | ||
out_columns.reserve(_output_buffers.size()); | ||
|
||
// Copy number of total input row groups and number of surviving row groups from predicate | ||
// pushdown. | ||
out_metadata.num_input_row_groups = _file_itm_data.num_input_row_groups; | ||
// Copy the number surviving row groups from each predicate pushdown only if the filter has value. | ||
if (_expr_conv.get_converted_expr().has_value()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copy from |
||
out_metadata.num_row_groups_after_stats_filter = | ||
_file_itm_data.remaining_row_groups.after_stats_filter; | ||
out_metadata.num_row_groups_after_bloom_filter = | ||
_file_itm_data.remaining_row_groups.after_bloom_filter; | ||
} | ||
|
||
// no work to do (this can happen on the first pass if we have no rows to read) | ||
if (!has_more_work()) { | ||
// Check if number of rows per source should be included in output metadata. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -408,10 +408,16 @@ int64_t aggregate_reader_metadata::calc_num_rows() const | |
|
||
size_type aggregate_reader_metadata::calc_num_row_groups() const | ||
{ | ||
return std::accumulate( | ||
per_file_metadata.cbegin(), per_file_metadata.cend(), 0, [](auto& sum, auto& pfm) { | ||
auto const total_row_groups = std::accumulate( | ||
per_file_metadata.cbegin(), per_file_metadata.cend(), size_t{0}, [](size_t& sum, auto& pfm) { | ||
return sum + pfm.row_groups.size(); | ||
}); | ||
|
||
// Check if we have less than 2B total row groups. | ||
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(), | ||
"Total number of row groups exceed the size_type's limit"); | ||
|
||
return static_cast<size_type>(total_row_groups); | ||
} | ||
|
||
// Copies info from the column and offset indexes into the passed in row_group_info. | ||
|
@@ -1029,7 +1035,12 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con | |
return names; | ||
} | ||
|
||
std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>> | ||
std::tuple<int64_t, | ||
size_type, | ||
std::vector<row_group_info>, | ||
std::vector<size_t>, | ||
size_type, | ||
surviving_row_groups> | ||
aggregate_reader_metadata::select_row_groups( | ||
host_span<std::unique_ptr<datasource> const> sources, | ||
host_span<std::vector<size_type> const> row_group_indices, | ||
|
@@ -1040,17 +1051,63 @@ aggregate_reader_metadata::select_row_groups( | |
std::optional<std::reference_wrapper<ast::expression const>> filter, | ||
rmm::cuda_stream_view stream) const | ||
{ | ||
// Compute total number of input row groups | ||
size_type total_row_groups = [&]() { | ||
if (not row_group_indices.empty()) { | ||
size_t const total_row_groups = | ||
std::accumulate(row_group_indices.begin(), | ||
row_group_indices.end(), | ||
size_t{0}, | ||
[](size_t& sum, auto const& pfm) { return sum + pfm.size(); }); | ||
|
||
// Check if we have less than 2B total row groups. | ||
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(), | ||
"Total number of row groups exceed the size_type's limit"); | ||
return static_cast<size_type>(total_row_groups); | ||
} else { | ||
return num_row_groups; | ||
} | ||
}(); | ||
|
||
// Pair to store the number of row groups after stats and bloom filtering respectively. Initialize | ||
// to total_row_groups. | ||
surviving_row_groups num_row_groups_after_filters{total_row_groups, total_row_groups}; | ||
|
||
std::optional<std::vector<std::vector<size_type>>> filtered_row_group_indices; | ||
// if filter is not empty, then gather row groups to read after predicate pushdown | ||
if (filter.has_value()) { | ||
filtered_row_group_indices = filter_row_groups( | ||
sources, row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream); | ||
// Span of input row group indices for predicate pushdown | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this piece of code out of |
||
host_span<std::vector<size_type> const> input_row_group_indices; | ||
std::vector<std::vector<size_type>> all_row_group_indices; | ||
if (row_group_indices.empty()) { | ||
std::transform(per_file_metadata.cbegin(), | ||
per_file_metadata.cend(), | ||
std::back_inserter(all_row_group_indices), | ||
[](auto const& file_meta) { | ||
std::vector<size_type> rg_idx(file_meta.row_groups.size()); | ||
std::iota(rg_idx.begin(), rg_idx.end(), 0); | ||
return rg_idx; | ||
}); | ||
input_row_group_indices = host_span<std::vector<size_type> const>(all_row_group_indices); | ||
} else { | ||
input_row_group_indices = row_group_indices; | ||
} | ||
// Predicate pushdown: Filter row groups using stats and bloom filters | ||
std::tie(filtered_row_group_indices, num_row_groups_after_filters) = | ||
filter_row_groups(sources, | ||
input_row_group_indices, | ||
total_row_groups, | ||
output_dtypes, | ||
output_column_schemas, | ||
filter.value(), | ||
stream); | ||
if (filtered_row_group_indices.has_value()) { | ||
row_group_indices = | ||
host_span<std::vector<size_type> const>(filtered_row_group_indices.value()); | ||
} | ||
} | ||
std::vector<row_group_info> selection; | ||
|
||
// Compute the number of rows to read and skip | ||
auto [rows_to_skip, rows_to_read] = [&]() { | ||
if (not row_group_indices.empty()) { return std::pair<int64_t, size_type>{}; } | ||
auto const from_opts = cudf::io::detail::skip_rows_num_rows_from_options( | ||
|
@@ -1061,7 +1118,9 @@ aggregate_reader_metadata::select_row_groups( | |
static_cast<size_type>(from_opts.second)}; | ||
}(); | ||
|
||
// Get number of rows in each data source | ||
// Vector to hold the `row_group_info` of selected row groups | ||
std::vector<row_group_info> selection; | ||
// Number of rows in each data source | ||
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0); | ||
|
||
if (!row_group_indices.empty()) { | ||
|
@@ -1083,6 +1142,10 @@ aggregate_reader_metadata::select_row_groups( | |
} | ||
} | ||
} else { | ||
// Reset and recompute input row group count to adjust for num_rows and skip_rows. Here, the | ||
// output from predicate pushdown was empty. i.e., no row groups filtered. | ||
total_row_groups = 0; | ||
|
||
size_type count = 0; | ||
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { | ||
auto const& fmd = per_file_metadata[src_idx]; | ||
|
@@ -1093,6 +1156,9 @@ aggregate_reader_metadata::select_row_groups( | |
auto const chunk_start_row = count; | ||
count += rg.num_rows; | ||
if (count > rows_to_skip || count == 0) { | ||
// Keep this row group, increase count | ||
total_row_groups++; | ||
|
||
// start row of this row group adjusted with rows_to_skip | ||
num_rows_per_source[src_idx] += count; | ||
num_rows_per_source[src_idx] -= | ||
|
@@ -1113,9 +1179,18 @@ aggregate_reader_metadata::select_row_groups( | |
} | ||
} | ||
} | ||
|
||
// Since no row groups were filtered, set the number of row groups after filters to the number | ||
// of adjusted input row groups | ||
num_row_groups_after_filters = {total_row_groups, total_row_groups}; | ||
} | ||
|
||
return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)}; | ||
return {rows_to_skip, | ||
rows_to_read, | ||
std::move(selection), | ||
std::move(num_rows_per_source), | ||
total_row_groups, | ||
std::move(num_row_groups_after_filters)}; | ||
} | ||
|
||
std::tuple<std::vector<input_column_info>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edit: With recent updates to this PR:
num_row_groups_after_stats_filter
will bestd::nullopt
ifnot filter.has_value()
and equal tonum_input_row_groups
or actual value otherwisenum_row_groups_after_bloom_filter
will bestd::nullopt
ifnot filter.has_value() or not bloom_filter_exist
and equal tonum_row_groups_after_stats_filter
or actual value otherwise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ Updated