Skip to content

[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001

Open
jsingh-yelp wants to merge 11 commits into
apache:masterfrom
jsingh-yelp:add-lineage-for-non-table-apis
Open

[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001
jsingh-yelp wants to merge 11 commits into
apache:masterfrom
jsingh-yelp:add-lineage-for-non-table-apis

Conversation

@jsingh-yelp

@jsingh-yelp jsingh-yelp commented May 27, 2026

Copy link
Copy Markdown
Contributor

Purpose

What is covered?

Component Approach Covers
DataStream Sources PaimonDataStreamSource wraps any Paimon Source with LineageVertexProvider StaticFileStoreSourceContinuousFileStoreSourceAlignedContinuousFileStoreSourceMonitorSource (via FlinkSourceBuilder)
DataStream Sinks PaimonDiscardingSink extends DiscardingSink with lineage All FlinkSink subclasses (append, upsert, CDC fixed/dynamic bucket)
Format Table Sink FormatTableSink implements LineageVertexProvider directly FlinkFormatTableDataStreamSink (Parquet/ORC direct writes)

What is still not covered?

Not covered Reason
Multi-table CDC sinks (FlinkCdcMultiTableSink) Tables are created dynamically at runtime, while getLineageVertex() is called during graph construction before the output tables are known. This can be revisited in a future improvement.
CompactorSourceBuilder / SystemTableSource Maintenance or metadata-oriented paths rather than regular source-to-sink user data pipelines.

Tests

  • Added tests for the changes done in this PR.
  • Did manual testing for various use cases I had readily available for both Paimon as a source and sink:

Example paimon table looks like this inside the lineage event:

{
      "namespace": "file:///tmp/paimon-warehouse",
      "name": "jdbc.jaskaran_test.default_value",
      "facets": {
        "schema": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
          "fields": [
            {
              "name": "id",
              "type": "STRING NOT NULL"
            },
            {
              "name": "age",
              "type": "INT"
            },
            {
              "name": "source",
              "type": "STRING"
            }
          ]
        },
        "symlinks": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
          "identifiers": []
        },
        "config": {
          "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.47.1/integration/flink",
          "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/DatasetFacet",
          "bucket": "-1",
          "catalog.warehouse": "file:///tmp/paimon-warehouse",
          "path": "file:/tmp/paimon-warehouse/jaskaran_test.db/default_value",
          "write-only": "true",
          "changelog-producer": "input",
          "partition-keys": "",
          "type": "paimon",
          "primary-keys": "id"
        }
      }
    }

@jsingh-yelp jsingh-yelp force-pushed the add-lineage-for-non-table-apis branch from feac25e to 202acc6 Compare May 27, 2026 18:47
@jsingh-yelp jsingh-yelp changed the title [flink] Implement FLIP-314 LineageVertexProvider for non-table APIs [flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs May 27, 2026
@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

cc: @JingsongLi and @yunfengzhou-hub can I please a review on this PR, this is extension to work done previously in:
#7311

@leaves12138

Copy link
Copy Markdown
Contributor

Thanks for the contribution. I am holding off on approval because the current CI status still has a failing build_test job. Please fix or rerun it, then I can take another pass.

@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

@leaves12138 I re-triggered the CI and all tests are passing now, can you please give another pass. Thanks a lot.

@jsingh-yelp

jsingh-yelp commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

@leaves12138 Sorry to re-tag you can I please have review on this PR?

@jsingh-yelp jsingh-yelp force-pushed the add-lineage-for-non-table-apis branch 6 times, most recently from ac873b1 to ecec184 Compare June 13, 2026 15:51
@jsingh-yelp jsingh-yelp force-pushed the add-lineage-for-non-table-apis branch from ecec184 to 39704ef Compare June 13, 2026 16:10
@jsingh-yelp jsingh-yelp force-pushed the add-lineage-for-non-table-apis branch from 17dc2db to 8b6a41e Compare June 16, 2026 03:44
@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

@JingsongLi / @leaves12138 gentle reminder can I please have a review on this PR?

@JingsongLi once this PR is merged, I can add docs on how paimon support native lineage for flink.

@jsingh-yelp jsingh-yelp requested a review from JingsongLi June 22, 2026 02:06
@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

@JingsongLi Can I please get a review on this PR?

@jsingh-yelp jsingh-yelp requested a review from JingsongLi June 24, 2026 04:03
@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

@JingsongLi can you please give another pass to this PR?

catalogOptions.get(CatalogOptions.METASTORE))) {
String catalogKeyValue = catalogOptions.get(JdbcCatalogOptions.CATALOG_KEY);
if (!StringUtils.isNullOrWhitespaceOnly(catalogKeyValue)) {
return catalogKeyValue + "." + table.fullName();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also changes the existing table/SQL lineage path. PaimonDataStreamScanProvider and PaimonDataStreamSinkProvider pass the Flink tableIdentifier.asSummaryString() as an explicit name, but for JDBC catalogs this branch ignores it and emits catalog-key + "." + table.fullName() instead, while non-JDBC catalogs keep the Flink identifier. That makes table/SQL dataset names metastore-dependent even though the new derivation is only needed when the non-table DataStream APIs do not have a name. Please keep defaultName when it is non-null, and only derive the catalog-key name when no explicit name was provided; a JDBC-metastore regression for the explicit-name overload would catch this.

@jsingh-yelp jsingh-yelp Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi This is by design, the catalog-key override for JDBC metastore on the Table API path is intentional. Catalog names which is used in tableIdentifier.asSummaryString() are arbitrary per-job the same physical table can appear under different lineage dataset names depending on how the job defines its catalog name.

catalog-key is a more stable cross-job identifier that users can explicitly configure. This is particularly valuable when the same database and table exist across different catalogs the catalog-key disambiguates them.

For context, this is the follow up ticket to support this in Flink: https://issues.apache.org/jira/browse/FLINK-39935

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. I still think the explicit defaultName should win in this overload. The Table/SQL providers pass the resolved Flink table identifier here, so overriding it only for JDBC catalogs changes the existing table/SQL lineage name even though the catalog-key derivation is only needed for the new DataStream APIs where no name is available. That makes the same SQL/table job report a different dataset name after switching the catalog metastore/config, which is a surprising behavior change for this PR. Please keep defaultName when it is non-null and use catalog-key + "." + table.fullName() only for the null-name DataStream overload (or split the Table/SQL naming change into a separate documented change with compatibility discussion and tests).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Makes sense, i have added the fix as suggested. I can create a separate PR with reasoning for the table/sql flow.

@jsingh-yelp jsingh-yelp requested a review from JingsongLi June 24, 2026 06:27
@jsingh-yelp

jsingh-yelp commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

@JingsongLi Can you please share your thoughts for this comment: #8001 (comment) I think catalog-key provided by jdbc is more stable then catalog-name in case it is available, but please let me know your thoughts.

@jsingh-yelp

Copy link
Copy Markdown
Contributor Author

@JingsongLi I have added the suggested fix here: #8001 (comment) please let me know your thoughts thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants