fix: handle non-contiguous RowRanges when resolving global row IDs#383
fix: handle non-contiguous RowRanges when resolving global row IDs#383zhf999 wants to merge 11 commits into
Conversation
…ousBatchFirstRowId to GetGlobalRowId
| virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0; | ||
| /// Get the global row number of the row in the previously read batch. | ||
| virtual Result<uint64_t> GetPreviousBatchGlobalRowId(uint64_t batch_row_id) const = 0; | ||
|
|
There was a problem hiding this comment.
GetPreviousBatchGlobalRowId -> GetPreviousBatchFileRowId
| PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, reader_->GetPreviousBatchGlobalRowId(i)); | ||
| if (bitmap_.Contains(global_row_id)) { | ||
| is_valid.Add(i); | ||
| } |
There was a problem hiding this comment.
This looks inefficient. Contains starts scanning from the beginning on every call, which can be quite costly for large bitmaps. In this case, since the iterator only moves forward, it should be enough to adjust the filter logic to take advantage of that.
| Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { | ||
| return previous_batch_first_row_num_; | ||
| Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchGlobalRowId( | ||
| uint64_t batch_row_id) const { |
There was a problem hiding this comment.
Why can’t we just return previous_batch_first_row_num_ + batch_row_id directly?
| ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0); | ||
| uint64_t global_row_id = 0; | ||
| ASSERT_OK_AND_ASSIGN(global_row_id, reader->GetPreviousBatchGlobalRowId(0)); | ||
| ASSERT_EQ(global_row_id, 0); |
There was a problem hiding this comment.
Using .value() directly should also work here.
| ReadResultCollector::CollectResult( | ||
| reader.get(), /*max simulated data processing time*/ 100)); | ||
| ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10); | ||
| auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array); |
There was a problem hiding this comment.
If we drop the final validation, this scenario would no longer be exercised by the test. Could we mimic ReadResultCollector::CollectResult inside this test instead? We don’t need to materialize the full result — the goal is just to validate the last row of the final batch.
| PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, | ||
| reader_->GetPreviousBatchGlobalRowId(idx_in_array)); | ||
| return first_row_id_.value() + global_row_id; | ||
| }; |
There was a problem hiding this comment.
The name global row id doesn’t seem quite appropriate in this scenario. It usually implies a row ID at the table level, while here global_row_id actually refers to a row ID within the file.
| Result<uint64_t> GetPreviousBatchFirstRowNumber() const override { | ||
| return ToReaderRowNumber(previous_batch_first_row_num_); | ||
| Result<uint64_t> GetPreviousBatchGlobalRowId(uint64_t batch_row_id) const override { | ||
| return previous_batch_first_row_num_ + batch_row_id; |
There was a problem hiding this comment.
If previous_batch_first_row_num_ can be -1, then the current implementation of GetPreviousBatchGlobalRowId would be incorrect, which means the refactoring is not behaviorally equivalent.
|
|
||
| static Result<std::shared_ptr<arrow::ChunkedArray>> CollectResultOneBatch( | ||
| BatchReader* batch_reader) { | ||
| return CollectResultOneBatch(batch_reader, /*max simulated data processing time*/ 0); |
There was a problem hiding this comment.
/*max_data_processing_time_in_us=*/
|
|
||
| static Result<std::shared_ptr<arrow::ChunkedArray>> CollectResultOneBatch( | ||
| BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { | ||
| int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); |
There was a problem hiding this comment.
Can CollectResultOneBatch just return an Arrow array directly? It doesn’t look like we need a ChunkedArray here.
|
|
||
| Result<RowRanges> ParquetFileBatchReader::GetAllTargetRowRanges( | ||
| const std::vector<TargetRowGroup>& target_row_groups) { | ||
| row_mapping_.clear(); |
There was a problem hiding this comment.
Please rename this to something like UpdateAllTargetRowRanges. Methods prefixed with Get should ideally avoid modifying member variables. Also, update doesn’t need to return anything here — it can just update all_row_ranges_ directly.
Purpose
previous_batch_start + offset.GetPreviousBatchGlobalRowId(batch_row_id)to resolve the global row ID for a row index inside the current batch.PrefetchFileBatchReaderImpl, cache the actual global row IDs for each returned batch and keep row-id mapping aligned when a batch is sliced byread_range.batch_row -> global_row_idmapping inNextBatch().GetPreviousBatchGlobalRowId()correct under non-contiguous rows caused by predicate + bitmap filtering._ROW_IDfield conversion, KeyValue iteration positions).Tests
src/paimon/format/avro/avro_file_batch_reader_test.cppsrc/paimon/format/blob/blob_file_batch_reader_test.cppsrc/paimon/format/lance/lance_format_reader_writer_test.cppsrc/paimon/format/orc/orc_file_batch_reader_test.cppsrc/paimon/format/parquet/parquet_file_batch_reader_test.cppsrc/paimon/common/reader/prefetch_file_batch_reader_impl_test.cppTestRowMappingto validate global row mapping across non-contiguous ranges.API and Format
FileBatchReaderand implementations now useGetPreviousBatchGlobalRowId(uint64_t batch_row_id).batch_row_idinside the current batch (instead of deriving by batch start + offset under contiguous assumptions).Documentation
No.
Generative AI tooling
gpt-5.3-codex