fix: size Iceberg delete files in the native scan to avoid dropping deletes#4760
Conversation
|
@sandugood any chance you can test this PR on your data? |
Sure! |
|
When trying to do a simple |
What sort of catalog are you using? Glue? REST catalog? This seems like we're getting a delete file of size 0 from the manifest after compaction with Trino. |
Catalog: Hive Metastore (HMS) on-prem |
Thanks. I suspect we're not extracting all of the catalog properties to the native reader so when we're trying to stat the delete files we're getting errors: on How are you providing credentials? |
Basically just baking certs into the Spark+Comet executor image |
To phrase it differently, how is Comet getting its S3 credentials and endpoint config? Specifically:
The reason it matters: the native scan rebuilds its own object-store client from the properties Spark forwards. If the endpoint, path-style, and credentials come through one channel but not the one we harvest, the native client can reach the data files but mis-resolve the delete file, which is what we are chasing. I just pushed a change that fails loudly with the exact path and byte count instead of the opaque footer error. After you rebuild, the new message will look like |
In terms of connections (creds) everything worked out essentialy. Thanks! spark.sql.catalog.iceberg.s3.secret-access-key Been testing this on several of prod tables. Both with and without delete files.
I think delete files' problem when reading Iceberg tables is now gone, however when aggregating values we get incorrect and smaller, not bloated, values. |
|
Will try to give more context as this might be suitable (however, it might be related to a different issue): When running the same query (as presented in the issue that his PR tackles) just from logs it can be seen that Comet skips a rather big stage.
It might be relative to FULLOUTER join. Because when we are forming features with a 30-day, 180-day, 365-day windows everything seems fine and resulting values are the same across both engines. However when performing FULLOUTER join for the end result - we get significantly smaller values on Comet's side. |
|
Another note: We tried to cast |
|
Can you file a followup issue for the aggregation issues now that the delete file issue is resolved by this PR? I'd like to get this one merged, then we'll work on the aggregation difference in a another issue/PR. Thank you for your help thus far! |
Which issue does this PR close?
Closes #4723.
Rationale for this change
The native Iceberg scan returned more rows than Spark on MOR tables with positional deletes (the reporter in #4723 saw ~42% extra rows and a
WARN CometIcebergNativeScan: Failed to serialize delete file: null). Root cause: the JVM serializer sized each delete file with aFileIO.getLength()HEAD on a reconstructed FileIO, and on any failure (e.g. a path the reconstructed FileIO could not resolve) it silently dropped the delete file. Dropped deletes mean deleted rows leak back into the scan, producing wrong counts and inflated aggregates.The size was never needed on the JVM side. iceberg-rust seeks each delete file's Parquet footer from
file_size_in_bytes, and the native scan already holds a working FileIO (the one that reads the data files). So the native side can stat the delete file itself, with the same FileIO, instead of trusting a fragile JVM-side HEAD or the manifest value (which is itself untrustworthy, see apache/iceberg#12554). A delete file that genuinely cannot be sized now fails the query loudly instead of being dropped.What changes are included in this PR?
file_size_in_bytesfrom theIcebergDeleteFileproto message. A delete file whose path cannot be extracted is now fatal rather than silently skipped.FileScanTaskStream, statting viaFileIO::new_input(path).metadata()on the iceberg runtime (deduped, concurrent). A stat failure is fatal and surfaces as a query error, never a silent row leak.How are these changes tested?
fill_delete_file_sizes: errors on an unreadable delete file, populates the real size, no-op without deletes.CometIcebergNativeSuite: existing MOR positional/equality delete tests now exercise the native fill; new tests cover deletes afterrewrite_data_filescompaction and assert that an unreadable delete file fails the scan loudly instead of dropping deletes.IcebergReadFromS3Suite: MOR-with-deletes over MinIO exercises the fill against an S3 object store.main: a MOR table with three deleted rows whose delete file is then removed from disk returnsCOUNT(*) = 20instead of17(deletes silently dropped, only aWARN). With this change the same scenario fails loudly instead of returning wrong data.