Skip to content

Commit a599876

Browse files
authored
Merge pull request #2034 from ClickHouse/v2_implement_writer_api
[client-v2] V2 implement writer api
2 parents 75cde09 + f50c9df commit a599876

File tree

13 files changed

+798
-75
lines changed

13 files changed

+798
-75
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

+67-59
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
1111
import com.clickhouse.client.api.data_formats.NativeFormatReader;
1212
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
13+
import com.clickhouse.client.api.data_formats.RowBinaryFormatSerializer;
1314
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
1415
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
1516
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
@@ -43,7 +44,6 @@
4344
import com.clickhouse.client.api.query.Records;
4445
import com.clickhouse.client.config.ClickHouseClientOption;
4546
import com.clickhouse.data.ClickHouseColumn;
46-
import com.clickhouse.data.ClickHouseDataType;
4747
import com.clickhouse.data.ClickHouseFormat;
4848
import org.apache.hc.client5.http.ConnectTimeoutException;
4949
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
@@ -59,6 +59,7 @@
5959
import java.io.ByteArrayOutputStream;
6060
import java.io.IOException;
6161
import java.io.InputStream;
62+
import java.io.OutputStream;
6263
import java.lang.reflect.InvocationTargetException;
6364
import java.lang.reflect.Method;
6465
import java.net.ConnectException;
@@ -623,6 +624,11 @@ public Builder useHttpCompression(boolean enabled) {
623624
return this;
624625
}
625626

627+
public Builder appCompressedData(boolean enabled) {
628+
this.configuration.put(ClientConfigProperties.APP_COMPRESSED_DATA.getKey(), String.valueOf(enabled));
629+
return this;
630+
}
631+
626632
/**
627633
* Sets buffer size for uncompressed data in LZ4 compression.
628634
* For outgoing data it is the size of a buffer that will be compressed.
@@ -1066,6 +1072,11 @@ public Client build() {
10661072

10671073
private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000;
10681074

1075+
/**
1076+
* Default size for a buffers used in networking.
1077+
*/
1078+
public static final int DEFAULT_BUFFER_SIZE = 8192;
1079+
10691080
private void setDefaults() {
10701081

10711082
// set default database name if not specified
@@ -1154,6 +1165,10 @@ private void setDefaults() {
11541165
if (!configuration.containsKey(ClientConfigProperties.USE_HTTP_COMPRESSION.getKey())) {
11551166
useHttpCompression(false);
11561167
}
1168+
1169+
if (!configuration.containsKey(ClientConfigProperties.APP_COMPRESSED_DATA.getKey())) {
1170+
appCompressedData(false);
1171+
}
11571172
}
11581173
}
11591174

@@ -1236,45 +1251,9 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
12361251
schemaSerializers.put(column.getColumnName(), (obj, stream) -> {
12371252
Object value = getterMethod.invoke(obj);
12381253

1239-
if (defaultsSupport) {
1240-
if (value != null) {//Because we now support defaults, we have to send nonNull
1241-
SerializerUtils.writeNonNull(stream);//Write 0 for no default
1242-
1243-
if (column.isNullable()) {//If the column is nullable
1244-
SerializerUtils.writeNonNull(stream);//Write 0 for not null
1245-
}
1246-
} else {//So if the object is null
1247-
if (column.hasDefault()) {
1248-
SerializerUtils.writeNull(stream);//Send 1 for default
1249-
return;
1250-
} else if (column.isNullable()) {//And the column is nullable
1251-
SerializerUtils.writeNonNull(stream);
1252-
SerializerUtils.writeNull(stream);//Then we send null, write 1
1253-
return;//And we're done
1254-
} else if (column.getDataType() == ClickHouseDataType.Array) {//If the column is an array
1255-
SerializerUtils.writeNonNull(stream);//Then we send nonNull
1256-
} else {
1257-
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
1258-
}
1259-
}
1260-
} else {
1261-
if (column.isNullable()) {
1262-
if (value == null) {
1263-
SerializerUtils.writeNull(stream);
1264-
return;
1265-
}
1266-
SerializerUtils.writeNonNull(stream);
1267-
} else if (value == null) {
1268-
if (column.getDataType() == ClickHouseDataType.Array) {
1269-
SerializerUtils.writeNonNull(stream);
1270-
} else {
1271-
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
1272-
}
1273-
}
1254+
if (RowBinaryFormatSerializer.writeValuePreamble(stream, defaultsSupport, column, value)) {
1255+
SerializerUtils.serializeData(stream, value, column);
12741256
}
1275-
1276-
//Handle the different types
1277-
SerializerUtils.serializeData(stream, value, column);
12781257
});
12791258
} else {
12801259
LOG.warn("No getter method found for column: {}", propertyName);
@@ -1473,7 +1452,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
14731452
}
14741453

14751454
/**
1476-
* <p>Sends write request to database. Input data is read from the input stream.</p>
1455+
* Sends write request to database. Input data is read from the input stream.
14771456
*
14781457
* @param tableName - destination table name
14791458
* @param data - data stream to insert
@@ -1482,7 +1461,49 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
14821461
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
14831462
*/
14841463
public CompletableFuture<InsertResponse> insert(String tableName,
1485-
InputStream data,
1464+
InputStream data,
1465+
ClickHouseFormat format,
1466+
InsertSettings settings) {
1467+
1468+
final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
1469+
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(),
1470+
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getDefaultValue())) :
1471+
settings.getInputStreamCopyBufferSize();
1472+
1473+
if (writeBufferSize <= 0) {
1474+
throw new IllegalArgumentException("Buffer size must be greater than 0");
1475+
}
1476+
1477+
return insert(tableName, new DataStreamWriter() {
1478+
@Override
1479+
public void onOutput(OutputStream out) throws IOException {
1480+
byte[] buffer = new byte[writeBufferSize];
1481+
int bytesRead;
1482+
while ((bytesRead = data.read(buffer)) > 0) {
1483+
out.write(buffer, 0, bytesRead);
1484+
}
1485+
out.close();
1486+
}
1487+
1488+
@Override
1489+
public void onRetry() throws IOException {
1490+
data.reset();
1491+
}
1492+
},
1493+
format, settings);
1494+
}
1495+
1496+
/**
1497+
* Does an insert request to a server. Data is pushed when a {@link DataStreamWriter#onOutput(OutputStream)} is called.
1498+
*
1499+
* @param tableName - target table name
1500+
* @param writer - {@link DataStreamWriter} implementation
1501+
* @param format - source format
1502+
* @param settings - operation settings
1503+
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
1504+
*/
1505+
public CompletableFuture<InsertResponse> insert(String tableName,
1506+
DataStreamWriter writer,
14861507
ClickHouseFormat format,
14871508
InsertSettings settings) {
14881509

@@ -1513,6 +1534,8 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15131534

15141535
settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name());
15151536
final InsertSettings finalSettings = settings;
1537+
final String sqlStmt = "INSERT INTO \"" + tableName + "\" FORMAT " + format.name();
1538+
finalSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt);
15161539
responseSupplier = () -> {
15171540
// Selecting some node
15181541
ClickHouseNode selectedNode = getNextAliveNode();
@@ -1523,17 +1546,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15231546
try (ClassicHttpResponse httpResponse =
15241547
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(),
15251548
out -> {
1526-
out.write("INSERT INTO ".getBytes());
1527-
out.write(tableName.getBytes());
1528-
out.write(" FORMAT ".getBytes());
1529-
out.write(format.name().getBytes());
1530-
out.write(" \n".getBytes());
1531-
1532-
byte[] buffer = new byte[writeBufferSize];
1533-
int bytesRead;
1534-
while ((bytesRead = data.read(buffer)) > 0) {
1535-
out.write(buffer, 0, bytesRead);
1536-
}
1549+
writer.onOutput(out);
15371550
out.close();
15381551
})) {
15391552

@@ -1566,7 +1579,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15661579

15671580
if (i < maxRetries) {
15681581
try {
1569-
data.reset();
1582+
writer.onRetry();
15701583
} catch (IOException ioe) {
15711584
throw new ClientException("Failed to reset stream before next attempt", ioe);
15721585
}
@@ -1581,12 +1594,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15811594

15821595
CompletableFuture<ClickHouseResponse> future = null;
15831596
future = request.data(output -> {
1584-
//Copy the data from the input stream to the output stream
1585-
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
1586-
int bytesRead;
1587-
while ((bytesRead = data.read(buffer)) != -1) {
1588-
output.write(buffer, 0, bytesRead);
1589-
}
1597+
writer.onOutput(output);
15901598
output.close();
15911599
}).option(ClickHouseClientOption.ASYNC, false).execute();
15921600

client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public enum ClientConfigProperties {
104104

105105
QUERY_ID("query_id"), // actually a server setting, but has client effect too
106106

107-
CLIENT_NETWORK_BUFFER_SIZE("client_network_buffer_size"),
108-
107+
CLIENT_NETWORK_BUFFER_SIZE("client_network_buffer_size", String.valueOf(Client.Builder.DEFAULT_BUFFER_SIZE)),
109108

110109
ACCESS_TOKEN("access_token"), SSL_AUTH("ssl_authentication"),
111110

@@ -123,7 +122,12 @@ public enum ClientConfigProperties {
123122
@Deprecated
124123
PRODUCT_NAME("product_name"),
125124

126-
BEARERTOKEN_AUTH ("bearer_token")
125+
BEARERTOKEN_AUTH ("bearer_token"),
126+
/**
127+
* Indicates that data provided for write operation is compressed by application.
128+
*/
129+
APP_COMPRESSED_DATA("app_compressed_data"),
130+
127131
;
128132

129133
private String key;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.clickhouse.client.api;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
6+
public interface DataStreamWriter {
7+
8+
/**
9+
* Called by client when output stream is ready for user data.
10+
* This method is called once per operation, so all data should be written while the call.
11+
* Output stream will be closed by client.
12+
* When client compression is enabled, then output stream will be a compressing one.
13+
* If {@link ClientConfigProperties#APP_COMPRESSED_DATA} is set for an operation,
14+
* then {@param out} will be raw IO stream without compression.
15+
* @param out - output stream
16+
* @throws IOException - when any IO exceptions happens.
17+
*/
18+
void onOutput(OutputStream out) throws IOException;
19+
20+
/**
21+
* Is called when client is going to perform a retry.
22+
* It is optional to implement this method because most cases there is nothing to reset.
23+
* Useful to reset wrapped stream or throw exception to indicate that retry is not supported for a data source.
24+
* @throws IOException - when any IO exception happens.
25+
*/
26+
default void onRetry() throws IOException {}
27+
}

0 commit comments

Comments
 (0)