Core: Add v4 manifest reader#16958
Conversation
supports: - column projection - partition pruning via a row filter, including multi-spec manifests Content stats reading and metadata inheritance are not yet implemented.
| .specId(0); | ||
| } | ||
|
|
||
| private static TrackedFile fileWithStatus(EntryStatus status, String location) { |
There was a problem hiding this comment.
Not using the builder here so that we can set any status here.
gaborkaszab
left a comment
There was a problem hiding this comment.
I'm in the process of getting familiar with the details here. Asked early questions for my benefit.
Thanks @anoopj !
| * Builds the tracking field with {@link MetadataColumns#ROW_POSITION} appended so the reader | ||
| * populates the manifest position of each entry. | ||
| */ | ||
| private static Types.NestedField trackingWithRowPosition() { |
There was a problem hiding this comment.
These Tracking schemas are kind of confusing. Let me know if I get it wrong!
So Tracking.schema() is the physical representation of the data, what we expect to be written, or in other words, this is the write schema.
TrackingStruct.BASE_TYPE is the logical representation including field(s) that are physically not persisted, or in other words this is the read schema.
Naively asking, since we are on the read path, can't we simply use the read schema from TrackingStruct instead of constructing it here from the physical schema?
There was a problem hiding this comment.
Your understanding correct! Tracking.schema() is the on-disk schema. TrackingStruct.BASE_TYPE is the same schema plus row_position. We can reuse it. I just needed to change the visibility of TrackingStruct.BASE_TYPE from private to package private.
There was a problem hiding this comment.
I think the reason this is still needed is that we use the write schema (TrackedFile.schemaWithContentStats()) on the read path and as a result we have to iterate the fields and replace Tracking to hold 'logical' field(s) too. I'm not sure we have other occasions where we do the same for other types, but seems more complicated than necessary. Can't we simply have a method that produces the read schema for us including logical fields in Tracking`.
I believe we could then get rid of the whole for loop on fields to replace/omit some of them. It would be as simple as:
private Schema readSchema() {
if (fileProjection != null) {
// construct and return a schema using the projection + the mandatory fields
} else {
return someMethodToGetReadSchema();
}
}
There was a problem hiding this comment.
I'd lean toward keeping the loop. It derives the read schema from the canonical schemaWithContentStats(...), so field IDs and ordering stay in one place. A separately hand-built read schema would have to be kept in lockstep ), which is duplication we should try to avoid.
| // manifestLocation is not stored in the manifest; the reader fills it from the file location. | ||
| // manifestPos is filled from ROW_POSITION while reading the tracking struct. | ||
| if (tracking instanceof TrackingStruct) { | ||
| ((TrackingStruct) tracking).setManifestLocation(file.location()); |
There was a problem hiding this comment.
We populate Tracking.manifestPos using a metadata column ROW_POSITION, while we populate Tracking.manifestLocation manually here. For the latter, is there a reason we can't use MetadataColumns.FILE_PATH to be consistent?
We could get rid of the custom setter TrackingStruct.setManifestlocation method too and let it flow through internalSet getByPos methods.
There was a problem hiding this comment.
They look symmetric but the underlying mechanism differs. ROW_POSITION is synthesized by the reader itself. FILE_PATH us only populated when the caller injects it. I'd lean toward keeping the manual set for now
There was a problem hiding this comment.
I guess the difficulty here is that for InternalData API we don't have a way to pass the constants map to the underlying reader, right?
Would it worth considering as an improvement? For me both manifestLocation and manifestPos seems regular metadata columns, we just don't have the plumbing currently to make the former act like a metadata column.
I wonder what others say. Probably is an overkill at this point.
There was a problem hiding this comment.
manifestLocation might be able to be populated via constant map. But we didn't populate the manifestPos field, which may have to be populated via a setter method in this reader.
|
|
||
| private TrackedFile prepare(TrackedFile trackedFile) { | ||
| Tracking tracking = trackedFile.tracking(); | ||
| Preconditions.checkState( |
There was a problem hiding this comment.
Instead of checking here, should we rely on the builders enforcing this invariant?
There was a problem hiding this comment.
On the read path, we don't use builders. Tracking here comes from deserializing the manifest and not from a builder.
But i think the check here is not needed for a different reason. We have made Tracking a required field, so the internal reader should throw before prepare() runs. So this check is likely redundant.
| return entries; | ||
| } | ||
|
|
||
| private boolean matchesPartition(TrackedFile trackedFile) { |
There was a problem hiding this comment.
I thought we want to drop tuple-based pruning entirely and push predicates against partition-transform-expression columns in content_stats. Under that model the partition tuple is only needed to match data files to equality-delete files.
I am planning to bring up the discussion in Monday's sync.
There was a problem hiding this comment.
As a followup of today's discussion, we will keep the partition tuple for planning/pruning for now and we can move to content stats based pruning later. This implementation in this PR is purely a pruning optimization, so swapping/removing it has no correctness implications.
| import org.apache.iceberg.util.StructProjection; | ||
|
|
||
| /** Reader that reads a v4 manifest file as {@link TrackedFile}s. */ | ||
| class V4ManifestReader extends CloseableGroup implements CloseableIterable<TrackedFile> { |
There was a problem hiding this comment.
this name V4ManifestReader would become stale when V5 rolls in. maybe TrackedFileManifestReader or just TrackedFileReader.
There was a problem hiding this comment.
TrackedFile is an abstraction that we want to keep internal for now. V4ManifestReader is the best name I could come up with. Open to other suggestions here. cc @rdblue for his thoughts.
There was a problem hiding this comment.
This class implements and returns a CloseableIterable<TrackedFile> so we don't really keep TrackedFile internal in my opinion. Following this design we can name this TrackedFileReader.
There was a problem hiding this comment.
That is actually a good point. Leaving it open to hear from others.
There was a problem hiding this comment.
TrackedFile is internal in the sense that it is not a public interface (at least not yet). This class TrackedFileReader would probably only be package private.
| import org.apache.iceberg.util.StructProjection; | ||
|
|
||
| /** Reader that reads a v4 manifest file as {@link TrackedFile}s. */ | ||
| class V4ManifestReader extends CloseableGroup implements CloseableIterable<TrackedFile> { |
There was a problem hiding this comment.
This class implements and returns a CloseableIterable<TrackedFile> so we don't really keep TrackedFile internal in my opinion. Following this design we can name this TrackedFileReader.
| } | ||
|
|
||
| Builder project(Schema newFileProjection) { | ||
| this.fileProjection = newFileProjection; |
There was a problem hiding this comment.
nit: precondition for != null?
There was a problem hiding this comment.
This is intentional. Null projection means read all columns, and it's a supported case. This is the current semantics of the existing manifest reader as well.
| InputFile manifest = writeManifest(EMPTY_PARTITION, ImmutableList.of(file)); | ||
|
|
||
| TrackedFile actual = read(manifest, UNPARTITIONED_SPECS).get(0); | ||
| assertThat(actual.partition()).isNotNull(); |
There was a problem hiding this comment.
For unpartitioned entry ,the partition should be null, and also spec_id I believe, once the relevant PR is merged.
There was a problem hiding this comment.
Agreed on partition: once #17000 makes the partition field optional, I can update the test to expect null. For now, leaving the current assertions.
On spec_id: in this test the file is written with specId(0) and UNPARTITIONED_SPECS includes the unpartitioned spec (id 0), so it reads back as 0, not null. spec_id is null only for a file that has no spec at all.
| Integer specId = trackedFile.specId(); | ||
| Evaluator evaluator = specId != null ? partitionEvaluators.get(specId) : null; | ||
| StructProjection projection = specId != null ? partitionProjections.get(specId) : null; | ||
| Preconditions.checkState( |
There was a problem hiding this comment.
My point was that specId = null is a valid state, but we throw an exception here because of that because both the evaluator and the projection will be null.
|
|
||
| if (field.fieldId() == TrackedFile.TRACKING.fieldId()) { | ||
| fields.add(trackingWithRowPosition()); | ||
| } else if (field.fieldId() == TrackedFile.CONTENT_STATS_ID) { |
There was a problem hiding this comment.
Thanks for the explanation! Projecting stats makes total sense.
I think most of my comment on this area are for a single reason: for me this readSchema() function seems a bit more complicated than what it is for. What I have in mind is like this:
if (fileProjection == null) {
return TrackedFileStruct.READ_SCHEMA;
// this could be prepared with the desired Tracking schema, omitting stats, including partition unconditionally. Preferable a static field in TrackedFileStruct.
}
// construct the projected schema including the mandatory fields similarly as now
| @TestTemplate | ||
| public void testUnpartitioned() throws IOException { | ||
| TrackedFile file = dataFile("s3://bucket/file.parquet", EMPTY_PARTITION_DATA); | ||
|
|
There was a problem hiding this comment.
Is there a test where we do a projection that doesn't include partition, and partition added to the projection because of the partition filtering?
| TrackedFile.TRACKING.name(), | ||
| fullTracking ? TrackingStruct.BASE_TYPE : STATUS_TRACKING, | ||
| TrackedFile.TRACKING.doc())); | ||
| } else if (field.fieldId() == TrackedFile.CONTENT_STATS_ID) { |
There was a problem hiding this comment.
I'm not convinced this is how we'll be projecting stats from the read path. Ideally, we wouldn't project the whole stats field (and that's just the containing field id).
What we should have is the specific field ids for the individual status required to evaluate the filter.
There was a problem hiding this comment.
I do agree: we will be projecting only the field IDs we need. This is just a placeholder for now, as the interfaces for content stats are being revamped.
|
|
||
| CloseableIterable<TrackedFile> reader = | ||
| InternalData.read(format, file) | ||
| .project(readSchema()) |
There was a problem hiding this comment.
I think we're missing CONTENT_TYPE, which always needs to be projected due to the matchesPartition relying on it.
There was a problem hiding this comment.
That makes sense. Added content_type as always projected.
| private CloseableIterable<TrackedFile> open() { | ||
| FileFormat format = FileFormat.fromFileName(file.location()); | ||
| Preconditions.checkArgument( | ||
| format != null, "Unable to determine format of manifest: %s", file.location()); |
There was a problem hiding this comment.
Prefer: "Cannot determine format of manifest: %s"
There was a problem hiding this comment.
Changed. I was trying to be consistent in error messages with the current reader.
| Integer specId = trackedFile.specId(); | ||
| Evaluator evaluator = specId != null ? partitionEvaluators.get(specId) : null; | ||
| StructProjection projection = specId != null ? partitionProjections.get(specId) : null; | ||
| Preconditions.checkState( |
There was a problem hiding this comment.
specId must be null for leaf manifest entry in the root manifest file, as a leaf manifest file can contain data file entries with mixed spec. It doesn't make sense to populate a single specId. Leaf manifest file entry should only have partition_summary field populated.
Hence the evaluation and preconditions check seem to only apply to data file entries in a leaf manifest file.
| // manifestLocation is not stored in the manifest; the reader fills it from the file location. | ||
| // manifestPos is filled from ROW_POSITION while reading the tracking struct. | ||
| if (tracking instanceof TrackingStruct) { | ||
| ((TrackingStruct) tracking).setManifestLocation(file.location()); |
There was a problem hiding this comment.
manifestLocation might be able to be populated via constant map. But we didn't populate the manifestPos field, which may have to be populated via a setter method in this reader.
| .map(Types.NestedField::fieldId) | ||
| .collect(Collectors.toCollection(Sets::newHashSet)); | ||
|
|
||
| // read the full tracking struct only when the caller requests it; otherwise force-add a |
There was a problem hiding this comment.
what the use case of full tracking vs minimal tracking?
minimal tracking carrying just the status used to filter live files
don't we always need some tracking fields (like seq numbers, first row id, etc.) besides the status?
There was a problem hiding this comment.
I just checked ManifestReader to see what it adds to the projection:
In ManifestReader.open(projection) and in ManifestEntry.wrapFileSchema() we add:
record_count,
first_row_id,
_pos,
status,
snapshot_id,
sequence_number,
file_sequence_number.
What remains in Tracking above these are specific to V4: dv_snapshot_id, deleted_positions and replaced_positions (and later the column file related snapshot ID). I have the feeling that we're going to need all of these, so adding the full tracking to the projection seems reasonable to me at this point.
Also, to be consistent with ManifestReader we also need _pos and record_count, right?
There was a problem hiding this comment.
I initially left them out so that inheritance can be a separate PR. I think it makes sense to add them here.
But I think we should probably not always project out the whole tracking. dv_snapshot_id, replaced/deleted_positions are required for CDF readers. Especially the latter two fields are fairly large bitmaps. We should probably not project them on the scan path.
I have updated the PR such that we always project out the fields we need for scan but not for CDF.
| projectedIds.add(TrackedFile.TRACKING.fieldId()); | ||
|
|
||
| // project spec_id for partition filtering | ||
| if (!partitionEvaluators.isEmpty()) { |
There was a problem hiding this comment.
PARTITION_ID should be force-added here too when partitionEvaluators is non-empty. matchesPartition calls evaluator.eval(projection.wrap(trackedFile.partition())), so a caller that passes a fileProjection excluding partition + a row filter ends up evaluating against an empty tuple and prunes every row.
Same reasoning as spec_id on the line above: fields the reader itself needs for filtering must be projected regardless of what the caller asked for. A test covering the caller-projected-subset + filter combination would surface this.
There was a problem hiding this comment.
Good catch, fixed such that we will be force-projecting partition (alongside spec_id) when a filter is active. Also added testPartitionFilterForceProjectsFilterFields() as a test
| return matches; | ||
| } | ||
|
|
||
| private CloseableIterable<TrackedFile> open() { |
There was a problem hiding this comment.
firstRowId is not supported in this version?
There was a problem hiding this comment.
Yes, row lineage isn't wired up in this reader yet.
The reader supports:
Content stats reading and metadata inheritance are not yet implemented.