Skip to content

fix: size Iceberg delete files in the native scan to avoid dropping deletes#4760

Merged
mbutrovich merged 7 commits into
apache:mainfrom
mbutrovich:delete-file-size
Jun 30, 2026
Merged

fix: size Iceberg delete files in the native scan to avoid dropping deletes#4760
mbutrovich merged 7 commits into
apache:mainfrom
mbutrovich:delete-file-size

Conversation

@mbutrovich

@mbutrovich mbutrovich commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4723.

Rationale for this change

The native Iceberg scan returned more rows than Spark on MOR tables with positional deletes (the reporter in #4723 saw ~42% extra rows and a WARN CometIcebergNativeScan: Failed to serialize delete file: null). Root cause: the JVM serializer sized each delete file with a FileIO.getLength() HEAD on a reconstructed FileIO, and on any failure (e.g. a path the reconstructed FileIO could not resolve) it silently dropped the delete file. Dropped deletes mean deleted rows leak back into the scan, producing wrong counts and inflated aggregates.

The size was never needed on the JVM side. iceberg-rust seeks each delete file's Parquet footer from file_size_in_bytes, and the native scan already holds a working FileIO (the one that reads the data files). So the native side can stat the delete file itself, with the same FileIO, instead of trusting a fragile JVM-side HEAD or the manifest value (which is itself untrustworthy, see apache/iceberg#12554). A delete file that genuinely cannot be sized now fails the query loudly instead of being dropped.

What changes are included in this PR?

  • Stop sizing delete files on the JVM side and remove file_size_in_bytes from the IcebergDeleteFile proto message. A delete file whose path cannot be extracted is now fatal rather than silently skipped.
  • Native scan fills each unique delete file's size as the first step of the FileScanTaskStream, statting via FileIO::new_input(path).metadata() on the iceberg runtime (deduped, concurrent). A stat failure is fatal and surfaces as a query error, never a silent row leak.

How are these changes tested?

  • Native unit tests for fill_delete_file_sizes: errors on an unreadable delete file, populates the real size, no-op without deletes.
  • CometIcebergNativeSuite: existing MOR positional/equality delete tests now exercise the native fill; new tests cover deletes after rewrite_data_files compaction and assert that an unreadable delete file fails the scan loudly instead of dropping deletes.
  • IcebergReadFromS3Suite: MOR-with-deletes over MinIO exercises the fill against an S3 object store.
  • Reproduced the original corruption against main: a MOR table with three deleted rows whose delete file is then removed from disk returns COUNT(*) = 20 instead of 17 (deletes silently dropped, only a WARN). With this change the same scenario fails loudly instead of returning wrong data.

@comphead

Copy link
Copy Markdown
Contributor

@sandugood any chance you can test this PR on your data?

@sandugood

Copy link
Copy Markdown

@sandugood any chance you can test this PR on your data?

Sure!
Going to compile it and test

@sandugood

sandugood commented Jun 29, 2026

Copy link
Copy Markdown

When trying to do a simple .count() over an Iceberg table got an exception
org.apache.comet.CometNativeException: Iceberg scan error: Unexpected => file scan task generate failed, source: Unexpected => Failed to load Parquet metadata, source: EOF: file size of 0 is less than footer

@mbutrovich

mbutrovich commented Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

When trying to do a simple .count() over an Iceberg table got an exception org.apache.comet.CometNativeException: Iceberg scan error: Unexpected => file scan task generate failed, source: Unexpected => Failed to load Parquet metadata, source: EOF: file size of 0 is less than footer

What sort of catalog are you using? Glue? REST catalog? This seems like we're getting a delete file of size 0 from the manifest after compaction with Trino.

@sandugood

Copy link
Copy Markdown

When trying to do a simple .count() over an Iceberg table got an exception org.apache.comet.CometNativeException: Iceberg scan error: Unexpected => file scan task generate failed, source: Unexpected => Failed to load Parquet metadata, source: EOF: file size of 0 is less than footer

What sort of catalog are you using? Glue? REST catalog? This seems like we're getting a delete file of size 0 from the manifest after compaction with Trino.

Catalog: Hive Metastore (HMS) on-prem
Storage: MinIO (path-style access)
Write engine: Trino for ETL jobs + OPTIMIZE after each run

@mbutrovich

mbutrovich commented Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Catalog: Hive Metastore (HMS) on-prem Storage: MinIO (path-style access) Write engine: Trino for ETL jobs + OPTIMIZE after each run

Thanks. I suspect we're not extracting all of the catalog properties to the native reader so when we're trying to stat the delete files we're getting errors: on main silently, and in this branch by somehow interpreting the size as 0.

How are you providing credentials?

@sandugood

Copy link
Copy Markdown

Catalog: Hive Metastore (HMS) on-prem Storage: MinIO (path-style access) Write engine: Trino for ETL jobs + OPTIMIZE after each run

Thanks. I suspect we're not extracting all of the catalog properties to the native reader so when we're trying to stat the delete files we're getting errors: on main silently, and in this branch by somehow interpreting the size as 0.

How are you providing credentials?

Basically just baking certs into the Spark+Comet executor image

COPY certs/*.crt /usr/local/share/ca-certificates/
RUN apt-get update && update-ca-certificates
ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt

@mbutrovich

Copy link
Copy Markdown
Contributor Author

Basically just baking certs into the Spark+Comet executor image

To phrase it differently, how is Comet getting its S3 credentials and endpoint config? Specifically:

  1. Credentials: are you setting explicit keys (spark.sql.catalog.<cat>.s3.access-key-id / s3.secret-access-key, or spark.hadoop.fs.s3a.access.key / secret.key), or relying on the AWS default chain (env vars like AWS_ACCESS_KEY_ID, an instance profile, or IRSA)?
  2. Endpoint and path-style: where are s3.endpoint / s3.path-style-access (or fs.s3a.endpoint / fs.s3a.path.style.access) set? On the catalog, in spark.hadoop.*, or in core-site.xml?
  3. Could you paste your spark.sql.catalog.<cat>.* config (redact secrets)?

The reason it matters: the native scan rebuilds its own object-store client from the properties Spark forwards. If the endpoint, path-style, and credentials come through one channel but not the one we harvest, the native client can reach the data files but mis-resolve the delete file, which is what we are chasing.

I just pushed a change that fails loudly with the exact path and byte count instead of the opaque footer error. After you rebuild, the new message will look like Delete file '...' statted to N bytes. Could you send that line, plus the matching HEAD entry from the MinIO server/audit log for that delete key (status code and whether a Content-Length came back)?

@sandugood

sandugood commented Jun 30, 2026

Copy link
Copy Markdown

Basically just baking certs into the Spark+Comet executor image

To phrase it differently, how is Comet getting its S3 credentials and endpoint config? Specifically:

  1. Credentials: are you setting explicit keys (spark.sql.catalog.<cat>.s3.access-key-id / s3.secret-access-key, or spark.hadoop.fs.s3a.access.key / secret.key), or relying on the AWS default chain (env vars like AWS_ACCESS_KEY_ID, an instance profile, or IRSA)?
  2. Endpoint and path-style: where are s3.endpoint / s3.path-style-access (or fs.s3a.endpoint / fs.s3a.path.style.access) set? On the catalog, in spark.hadoop.*, or in core-site.xml?
  3. Could you paste your spark.sql.catalog.<cat>.* config (redact secrets)?

The reason it matters: the native scan rebuilds its own object-store client from the properties Spark forwards. If the endpoint, path-style, and credentials come through one channel but not the one we harvest, the native client can reach the data files but mis-resolve the delete file, which is what we are chasing.

I just pushed a change that fails loudly with the exact path and byte count instead of the opaque footer error. After you rebuild, the new message will look like Delete file '...' statted to N bytes. Could you send that line, plus the matching HEAD entry from the MinIO server/audit log for that delete key (status code and whether a Content-Length came back)?

Basically just baking certs into the Spark+Comet executor image

To phrase it differently, how is Comet getting its S3 credentials and endpoint config? Specifically:

  1. Credentials: are you setting explicit keys (spark.sql.catalog.<cat>.s3.access-key-id / s3.secret-access-key, or spark.hadoop.fs.s3a.access.key / secret.key), or relying on the AWS default chain (env vars like AWS_ACCESS_KEY_ID, an instance profile, or IRSA)?
  2. Endpoint and path-style: where are s3.endpoint / s3.path-style-access (or fs.s3a.endpoint / fs.s3a.path.style.access) set? On the catalog, in spark.hadoop.*, or in core-site.xml?
  3. Could you paste your spark.sql.catalog.<cat>.* config (redact secrets)?

The reason it matters: the native scan rebuilds its own object-store client from the properties Spark forwards. If the endpoint, path-style, and credentials come through one channel but not the one we harvest, the native client can reach the data files but mis-resolve the delete file, which is what we are chasing.

I just pushed a change that fails loudly with the exact path and byte count instead of the opaque footer error. After you rebuild, the new message will look like Delete file '...' statted to N bytes. Could you send that line, plus the matching HEAD entry from the MinIO server/audit log for that delete key (status code and whether a Content-Length came back)?

In terms of connections (creds) everything worked out essentialy. Thanks!
And yeah, we are using explicit keys with spark.sql.catalog.<cat>.s3.access-key-id and also a path-style access (spark.sql.catalog.iceberg.s3.path-style-access). Here is full catalog config for the SparkSession that I am testing Comet on.

spark.sql.catalog.iceberg.s3.secret-access-key secret_key
spark.sql.catalog.iceberg.type hive
spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.s3.path-style-access true
spark.sql.catalog.iceberg.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg.s3.endpoint https://endpoint:port/
spark.sql.catalog.iceberg.client.region dummy
spark.sql.catalog.iceberg.uri thrift://hms_endpoint:port
spark.sql.catalog.iceberg.s3.separator /
spark.sql.catalog.iceberg.s3.access-key-id access_key

Been testing this on several of prod tables. Both with and without delete files.
Results:

  • on a single table with delete files that was giving bloated results on .count() everything now is fixed. When performing a simple groupBy over a date type everything seems to be aggregating the same as default Spark. (testing Spark4.0+Comet vs Spark3.5/4.0/4.1)
  • when trying on a rather sophisticated pipeline (feature store described earlier; joining 3 tables together, one of which was stated in the previous bullet point, so it shouldn't be a problematic one, but still wrong results) now aggregated values are smaller, I would say like 10x smaller. COUNT(*) on the resulting tables is the same, however aggregated values differ significantly. Almost all of the plan was presented in the issue.

I think delete files' problem when reading Iceberg tables is now gone, however when aggregating values we get incorrect and smaller, not bloated, values.
All of the results are reproducible in production, tried them several times, so it is not a random fluctuation.

@sandugood

sandugood commented Jun 30, 2026

Copy link
Copy Markdown

Will try to give more context as this might be suitable (however, it might be related to a different issue):

When running the same query (as presented in the issue that his PR tackles) just from logs it can be seen that Comet skips a rather big stage.

  1. Default Spark has both of these stages at the beginning of the execution:
    [Stage 0:> (0 + 0) / 7350][Stage 1:> (0 + 0) / 7350]
  2. Comet has only one stage with 7350 tasks.

It might be relative to FULLOUTER join. Because when we are forming features with a 30-day, 180-day, 365-day windows everything seems fine and resulting values are the same across both engines. However when performing FULLOUTER join for the end result - we get significantly smaller values on Comet's side.

@sandugood

sandugood commented Jun 30, 2026

Copy link
Copy Markdown

Another note:

We tried to cast decimal to float and vice-versa on the same query. The results didn't change (and were correct before the final step across both engines). So I wouldn't say that it is related to type conversion or floating-point precision

@mbutrovich

Copy link
Copy Markdown
Contributor Author

Can you file a followup issue for the aggregation issues now that the delete file issue is resolved by this PR? I'd like to get this one merged, then we'll work on the aggregation difference in a another issue/PR. Thank you for your help thus far!

@mbutrovich mbutrovich requested a review from andygrove June 30, 2026 13:50

@andygrove andygrove left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mbutrovich

@mbutrovich mbutrovich merged commit b70e529 into apache:main Jun 30, 2026
135 of 136 checks passed
@mbutrovich mbutrovich deleted the delete-file-size branch June 30, 2026 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Comet produces bloated results in comparison with Spark

4 participants