Skip to content

Commit

Permalink
Merge branch 'main' into feat_compression_support
Browse files Browse the repository at this point in the history
  • Loading branch information
chernser committed Aug 6, 2024
2 parents 2d3852f + 301d75f commit f4f4552
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 71 deletions.
81 changes: 45 additions & 36 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class Client implements AutoCloseable {

private boolean useNewImplementation = false;

private ClickHouseClient oldClient = null;

private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation) {
this.endpoints = endpoints;
Expand All @@ -140,6 +141,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
this.httpClientHelper = new HttpAPIClientHelper(configuration);
LOG.info("Using new http client implementation");
} else {
this.oldClient = ClientV1AdaptorHelper.createClient(configuration);
LOG.info("Using old http client implementation");
}
}
Expand Down Expand Up @@ -169,6 +171,10 @@ public void close() {
} catch (Exception e) {
LOG.error("Failed to close shared operation executor", e);
}

if (oldClient != null) {
oldClient.close();
}
}

public static class Builder {
Expand Down Expand Up @@ -623,8 +629,14 @@ public boolean ping() {
* @return true if the server is alive, false otherwise
*/
public boolean ping(long timeout) {
try (ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration)) {
return client.ping(getServerNode(), Math.toIntExact(timeout));
if (useNewImplementation) {
try (QueryResponse response = query("SELECT 1 FORMAT TabSeparated").get(timeout, TimeUnit.MILLISECONDS)) {
return true;
} catch (Exception e) {
return false;
}
} else {
return oldClient.ping(getServerNode(), Math.toIntExact(timeout));
}
}

Expand Down Expand Up @@ -947,43 +959,41 @@ public CompletableFuture<InsertResponse> insert(String tableName,
} else {
CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();

try (ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration)) {
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
.createMutationRequest(client.write(getServerNode()), tableName, settings, configuration).format(format);
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
.createMutationRequest(oldClient.write(getServerNode()), tableName, settings, configuration).format(format);

CompletableFuture<ClickHouseResponse> future = null;
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(request.getConfig())) {
future = request.data(stream.getInputStream()).execute();
CompletableFuture<ClickHouseResponse> future = null;
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(request.getConfig())) {
future = request.data(stream.getInputStream()).execute();

//Copy the data from the input stream to the output stream
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
responseFuture.completeExceptionally(new ClientException("Failed to write data to the output stream", e));
//Copy the data from the input stream to the output stream
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
responseFuture.completeExceptionally(new ClientException("Failed to write data to the output stream", e));
}

if (!responseFuture.isCompletedExceptionally()) {
try {
int operationTimeout = getOperationTimeout();
ClickHouseResponse clickHouseResponse;
if (operationTimeout > 0) {
clickHouseResponse = future.get(operationTimeout, TimeUnit.MILLISECONDS);
} else {
clickHouseResponse = future.get();
}
InsertResponse response = new InsertResponse(client, clickHouseResponse, clientStats);
responseFuture.complete(response);
} catch (ExecutionException e) {
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
} catch (InterruptedException | TimeoutException e) {
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
if (!responseFuture.isCompletedExceptionally()) {
try {
int operationTimeout = getOperationTimeout();
ClickHouseResponse clickHouseResponse;
if (operationTimeout > 0) {
clickHouseResponse = future.get(operationTimeout, TimeUnit.MILLISECONDS);
} else {
clickHouseResponse = future.get();
}
InsertResponse response = new InsertResponse(clickHouseResponse, clientStats);
responseFuture.complete(response);
} catch (ExecutionException e) {
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
} catch (InterruptedException | TimeoutException e) {
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
}
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
}
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));

return responseFuture;
}
Expand Down Expand Up @@ -1086,7 +1096,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
metrics.setQueryId(queryId);
metrics.operationComplete();

return new QueryResponse(httpResponse, finalSettings, metrics);
return new QueryResponse(httpResponse, finalSettings.getFormat(), metrics);
} catch (ClientException e) {
throw e;
} catch (Exception e) {
Expand All @@ -1097,8 +1107,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
}, sharedOperationExecutor);
return future;
} else {
ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration);
ClickHouseRequest<?> request = client.read(getServerNode());
ClickHouseRequest<?> request = oldClient.read(getServerNode());
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
request.settings(SettingsConverter.toRequestSettings(settings.getAllSettings(), queryParams));
request.option(ClickHouseClientOption.ASYNC, false); // we have own async handling
Expand All @@ -1119,7 +1128,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
clickHouseResponse = request.execute().get();
}

return new QueryResponse(client, clickHouseResponse, finalSettings, format, clientStats);
return new QueryResponse(clickHouseResponse, format, clientStats);
} catch (ClientException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,14 @@
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
import com.clickhouse.client.api.metrics.OperationMetrics;
import com.clickhouse.client.api.metrics.ServerMetrics;
import org.apache.hc.core5.http.ClassicHttpResponse;

public class InsertResponse implements AutoCloseable {
private final ClickHouseResponse responseRef;
private final ClickHouseClient client;

private OperationMetrics operationMetrics;

public InsertResponse(ClickHouseClient client, ClickHouseResponse responseRef,
public InsertResponse(ClickHouseResponse responseRef,
ClientStatisticsHolder clientStatisticsHolder) {
this.responseRef = responseRef;
this.client = client;
this.operationMetrics = new OperationMetrics(clientStatisticsHolder);
this.operationMetrics.operationComplete();
this.operationMetrics.setQueryId(responseRef.getSummary().getQueryId());
Expand All @@ -26,22 +22,13 @@ public InsertResponse(ClickHouseClient client, ClickHouseResponse responseRef,

public InsertResponse(OperationMetrics metrics) {
this.responseRef = null;
this.client = null;
this.operationMetrics = metrics;
}

@Override
public void close() {
if (responseRef != null) {
try {
responseRef.close();
} finally {
client.close();
}
}

if (client != null) {
client.close();
responseRef.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.clickhouse.client.api.query;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
Expand All @@ -10,7 +9,6 @@
import com.clickhouse.data.ClickHouseFormat;
import org.apache.hc.core5.http.ClassicHttpResponse;

import java.io.InputStream;
import java.io.InputStream;

/**
Expand All @@ -30,34 +28,27 @@ public class QueryResponse implements AutoCloseable {

private final ClickHouseResponse clickHouseResponse;
private final ClickHouseFormat format;
private ClickHouseClient client;

private QuerySettings settings;

private OperationMetrics operationMetrics;

private ClassicHttpResponse httpResponse;

@Deprecated
public QueryResponse(ClickHouseClient client, ClickHouseResponse clickHouseResponse,
QuerySettings settings, ClickHouseFormat format,
public QueryResponse(ClickHouseResponse clickHouseResponse, ClickHouseFormat format,
ClientStatisticsHolder clientStatisticsHolder) {
this.client = client;
this.clickHouseResponse = clickHouseResponse;
this.format = format;
this.settings = settings;
this.operationMetrics = new OperationMetrics(clientStatisticsHolder);
this.operationMetrics.operationComplete();
this.operationMetrics.setQueryId(clickHouseResponse.getSummary().getQueryId());
ClientV1AdaptorHelper.setServerStats(clickHouseResponse.getSummary().getProgress(),
this.operationMetrics);
}

public QueryResponse(ClassicHttpResponse response, QuerySettings settings, OperationMetrics operationMetrics) {
public QueryResponse(ClassicHttpResponse response, ClickHouseFormat format, OperationMetrics operationMetrics) {
this.clickHouseResponse = null;
this.httpResponse = response;
this.format = settings.getFormat();
this.settings = settings;
this.format = format;
this.operationMetrics = operationMetrics;
}

Expand Down Expand Up @@ -94,14 +85,6 @@ public void close() throws Exception {
throw new ClientException("Failed to close response", e);
}
}

if (client !=null) {
try {
client.close();
} catch (Exception e) {
throw new ClientException("Failed to close client", e);
}
}
}

public ClickHouseFormat getFormat() {
Expand Down
22 changes: 22 additions & 0 deletions client-v2/src/test/java/com/clickhouse/client/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,26 @@ public void testRawSettings() {
client.close();
}
}

@Test
public void testPing() {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
try (Client client = new Client.Builder()
.addEndpoint(node.toUri().toString())
.setUsername("default")
.setPassword("")
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
.build()) {
Assert.assertTrue(client.ping());
}

try (Client client = new Client.Builder()
.addEndpoint("http://localhost:12345")
.setUsername("default")
.setPassword("")
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
.build()) {
Assert.assertFalse(client.ping(TimeUnit.SECONDS.toMillis(20)));
}
}
}

0 comments on commit f4f4552

Please sign in to comment.