diff --git a/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..f17f7f907b --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink2.2. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + return this; + } + + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + this.builder.distribution(distribution); + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java index bff2b77d60..0c49cc7135 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java @@ -18,8 +18,11 @@ package org.apache.fluss.flink.catalog; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -41,4 +44,14 @@ protected ResolvedSchema createSchema() { DefaultIndex.newIndex( "INDEX_first_third", Arrays.asList("first", "third")))); } + + @Override + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable( + origin, resolvedSchema, refreshMode, intervalFreshness); + } } diff --git a/fluss-flink/fluss-flink-2.3/pom.xml b/fluss-flink/fluss-flink-2.3/pom.xml new file mode 100644 index 0000000000..81215aa956 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/pom.xml @@ -0,0 +1,268 @@ + + + + 4.0.0 + + org.apache.fluss + fluss-flink + 1.0-SNAPSHOT + + + fluss-flink-2.3 + Apache Fluss (Incubating) : Engine Flink : 2.3 + + 2.3 + 2.3.0 + + + + + + org.apache.fluss + fluss-client + ${project.version} + + + + org.apache.fluss + fluss-flink-common + ${project.version} + + + * + * + + + + + + + org.apache.flink + flink-core + ${flink.minor.version} + provided + + + + org.apache.flink + flink-table-common + ${flink.minor.version} + provided + + + + org.apache.flink + flink-streaming-java + ${flink.minor.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.minor.version} + provided + + + + + org.apache.fluss + fluss-flink-common + ${project.version} + test + test-jar + + + + org.apache.fluss + fluss-server + ${project.version} + test + + + + org.apache.flink + flink-table-test-utils + ${flink.minor.version} + test + + + + org.apache.flink + flink-connector-base + ${flink.minor.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + test + test-jar + + + + org.apache.fluss + fluss-rpc + test + test-jar + ${project.version} + + + + + org.apache.curator + curator-test + ${curator.version} + test + + + + org.apache.flink + flink-table-common + ${flink.minor.version} + test + test-jar + + + + org.apache.flink + flink-connector-test-utils + ${flink.minor.version} + test + + + + + org.apache.fluss + fluss-test-utils + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + org.apache.flink + flink-clients + ${flink.minor.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.minor.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.minor.version} + test-jar + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M5 + + + + integration-tests + integration-test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + 1 + + + + + default-test + test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-flink-common + org.apache.fluss:fluss-client + + + + + + + + + + diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..cead311075 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink2.3. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + this.builder.originalQuery(originalQuery); + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + this.builder.expandedQuery(expandedQuery); + return this; + } + + @Deprecated + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + this.builder.distribution(distribution); + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java similarity index 50% rename from fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java rename to fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java index f868a84db6..992047ae19 100644 --- a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java @@ -17,25 +17,30 @@ package org.apache.fluss.flink.adapter; -import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Interval; import org.apache.flink.table.catalog.IntervalFreshness; -import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; -import org.apache.flink.table.catalog.ResolvedSchema; -/** - * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in - * flink 2.2. However, this constructor only used in test. - * - *

TODO: remove it until ... is - * fixed. - */ -public class ResolvedCatalogMaterializedTableAdapter { - - public static ResolvedCatalogMaterializedTable create( - CatalogMaterializedTable origin, - ResolvedSchema resolvedSchema, - CatalogMaterializedTable.RefreshMode refreshMode, - IntervalFreshness freshness) { - return new ResolvedCatalogMaterializedTable(origin, resolvedSchema, refreshMode, freshness); +/** An adapter for {@link IntervalFreshness} for flink2.3. */ +public class IntervalFreshnessAdapter { + + public static TimeUnitAdapter timeUnit(String name) { + return new TimeUnitAdapter(Interval.TimeUnit.valueOf(name)); + } + + public static IntervalFreshness of(String interval, TimeUnitAdapter timeUnit) { + return IntervalFreshness.of(interval, timeUnit.timeUnit); + } + + public static String getTimeUnitName(IntervalFreshness intervalFreshness) { + return intervalFreshness.getTimeUnit().name(); + } + + /** An adapter for {@link Interval.TimeUnit} for flink2.3. */ + public static class TimeUnitAdapter { + private final Interval.TimeUnit timeUnit; + + private TimeUnitAdapter(Interval.TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } } } diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java new file mode 100644 index 0000000000..076dcb86c8 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.util.MultipleParameterTool; + +import java.util.Map; + +/** + * An adapter for Flink {@link MultipleParameterTool} class. The {@link MultipleParameterTool} is + * moved to a new package since Flink 2.x, so this adapter helps to bridge compatibility for + * different Flink versions. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class MultipleParameterToolAdapter { + + private MultipleParameterToolAdapter() {} + + private MultipleParameterTool multipleParameterTool; + + public static MultipleParameterToolAdapter fromArgs(String[] args) { + MultipleParameterToolAdapter adapter = new MultipleParameterToolAdapter(); + adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args); + return adapter; + } + + public Map toMap() { + return this.multipleParameterTool.toMap(); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java new file mode 100644 index 0000000000..8f6492bd16 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; + +import java.util.List; + +/** + * An adapter for the schema with Index. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class SchemaAdapter { + private SchemaAdapter() {} + + public static Schema withIndex(Schema unresolvedSchema, List> indexes) { + Schema.Builder newSchemaBuilder = Schema.newBuilder().fromSchema(unresolvedSchema); + for (List index : indexes) { + newSchemaBuilder.index(index); + } + return newSchemaBuilder.build(); + } + + public static boolean supportIndex() { + return true; + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java new file mode 100644 index 0000000000..8d6e512501 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import java.io.IOException; + +/** + * Flink sink adapter which hide the different version of createWriter method. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public abstract class SinkAdapter implements Sink { + + @Override + public SinkWriter createWriter(WriterInitContext writerInitContext) throws IOException { + return createWriter( + writerInitContext.getMailboxExecutor(), writerInitContext.metricGroup()); + } + + protected abstract SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup); +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java new file mode 100644 index 0000000000..ad4f611458 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/java/org/apache/fluss/flink/adapter/TypeInformationAdapter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink 2.3 variant of the type information adapter. + * + *

In Flink 2.3, {@link TypeInformation} declares {@code createSerializer(SerializerConfig)} as + * the abstract method. This adapter implements that method and delegates to {@link + * #createSerializer(TypeSerializerCreator)} with a creator that forwards the config, so subclasses + * (e.g. in fluss-flink-common) can obtain the inner serializer and wrap it without touching + * SerializerConfig here. + * + *

When building for Flink 2.3, this class is used instead of the common adapter so that the + * correct API is implemented for this Flink version. See fluss-flink-common's + * TypeInformationAdapter for the version that implements both createSerializer(SerializerConfig) + * and createSerializer(ExecutionConfig). + * + * @param the type described by this type information + */ +public abstract class TypeInformationAdapter extends TypeInformation { + + @Override + public TypeSerializer createSerializer(SerializerConfig config) { + return createSerializer(typeInfo -> typeInfo.createSerializer(config)); + } + + /** + * Creates the type serializer using the given creator. The creator captures the config from the + * framework; subclasses call {@code creator.createSerializer(innerTypeInfo)} to obtain the + * inner serializer and then wrap or return it. + */ + protected abstract TypeSerializer createSerializer( + TypeInformationAdapter.TypeSerializerCreator typeSerializerCreator); + + /** + * Creator that, given a TypeInformation, returns its serializer for the current config. Passed + * by the adapter so that config is forwarded to the underlying TypeInformation. + */ + @FunctionalInterface + public interface TypeSerializerCreator { + TypeSerializer createSerializer(TypeInformation typeInfo); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..d5aca2d53b --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.flink.catalog.FlinkCatalogFactory diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java new file mode 100644 index 0000000000..3cdd2687f6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/adapter/Flink23MultipleParameterToolTest.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +/** Test for {@link MultipleParameterToolAdapter} in flink 2.3. */ +public class Flink23MultipleParameterToolTest extends FlinkMultipleParameterToolTest {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java new file mode 100644 index 0000000000..aa29869c9d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogITCase.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for catalog in Flink 2.3. */ +public class Flink23CatalogITCase extends FlinkCatalogITCase { + + @Test + void testGetTableWithIndex() throws Exception { + String tableName = "table_with_pk_only"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss' " + + ")", + tableName)); + CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + Schema expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_prefix_bucket_key"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .index("a") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_bucket_key_is_not_prefix_pk"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " primary key (a, b) NOT ENFORCED" + + ") with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'b'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a", "b") + .index("a", "b") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_1"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (a, b, dt) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("a", "b", "dt") + .index("a", "b", "dt") + .index("a", "dt") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + + tableName = "table_with_partition_2"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a int, " + + " b varchar, " + + " c bigint, " + + " dt varchar, " + + " primary key (dt, a, b) NOT ENFORCED " + + ") " + + " partitioned by (dt) " + + " with ( " + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'a'" + + ")", + tableName)); + + table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName)); + expectedSchema = + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.STRING().notNull()) + .column("c", DataTypes.BIGINT()) + .column("dt", DataTypes.STRING().notNull()) + .primaryKey("dt", "a", "b") + .index("dt", "a", "b") + .index("a", "dt") + .build(); + assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); + } + + @Override + protected void addDefaultIndexKey(Schema.Builder schemaBuilder) { + super.addDefaultIndexKey(schemaBuilder); + + Schema currentSchema = schemaBuilder.build(); + currentSchema.getPrimaryKey().ifPresent(pk -> schemaBuilder.index(pk.getColumnNames())); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java new file mode 100644 index 0000000000..8d803f1aa6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23CatalogTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.UniqueConstraint; + +import java.util.Arrays; +import java.util.Collections; + +/** Test for {@link FlinkCatalog}. */ +public class Flink23CatalogTest extends FlinkCatalogTest { + + protected ResolvedSchema createSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical("first", DataTypes.STRING().notNull()), + Column.physical("second", DataTypes.INT()), + Column.physical("third", DataTypes.STRING().notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third")), + Collections.singletonList( + DefaultIndex.newIndex( + "INDEX_first_third", Arrays.asList("first", "third")))); + } + + @Override + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable( + origin, + resolvedSchema, + refreshMode, + intervalFreshness, + StartMode.of(StartMode.StartModeKind.FROM_BEGINNING)); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java new file mode 100644 index 0000000000..ff62e8dc6d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/catalog/Flink23MaterializedTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.catalog; + +/** IT case for materialized table in Flink 2.3. */ +public class Flink23MaterializedTableITCase extends MaterializedTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java new file mode 100644 index 0000000000..26c8fcd856 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/metrics/Flink23MetricsITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.metrics; + +/** IT case for metrics in Flink 2.3. */ +public class Flink23MetricsITCase extends FlinkMetricsITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java new file mode 100644 index 0000000000..581f689ff2 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/procedure/Flink23ProcedureITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +/** IT case for procedure in Flink 2.3. */ +public class Flink23ProcedureITCase extends FlinkProcedureITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java new file mode 100644 index 0000000000..8541a0d1ab --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/security/acl/Flink23AuthorizationITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.security.acl; + +/** IT case for authorization in Flink 2.3. */ +public class Flink23AuthorizationITCase extends FlinkAuthorizationITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java new file mode 100644 index 0000000000..7521277b4f --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 2.3. */ +public class Flink23ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java new file mode 100644 index 0000000000..f50fb4b2d7 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23TableSinkITCase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.junit.jupiter.api.BeforeEach; + +/** IT case for {@link FlinkTableSink} in Flink 2.3. */ +public class Flink23TableSinkITCase extends FlinkTableSinkITCase { + + /** + * Flink 2.3 introduces {@link ExecutionConfigOptions#TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT} + * (default {@code true}). In {@code FlinkChangelogModeInferenceProgram} this triggers a "upsert + * key differs from primary key" ValidationException for partial upserts such as {@code INSERT + * INTO pk_table(a, b) VALUES ...} where the query carries no upsert key. The parent class + * {@link FlinkTableSinkITCase#testPartialUpsert()} (and its siblings) rely on Fluss's own + * partial-update handling at the sink layer, so disable the option here to keep those tests + * reachable. + */ + @BeforeEach + void beforeEach() { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT, false); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java new file mode 100644 index 0000000000..7b9b447050 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/sink/Flink23UndoRecoveryITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** IT case for Undo Recovery functionality in Flink 2.3. */ +public class Flink23UndoRecoveryITCase extends UndoRecoveryITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java new file mode 100644 index 0000000000..e17e1affe8 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23BinlogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.3. */ +public class Flink23BinlogVirtualTableITCase extends BinlogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..ee27c922e2 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 2.3. */ +public class Flink23ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java new file mode 100644 index 0000000000..8cc4bd0dd6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23DeltaJoinITCase.java @@ -0,0 +1,1090 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT case for Delta Join optimization in Flink 2.3. */ +public class Flink23DeltaJoinITCase extends FlinkTestBase { + + private static final String CATALOG_NAME = "test_catalog"; + + private StreamTableEnvironment tEnv; + + @BeforeEach + public void beforeEach() { + bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.useCatalog(CATALOG_NAME); + tEnv.executeSql(String.format("create database if not exists `%s`", DEFAULT_DB)); + tEnv.useDatabase(DEFAULT_DB); + + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + // Set FORCE strategy for delta join + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + // Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT + // (default true), which makes FlinkChangelogModeInferenceProgram throw a + // "upsert key differs from primary key" ValidationException before the + // StreamPhysicalDeltaJoinForceValidator runs. Disable it here so the existing + // delta-join "doesn't support to do delta join optimization" error remains + // reachable from these tests. + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT, false); + } + + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database `%s` cascade", DEFAULT_DB)); + } + + /** + * Creates a source table with specified schema and options. + * + * @param tableName the name of the table + * @param columns column definitions (e.g., "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint") + * @param primaryKey primary key columns (e.g., "c1, d1") + * @param bucketKey bucket key column (e.g., "c1") + * @param extraOptions additional WITH options (e.g., "lookup.cache" = "partial"), or null + */ + private void createSource( + String tableName, + String columns, + String primaryKey, + String bucketKey, + @Nullable String partitionKey, + @Nullable Map extraOptions) { + Map withOptions = new HashMap<>(); + if (extraOptions != null) { + withOptions.putAll(extraOptions); + } + withOptions.put("connector", "fluss"); + withOptions.put("bucket.key", bucketKey); + StringBuilder ddlBuilder = new StringBuilder(); + ddlBuilder.append( + String.format( + "create table %s ( %s, primary key (%s) NOT ENFORCED )", + tableName, columns, primaryKey)); + if (partitionKey != null) { + ddlBuilder.append(String.format(" partitioned by (%s)", partitionKey)); + withOptions.put("table.auto-partition.enabled", "true"); + withOptions.put("table.auto-partition.time-unit", "year"); + } + ddlBuilder.append( + String.format( + " with (%s)", + withOptions.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(", ")))); + + tEnv.executeSql(ddlBuilder.toString()); + } + + /** + * Creates a sink table with specified columns. + * + * @param tableName the name of the table + * @param columns the column definitions (e.g., "a1 int, c1 bigint, a2 int") + * @param primaryKey the primary key columns (e.g., "c1, d1, c2, d2") + */ + private void createSink(String tableName, String columns, String primaryKey) { + tEnv.executeSql( + String.format( + "create table %s (" + + " %s, " + + " primary key (%s) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss'" + + ")", + tableName, columns, primaryKey)); + } + + @Test + void testDeltaJoin() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); + String leftTableName = "left_table"; + String rightTableName = "right_table"; + String sinkTableName = "sink_table"; + + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 100L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L), + // update + row(5, "v5", 100L, 1, 50000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 330L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L), + // update + row(6, "v6", 100L, 1, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "-U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]", + "+U[2, v2, 100, 2, 20000, 6, v6, 100, 1, 60000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinOnPrimaryKey() throws Exception { + // disable cache to get stable results with updating + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED, false); + String leftTableName = "left_table"; + String rightTableName = "right_table"; + String sinkTableName = "sink_table"; + + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L), + // update + row(5, "v5", 100L, 1, 50000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(leftTablePath); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L), + // update + row(6, "v6", 100L, 1, 60000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + // wait for the first snapshot to finish to get the stable result + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(rightTablePath); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "-U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+U[5, v5, 100, 1, 50000, 6, v6, 100, 1, 60000]", + "+I[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v2, 200, 2, 20000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithCalc() throws Exception { + String leftTableName = "left_table_proj"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM (" + + " SELECT * FROM %s WHERE d1 > 1" + + ") INNER JOIN (" + + " SELECT * FROM %s WHERE c2 < 300" + + ") ON c1 = c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList("+I[2, 200, 2, 2]", "-U[2, 200, 2, 2]", "+U[2, 200, 2, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndCalc() throws Exception { + String leftTableName = "left_table_proj"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 400L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 WHERE a1 > 1", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenFilterOnNonUpsertKeys() { + String leftTableName = "left_table_force_fail"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_force_fail"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_force_fail"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + // Filter on e1 > e2, where e1 and e2 are NOT part of the upsert key + // TODO we can add a UpsertFilterOperator that can convert the un-match-filter UPSERT record + // into DELETE record. + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Non-equiv-cond on e1 > e2, where e1 and e2 are NOT part of the upsert key + String sql2 = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithNonEquiConditionOnUpsertKeys() throws Exception { + String leftTableName = "left_table_nonequi_upsert"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1, e1", + "c1, d1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20001L), + row(3, "v3", 300L, 3, 30000L), + // Add row with same PK (100, 1) to generate UPDATE in CDC mode + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join + // result + row(4, "v1_updated", 100L, 1, 10000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_nonequi_upsert"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v4", 200L, 2, 20000L), + row(3, "v5", 300L, 4, 40000L), + // Add row with same PK (100, 1) to generate UPDATE in CDC mode + // This row is filtered out by (e2 / 100) <> c2, so doesn't affect join + // result + row(5, "v1_updated", 100L, 1, 10000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_nonequi_upsert"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int", "c1, d1, e1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2 FROM %s INNER JOIN %s " + + "ON c1 = c2 AND d1 = d2 AND e1 <> (c2 * 100)", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 <> (c2 * 100)))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[2, 200, 2, 20001, 2]", + "-U[2, 200, 2, 20001, 2]", + "+U[2, 200, 2, 20001, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndNonEquiCondition() throws Exception { + String leftTableName = "left_table_nonequi_insert"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v3", 300L, 3, 5000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_nonequi_insert"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 8000L), + row(2, "v4", 200L, 2, 15000L), + row(3, "v5", 300L, 3, 3000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_nonequi_insert"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + // INSERT_ONLY sources with non-equi condition on non-upsert key fields (e1, e2) + // This should succeed because INSERT_ONLY mode allows any non-equi conditions + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 > e2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 > e2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + // Rows where e1 > e2: + // Row 1: e1=10000 > e2=8000 ✓ + // Row 2: e1=20000 > e2=15000 ✓ + // Row 3: e1=5000 > e2=3000 ✓ + List expected = + Arrays.asList( + "+I[1, 100, 1, 10000, 1, 8000]", + "-U[1, 100, 1, 10000, 1, 8000]", + "+U[1, 100, 1, 10000, 1, 8000]", + "+I[2, 200, 2, 20000, 2, 15000]", + "-U[2, 200, 2, 20000, 2, 15000]", + "+U[2, 200, 2, 20000, 2, 15000]", + "+I[3, 300, 3, 5000, 3, 3000]", + "-U[3, 300, 3, 5000, 3, 3000]", + "+U[3, 300, 3, 5000, 3, 3000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithLookupCache() throws Exception { + String leftTableName = "left_table_cache"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + List rows1 = Collections.singletonList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); + + String rightTableName = "right_table_cache"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2", + "c2", + null, + ImmutableMap.of( + "table.delete.behavior", + "IGNORE", + "lookup.cache", + "partial", + "lookup.partial-cache.max-rows", + "100")); + List rows2 = Collections.singletonList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); + + String sinkTableName = "sink_table_cache"; + createSink(sinkTableName, "a1 int, c1 bigint, d1 int, a2 int", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[(c1 = c2)]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList("+I[1, 100, 1, 1]", "-U[1, 100, 1, 1]", "+U[1, 100, 1, 1]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithPartitionedTable() throws Exception { + String leftTableName = "left_table_partitioned"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar", + "c1, d1, pt1", + "c1", + "pt1", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + Iterator leftPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), leftTablePath) + .values() + .iterator(); + // pick two partition to insert data + String leftPartition1 = leftPartitionIterator.next(); + String leftPartition2 = leftPartitionIterator.next(); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1000, leftPartition1), + row(2, "v2", 200L, 2000, leftPartition1), + row(3, "v3", 100L, 3000, leftPartition2), + row(4, "v4", 400L, 4000, leftPartition2)); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_partitioned"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, pt2 varchar", + "c2, pt2", + "c2", + "pt2", + ImmutableMap.of("table.delete.behavior", "IGNORE")); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + Iterator rightPartitionIterator = + waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), rightTablePath) + .values() + .iterator(); + // pick two partition to insert data + String rightPartition1 = rightPartitionIterator.next(); + String rightPartition2 = rightPartitionIterator.next(); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1000, rightPartition1), + row(4, "v4", 400L, 3000, rightPartition2)); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, pt1 varchar, b2 varchar", + "c1, d1, pt1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, b1, c1, d1, pt1, b2 FROM %s AS T1 INNER JOIN %s AS T2 " + + "ON T1.c1 = T2.c2 AND T1.pt1 = T2.pt2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (pt1 = pt2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + String.format("+I[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("-U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+U[1, v1, 100, 1000, %s, v1]", leftPartition1), + String.format("+I[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("-U[4, v4, 400, 4000, %s, v4]", leftPartition2), + String.format("+U[4, v4, 400, 4000, %s, v4]", leftPartition2)); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenSourceHasDelete() { + String leftTableName = "left_table_delete_force"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + null); + + String rightTableName = "right_table_delete_force"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + null); + + String sinkTableName = "sink_table_delete_force"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c1, d1, c2, d2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithJoinKeyExceedsPrimaryKey() { + String leftTableName = "left_table_exceed_pk"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_exceed_pk"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_exceed_pk"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinWithAppendOnlySourceAndJoinKeyExceedsPrimaryKey() throws Exception { + String leftTableName = "left_table_exceed_pk"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows1 = + Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v2", 200L, 2, 20000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_exceed_pk"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.merge-engine", "first_row")); + + List rows2 = + Arrays.asList(row(1, "v1", 100L, 1, 10000L), row(2, "v3", 200L, 2, 99999L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_exceed_pk"; + createSink( + sinkTableName, "a1 int, c1 bigint, d1 int, e1 bigint, a2 int, e2 bigint", "c1, d1"); + + // Join on PK (c1, d1) + additional non-PK field (e1) + // This should succeed because join keys {c1, d1, e1} contain complete PK {c1, d1} + // The e1 = e2 condition is applied as a post-lookup equi-condition filter + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, d1, e1, a2, e2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 AND e1 = e2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains( + "DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2) AND (e1 = e2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + // Only first row should match (e1 = e2 = 10000) + // Second row filtered out (e1 = 20000 AND != e2 = 99999) + List expected = + Arrays.asList( + "+I[1, 100, 1, 10000, 1, 10000]", + "-U[1, 100, 1, 10000, 1, 10000]", + "+U[1, 100, 1, 10000, 1, 10000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinFailsWhenJoinKeyNotContainIndex() { + String leftTableName = "left_table_no_idx_force"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_no_idx_force"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_no_idx_force"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "a1, a2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON a1 = a2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithOuterJoin() { + String leftTableName = "left_table_outer_fail"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_outer_fail"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_outer_fail"; + createSink(sinkTableName, "a1 int, c1 bigint, a2 int", "c1"); + + // Test LEFT JOIN + String leftJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s LEFT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(leftJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Test RIGHT JOIN + String rightJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c2, a2 FROM %s RIGHT JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(rightJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + + // Test FULL OUTER JOIN + String fullOuterJoinSql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s FULL OUTER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(fullOuterJoinSql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithCascadeJoin() { + String table1 = "cascade_table1"; + createSource( + table1, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String table2 = "cascade_table2"; + createSource( + table2, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String table3 = "cascade_table3"; + createSource( + table3, + "a3 int, c3 bigint, d3 int", + "c3, d3", + "c3", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "cascade_sink"; + createSink( + sinkTableName, + "a1 int, c1 bigint, a2 int, c2 bigint, a3 int, c3 bigint", + "c1, c2, c3"); + + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2, c2, a3, c3 " + + "FROM %s " + + "INNER JOIN %s ON c1 = c2 AND d1 = d2 " + + "INNER JOIN %s ON c1 = c3 AND d1 = d3", + sinkTableName, table1, table2, table3); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithSinkMaterializer() { + // With CDC sources, when sink PK doesn't match upstream update key, + // Flink would insert SinkUpsertMaterializer which prevents delta join + String leftTableName = "left_table_materializer"; + createSource( + leftTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_materializer"; + createSource( + rightTableName, + "a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + // Sink PK (a1, a2) doesn't match upstream update key (c1, d1, c2, d2) + // TODO: this depends on Fluss supports MVCC/point-in-time lookup to support change upsert + // keys + String sinkTableName = "sink_table_materializer"; + createSink( + sinkTableName, + "a1 int, b1 varchar, c1 bigint, d1 int, e1 bigint, a2 int, b2 varchar, c2 bigint, d2 int, e2 bigint", + "a1, a2"); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } + + @Test + void testDeltaJoinFailsWithNonDeterministicFunctions() { + String leftTableName = "left_table_nondeterministic"; + createSource( + leftTableName, + "a1 int, c1 bigint, d1 int", + "c1, d1", + "c1", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String rightTableName = "right_table_nondeterministic"; + createSource( + rightTableName, + "a2 int, c2 bigint, d2 int", + "c2, d2", + "c2", + null, + ImmutableMap.of("table.delete.behavior", "IGNORE")); + + String sinkTableName = "sink_table_nondeterministic"; + // TODO this should be supported in Flink in future for non-deterministic functions before + // sinking + createSink(sinkTableName, "c1 bigint, d1 bigint, rand_val double", "c1"); + + String sql = + String.format( + "INSERT INTO %s SELECT c1, d1, RAND() FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThatThrownBy(() -> tEnv.explainSql(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("doesn't support to do delta join optimization"); + } +} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java new file mode 100644 index 0000000000..0df267e857 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceBatchITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for batch source in Flink 2.3. */ +public class Flink23TableSourceBatchITCase extends FlinkTableSourceBatchITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java new file mode 100644 index 0000000000..37e7b53f63 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceFailOverITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for source failover and recovery in Flink 2.3. */ +public class Flink23TableSourceFailOverITCase extends FlinkTableSourceFailOverITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java new file mode 100644 index 0000000000..bbe34f1530 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/source/Flink23TableSourceITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link FlinkTableSource} in Flink 2.3. */ +public class Flink23TableSourceITCase extends FlinkTableSourceITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java new file mode 100644 index 0000000000..91db3a6f2f --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/Flink23TieringITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** The IT case for tiering in Flink 2.3. */ +class Flink23TieringITCase extends TieringITCase {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java new file mode 100644 index 0000000000..2f6d5d6772 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/java/org/apache/fluss/flink/tiering/committer/Flink23TieringCommitOperatorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.committer; + +/** + * UT for {@link TieringCommitOperator}. Test the compatibility of the `getAttemptNumber` method in + * flink 2.3. + */ +public class Flink23TieringCommitOperatorTest extends TieringCommitOperatorTest {} diff --git a/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000..ca0e907f6d --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.testutils.common.TestLoggerExtension \ No newline at end of file diff --git a/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties b/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..12b05f1867 --- /dev/null +++ b/fluss-flink/fluss-flink-2.3/src/test/resources/log4j2-test.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n + +# suppress the duplicated logger extension +logger.flink.name = org.apache.flink.util.TestLoggerExtension +logger.flink.level = OFF diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 23d8f0b2e9..e2726ced40 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.table.catalog.CatalogMaterializedTable; -import org.apache.flink.table.catalog.IntervalFreshness; import java.time.Duration; import java.util.Arrays; @@ -255,12 +254,11 @@ public class FlinkConnectorOptions { .noDefaultValue() .withDescription( "The freshness interval of materialized table which is used to determine the physical refresh mode."); - public static final ConfigOption - MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = - ConfigOptions.key("materialized-table.interval-freshness.time-unit") - .enumType(IntervalFreshness.TimeUnit.class) - .noDefaultValue() - .withDescription("The time unit of freshness interval."); + public static final ConfigOption MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = + ConfigOptions.key("materialized-table.interval-freshness.time-unit") + .stringType() + .noDefaultValue() + .withDescription("The time unit of freshness interval."); public static final ConfigOption MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = ConfigOptions.key("materialized-table.logical-refresh-mode") diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..7c04bd8ecd --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogMaterializedTableAdapter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.TableDistribution; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** An adapter for {@link CatalogMaterializedTable#newBuilder()} constructor for flink1.x. */ +public class CatalogMaterializedTableAdapter { + + private final CatalogMaterializedTable.Builder builder; + + private CatalogMaterializedTableAdapter() { + this.builder = CatalogMaterializedTable.newBuilder(); + } + + public static CatalogMaterializedTableAdapter newAdapter() { + return new CatalogMaterializedTableAdapter(); + } + + public CatalogMaterializedTableAdapter schema(Schema schema) { + this.builder.schema(schema); + return this; + } + + public CatalogMaterializedTableAdapter comment(@Nullable String comment) { + this.builder.comment(comment); + return this; + } + + public CatalogMaterializedTableAdapter partitionKeys(List partitionKeys) { + this.builder.partitionKeys(partitionKeys); + return this; + } + + public CatalogMaterializedTableAdapter options(Map options) { + this.builder.options(options); + return this; + } + + public CatalogMaterializedTableAdapter snapshot(@Nullable Long snapshot) { + this.builder.snapshot(snapshot); + return this; + } + + public CatalogMaterializedTableAdapter originalQuery(String originalQuery) { + return this; + } + + public CatalogMaterializedTableAdapter expandedQuery(String expandedQuery) { + return this; + } + + public CatalogMaterializedTableAdapter definitionQuery(String definitionQuery) { + this.builder.definitionQuery(definitionQuery); + return this; + } + + public CatalogMaterializedTableAdapter freshness(@Nullable IntervalFreshness freshness) { + this.builder.freshness(freshness); + return this; + } + + public CatalogMaterializedTableAdapter logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode) { + this.builder.logicalRefreshMode(logicalRefreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshMode( + @Nullable CatalogMaterializedTable.RefreshMode refreshMode) { + this.builder.refreshMode(refreshMode); + return this; + } + + public CatalogMaterializedTableAdapter refreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.builder.refreshStatus(refreshStatus); + return this; + } + + public CatalogMaterializedTableAdapter refreshHandlerDescription( + @Nullable String refreshHandlerDescription) { + this.builder.refreshHandlerDescription(refreshHandlerDescription); + return this; + } + + public CatalogMaterializedTableAdapter serializedRefreshHandler( + @Nullable byte[] serializedRefreshHandler) { + this.builder.serializedRefreshHandler(serializedRefreshHandler); + return this; + } + + public CatalogMaterializedTableAdapter distribution(@Nullable TableDistribution distribution) { + return this; + } + + public CatalogMaterializedTable build() { + return this.builder.build(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java similarity index 51% rename from fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java rename to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java index 7a018bc027..e855c25280 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/IntervalFreshnessAdapter.java @@ -17,25 +17,29 @@ package org.apache.fluss.flink.adapter; -import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.IntervalFreshness; -import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; -import org.apache.flink.table.catalog.ResolvedSchema; -/** - * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in - * flink 2.2. However, this constructor only used in test. - * - *

TODO: remove it until ... is - * fixed. - */ -public class ResolvedCatalogMaterializedTableAdapter { - - public static ResolvedCatalogMaterializedTable create( - CatalogMaterializedTable origin, - ResolvedSchema resolvedSchema, - CatalogMaterializedTable.RefreshMode refreshMode, - IntervalFreshness freshness) { - return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); +/** An adapter for {@link IntervalFreshness} for below flink2.3. */ +public class IntervalFreshnessAdapter { + + public static TimeUnitAdapter timeUnit(String name) { + return new TimeUnitAdapter(IntervalFreshness.TimeUnit.valueOf(name)); + } + + public static IntervalFreshness of(String interval, TimeUnitAdapter timeUnit) { + return IntervalFreshness.of(interval, timeUnit.timeUnit); + } + + public static String getTimeUnitName(IntervalFreshness intervalFreshness) { + return intervalFreshness.getTimeUnit().name(); + } + + /** An adapter for {@link IntervalFreshness.TimeUnit} for below flink2.3. */ + public static class TimeUnitAdapter { + private final IntervalFreshness.TimeUnit timeUnit; + + private TimeUnitAdapter(IntervalFreshness.TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 01f9a9817e..1434a96aeb 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -21,7 +21,9 @@ import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.MemorySize; import org.apache.fluss.config.Password; +import org.apache.fluss.flink.adapter.CatalogMaterializedTableAdapter; import org.apache.fluss.flink.adapter.CatalogTableAdapter; +import org.apache.fluss.flink.adapter.IntervalFreshnessAdapter; import org.apache.fluss.flink.catalog.FlinkCatalogFactory; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.DatabaseDescriptor; @@ -562,7 +564,7 @@ private static void serializeMaterializedTableToCustomProperties( customProperties.put(MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), freshness.getInterval()); customProperties.put( MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), - freshness.getTimeUnit().name()); + IntervalFreshnessAdapter.getTimeUnitName(freshness)); // Serialize refresh configuration customProperties.put( MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE.key(), mt.getLogicalRefreshMode().name()); @@ -624,8 +626,9 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( checkNotNull(refreshStatusStr, "Materialized table refresh status is required but missing"); // Parse validated values - IntervalFreshness.TimeUnit timeUnit = IntervalFreshness.TimeUnit.valueOf(timeUnitStr); - IntervalFreshness freshness = IntervalFreshness.of(intervalFreshness, timeUnit); + IntervalFreshnessAdapter.TimeUnitAdapter timeUnit = + IntervalFreshnessAdapter.timeUnit(timeUnitStr); + IntervalFreshness freshness = IntervalFreshnessAdapter.of(intervalFreshness, timeUnit); CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = CatalogMaterializedTable.LogicalRefreshMode.valueOf(logicalRefreshModeStr); CatalogMaterializedTable.RefreshMode refreshMode = @@ -645,12 +648,14 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( ? null : decodeBase64ToBytes(refreshHandlerStringBytes); - CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder(); + CatalogMaterializedTableAdapter builder = CatalogMaterializedTableAdapter.newAdapter(); builder.schema(schema) .comment(comment) .partitionKeys(partitionKeys) .options(excludeByPrefix(options, MATERIALIZED_TABLE_PREFIX)) .definitionQuery(definitionQuery) + .originalQuery(definitionQuery) + .expandedQuery(definitionQuery) .freshness(freshness) .logicalRefreshMode(logicalRefreshMode) .refreshMode(refreshMode) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 8354fbf20c..3f98846dea 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -22,7 +22,7 @@ import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidPartitionException; import org.apache.fluss.exception.InvalidTableException; -import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter; +import org.apache.fluss.flink.adapter.CatalogMaterializedTableAdapter; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.utils.FlinkConversionsTest; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -41,6 +41,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; @@ -144,13 +145,16 @@ private CatalogMaterializedTable newCatalogMaterializedTable( CatalogMaterializedTable.RefreshMode refreshMode, Map options) { CatalogMaterializedTable origin = - CatalogMaterializedTable.newBuilder() + CatalogMaterializedTableAdapter.newAdapter() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) .comment("test comment") .options(options) .partitionKeys(Collections.emptyList()) .definitionQuery("select first, second, third from t") - .freshness(IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND)) + // TODO The configuration added in Flink 2.3 currently uses the same SQL. + .originalQuery("select first, second, third from t") + .expandedQuery("select first, second, third from t") + .freshness(IntervalFreshness.ofSecond("5")) .logicalRefreshMode( refreshMode == CatalogMaterializedTable.RefreshMode.CONTINUOUS ? CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS @@ -158,11 +162,16 @@ private CatalogMaterializedTable newCatalogMaterializedTable( .refreshMode(refreshMode) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) .build(); - return ResolvedCatalogMaterializedTableAdapter.create( - origin, - resolvedSchema, - refreshMode, - IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND)); + return createResolvedCatalogMaterializedTable( + origin, resolvedSchema, refreshMode, IntervalFreshness.ofSecond("5")); + } + + protected ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness intervalFreshness) { + return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); } protected FlinkCatalog initCatalog( diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml index 4f65374352..e640473949 100644 --- a/fluss-flink/pom.xml +++ b/fluss-flink/pom.xml @@ -37,6 +37,7 @@ fluss-flink-1.19 fluss-flink-1.18 fluss-flink-2.2 + fluss-flink-2.3 fluss-flink-tiering