Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
df8100d
Flatten the data page mask
mhaseeb123 Oct 15, 2025
bc6f08b
Templatize ColumnView type for compute data page mask
mhaseeb123 Oct 15, 2025
30dbbee
Minor refactor
mhaseeb123 Oct 15, 2025
73f64f2
Initial impl of GPU data page mask algorithm
mhaseeb123 Oct 15, 2025
de65183
Minor improvements
mhaseeb123 Oct 16, 2025
2519f5f
Merge branch 'branch-25.12' into fea/gpu-compute-data-page-mask
mhaseeb123 Oct 16, 2025
f6b79c7
Add cmake changes
mhaseeb123 Oct 16, 2025
aef9d64
Revert to old probe impl
mhaseeb123 Oct 16, 2025
ebf460a
style fix
mhaseeb123 Oct 16, 2025
c62f6b7
Use functors
mhaseeb123 Oct 16, 2025
f9e8f85
Optimize for the chunked reader
mhaseeb123 Oct 16, 2025
9ea83f5
Minor optimizations
mhaseeb123 Oct 16, 2025
38a640b
Small optimizations
mhaseeb123 Oct 16, 2025
0071dc7
CPU optimizations
mhaseeb123 Oct 16, 2025
06dfca4
Bug fixing in fenwick tree search
mhaseeb123 Oct 17, 2025
8ac9c3a
Bug fixing in fenwick tree
mhaseeb123 Oct 17, 2025
e5e12ec
Minor optimizations
mhaseeb123 Oct 17, 2025
3d056ed
Documentation and style
mhaseeb123 Oct 17, 2025
60c789f
Merge branch 'branch-25.12' into fea/gpu-compute-data-page-mask
mhaseeb123 Oct 17, 2025
55948f0
Small optimization
mhaseeb123 Oct 17, 2025
a2221a5
Minor optimizations
mhaseeb123 Oct 17, 2025
fc6c534
Style check
mhaseeb123 Oct 17, 2025
488b1de
Use host worker pool instead of c++ threads
mhaseeb123 Oct 17, 2025
33489df
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 17, 2025
ffacb93
style pls
mhaseeb123 Oct 17, 2025
c820f50
Merge branch 'main' into fea/gpu-compute-data-page-mask
mhaseeb123 Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ add_library(
src/io/parquet/experimental/hybrid_scan_impl.cpp
src/io/parquet/experimental/hybrid_scan_preprocess.cu
src/io/parquet/experimental/page_index_filter.cu
src/io/parquet/experimental/page_index_filter_utils.cu
src/io/parquet/page_data.cu
src/io/parquet/chunk_dict.cu
src/io/parquet/page_enc.cu
Expand Down
21 changes: 16 additions & 5 deletions cpp/examples/hybrid_scan_io/common_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,24 @@ void check_tables_equal(cudf::table_view const& lhs_table,
cudf::filtered_join join_obj(
lhs_table, cudf::null_equality::EQUAL, cudf::set_as_build_table::RIGHT, stream);
auto const indices = join_obj.anti_join(rhs_table, stream);

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Tables identical: " << std::boolalpha << valid << "\n\n";
auto const tables_equal = indices->size() == 0;
if (tables_equal) {
std::cout << "Tables identical: " << std::boolalpha << tables_equal << "\n\n";
} else {
// Helper to write parquet data for inspection
auto const write_parquet =
[](cudf::table_view table, std::string filepath, rmm::cuda_stream_view stream) {
auto sink_info = cudf::io::sink_info(filepath);
auto opts = cudf::io::parquet_writer_options::builder(sink_info, table).build();
cudf::io::write_parquet(opts, stream);
};
write_parquet(lhs_table, "lhs_table.parquet", stream);
write_parquet(rhs_table, "rhs_table.parquet", stream);
throw std::logic_error("Tables identical: false\n\n");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to see an exception if the tables aren't equal to alert the user

}
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error("Tables identical: false\n\n");
std::cout << e.what() << std::endl;
}
}

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ using parquet::detail::pass_intermediate_data;

void hybrid_scan_reader_impl::handle_chunking(
read_mode mode,
std::vector<rmm::device_buffer> column_chunk_buffers,
cudf::host_span<std::vector<bool> const> data_page_mask)
std::vector<rmm::device_buffer>&& column_chunk_buffers,
cudf::host_span<bool const> data_page_mask)
{
// if this is our first time in here, setup the first pass.
if (!_pass_itm_data) {
Expand Down Expand Up @@ -88,7 +88,8 @@ void hybrid_scan_reader_impl::handle_chunking(
setup_next_subpass(mode);
}

void hybrid_scan_reader_impl::setup_next_pass(std::vector<rmm::device_buffer> column_chunk_buffers)
void hybrid_scan_reader_impl::setup_next_pass(
std::vector<rmm::device_buffer>&& column_chunk_buffers)
{
auto const num_passes = _file_itm_data.num_passes();
CUDF_EXPECTS(num_passes == 1,
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,21 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
* Compute a vector of boolean vectors indicating which data pages need to be decoded to
* construct each input column based on the row mask, one vector per column
*
* @tparam ColumnView Type of the row mask column view - cudf::mutable_column_view for filter
* columns and cudf::column_view for payload columns
*
* @param row_mask Boolean column indicating which rows need to be read after page-pruning
* @param row_group_indices Input row groups indices
* @param input_columns Input column information
* @param row_mask_offset Offset into the row mask column for the current pass
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return A vector of boolean vectors indicating which data pages need to be decoded to produce
* the output table based on the input row mask, one per input column
* @return Boolean vector indicating which data pages need to be decoded to produce
* the output table based on the input row mask across all input columns
*/
[[nodiscard]] std::vector<std::vector<bool>> compute_data_page_mask(
cudf::column_view row_mask,
template <typename ColumnView>
[[nodiscard]] cudf::detail::host_vector<bool> compute_data_page_mask(
ColumnView const& row_mask,
cudf::host_span<std::vector<size_type> const> row_group_indices,
cudf::host_span<input_column_info const> input_columns,
cudf::size_type row_mask_offset,
Expand Down
27 changes: 10 additions & 17 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_filter_columns(
(mask_data_pages == use_data_page_mask::YES)
? _extended_metadata->compute_data_page_mask(
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream)
: std::vector<std::vector<bool>>{};
: cudf::detail::make_empty_host_vector<bool>(0, stream);

prepare_data(
read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
Expand Down Expand Up @@ -485,7 +485,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_payload_columns(
(mask_data_pages == use_data_page_mask::YES)
? _extended_metadata->compute_data_page_mask(
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream)
: std::vector<std::vector<bool>>{};
: cudf::detail::make_empty_host_vector<bool>(0, stream);

prepare_data(
read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
Expand Down Expand Up @@ -524,7 +524,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_filter_columns(
(mask_data_pages == use_data_page_mask::YES)
? _extended_metadata->compute_data_page_mask(
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream)
: std::vector<std::vector<bool>>{};
: cudf::detail::make_empty_host_vector<bool>(0, _stream);

prepare_data(
read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
Expand Down Expand Up @@ -575,7 +575,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_payload_columns(
(mask_data_pages == use_data_page_mask::YES)
? _extended_metadata->compute_data_page_mask(
row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream)
: std::vector<std::vector<bool>>{};
: cudf::detail::make_empty_host_vector<bool>(0, _stream);

prepare_data(
read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask);
Expand Down Expand Up @@ -656,7 +656,7 @@ void hybrid_scan_reader_impl::prepare_data(
read_mode mode,
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::vector<rmm::device_buffer>&& column_chunk_buffers,
cudf::host_span<std::vector<bool> const> data_page_mask)
cudf::host_span<bool const> data_page_mask)
{
// if we have not preprocessed at the whole-file level, do that now
if (not _file_preprocessed) {
Expand Down Expand Up @@ -885,8 +885,7 @@ table_with_metadata hybrid_scan_reader_impl::finalize_output(
}
}

void hybrid_scan_reader_impl::set_pass_page_mask(
cudf::host_span<std::vector<bool> const> data_page_mask)
void hybrid_scan_reader_impl::set_pass_page_mask(cudf::host_span<bool const> data_page_mask)
{
auto const& pass = _pass_itm_data;
auto const& chunks = pass->chunks;
Expand All @@ -900,13 +899,11 @@ void hybrid_scan_reader_impl::set_pass_page_mask(
return;
}

size_t num_inserted_data_pages = 0;
std::for_each(
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(_input_columns.size()),
[&](auto col_idx) {
auto const& col_page_mask = data_page_mask[col_idx];
size_t num_inserted_data_pages = 0;

for (size_t chunk_idx = col_idx; chunk_idx < chunks.size(); chunk_idx += num_columns) {
// Insert a true value for each dictionary page
if (chunks[chunk_idx].num_dict_pages > 0) { _pass_page_mask.push_back(true); }
Expand All @@ -916,21 +913,17 @@ void hybrid_scan_reader_impl::set_pass_page_mask(

// Make sure we have enough page mask for this column chunk
CUDF_EXPECTS(
col_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk,
data_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk,
"Encountered invalid data page mask size");

// Insert page mask for this column chunk
_pass_page_mask.insert(
_pass_page_mask.end(),
col_page_mask.begin() + num_inserted_data_pages,
col_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk);

data_page_mask.begin() + num_inserted_data_pages,
data_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk);
// Update the number of inserted data pages
num_inserted_data_pages += num_data_pages_this_col_chunk;
}
// Make sure we inserted exactly the number of data pages for this column
CUDF_EXPECTS(num_inserted_data_pages == col_page_mask.size(),
"Encountered mismatch in number of data pages and page mask size");
});

// Make sure we inserted exactly the number of pages for this pass
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
*
* @param data_page_mask Input data page mask from page-pruning step
*/
void set_pass_page_mask(cudf::host_span<std::vector<bool> const> data_page_mask);
void set_pass_page_mask(cudf::host_span<bool const> data_page_mask);

/**
* @brief Select the columns to be read based on the read mode
Expand Down Expand Up @@ -296,11 +296,12 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
* @param mode Value indicating if the data sources are read all at once or chunk by chunk
* @param row_group_indices Row group indices to read
* @param column_chunk_buffers Device buffers containing column chunk data
* @param data_page_mask Input data page mask from page-pruning step
*/
void prepare_data(read_mode mode,
cudf::host_span<std::vector<size_type> const> row_group_indices,
std::vector<rmm::device_buffer>&& column_chunk_buffers,
cudf::host_span<std::vector<bool> const> data_page_mask);
cudf::host_span<bool const> data_page_mask);

/**
* @brief Create descriptors for filter column chunks and decode dictionary page headers
Expand Down Expand Up @@ -341,8 +342,8 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
* @param data_page_mask Input data page mask from page-pruning step for the current pass
*/
void handle_chunking(read_mode mode,
std::vector<rmm::device_buffer> column_chunk_buffers,
cudf::host_span<std::vector<bool> const> data_page_mask);
std::vector<rmm::device_buffer>&& column_chunk_buffers,
cudf::host_span<bool const> data_page_mask);

/**
* @brief Setup step for the next input read pass.
Expand All @@ -352,7 +353,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
*
* @param column_chunk_buffers Device buffers containing column chunk data
*/
void setup_next_pass(std::vector<rmm::device_buffer> column_chunk_buffers);
void setup_next_pass(std::vector<rmm::device_buffer>&& column_chunk_buffers);

/**
* @brief Setup pointers to columns chunks to be processed for this pass.
Expand All @@ -368,7 +369,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
*
* @param column_chunk_buffers Device buffers containing column chunk data
*/
void setup_compressed_data(std::vector<rmm::device_buffer> column_chunk_buffers);
void setup_compressed_data(std::vector<rmm::device_buffer>&& column_chunk_buffers);

/**
* @brief Reset the internal state of the reader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ bool hybrid_scan_reader_impl::setup_column_chunks()
}

void hybrid_scan_reader_impl::setup_compressed_data(
std::vector<rmm::device_buffer> column_chunk_buffers)
std::vector<rmm::device_buffer>&& column_chunk_buffers)
{
auto& pass = *_pass_itm_data;

Expand Down
Loading