From 5057a8b6be42c004a684368a5c1583b6645df9ac Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Thu, 25 Jun 2026 09:02:34 +0800 Subject: [PATCH 1/3] add support for flink 2.3 Signed-off-by: Pei Yu <125331682@qq.com> --- .../CatalogMaterializedTableAdapter.java | 124 ++ .../flink/catalog/Flink22CatalogTest.java | 13 + fluss-flink/fluss-flink-2.3/pom.xml | 268 ++++ .../CatalogMaterializedTableAdapter.java | 127 ++ .../adapter/IntervalFreshnessAdapter.java} | 41 +- .../adapter/MultipleParameterToolAdapter.java | 46 + .../fluss/flink/adapter/SchemaAdapter.java | 43 + .../fluss/flink/adapter/SinkAdapter.java | 43 + .../flink/adapter/TypeInformationAdapter.java | 64 + .../org.apache.flink.table.factories.Factory | 19 + .../Flink23MultipleParameterToolTest.java | 21 + .../flink/catalog/Flink23CatalogITCase.java | 175 +++ .../flink/catalog/Flink23CatalogTest.java | 62 + .../Flink23MaterializedTableITCase.java | 21 + .../flink/metrics/Flink23MetricsITCase.java | 21 + .../procedure/Flink23ProcedureITCase.java | 21 + .../acl/Flink23AuthorizationITCase.java | 21 + .../flink/sink/Flink23ComplexTypeITCase.java | 21 + .../flink/sink/Flink23TableSinkITCase.java | 21 + .../flink/sink/Flink23UndoRecoveryITCase.java | 21 + .../Flink23BinlogVirtualTableITCase.java | 21 + .../Flink23ChangelogVirtualTableITCase.java | 21 + .../flink/source/Flink23DeltaJoinITCase.java | 1083 +++++++++++++++++ .../source/Flink23TableSourceBatchITCase.java | 21 + .../Flink23TableSourceFailOverITCase.java | 21 + .../source/Flink23TableSourceITCase.java | 21 + .../flink/tiering/Flink23TieringITCase.java | 22 + .../Flink23TieringCommitOperatorTest.java | 24 + .../org.junit.jupiter.api.extension.Extension | 19 + .../src/test/resources/log4j2-test.properties | 32 + .../fluss/flink/FlinkConnectorOptions.java | 12 +- .../CatalogMaterializedTableAdapter.java | 123 ++ .../adapter/IntervalFreshnessAdapter.java} | 40 +- .../fluss/flink/utils/FlinkConversions.java | 13 +- .../fluss/flink/catalog/FlinkCatalogTest.java | 25 +- fluss-flink/pom.xml | 1 + 36 files changed, 2637 insertions(+), 55 deletions(-) create mode 100644 fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java create mode 100644 fluss-flink/fluss-flink-2.3/pom.xml create mode 100644 fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java rename fluss-flink/{fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java => fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java} (50%) create mode 100644 fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java create mode 100644 fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java create mode 100644 fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java create mode 100644 fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java create mode 100644 fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java create mode 100644 fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension create mode 100644 fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java rename fluss-flink/fluss-flink-common/src/{test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java => main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java} (51%) diff --git a/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..f17f7f907b --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink2.2. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + return this; + } + + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + this.builder.distribution(distribution); + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java index bff2b77d60..0c49cc7135 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java @@ -18,8 +18,11 @@ package org.apache.fluss.flink.catalog; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -41,4 +44,14 @@ protected ResolvedSchema createSchema() { DefaultIndex.newIndex( "INDEX_first_third", Arrays.asList("first", "third")))); } + + @Override + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable( + origin, resolvedSchema, refreshMode, intervalFreshness); + } } diff --git a/fluss-flink/fluss-flink-2.3/pom.xml b/fluss-flink/fluss-flink-2.3/pom.xml new file mode 100644 index 0000000000..81215aa956 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/pom.xml @@ -0,0 +1,268 @@ + + + + 4.0.0 + + org.apache.fluss + fluss-flink + 1.0-SNAPSHOT + + + fluss-flink-2.3 + Apache Fluss (Incubating) : Engine Flink : 2.3 + + 2.3 + 2.3.0 + + + + + + org.apache.fluss + fluss-client + ${project.version} + + + + org.apache.fluss + fluss-flink-common + ${project.version} + + + * + * + + + + + + + org.apache.flink + flink-core + ${flink.minor.version} + provided + + + + org.apache.flink + flink-table-common + ${flink.minor.version} + provided + + + + org.apache.flink + flink-streaming-java + ${flink.minor.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.minor.version} + provided + + + + + org.apache.fluss + fluss-flink-common + ${project.version} + test + test-jar + + + + org.apache.fluss + fluss-server + ${project.version} + test + + + + org.apache.flink + flink-table-test-utils + ${flink.minor.version} + test + + + + org.apache.flink + flink-connector-base + ${flink.minor.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + test + test-jar + + + + org.apache.fluss + fluss-rpc + test + test-jar + ${project.version} + + + + + org.apache.curator + curator-test + ${curator.version} + test + + + + org.apache.flink + flink-table-common + ${flink.minor.version} + test + test-jar + + + + org.apache.flink + flink-connector-test-utils + ${flink.minor.version} + test + + + + + org.apache.fluss + fluss-test-utils + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + org.apache.flink + flink-clients + ${flink.minor.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.minor.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.minor.version} + test-jar + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M5 + + + + integration-tests + integration-test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + 1 + + + + + default-test + test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-flink-common + org.apache.fluss:fluss-client + + + + + + + + + + diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..cead311075 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink2.3. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + this.builder.originalQuery(originalQuery); + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + this.builder.expandedQuery(expandedQuery); + return this; + } + + @Deprecated + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + this.builder.distribution(distribution); + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java similarity index 50% rename from fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java rename to fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java index f868a84db6..992047ae19 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java @@ -17,25 +17,30 @@ package org.apache.fluss.flink.adapter; -import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Interval; import org.apache.flink.table.catalog.IntervalFreshness; -import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; -import org.apache.flink.table.catalog.ResolvedSchema; -/** - * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in - * flink 2.2. However, this constructor only used in test. - * - *

TODO: remove it until ... is - * fixed. - */ -public class ResolvedCatalogMaterializedTableAdapter { - - public static ResolvedCatalogMaterializedTable create( - CatalogMaterializedTable origin, - ResolvedSchema resolvedSchema, - CatalogMaterializedTable.RefreshMode refreshMode, - IntervalFreshness freshness) { - return new ResolvedCatalogMaterializedTable(origin, resolvedSchema, refreshMode, freshness); +/** An adapter for {@link IntervalFreshness} for flink2.3. */ +public class IntervalFreshnessAdapter { + + public static TimeUnitAdapter timeUnit(String name) { + return new TimeUnitAdapter(Interval.TimeUnit.valueOf(name)); + } + + public static IntervalFreshness of(String interval, TimeUnitAdapter timeUnit) { + return IntervalFreshness.of(interval, timeUnit.timeUnit); + } + + public static String getTimeUnitName(IntervalFreshness intervalFreshness) { + return intervalFreshness.getTimeUnit().name(); + } + + /** An adapter for {@link Interval.TimeUnit} for flink2.3. */ + public static class TimeUnitAdapter { + private final Interval.TimeUnit timeUnit; + + private TimeUnitAdapter(Interval.TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } } } diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java new file mode 100644 index 0000000000..076dcb86c8 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.util.MultipleParameterTool; + +import java.util.Map; + +/** + * An adapter for Flink {@link MultipleParameterTool} class. The {@link MultipleParameterTool} is + * moved to a new package since Flink 2.x, so this adapter helps to bridge compatibility for + * different Flink versions. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class MultipleParameterToolAdapter { + + private MultipleParameterToolAdapter() {} + + private MultipleParameterTool multipleParameterTool; + + public static MultipleParameterToolAdapter fromArgs(String[] args) { + MultipleParameterToolAdapter adapter = new MultipleParameterToolAdapter(); + adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args); + return adapter; + } + + public Map toMap() { + return this.multipleParameterTool.toMap(); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java new file mode 100644 index 0000000000..8f6492bd16 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; + +import java.util.List; + +/** + * An adapter for the schema with Index. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class SchemaAdapter { + private SchemaAdapter() {} + + public static Schema withIndex(Schema unresolvedSchema, List> indexes) { + Schema.Builder newSchemaBuilder = Schema.newBuilder().fromSchema(unresolvedSchema); + for (List index : indexes) { + newSchemaBuilder.index(index); + } + return newSchemaBuilder.build(); + } + + public static boolean supportIndex() { + return true; + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java new file mode 100644 index 0000000000..8d6e512501 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import java.io.IOException; + +/** + * Flink sink adapter which hide the different version of createWriter method. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public abstract class SinkAdapter implements Sink { + + @Override + public SinkWriter createWriter(WriterInitContext writerInitContext) throws IOException { + return createWriter( + writerInitContext.getMailboxExecutor(), writerInitContext.metricGroup()); + } + + protected abstract SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup); +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java new file mode 100644 index 0000000000..ad4f611458 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink 2.3 variant of the type information adapter. + * + *

In Flink 2.3, {@link TypeInformation} declares {@code createSerializer(SerializerConfig)} as + * the abstract method. This adapter implements that method and delegates to {@link + * #createSerializer(TypeSerializerCreator)} with a creator that forwards the config, so subclasses + * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it without touching + * SerializerConfig here. + * + *

When building for Flink 2.3, this class is used instead of the common adapter so that the + * correct API is implemented for this Flink version. See fluss-flink-common's + * TypeInformationAdapter for the version that implements both createSerializer(SerializerConfig) + * and createSerializer(ExecutionConfig). + * + * @param the type described by this type information + */ +public abstract class TypeInformationAdapter extends TypeInformation { + + @Override + public TypeSerializer createSerializer(SerializerConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + /** + * Creates the type serializer using the given creator. The creator captures the config from the + * framework; subclasses call {@code creator.createSerializer(innerTypeInfo)} to obtain the + * inner serializer and then wrap or return it. + */ + protected abstract TypeSerializer createSerializer( + TypeInformationAdapter.TypeSerializerCreator typeSerializerCreator); + + /** + * Creator that, given a TypeInformation, returns its serializer for the current config. Passed + * by the adapter so that config is forwarded to the underlying TypeInformation. + */ + @FunctionalInterface + public interface TypeSerializerCreator { + TypeSerializer createSerializer(TypeInformation typeInfo); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..d5aca2d53b --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.flink.catalog.FlinkCatalogFactory diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java new file mode 100644 index 0000000000..3cdd2687f6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +/** Test for {@link MultipleParameterToolAdapter} in flink 2.3. */ +public class Flink23MultipleParameterToolTest extends FlinkMultipleParameterToolTest {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java new file mode 100644 index 0000000000..aa29869c9d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for catalog in Flink 2.3. */ +public class Flink23CatalogITCase extends FlinkCatalogITCase { + + @Test + void testGetTableWithIndex() throws Exception { + String tableName = "table_with_pk_only"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss' " + + ")", + tableName)); + CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + Schema expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_prefix_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .index("a") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_bucket_key_is_not_prefix_pk"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'b'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_1"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (a, b, dt) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("a", "b", "dt") + .index("a", "b", "dt") + .index("a", "dt") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_2"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (dt, a, b) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("dt", "a", "b") + .index("dt", "a", "b") + .index("a", "dt") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + } + + @Override + protected void addDefaultIndexKey(Schema.Builder schemaBuilder) { + super.addDefaultIndexKey(schemaBuilder); + + Schema currentSchema = schemaBuilder.build(); + currentSchema.getPrimaryKey().ifPresent(pk -> schemaBuilder.index(pk.getColumnNames())); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java new file mode 100644 index 0000000000..8d803f1aa6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.UniqueConstraint; + +import java.util.Arrays; +import java.util.Collections; + +/** Test for {@link FlinkCatalog}. */ +public class Flink23CatalogTest extends FlinkCatalogTest { + + protected ResolvedSchema createSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical("first", DataTypes.STRING().notNull()), + Column.physical("second", DataTypes.INT()), + Column.physical("third", DataTypes.STRING().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third")), + Collections.singletonList( + DefaultIndex.newIndex( + "INDEX_first_third", Arrays.asList("first", "third")))); + } + + @Override + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable( + origin, + resolvedSchema, + refreshMode, + intervalFreshness, + StartMode.of(StartMode.StartModeKind.FROM_BEGINNING)); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java new file mode 100644 index 0000000000..ff62e8dc6d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +/** IT case for materialized table in Flink 2.3. */ +public class Flink23MaterializedTableITCase extends MaterializedTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java new file mode 100644 index 0000000000..26c8fcd856 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.metrics; + +/** IT case for metrics in Flink 2.3. */ +public class Flink23MetricsITCase extends FlinkMetricsITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java new file mode 100644 index 0000000000..581f689ff2 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +/** IT case for procedure in Flink 2.3. */ +public class Flink23ProcedureITCase extends FlinkProcedureITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java new file mode 100644 index 0000000000..8541a0d1ab --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.security.acl; + +/** IT case for authorization in Flink 2.3. */ +public class Flink23AuthorizationITCase extends FlinkAuthorizationITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java new file mode 100644 index 0000000000..7521277b4f --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 2.3. */ +public class Flink23ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java new file mode 100644 index 0000000000..f7b18355dd --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** IT case for {@link FlinkTableSink} in Flink 2.3. */ +public class Flink23TableSinkITCase extends FlinkTableSinkITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java new file mode 100644 index 0000000000..7b9b447050 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** IT case for Undo Recovery functionality in Flink 2.3. */ +public class Flink23UndoRecoveryITCase extends UndoRecoveryITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..e17e1affe8 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.3. */ +public class Flink23BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..ee27c922e2 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 2.3. */ +public class Flink23ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java new file mode 100644 index 0000000000..4d4b524753 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java @@ -0,0 +1,1083 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT case for Delta Join optimization in Flink 2.3. */ +public class Flink23DeltaJoinITCase extends FlinkTestBase { + + private static final String CATALOG_NAME = "test_catalog"; + + private StreamTableEnvironment tEnv; + + @BeforeEach + public void beforeEach() { + bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.useCatalog(CATALOG_NAME); + tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB)); + tEnv.useDatabase(DEFAULT_DB); + + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + // Set FORCE strategy for delta join + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + } + + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB)); + } + + /** + * Creates a source table with specified schema and options. + * + * @param tableName the name of the table + * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint") + * @param primaryKey primary key columns (e.g., "c1, d1") + * @param bucketKey bucket key column (e.g., "c1") + * @param extraOptions additional WITH options (e.g., "lookup.cache" = "partial"), or null + */ + private void createSource( + String tableName, + String columns, + String primaryKey, + String bucketKey, + @Nullable String partitionKey, + @Nullable Map extraOptions) { + Map withOptions = new HashMap<>(); + if (extraOptions != null) { + withOptions.putAll(extraOptions); + } + withOptions.put("connector", "fluss"); + withOptions.put("bucket.key", bucketKey); + StringBuilder ddlBuilder = new StringBuilder(); + ddlBuilder.append( + String.format( + "create table %s ( %s, primary key (%s) NOT ENFORCED )", + tableName, columns, primaryKey)); + if (partitionKey != null) { + ddlBuilder.append(String.format(" partitioned by (%s)", partitionKey)); + withOptions.put("table.auto-partition.enabled", "true"); + withOptions.put("table.auto-partition.time-unit", "year"); + } + ddlBuilder.append( + String.format( + " with (%s)", + withOptions.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", ")))); + + tEnv.executeSql(ddlBuilder.toString()); + } + + /** + * Creates a sink table with specified columns. + * + * @param tableName the name of the table + * @param columns the column definitions (e.g., "a1 int, c1 bigint, a2 int") + * @param primaryKey the primary key columns (e.g., "c1, d1, c2, d2") + */ + private void createSink(String tableName, String columns, String primaryKey) { + tEnv.executeSql( + String.format( + "create table %s (" + + " %s, " + + " primary key (%s) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss'" + + ")", + tableName, columns, primaryKey)); + } + + @Test + void testDeltaJoin() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); + String leftTableName = "left_table"; + String rightTableName = "right_table"; + String sinkTableName = "sink_table"; + + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 100L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L), + // update + row(5, "v5", 100L, 1, 50000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 330L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L), + // update + row(6, "v6", 100L, 1, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinOnPrimaryKey() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); + String leftTableName = "left_table"; + String rightTableName = "right_table"; + String sinkTableName = "sink_table"; + + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L), + // update + row(5, "v5", 100L, 1, 50000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L), + // update + row(6, "v6", 100L, 1, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithCalc() throws Exception { + String leftTableName = "left_table_proj"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM (" + + " SELECT * FROM %s WHERE d1 > 1" + + ") INNER JOIN (" + + " SELECT * FROM %s WHERE c2 < 300" + + ") ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 200, 2, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception { + String leftTableName = "left_table_proj"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 400L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 WHERE a1 > 1", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() { + String leftTableName = "left_table_force_fail"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_force_fail"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_force_fail"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key + // TODO we can add a UpsertFilterOperator that can convert the un-match-filter UPSERT record + // into DELETE record. + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the upsert key + String sql2 = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithNonEquiConditionOnUpsertKeys() throws Exception { + String leftTableName = "left_table_nonequi_upsert"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1, e1", + "c1, d1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20001L), + row(3, "v3", 300L, 3, 30000L), + // Add row with same PK (100, 1) to generate UPDATE in CDC mode + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join + // result + row(4, "v1_updated", 100L, 1, 10000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_nonequi_upsert"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v4", 200L, 2, 20000L), + row(3, "v5", 300L, 4, 40000L), + // Add row with same PK (100, 1) to generate UPDATE in CDC mode + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join + // result + row(5, "v1_updated", 100L, 1, 10000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_nonequi_upsert"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int", "c1, d1, e1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s INNER JOIN %s " + + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 100)", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 <> (c2 * 100)))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[2, 200, 2, 20001, 2]", + "-U[2, 200, 2, 20001, 2]", + "+U[2, 200, 2, 20001, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws Exception { + String leftTableName = "left_table_nonequi_insert"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 3, 5000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_nonequi_insert"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 8000L), + row(2, "v4", 200L, 2, 15000L), + row(3, "v5", 300L, 3, 3000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_nonequi_insert"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + // INSERT_ONLY sources with non-equi condition on non-upsert key fields (e1, e2) + // This should succeed because INSERT_ONLY mode allows any non-equi conditions + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 > e2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + // Rows where e1 > e2: + // Row 1: e1=10000 > e2=8000 ✓ + // Row 2: e1=20000 > e2=15000 ✓ + // Row 3: e1=5000 > e2=3000 ✓ + List expected = + Arrays.asList( + "+I[1, 100, 1, 10000, 1, 8000]", + "-U[1, 100, 1, 10000, 1, 8000]", + "+U[1, 100, 1, 10000, 1, 8000]", + "+I[2, 200, 2, 20000, 2, 15000]", + "-U[2, 200, 2, 20000, 2, 15000]", + "+U[2, 200, 2, 20000, 2, 15000]", + "+I[3, 300, 3, 5000, 3, 3000]", + "-U[3, 300, 3, 5000, 3, 3000]", + "+U[3, 300, 3, 5000, 3, 3000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithLookupCache() throws Exception { + String leftTableName = "left_table_cache"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + List rows1 = Collections.singletonList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); + + String rightTableName = "right_table_cache"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2", + "c2", + null, + ImmutableMap.of( + "table.delete.behavior", + "IGNORE", + "lookup.cache", + "partial", + "lookup.partial-cache.max-rows", + "100")); + List rows2 = Collections.singletonList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); + + String sinkTableName = "sink_table_cache"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 100, 1, 1]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithPartitionedTable() throws Exception { + String leftTableName = "left_table_partitioned"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar", + "c1, d1, pt1", + "c1", + "pt1", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + Iterator leftPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath) + .values() + .iterator(); + // pick two partition to insert data + String leftPartition1 = leftPartitionIterator.next(); + String leftPartition2 = leftPartitionIterator.next(); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1000, leftPartition1), + row(2, "v2", 200L, 2000, leftPartition1), + row(3, "v3", 100L, 3000, leftPartition2), + row(4, "v4", 400L, 4000, leftPartition2)); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_partitioned"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar", + "c2, pt2", + "c2", + "pt2", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + Iterator rightPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), rightTablePath) + .values() + .iterator(); + // pick two partition to insert data + String rightPartition1 = rightPartitionIterator.next(); + String rightPartition2 = rightPartitionIterator.next(); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1000, rightPartition1), + row(4, "v4", 400L, 3000, rightPartition2)); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 varchar", + "c1, d1, pt1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s AS T1 INNER JOIN %s AS T2 " + + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (pt1 = pt2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + String.format("+I[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("-U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+I[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("-U[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("+U[4, v4, 400, 4000, %s, v4]", leftPartition2)); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenSourceHasDelete() { + String leftTableName = "left_table_delete_force"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + null); + + String rightTableName = "right_table_delete_force"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + null); + + String sinkTableName = "sink_table_delete_force"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithJoinKeyExceedsPrimaryKey() { + String leftTableName = "left_table_exceed_pk"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_exceed_pk"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_exceed_pk"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws Exception { + String leftTableName = "left_table_exceed_pk"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 200L, 2, 20000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_exceed_pk"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 200L, 2, 99999L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_exceed_pk"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + // Join on PK (c1, d1) + additional non-PK field (e1) + // This should succeed because join keys {c1, d1, e1} contain complete PK {c1, d1} + // The e1 = e2 condition is applied as a post-lookup equi-condition filter + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 = e2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + // Only first row should match (e1 = e2 = 10000) + // Second row filtered out (e1 = 20000 AND != e2 = 99999) + List expected = + Arrays.asList( + "+I[1, 100, 1, 10000, 1, 10000]", + "-U[1, 100, 1, 10000, 1, 10000]", + "+U[1, 100, 1, 10000, 1, 10000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenJoinKeyNotContainIndex() { + String leftTableName = "left_table_no_idx_force"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_no_idx_force"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_no_idx_force"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "a1, a2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 = a2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithOuterJoin() { + String leftTableName = "left_table_outer_fail"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_outer_fail"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_outer_fail"; + createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); + + // Test LEFT JOIN + String leftJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(leftJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Test RIGHT JOIN + String rightJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(rightJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Test FULL OUTER JOIN + String fullOuterJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s FULL OUTER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(fullOuterJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithCascadeJoin() { + String table1 = "cascade_table1"; + createSource( + table1, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String table2 = "cascade_table2"; + createSource( + table2, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String table3 = "cascade_table3"; + createSource( + table3, + "a3 int, c3 bigint, d3 int", + "c3, d3", + "c3", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "cascade_sink"; + createSink( + sinkTableName, + "a1 int, c1 bigint, a2 int, c2 bigint, a3 int, c3 bigint", + "c1, c2, c3"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 " + + "FROM %s " + + "INNER JOIN %s ON c1 = c2 AND d1 = d2 " + + "INNER JOIN %s ON c1 = c3 AND d1 = d3", + sinkTableName, table1, table2, table3); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithSinkMaterializer() { + // With CDC sources, when sink PK doesn't match upstream update key, + // Flink would insert SinkUpsertMaterializer which prevents delta join + String leftTableName = "left_table_materializer"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_materializer"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2) + // TODO: this depends on Fluss supports MVCC/point-in-time lookup to support change upsert + // keys + String sinkTableName = "sink_table_materializer"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "a1, a2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithNonDeterministicFunctions() { + String leftTableName = "left_table_nondeterministic"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_nondeterministic"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_nondeterministic"; + // TODO this should be supported in Flink in future for non-deterministic functions before + // sinking + createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", "c1"); + + String sql = + String.format( + "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java new file mode 100644 index 0000000000..0df267e857 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for batch source in Flink 2.3. */ +public class Flink23TableSourceBatchITCase extends FlinkTableSourceBatchITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java new file mode 100644 index 0000000000..37e7b53f63 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for source failover and recovery in Flink 2.3. */ +public class Flink23TableSourceFailOverITCase extends FlinkTableSourceFailOverITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java new file mode 100644 index 0000000000..bbe34f1530 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link FlinkTableSource} in Flink 2.3. */ +public class Flink23TableSourceITCase extends FlinkTableSourceITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java new file mode 100644 index 0000000000..91db3a6f2f --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 2.3. */ +class Flink23TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java new file mode 100644 index 0000000000..2f6d5d6772 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.committer; + +/** + * UT for {@link TieringCommitOperator}. Test the compatibility of the `getAttemptNumber` method in + * flink 2.3. + */ +public class Flink23TieringCommitOperatorTest extends TieringCommitOperatorTest {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000..ca0e907f6d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.testutils.common.TestLoggerExtension \ No newline at end of file diff --git a/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties b/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..12b05f1867 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n + +# suppress the duplicated logger extension +logger.flink.name = org.apache.flink.util.TestLoggerExtension +logger.flink.level = OFF diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 23d8f0b2e9..e2726ced40 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.table.catalog.CatalogMaterializedTable; -import org.apache.flink.table.catalog.IntervalFreshness; import java.time.Duration; import java.util.Arrays; @@ -255,12 +254,11 @@ public class FlinkConnectorOptions { .noDefaultValue() .withDescription( "The freshness interval of materialized table which is used to determine the physical refresh mode."); - public static final ConfigOption - MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = - ConfigOptions.key("materialized-table.interval-freshness.time-unit") - .enumType(IntervalFreshness.TimeUnit.class) - .noDefaultValue() - .withDescription("The time unit of freshness interval."); + public static final ConfigOption MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = + ConfigOptions.key("materialized-table.interval-freshness.time-unit") + .stringType() + .noDefaultValue() + .withDescription("The time unit of freshness interval."); public static final ConfigOption MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = ConfigOptions.key("materialized-table.logical-refresh-mode") diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..7c04bd8ecd --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink1.x. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + return this; + } + + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java similarity index 51% rename from fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java index 7a018bc027..e855c25280 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java @@ -17,25 +17,29 @@ package org.apache.fluss.flink.adapter; -import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.IntervalFreshness; -import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; -import org.apache.flink.table.catalog.ResolvedSchema; -/** - * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in - * flink 2.2. However, this constructor only used in test. - * - *

TODO: remove it until ... is - * fixed. - */ -public class ResolvedCatalogMaterializedTableAdapter { - - public static ResolvedCatalogMaterializedTable create( - CatalogMaterializedTable origin, - ResolvedSchema resolvedSchema, - CatalogMaterializedTable.RefreshMode refreshMode, - IntervalFreshness freshness) { - return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); +/** An adapter for {@link IntervalFreshness} for below flink2.3. */ +public class IntervalFreshnessAdapter { + + public static TimeUnitAdapter timeUnit(String name) { + return new TimeUnitAdapter(IntervalFreshness.TimeUnit.valueOf(name)); + } + + public static IntervalFreshness of(String interval, TimeUnitAdapter timeUnit) { + return IntervalFreshness.of(interval, timeUnit.timeUnit); + } + + public static String getTimeUnitName(IntervalFreshness intervalFreshness) { + return intervalFreshness.getTimeUnit().name(); + } + + /** An adapter for {@link IntervalFreshness.TimeUnit} for below flink2.3. */ + public static class TimeUnitAdapter { + private final IntervalFreshness.TimeUnit timeUnit; + + private TimeUnitAdapter(IntervalFreshness.TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 01f9a9817e..1434a96aeb 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -21,7 +21,9 @@ import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.MemorySize; import org.apache.fluss.config.Password; +import org.apache.fluss.flink.adapter.CatalogMaterializedTableAdapter; import org.apache.fluss.flink.adapter.CatalogTableAdapter; +import org.apache.fluss.flink.adapter.IntervalFreshnessAdapter; import org.apache.fluss.flink.catalog.FlinkCatalogFactory; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.DatabaseDescriptor; @@ -562,7 +564,7 @@ private static void serializeMaterializedTableToCustomProperties( customProperties.put(MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), freshness.getInterval()); customProperties.put( MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), - freshness.getTimeUnit().name()); + IntervalFreshnessAdapter.getTimeUnitName(freshness)); // Serialize refresh configuration customProperties.put( MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE.key(), mt.getLogicalRefreshMode().name()); @@ -624,8 +626,9 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( checkNotNull(refreshStatusStr, "Materialized table refresh status is required but missing"); // Parse validated values - IntervalFreshness.TimeUnit timeUnit = IntervalFreshness.TimeUnit.valueOf(timeUnitStr); - IntervalFreshness freshness = IntervalFreshness.of(intervalFreshness, timeUnit); + IntervalFreshnessAdapter.TimeUnitAdapter timeUnit = + IntervalFreshnessAdapter.timeUnit(timeUnitStr); + IntervalFreshness freshness = IntervalFreshnessAdapter.of(intervalFreshness, timeUnit); CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = CatalogMaterializedTable.LogicalRefreshMode.valueOf(logicalRefreshModeStr); CatalogMaterializedTable.RefreshMode refreshMode = @@ -645,12 +648,14 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( ? null : decodeBase64ToBytes(refreshHandlerStringBytes); - CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder(); + CatalogMaterializedTableAdapter builder = CatalogMaterializedTableAdapter.newAdapter(); builder.schema(schema) .comment(comment) .partitionKeys(partitionKeys) .options(excludeByPrefix(options, MATERIALIZED_TABLE_PREFIX)) .definitionQuery(definitionQuery) + .originalQuery(definitionQuery) + .expandedQuery(definitionQuery) .freshness(freshness) .logicalRefreshMode(logicalRefreshMode) .refreshMode(refreshMode) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 8354fbf20c..3f98846dea 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -22,7 +22,7 @@ import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidPartitionException; import org.apache.fluss.exception.InvalidTableException; -import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter; +import org.apache.fluss.flink.adapter.CatalogMaterializedTableAdapter; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.utils.FlinkConversionsTest; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -41,6 +41,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; @@ -144,13 +145,16 @@ private CatalogMaterializedTable newCatalogMaterializedTable( CatalogMaterializedTable.RefreshMode refreshMode, Map options) { CatalogMaterializedTable origin = - CatalogMaterializedTable.newBuilder() + CatalogMaterializedTableAdapter.newAdapter() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) .comment("test comment") .options(options) .partitionKeys(Collections.emptyList()) .definitionQuery("select first, second, third from t") - .freshness(IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND)) + // TODO The configuration added in Flink 2.3 currently uses the same SQL. + .originalQuery("select first, second, third from t") + .expandedQuery("select first, second, third from t") + .freshness(IntervalFreshness.ofSecond("5")) .logicalRefreshMode( refreshMode == CatalogMaterializedTable.RefreshMode.CONTINUOUS ? CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS @@ -158,11 +162,16 @@ private CatalogMaterializedTable newCatalogMaterializedTable( .refreshMode(refreshMode) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) .build(); - return ResolvedCatalogMaterializedTableAdapter.create( - origin, - resolvedSchema, - refreshMode, - IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND)); + return createResolvedCatalogMaterializedTable( + origin, resolvedSchema, refreshMode, IntervalFreshness.ofSecond("5")); + } + + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); } protected FlinkCatalog initCatalog( diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml index 4f65374352..e640473949 100644 --- a/fluss-flink/pom.xml +++ b/fluss-flink/pom.xml @@ -37,6 +37,7 @@ fluss-flink-1.19 fluss-flink-1.18 fluss-flink-2.2 + fluss-flink-2.3 fluss-flink-tiering From 859d4b748dad024f4896f6cca4edd5b7d36e20be Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Fri, 26 Jun 2026 12:52:07 +0800 Subject: [PATCH 2/3] [flink] Work around Flink 2.3 ON CONFLICT validation in Delta Join ITCase Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT (table.exec.sink.require-on-conflict), defaulting to true. In FlinkChangelogModeInferenceProgram, this triggers a ValidationException ("upsert key differs from primary key") before the StreamPhysicalDeltaJoinForceValidator runs, so the Delta Join ITCases can no longer reach the original "doesn't support to do delta join optimization" error path. Disable the option in Flink23DeltaJoinITCase#beforeEach so the existing assertions remain valid. Production-side impact (real Fluss users hitting this on multi-table joins / group-by + insert) is left to community discussion. Signed-off-by: Pei Yu <125331682@qq.com> --- .../apache/fluss/flink/source/Flink23DeltaJoinITCase.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java index 4d4b524753..8cc4bd0dd6 100644 --- a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java @@ -82,6 +82,13 @@ public void beforeEach() { .set( OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + // Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT + // (default true), which makes FlinkChangelogModeInferenceProgram throw a + // "upsert key differs from primary key" ValidationException before the + // StreamPhysicalDeltaJoinForceValidator runs. Disable it here so the existing + // delta-join "doesn't support to do delta join optimization" error remains + // reachable from these tests. + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT, false); } @AfterEach From 018112e5113c04bc1d494596f11a89ecb3760ee6 Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Fri, 26 Jun 2026 15:26:30 +0800 Subject: [PATCH 3/3] [flink] Work around Flink 2.3 ON CONFLICT validation in Table Sink ITCase Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT (table.exec.sink.require-on-conflict), defaulting to true. In FlinkChangelogModeInferenceProgram, this triggers a ValidationException ("upsert key differs from primary key") before the partial-update handling in FlinkTableSink#getSinkRuntimeProvider runs, so the partial upsert ITCases (testPartialUpsert and testPartialUpsertDuringAddColumn) in FlinkTableSinkITCase can no longer reach the Fluss sink layer. Disable the option in Flink23TableSinkITCase#beforeEach so the existing partial-upsert assertions remain valid. Signed-off-by: Pei Yu <125331682@qq.com> --- .../flink/sink/Flink23TableSinkITCase.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java index f7b18355dd..f50fb4b2d7 100644 --- a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java @@ -17,5 +17,23 @@ package org.apache.fluss.flink.sink; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.junit.jupiter.api.BeforeEach; + /** IT case for {@link FlinkTableSink} in Flink 2.3. */ -public class Flink23TableSinkITCase extends FlinkTableSinkITCase {} +public class Flink23TableSinkITCase extends FlinkTableSinkITCase { + + /** + * Flink 2.3 introduces {@link ExecutionConfigOptions#TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT} + * (default {@code true}). In {@code FlinkChangelogModeInferenceProgram} this triggers a "upsert + * key differs from primary key" ValidationException for partial upserts such as {@code INSERT + * INTO pk_table(a, b) VALUES ...} where the query carries no upsert key. The parent class + * {@link FlinkTableSinkITCase#testPartialUpsert()} (and its siblings) rely on Fluss's own + * partial-update handling at the sink layer, so disable the option here to keep those tests + * reachable. + */ + @BeforeEach + void beforeEach() { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT, false); + } +}