Description
Describe your feedback
The client in version 2 allows to insert data from input byte stream. It also supports inserting data in compressed format (decompress
option). I would like to be able to insert data from already compressed data stream. This change allows to create compressed in memory batches and then send them directly.
I have already looked at code and found that there is one major incompatibly with the current API - insert statement is a part of the request body and is added before each request. This is a problem because the whole body has to be compressed or not. So I think the options are to send the statement as query parameter of HTTP or to allow user defined insert statements.
Code example
Here are two examples of how I would like to use the client. The second example is my actual use case - create compressed batches in memory and then sending them. Creating these compressed batches should be more memory efficient which can be used in memory heavy applications.
DISCLAIMER: The following examples do not work with current implementation! they assume that insert query will be sent as an HTTP query parameter.
Compressed stream
void compressedStream() {
final String table = "test_table";
final ClickHouseFormat format = ClickHouseFormat.CSV;
try (Client client = new Client.Builder()
.compressClientRequest(false)
.compressServerResponse(false)
.addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)
.setUsername(clickhouse.getUsername())
.setPassword(clickhouse.getPassword())
.useAsyncRequests(true)
.build()) {
final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());
final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(pipedOutputStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);
final var futureResponse = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));
// write data to insert to compressedOutputStream
final int numberOfRows = 2;
compressedOutputStream.write("1,foo\n".getBytes());
compressedOutputStream.write("2,bar\n".getBytes());
compressedOutputStream.close();
pipedOutputStream.close();
// insert setting tells ClickHouse that data are compressed
// but the insert doesn't work, because the `insert` method prepend the insert statement (and it isn't compressed)
try (var response = futureResponse.join()) {
final var writtenRows = response.getWrittenRows();
System.out.println("Written rows to ClickHouse: " + writtenRows);
if (writtenRows != numberOfRows) {
System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");
}
}
}
}
Compressed batch
private static class ByteBufferBackedOutputStream extends OutputStream {
private final ByteBuffer buffer;
public ByteBufferBackedOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public void write(int b) {
buffer.put((byte) b);
}
@Override
public void write(byte[] bytes, int off, int len) {
buffer.put(bytes, off, len);
}
}
void compressedBatch() {
final String table = "test_table";
final ClickHouseFormat format = ClickHouseFormat.CSV;
final ByteBuffer batch = ByteBuffer.allocate(1024);
try (Client client = new Client.Builder()
.compressClientRequest(false)
.compressServerResponse(false)
.addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)
.setUsername(clickhouse.getUsername())
.setPassword(clickhouse.getPassword())
.useAsyncRequests(true)
.build()) {
final var sinkStream = new ByteBufferBackedOutputStream(batch);
final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(sinkStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);
// write data into batch
final int numberOfRows = 2;
compressedOutputStream.write("1,foo\n".getBytes());
compressedOutputStream.write("2,bar\n".getBytes());
compressedOutputStream.flush();
// now, the batch contains 2 rows in compressed form
final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());
// open request
final var insertResponseFuture = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));
// stream buffer
pipedOutputStream.writeBytes(batch.array(), 0, batch.position());
pipedOutputStream.close();
// await response
try (var response = insertResponseFuture.join()) {
final var writtenRows = response.getWrittenRows();
System.out.println("Written rows to ClickHouse: " + writtenRows);
if (writtenRows != numberOfRows) {
System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");
}
}
}