Skip to content

[improve]: upgrade clickhouse jdbc version to 0.8.2 #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
.vscode/
.idea/
*.iml

4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
upgrade-jdbc:
- upgrade clickhouse jdbc from 0.6.4 to 0.8.2,
**breaking**: `username` and `password` must be set(password allow empty)
- fix the `url` of *Connector Options* in README
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ Please create issues if you encounter bugs and any help for the project is great

| Option | Required | Default | Type | Description |
|:-----------------------------------------|:---------|:---------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| url | required | none | String | The ClickHouse jdbc url in format `clickhouse://<host>:<port>`. |
| username | optional | none | String | The 'username' and 'password' must both be specified if any of them is specified. |
| password | optional | none | String | The ClickHouse password. |
| url | required | none | String | The ClickHouse jdbc url in format `jdbc:clickhouse://<host>:<port>`. |
| username | optional | none | String | The 'username' and 'password' must be specified. |
| password | optional | none | String | The ClickHouse password, allow empty. |
| database-name | optional | default | String | The ClickHouse database name. |
| table-name | required | none | String | The ClickHouse table name. |
| use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. |
Expand Down Expand Up @@ -131,8 +131,10 @@ CREATE TABLE t_user (
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://{ip}:{port}',
'url' = 'jdbc:clickhouse://{ip}:{port}',
'database-name' = 'tutorial',
'username' = 'default',
'password' = '',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
Expand All @@ -157,7 +159,7 @@ val tEnv = TableEnvironment.create(setting)

val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.URL, "jdbc:clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
Expand All @@ -175,7 +177,7 @@ TableEnvironment tEnv = TableEnvironment.create(setting);

Map<String, String> props = new HashMap<>();
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.URL, "jdbc:clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
Expand All @@ -191,7 +193,7 @@ tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
```sql
> CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'clickhouse://127.0.0.1:8123',
'url' = 'jdbc:clickhouse://127.0.0.1:8123',
'username' = 'username',
'password' = 'password',
'database-name' = 'default',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.List;

/** End-to-end test for Clickhouse. */
public class ClickhouseE2ECase extends FlinkContainerTestEnviroment {
public class ClickhouseE2ECase extends FlinkContainerTestEnvironment {

private static final Logger logger = LoggerFactory.getLogger(ClickhouseE2ECase.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package org.apache.flink.connector.clickhouse;

import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import com.clickhouse.jdbc.ClickHouseDriver;
import com.clickhouse.jdbc.ClickHouseStatement;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/** A proxy for Clickhouse to execute SQLs and check results. */
public class ClickhouseProxy {
private String jdbcUrl;
private String username;
private String password;
private final String jdbcUrl;
private final String username;
private final String password;
private static final Logger logger = LoggerFactory.getLogger(ClickhouseProxy.class);
ClickHouseDriver driver;
ClickHouseStatement statement;
ClickHouseConnection connection;
Statement statement;
Connection connection;

ClickhouseProxy(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.driver = new ClickHouseDriver();
}

public void connect() {
Expand All @@ -42,9 +38,7 @@ public void connect() {
Properties properties = new Properties();
properties.put("username", username);
properties.put("password", password);
ClickHouseDataSource clickHouseDataSource =
new ClickHouseDataSource(jdbcUrl, properties);
connection = clickHouseDataSource.getConnection(username, password);
connection = DriverManager.getConnection(jdbcUrl, properties);
statement = connection.createStatement();
}
} catch (Exception e) {
Expand Down Expand Up @@ -88,7 +82,7 @@ private void checkResult(List<String> expectedResult, String table, List<String>
}
}

results.add(result.stream().collect(Collectors.joining(",")));
results.add(String.join(",", result));
}
Collections.sort(results);
Collections.sort(expectedResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
import static org.assertj.core.util.Preconditions.checkState;

/** Test environment running job on Flink containers. */
public class FlinkContainerTestEnviroment {
public class FlinkContainerTestEnvironment {

private static final Logger logger =
LoggerFactory.getLogger(FlinkContainerTestEnviroment.class);
LoggerFactory.getLogger(FlinkContainerTestEnvironment.class);
public static final Network NETWORK = Network.newNetwork();

static final ClickHouseContainer CLICKHOUSE_CONTAINER =
Expand Down Expand Up @@ -90,7 +90,7 @@ public void setUp() throws Exception {
"heartbeat.timeout: 60000",
"parallelism.default: 1"));
jobManager =
new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
new GenericContainer<>(DockerImageName.parse("flink:1.19.0-scala_2.12"))
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withExtraHost("host.docker.internal", "host-gateway")
Expand All @@ -101,7 +101,7 @@ public void setUp() throws Exception {
.withEnv("FLINK_PROPERTIES", properties)
.withLogConsumer(new Slf4jLogConsumer(logger));
taskManager =
new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
new GenericContainer<>(DockerImageName.parse("flink:1.19.0-scala_2.12"))
.withCommand("taskmanager")
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.flink.connector.clickhouse.catalog;

import org.apache.flink.connector.clickhouse.config.ClickHouseConfig;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;

import java.util.HashMap;
import java.util.Map;

/** unit test for ClickHouseCatalog. */
public class ClickHouseCatalogTest {
private static final Logger log = LoggerFactory.getLogger(ClickHouseCatalogTest.class);

public static final Network NETWORK = Network.newNetwork();

static final ClickHouseContainer CLICKHOUSE_CONTAINER =
new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withNetwork(NETWORK)
.withNetworkAliases("clickhouse")
.withExposedPorts(8123, 9000)
.withUsername("default")
.withPassword("pwd")
.withLogConsumer(new Slf4jLogConsumer(log));

@Test
public void test() throws TableNotExistException {
Map<String, String> properties = new HashMap<>();
properties.put(ClickHouseConfig.URL, CLICKHOUSE_CONTAINER.getJdbcUrl());
properties.put(ClickHouseConfig.USERNAME, CLICKHOUSE_CONTAINER.getUsername());
properties.put(ClickHouseConfig.PASSWORD, CLICKHOUSE_CONTAINER.getPassword());
ClickHouseCatalog catalog = new ClickHouseCatalog("default_catalog", properties);
catalog.open();
catalog.listDatabases();
catalog.getTable(ObjectPath.fromString("system.disks"));
}

@Before
public void setUp() {
CLICKHOUSE_CONTAINER.start();
}

@After
public void tearDown() {
CLICKHOUSE_CONTAINER.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void validateConfigOptions(ReadableConfig config) {
SinkShardingStrategy shardingStrategy = config.get(SINK_PARTITION_STRATEGY);
if (!config.get(SINK_SHARDING_USE_TABLE_DEF)
&& shardingStrategy.shardingKeyNeeded
&& !config.getOptional(SINK_PARTITION_KEY).isPresent()) {
&& config.getOptional(SINK_PARTITION_KEY).isEmpty()) {
throw new IllegalArgumentException(
"A sharding key must be provided for sharding strategy: "
+ shardingStrategy.value);
Expand Down
Loading