Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
client = ensureActiveClient(client);
return action.run(client);
} finally {
// Return client to the deque, then check if close() raced us.
// The deque's lock ensures either drainTo or remove sees the client.
clients.addFirst(client);
if (this.clients == null && clients.remove(client)) {
close(client);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;

import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.paimon.options.CatalogOptions.CLIENT_POOL_SIZE;
import static org.apache.paimon.options.CatalogOptions.URI;

/**
* A cache that shares {@link JdbcClientPool} instances across multiple catalog instances in the
* same JVM. This prevents each Flink operator from creating its own connection pool when using the
* JDBC catalog.
*
* <p>The cache is keyed by JDBC URI, catalog key, pool size, and JDBC connection properties
* (credentials, driver settings). Pools live for the lifetime of the JVM and are closed via a
* shutdown hook.
*/
public class CachedJdbcClientPool {

private static final ConcurrentMap<Key, JdbcClientPool> CLIENT_POOLS =
new ConcurrentHashMap<>();

static {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
for (JdbcClientPool pool : CLIENT_POOLS.values()) {
pool.close();
}
CLIENT_POOLS.clear();
},
"jdbc-client-pool-shutdown"));
}

private final Key key;
private final int poolSize;
private final String dbUrl;
private final Map<String, String> props;

public CachedJdbcClientPool(Options options, Map<String, String> props) {
this.dbUrl = options.get(URI);
this.poolSize = options.get(CLIENT_POOL_SIZE);
this.props = props;
Properties jdbcProps =
JdbcUtils.extractJdbcConfiguration(props, JdbcCatalog.PROPERTY_PREFIX);
this.key = Key.of(dbUrl, options.get(JdbcCatalogOptions.CATALOG_KEY), poolSize, jdbcProps);
}

/** Returns the shared {@link JdbcClientPool} for this cache key, creating one if needed. */
public JdbcClientPool get() {
return CLIENT_POOLS.computeIfAbsent(key, k -> new JdbcClientPool(poolSize, dbUrl, props));
}

@VisibleForTesting
static ConcurrentMap<Key, JdbcClientPool> clientPools() {
return CLIENT_POOLS;
}

@VisibleForTesting
static void resetCache() {
for (JdbcClientPool pool : CLIENT_POOLS.values()) {
pool.close();
}
CLIENT_POOLS.clear();
}

static class Key {
private final String uri;
private final String catalogKey;
private final int poolSize;
private final Map<String, String> jdbcProperties;

private Key(String uri, String catalogKey, int poolSize, Properties jdbcProps) {
this.uri = uri;
this.catalogKey = catalogKey;
this.poolSize = poolSize;
TreeMap<String, String> sorted = new TreeMap<>();
for (String name : jdbcProps.stringPropertyNames()) {
sorted.put(name, jdbcProps.getProperty(name));
}
this.jdbcProperties = sorted;
}

static Key of(String uri, String catalogKey, int poolSize, Properties jdbcProps) {
return new Key(uri, catalogKey, poolSize, jdbcProps);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key that = (Key) o;
return poolSize == that.poolSize
&& Objects.equals(uri, that.uri)
&& Objects.equals(catalogKey, that.catalogKey)
&& Objects.equals(jdbcProperties, that.jdbcProperties);
}

@Override
public int hashCode() {
return Objects.hash(uri, catalogKey, poolSize, jdbcProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ protected JdbcCatalog(
this.options = context.options();
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties: null");
this.connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
this.connections = new CachedJdbcClientPool(options, options.toMap()).get();
try {
initializeCatalogTablesIfNeed();
} catch (SQLException e) {
Expand Down Expand Up @@ -569,7 +565,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException {

@Override
public void close() throws Exception {
connections.close();
// Do not close the connection pool here — it is shared across catalog instances
// via CachedJdbcClientPool and will be evicted/closed by the cache when idle.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() now leaves the shared pool in CachedJdbcClientPool, but that cache never evicts entries or tracks references; it only closes pools in the JVM shutdown hook. In long-lived processes that create catalogs with different URIs/keys/credentials, closing the catalog no longer releases the JDBC connections and the cache can grow for the lifetime of the process. Please add a real release/eviction mechanism (for example reference counting) or keep close() closing the pool when no other catalog instance is using it.

}

private boolean syncTableProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Jdbc lock context. */
Expand All @@ -41,11 +40,7 @@ public Options options() {

public JdbcClientPool connections() {
if (connections == null) {
connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
connections = new CachedJdbcClientPool(options, options.toMap()).get();
}
return connections;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link CachedJdbcClientPool}. */
public class CachedJdbcClientPoolTest {

@AfterEach
void tearDown() {
CachedJdbcClientPool.resetCache();
}

@Test
void testSameKeyReturnsSamePool() {
String uri = sqliteUri();
Options options = createOptions(uri, "my-catalog");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options, options.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options, options.toMap());

assertThat(cache1.get()).isSameAs(cache2.get());
}

@Test
void testDifferentUriReturnsDifferentPool() {
Options options1 = createOptions(sqliteUri(), "my-catalog");
Options options2 = createOptions(sqliteUri(), "my-catalog");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testDifferentCatalogKeyReturnsDifferentPool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "catalog-a");
Options options2 = createOptions(uri, "catalog-b");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testPoolIsUsable() throws SQLException, InterruptedException {
Options options = createOptions(sqliteUri(), "test-catalog");
CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap());

JdbcClientPool pool = cache.get();
Boolean result = pool.run(conn -> !conn.isClosed());

assertThat(result).isTrue();
}

@Test
void testMultipleCatalogInstancesSharePool() {
String uri = sqliteUri();
Options options = createOptions(uri, "shared-catalog");

JdbcCatalog catalog1 =
new JdbcCatalog(
new org.apache.paimon.fs.local.LocalFileIO(),
"shared-catalog",
org.apache.paimon.catalog.CatalogContext.create(options),
"/tmp/warehouse1");
JdbcCatalog catalog2 =
new JdbcCatalog(
new org.apache.paimon.fs.local.LocalFileIO(),
"shared-catalog",
org.apache.paimon.catalog.CatalogContext.create(options),
"/tmp/warehouse2");

assertThat(catalog1.getConnections()).isSameAs(catalog2.getConnections());
}

@Test
void testDifferentCredentialsReturnsDifferentPool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "my-catalog");
options1.set("jdbc.user", "user1");
options1.set("jdbc.password", "pass1");

Options options2 = createOptions(uri, "my-catalog");
options2.set("jdbc.user", "user2");
options2.set("jdbc.password", "pass2");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testSameCredentialsReturnsSamePool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "my-catalog");
options1.set("jdbc.user", "user1");
options1.set("jdbc.password", "pass1");

Options options2 = createOptions(uri, "my-catalog");
options2.set("jdbc.user", "user1");
options2.set("jdbc.password", "pass1");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isSameAs(cache2.get());
}

@Test
void testDifferentJdbcPropertyReturnsDifferentPool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "my-catalog");
options1.set("jdbc.useSSL", "true");

Options options2 = createOptions(uri, "my-catalog");
options2.set("jdbc.useSSL", "false");

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testDifferentPoolSizeReturnsDifferentPool() {
String uri = sqliteUri();
Options options1 = createOptions(uri, "my-catalog");
options1.set(CatalogOptions.CLIENT_POOL_SIZE, 2);

Options options2 = createOptions(uri, "my-catalog");
options2.set(CatalogOptions.CLIENT_POOL_SIZE, 5);

CachedJdbcClientPool cache1 = new CachedJdbcClientPool(options1, options1.toMap());
CachedJdbcClientPool cache2 = new CachedJdbcClientPool(options2, options2.toMap());

assertThat(cache1.get()).isNotSameAs(cache2.get());
}

@Test
void testResetCacheClearsAllPools() {
Options options = createOptions(sqliteUri(), "test-catalog");
CachedJdbcClientPool cache = new CachedJdbcClientPool(options, options.toMap());
JdbcClientPool pool = cache.get();

assertThat(pool).isNotNull();
assertThat(CachedJdbcClientPool.clientPools()).isNotEmpty();

CachedJdbcClientPool.resetCache();

assertThat(CachedJdbcClientPool.clientPools()).isEmpty();
}

private static Options createOptions(String uri, String catalogKey) {
Options options = new Options();
options.set(CatalogOptions.URI, uri);
options.set(JdbcCatalogOptions.CATALOG_KEY, catalogKey);
options.set(CatalogOptions.CLIENT_POOL_SIZE, 2);
return options;
}

private static String sqliteUri() {
return "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "");
}
}
Loading