Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Attach TimeZone to a QueryResponse & use TimeZone settings in binary readers #1763

Merged
merged 8 commits into from
Aug 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse {
private final transient ClickHouseCommandLine cli;

protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException {
super(config, cli.getInputStream(), null, null, ClickHouseResponseSummary.EMPTY);
super(config, cli.getInputStream(), null, null, ClickHouseResponseSummary.EMPTY, null);
this.cli = cli;

if (processor.getInputStream().available() < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TimeZone;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -39,7 +40,7 @@ public interface ClickHouseResponse extends AutoCloseable, Serializable {
/**
* Empty response that can never be closed.
*/
static final ClickHouseResponse EMPTY = new ClickHouseResponse() {
ClickHouseResponse EMPTY = new ClickHouseResponse() {
@Override
public List<ClickHouseColumn> getColumns() {
return Collections.emptyList();
Expand Down Expand Up @@ -75,6 +76,11 @@ public boolean isClosed() {
// ensure the instance is "stateless"
return false;
}

@Override
public TimeZone getTimeZone() {
return null;
}
};

/**
Expand Down Expand Up @@ -102,6 +108,15 @@ public boolean isClosed() {
*/
ClickHouseInputStream getInputStream();

/**
* Returns a server timezone if it is returned by server in a header {@code X-ClickHouse-Timezone } or
* other way. If not, it returns null
* @return server timezone from server response or null
*/
default TimeZone getTimeZone() {
return null;
}

/**
* Gets the first record only. Please use {@link #records()} instead if you need
* to access the rest of records.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.clickhouse.client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.data.ClickHouseRecord;
Expand All @@ -13,12 +8,19 @@
import com.clickhouse.data.ClickHouseSimpleRecord;
import com.clickhouse.data.ClickHouseValue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.TimeZone;

/**
* A simple response built on top of two lists: columns and records.
*/
public class ClickHouseSimpleResponse implements ClickHouseResponse {
private static final long serialVersionUID = 6883452584393840649L;

private final TimeZone timeZone;
/**
* Creates a response object using columns definition and raw values.
*
Expand All @@ -28,7 +30,7 @@ public class ClickHouseSimpleResponse implements ClickHouseResponse {
* @return response object
*/
public static ClickHouseResponse of(ClickHouseConfig config, List<ClickHouseColumn> columns, Object[][] values) {
return of(config, columns, values, null);
return of(config, columns, values, null, null);
}

/**
Expand All @@ -41,7 +43,7 @@ public static ClickHouseResponse of(ClickHouseConfig config, List<ClickHouseColu
* @return response object
*/
public static ClickHouseResponse of(ClickHouseConfig config, List<ClickHouseColumn> columns, Object[][] values,
ClickHouseResponseSummary summary) {
ClickHouseResponseSummary summary, TimeZone timeZone) {
if (columns == null) {
columns = Collections.emptyList();
}
Expand Down Expand Up @@ -69,7 +71,7 @@ public static ClickHouseResponse of(ClickHouseConfig config, List<ClickHouseColu
}
}

return new ClickHouseSimpleResponse(columns, wrappedValues, summary);
return new ClickHouseSimpleResponse(columns, wrappedValues, summary, timeZone);
}

/**
Expand Down Expand Up @@ -118,7 +120,7 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor
records.add(rec);
}

return new ClickHouseSimpleResponse(response.getColumns(), records, response.getSummary());
return new ClickHouseSimpleResponse(response.getColumns(), records, response.getSummary(), response.getTimeZone());
}

private final List<ClickHouseColumn> columns;
Expand All @@ -129,14 +131,15 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor
private volatile boolean closed;

protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, List<ClickHouseRecord> records,
ClickHouseResponseSummary summary) {
ClickHouseResponseSummary summary, TimeZone timeZone) {
this.columns = columns;
this.records = Collections.unmodifiableList(records);
this.summary = summary != null ? summary : ClickHouseResponseSummary.EMPTY;
this.timeZone = timeZone;
}

protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, ClickHouseValue[][] values,
ClickHouseResponseSummary summary) {
ClickHouseResponseSummary summary, TimeZone timeZone) {
this.columns = columns;

int len = values.length;
Expand All @@ -148,6 +151,7 @@ protected ClickHouseSimpleResponse(List<ClickHouseColumn> columns, ClickHouseVal
this.records = Collections.unmodifiableList(list);

this.summary = summary != null ? summary : ClickHouseResponseSummary.EMPTY;
this.timeZone = timeZone;
}

@Override
Expand All @@ -165,6 +169,11 @@ public ClickHouseInputStream getInputStream() {
throw new UnsupportedOperationException("An in-memory response does not have input stream");
}

@Override
public TimeZone getTimeZone() {
return timeZone;
}

@Override
public Iterable<ClickHouseRecord> records() {
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataProcessor;
Expand All @@ -23,32 +24,35 @@ public class ClickHouseStreamResponse implements ClickHouseResponse {

private static final long serialVersionUID = 2271296998310082447L;

private final TimeZone timeZone;

protected static final List<ClickHouseColumn> defaultTypes = Collections
.singletonList(ClickHouseColumn.of("results", "Nullable(String)"));

public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input) throws IOException {
return of(config, input, null, null, null);
}

public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
Map<String, Serializable> settings) throws IOException {
return of(config, input, settings, null, null);
}

public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
List<ClickHouseColumn> columns) throws IOException {
return of(config, input, null, columns, null);
}

public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
Map<String, Serializable> settings, List<ClickHouseColumn> columns) throws IOException {
return of(config, input, settings, columns, null);
}

//
// public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input) throws IOException {
// return of(config, input, null, null, null);
// }
//
// public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
// Map<String, Serializable> settings) throws IOException {
// return of(config, input, settings, null, null);
// }
//
// public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
// List<ClickHouseColumn> columns) throws IOException {
// return of(config, input, null, columns, null);
// }
//
// public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
// Map<String, Serializable> settings, List<ClickHouseColumn> columns) throws IOException {
// return of(config, input, settings, columns, null);
// }
//
public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStream input,
Map<String, Serializable> settings, List<ClickHouseColumn> columns, ClickHouseResponseSummary summary)
Map<String, Serializable> settings, List<ClickHouseColumn> columns,
ClickHouseResponseSummary summary, TimeZone timeZone)
throws IOException {
return new ClickHouseStreamResponse(config, input, settings, columns, summary);
return new ClickHouseStreamResponse(config, input, settings, columns, summary, timeZone);
}

protected final ClickHouseConfig config;
Expand All @@ -58,9 +62,11 @@ public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStre
private volatile boolean closed;

protected ClickHouseStreamResponse(ClickHouseConfig config, ClickHouseInputStream input,
Map<String, Serializable> settings, List<ClickHouseColumn> columns, ClickHouseResponseSummary summary)
Map<String, Serializable> settings, List<ClickHouseColumn> columns, ClickHouseResponseSummary summary,
TimeZone timeZone)
throws IOException {

this.timeZone = timeZone;
boolean hasError = true;
try {
this.processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, input, null, settings,
Expand Down Expand Up @@ -144,6 +150,11 @@ public Iterable<ClickHouseRecord> records() {
return processor.records();
}

@Override
public TimeZone getTimeZone() {
return timeZone;
}

@Override
public <T> Iterable<T> records(Class<T> objClass) {
if (processor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import com.clickhouse.client.ClickHouseConfig;
Expand All @@ -28,7 +29,7 @@ static void checkError(Result result) {

protected ClickHouseGrpcResponse(ClickHouseConfig config, Map<String, Serializable> settings,
ClickHouseStreamObserver observer) throws IOException {
super(config, observer.getInputStream(), settings, null, observer.getSummary());
super(config, observer.getInputStream(), settings, null, observer.getSummary(), null);

this.observer = observer;
}
Expand All @@ -42,7 +43,8 @@ protected ClickHouseGrpcResponse(ClickHouseConfig config, Map<String, Serializab
: ClickHouseGrpcClientImpl.getInput(config, result.getOutput().newInput(),
() -> checkError(result)),
settings, null,
new ClickHouseResponseSummary(null, null));
new ClickHouseResponseSummary(null, null),
TimeZone.getTimeZone(result.getTimeZone()));

this.observer = null;
if (result.hasProgress()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, CloseableH
format = ClickHouseFormat.valueOf(value);
hasQueryResult = true;
}
value = getResponseHeader(response, ClickHouseHttpProto.HEADER_TIMEZONE, "");
timeZone = !ClickHouseChecker.isNullOrEmpty(value) ? TimeZone.getTimeZone(value)
String tzValue = getResponseHeader(response, ClickHouseHttpProto.HEADER_TIMEZONE, "");
timeZone = !ClickHouseChecker.isNullOrEmpty(tzValue) ? TimeZone.getTimeZone(tzValue)
: timeZone;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ protected ClickHouseResponse send(ClickHouseRequest<?> sealedRequest) throws Cli


return ClickHouseStreamResponse.of(httpResponse.getConfig(sealedRequest), httpResponse.getInputStream(),
sealedRequest.getSettings(), null, httpResponse.summary);
sealedRequest.getSettings(), null, httpResponse.summary, httpResponse.getTimeZone());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInp
public ClickHouseInputStream getInputStream() {
return input;
}

public TimeZone getTimeZone() {
return timeZone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand Down Expand Up @@ -222,8 +223,10 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th

f = getFile(f, stmt);
final ClickHouseResponseSummary summary = new ClickHouseResponseSummary(null, null);
TimeZone responseTimeZone = null;
try (ClickHouseResponse response = request.query(stmt.getSQL()).output(f).executeAndWait()) {
summary.add(response.getSummary());
responseTimeZone = response.getTimeZone();
} catch (ClickHouseException e) {
throw SqlExceptionUtils.handle(e);
}
Expand All @@ -236,7 +239,8 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th
new Object[][] { { file, f.getFormat().name(),
f.hasCompression() ? f.getCompressionAlgorithm().encoding() : "none",
f.getCompressionLevel(), f.getFile().length() } },
summary);
summary,
responseTimeZone);
} else if (stmt.getStatementType() == StatementType.INSERT) {
final Mutation m = request.write().query(stmt.getSQL());
final ClickHouseResponseSummary summary = new ClickHouseResponseSummary(null, null);
Expand Down Expand Up @@ -267,7 +271,7 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th
} catch (IOException e) {
throw SqlExceptionUtils.handle(e);
}
return ClickHouseSimpleResponse.of(null, null, new Object[0][], summary);
return ClickHouseSimpleResponse.of(null, null, new Object[0][], summary, null);
}
}

Expand Down
Loading