[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001
[flink] Implement FLIP-314 LineageVertexProvider for non-table source & sink APIs#8001jsingh-yelp wants to merge 11 commits into
Conversation
feac25e to
202acc6
Compare
|
cc: @JingsongLi and @yunfengzhou-hub can I please a review on this PR, this is extension to work done previously in: |
|
Thanks for the contribution. I am holding off on approval because the current CI status still has a failing |
|
@leaves12138 I re-triggered the CI and all tests are passing now, can you please give another pass. Thanks a lot. |
|
@leaves12138 Sorry to re-tag you can I please have review on this PR? |
ac873b1 to
ecec184
Compare
ecec184 to
39704ef
Compare
17dc2db to
8b6a41e
Compare
|
@JingsongLi / @leaves12138 gentle reminder can I please have a review on this PR? @JingsongLi once this PR is merged, I can add docs on how paimon support native lineage for flink. |
|
@JingsongLi Can I please get a review on this PR? |
|
@JingsongLi can you please give another pass to this PR? |
| catalogOptions.get(CatalogOptions.METASTORE))) { | ||
| String catalogKeyValue = catalogOptions.get(JdbcCatalogOptions.CATALOG_KEY); | ||
| if (!StringUtils.isNullOrWhitespaceOnly(catalogKeyValue)) { | ||
| return catalogKeyValue + "." + table.fullName(); |
There was a problem hiding this comment.
This also changes the existing table/SQL lineage path. PaimonDataStreamScanProvider and PaimonDataStreamSinkProvider pass the Flink tableIdentifier.asSummaryString() as an explicit name, but for JDBC catalogs this branch ignores it and emits catalog-key + "." + table.fullName() instead, while non-JDBC catalogs keep the Flink identifier. That makes table/SQL dataset names metastore-dependent even though the new derivation is only needed when the non-table DataStream APIs do not have a name. Please keep defaultName when it is non-null, and only derive the catalog-key name when no explicit name was provided; a JDBC-metastore regression for the explicit-name overload would catch this.
There was a problem hiding this comment.
@JingsongLi This is by design, the catalog-key override for JDBC metastore on the Table API path is intentional. Catalog names which is used in tableIdentifier.asSummaryString() are arbitrary per-job the same physical table can appear under different lineage dataset names depending on how the job defines its catalog name.
catalog-key is a more stable cross-job identifier that users can explicitly configure. This is particularly valuable when the same database and table exist across different catalogs the catalog-key disambiguates them.
For context, this is the follow up ticket to support this in Flink: https://issues.apache.org/jira/browse/FLINK-39935
There was a problem hiding this comment.
Thanks for the context. I still think the explicit defaultName should win in this overload. The Table/SQL providers pass the resolved Flink table identifier here, so overriding it only for JDBC catalogs changes the existing table/SQL lineage name even though the catalog-key derivation is only needed for the new DataStream APIs where no name is available. That makes the same SQL/table job report a different dataset name after switching the catalog metastore/config, which is a surprising behavior change for this PR. Please keep defaultName when it is non-null and use catalog-key + "." + table.fullName() only for the null-name DataStream overload (or split the Table/SQL naming change into a separate documented change with compatibility discussion and tests).
There was a problem hiding this comment.
@JingsongLi Makes sense, i have added the fix as suggested. I can create a separate PR with reasoning for the table/sql flow.
|
@JingsongLi Can you please share your thoughts for this comment: #8001 (comment) I think |
|
@JingsongLi I have added the suggested fix here: #8001 (comment) please let me know your thoughts thanks. |
Purpose
FLIP-314lineage support for Paimon's non-Table (DataStream) source & sink APIs. This PR addsLineageVertexProvidersupport for jobs that useFlinkSourceBuilderandFlinkSinkdirectly.What is covered?
PaimonDataStreamSourcewraps any Paimon Source withLineageVertexProviderStaticFileStoreSource,ContinuousFileStoreSource,AlignedContinuousFileStoreSource,MonitorSource(via FlinkSourceBuilder)PaimonDiscardingSinkextendsDiscardingSinkwith lineageFormatTableSinkimplementsLineageVertexProviderdirectlyFlinkFormatTableDataStreamSink(Parquet/ORC direct writes)What is still not covered?
FlinkCdcMultiTableSink)CompactorSourceBuilder/SystemTableSourceTests
KafkaSyncTableAction→FlinkSink.FlinkSourceBuildervia Datastream APIs.Example paimon table looks like this inside the lineage event: