Skip to content

[client-v2] Inserting compressed data (using compressed input stream) #2010

Closed
@dizider

Description

@dizider

Describe your feedback

The client in version 2 allows to insert data from input byte stream. It also supports inserting data in compressed format (decompress option). I would like to be able to insert data from already compressed data stream. This change allows to create compressed in memory batches and then send them directly.

I have already looked at code and found that there is one major incompatibly with the current API - insert statement is a part of the request body and is added before each request. This is a problem because the whole body has to be compressed or not. So I think the options are to send the statement as query parameter of HTTP or to allow user defined insert statements.

Code example

Here are two examples of how I would like to use the client. The second example is my actual use case - create compressed batches in memory and then sending them. Creating these compressed batches should be more memory efficient which can be used in memory heavy applications.

DISCLAIMER: The following examples do not work with current implementation! they assume that insert query will be sent as an HTTP query parameter.

Compressed stream

void compressedStream() {
	final String table = "test_table";  
	final ClickHouseFormat format = ClickHouseFormat.CSV;  
	try (Client client = new Client.Builder()  
	        .compressClientRequest(false)  
	        .compressServerResponse(false)  
	        .addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)  
	        .setUsername(clickhouse.getUsername())  
	        .setPassword(clickhouse.getPassword())  
	        .useAsyncRequests(true)  
	        .build()) {  
	  
	    final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());  
	    final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(pipedOutputStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);  
	    
	    final var futureResponse = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));  
	  
	    // write data to insert to compressedOutputStream  
	    final int numberOfRows = 2;  
	    compressedOutputStream.write("1,foo\n".getBytes());  
	    compressedOutputStream.write("2,bar\n".getBytes());  
	    compressedOutputStream.close();  
	    pipedOutputStream.close();  
	  
	    // insert setting tells ClickHouse that data are compressed  
	    // but the insert doesn't work, because the `insert` method prepend the insert statement (and it isn't compressed)
	    try (var response = futureResponse.join()) {  
	        final var writtenRows = response.getWrittenRows();  
	        System.out.println("Written rows to ClickHouse: " + writtenRows);  
	        if (writtenRows != numberOfRows) {  
	            System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");  
	        }  
	    }  
	}
}

Compressed batch

private static class ByteBufferBackedOutputStream extends OutputStream {  
    private final ByteBuffer buffer;  
  
    public ByteBufferBackedOutputStream(ByteBuffer buffer) {  
        this.buffer = buffer;
    }  
  
    @Override  
    public void write(int b) {  
        buffer.put((byte) b);  
    }  
  
    @Override  
    public void write(byte[] bytes, int off, int len) {  
        buffer.put(bytes, off, len);  
    }  
}

void compressedBatch() {
	final String table = "test_table";  
	final ClickHouseFormat format = ClickHouseFormat.CSV;  
	final ByteBuffer batch = ByteBuffer.allocate(1024);  
	  
	try (Client client = new Client.Builder()  
	        .compressClientRequest(false)  
	        .compressServerResponse(false)  
	        .addEndpoint(Protocol.HTTP, "localhost", clickhouse.getMappedPort(8123), false)  
	        .setUsername(clickhouse.getUsername())  
	        .setPassword(clickhouse.getPassword())  
	        .useAsyncRequests(true)  
	        .build()) {  
	  
	    final var sinkStream = new ByteBufferBackedOutputStream(batch);  
	    final OutputStream compressedOutputStream = new ClickHouseLZ4OutputStream(sinkStream, LZ4Factory.fastestInstance().fastCompressor(), 8192);  
	    
	    // write data into batch  
	    final int numberOfRows = 2;  
	    compressedOutputStream.write("1,foo\n".getBytes());  
	    compressedOutputStream.write("2,bar\n".getBytes());  
	    compressedOutputStream.flush();  
	    // now, the batch contains 2 rows in compressed form  
	  
	    final var pipedOutputStream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(new ClickHouseConfig());  
	  
	    // open request  
	    final var insertResponseFuture = client.insert(table, pipedOutputStream.getInputStream(), format, new InsertSettings().serverSetting("decompress", "1"));  
	  
	    // stream buffer  
	    pipedOutputStream.writeBytes(batch.array(), 0, batch.position());  
	    pipedOutputStream.close();  
	  
	    // await response  
	    try (var response = insertResponseFuture.join()) {  
	        final var writtenRows = response.getWrittenRows();  
	        System.out.println("Written rows to ClickHouse: " + writtenRows);  
	        if (writtenRows != numberOfRows) {  
	            System.err.println("Written only " + writtenRows + " from " + numberOfRows + " expected.");  
	        }  
	    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions