Skip to content

Commit

Permalink
Merge pull request #1909 from ClickHouse/v2_json_support
Browse files Browse the repository at this point in the history
[client-v2] Added support of reading new JSON as string
  • Loading branch information
chernser authored Nov 12, 2024
2 parents 4689c2d + 70cdc62 commit 1baf186
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1903,7 +1903,7 @@ public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response
byteBufferPool);
break;
default:
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
throw new IllegalArgumentException("Binary readers doesn't support format: " + response.getFormat());
}
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ public static List<String> valuesFromCommaSeparated(String value) {
public static final String SETTING_LOG_COMMENT = SERVER_SETTING_PREFIX + "log_comment";

public static final String HTTP_USE_BASIC_AUTH = "http_use_basic_auth";

// -- Experimental features --

/**
* Server will expect a string in JSON format and parse it into a JSON object.
*/
public static final String INPUT_FORMAT_BINARY_READ_JSON_AS_STRING = "input_format_binary_read_json_as_string";

/**
* Server will return a JSON object as a string.
*/
public static final String OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING = "output_format_binary_write_json_as_string";
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.clickhouse.client.api.data_formats.internal;

import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.ClientSettings;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.internal.MapUtils;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.NullValueException;
import com.clickhouse.client.api.query.POJOSetter;
Expand Down Expand Up @@ -32,8 +34,10 @@
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -71,7 +75,9 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
if (timeZone == null) {
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
}
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
boolean jsonAsString = MapUtils.getFlag(this.settings,
ClientSettings.SERVER_SETTING_PREFIX + ClientSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, false);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString);
if (schema != null) {
setSchema(schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,7 @@ public class BinaryStreamReader {

private final ByteBufferAllocator bufferAllocator;

/**
* Creates a BinaryStreamReader instance that will use {@link DefaultByteBufferAllocator} to allocate buffers.
*
* @param input - source of raw data in a suitable format
* @param timeZone - timezone to use for date and datetime values
* @param log - logger
*/
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log) {
this(input, timeZone, log, new DefaultByteBufferAllocator());
}
private final boolean jsonAsString;

/**
* Createa a BinaryStreamReader instance that will use the provided buffer allocator.
Expand All @@ -64,11 +55,12 @@ public class BinaryStreamReader {
* @param log - logger
* @param bufferAllocator - byte buffer allocator
*/
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator) {
BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString) {
this.log = log == null ? NOPLogger.NOP_LOGGER : log;
this.timeZone = timeZone;
this.input = input;
this.bufferAllocator = bufferAllocator;
this.jsonAsString = jsonAsString;
}

/**
Expand Down Expand Up @@ -203,8 +195,13 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
case Ring:
return (T) readGeoRing();

// case JSON: // obsolete https://clickhouse.com/docs/en/sql-reference/data-types/json#displaying-json-column
// case Object:
case JSON: // experimental https://clickhouse.com/docs/en/sql-reference/data-types/newjson
if (jsonAsString) {
return (T) readString(input);
} else {
throw new RuntimeException("Reading JSON from binary is not implemented yet");
}
// case Object: // deprecated https://clickhouse.com/docs/en/sql-reference/data-types/object-data-type
case Array:
return convertArray(readArray(column), typeHint);
case Map:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,22 @@ private static void serializePrimitiveData(OutputStream stream, Object value, Cl
case IPv6:
BinaryStreamUtils.writeInet6Address(stream, (Inet6Address) value);
break;
case JSON:
serializeJSON(stream, value);
break;
default:
throw new UnsupportedOperationException("Unsupported data type: " + column.getDataType());
}
}

private static void serializeJSON(OutputStream stream, Object value) throws IOException {
if (value instanceof String) {
BinaryStreamUtils.writeString(stream, (String)value);
} else {
throw new UnsupportedOperationException("Serialization of Java object to JSON is not supported yet.");
}
}

private static void serializeAggregateFunction(OutputStream stream, Object value, ClickHouseColumn column) throws IOException {
if (column.getAggregateFunction() == ClickHouseAggregateFunction.groupBitmap) {
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,24 @@ public static boolean getFlag(Map<String, String> map, String key) {
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
}

public static boolean getFlag(Map<String, String> map, String key, boolean defaultValue) {
String val = map.get(key);
public static boolean getFlag(Map<String, ?> map, String key, boolean defaultValue) {
Object val = map.get(key);
if (val == null) {
return defaultValue;
}
if (val.equalsIgnoreCase("true")) {
return true;
} else if (val.equalsIgnoreCase("false")) {
return false;
if (val instanceof Boolean) {
return (Boolean) val;
} else if (val instanceof String) {
String str = (String) val;
if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("1")) {
return true;
} else if (str.equalsIgnoreCase("false") || str.equalsIgnoreCase("0")) {
return false;
}
}

throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
}


public static boolean getFlag(Map<String, ?> p1, Map<String, ?> p2, String key) {
Object val = p1.get(key);
if (val == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.ClientSettings;
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.command.CommandSettings;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
Expand All @@ -19,19 +21,25 @@
import com.clickhouse.client.api.metrics.ServerMetrics;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseVersion;
import org.apache.commons.lang3.StringEscapeUtils;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -121,6 +129,44 @@ public void insertSimplePOJOs() throws Exception {
assertEquals(response.getQueryId(), uuid);
}


@Test(groups = { "integration" }, enabled = true)
public void insertPOJOWithJSON() throws Exception {
List<GenericRecord> serverVersion = client.queryAll("SELECT version()");
if (ClickHouseVersion.of(serverVersion.get(0).getString(1)).check("(,24.8]")) {
System.out.println("Test is skipped: feature is supported since 24.8");
return;
}

final String tableName = "pojo_with_json_table";
final String createSQL = PojoWithJSON.createTable(tableName);
final String originalJsonStr = "{\"a\":{\"b\":\"42\"},\"c\":[\"1\",\"2\",\"3\"]}";


CommandSettings commandSettings = new CommandSettings();
commandSettings.serverSetting("allow_experimental_json_type", "1");
client.execute("DROP TABLE IF EXISTS " + tableName, commandSettings).get(1, TimeUnit.SECONDS);
client.execute(createSQL, commandSettings).get(1, TimeUnit.SECONDS);

client.register(PojoWithJSON.class, client.getTableSchema(tableName, "default"));
PojoWithJSON pojo = new PojoWithJSON();
pojo.setEventPayload(originalJsonStr);
List<Object> data = Arrays.asList(pojo);

InsertSettings insertSettings = new InsertSettings()
.serverSetting(ClientSettings.INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1");
InsertResponse response = client.insert(tableName, data, insertSettings).get(30, TimeUnit.SECONDS);
assertEquals(response.getWrittenRows(), 1);

QuerySettings settings = new QuerySettings()
.setFormat(ClickHouseFormat.CSV);
try (QueryResponse resp = client.query("SELECT * FROM " + tableName, settings).get(1, TimeUnit.SECONDS)) {
BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getInputStream()));
String jsonStr = StringEscapeUtils.unescapeCsv(reader.lines().findFirst().get());
Assert.assertEquals(jsonStr, originalJsonStr);
}
}

@Test(groups = { "integration" }, enabled = true)
public void insertPOJOAndReadBack() throws Exception {
final String tableName = "single_pojo_table";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.clickhouse.client.insert;

import java.util.Objects;

public class PojoWithJSON {

// This field is a string representation of a JSON object
private String eventPayload;

public String getEventPayload() {
return eventPayload;
}

public void setEventPayload(String eventPayload) {
this.eventPayload = eventPayload;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PojoWithJSON that = (PojoWithJSON) o;
return Objects.equals(eventPayload, that.eventPayload);
}

@Override
public int hashCode() {
return Objects.hash(eventPayload);
}

@Override
public String toString() {
return "PojoWithJSON{" +
"eventPayload='" + eventPayload + '\'' +
'}';
}

public static String createTable(String tableName) {
return "CREATE TABLE " + tableName + " (eventPayload JSON) ENGINE = MergeTree() ORDER BY tuple()";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.clickhouse.client.api.DataTypeUtils;
import com.clickhouse.client.api.ServerException;
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.command.CommandSettings;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
Expand All @@ -30,14 +30,13 @@
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.client.http.config.HttpConnectionProvider;
import com.clickhouse.client.insert.SamplePOJO;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseVersion;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringEscapeUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -57,15 +56,13 @@
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.nio.file.Files;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -74,7 +71,6 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -123,7 +119,6 @@ public void setUp() {
.compressClientRequest(false)
.compressServerResponse(useServerCompression)
.useHttpCompression(useHttpCompression)
.useNewImplementation(true)
.build();

delayForProfiler(0);
Expand Down Expand Up @@ -340,7 +335,7 @@ public void testQueryAllTableNames() {
}

@Test(groups = {"integration"})
public void testQueryJSON() throws ExecutionException, InterruptedException {
public void testQueryJSONEachRow() throws ExecutionException, InterruptedException {
Map<String, Object> datasetRecord = prepareSimpleDataSet();
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.JSONEachRow);
Future<QueryResponse> response = client.query("SELECT * FROM " + DATASET_TABLE, settings);
Expand Down Expand Up @@ -1793,6 +1788,35 @@ public void testReadingBitmap() throws Exception {
}
}

@Test(groups = {"integration"})
public void testReadingJSONValues() throws Exception {
List<GenericRecord> serverVersion = client.queryAll("SELECT version()");
if (ClickHouseVersion.of(serverVersion.get(0).getString(1)).check("(,24.8]")) {
System.out.println("Test is skipped: feature is supported since 24.8");
return;
}
CommandSettings commandSettings = new CommandSettings();
commandSettings.serverSetting("allow_experimental_json_type", "1");
client.execute("DROP TABLE IF EXISTS test_json_values", commandSettings).get(1, TimeUnit.SECONDS);
client.execute("CREATE TABLE test_json_values (json JSON) ENGINE = MergeTree ORDER BY ()", commandSettings).get(1, TimeUnit.SECONDS);
client.execute("INSERT INTO test_json_values VALUES ('{\"a\" : {\"b\" : 42}, \"c\" : [1, 2, 3]}')", commandSettings).get(1, TimeUnit.SECONDS);


QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.CSV);
try (QueryResponse resp = client.query("SELECT json FROM test_json_values", settings).get(1, TimeUnit.SECONDS)) {
BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getInputStream()));
Assert.assertEquals(StringEscapeUtils.unescapeCsv(reader.lines().findFirst().get()), "{\"a\":{\"b\":\"42\"},\"c\":[\"1\",\"2\",\"3\"]}");
}

settings = new QuerySettings()
.serverSetting(ClientSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
try (QueryResponse resp = client.query("SELECT json FROM test_json_values", settings).get(1, TimeUnit.SECONDS)) {
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(resp);
Assert.assertNotNull(reader.next());
Assert.assertEquals(reader.getString(1), "{\"a\":{\"b\":\"42\"},\"c\":[\"1\",\"2\",\"3\"]}");
}
}

protected Client.Builder newClient() {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
return new Client.Builder()
Expand Down
Loading

0 comments on commit 1baf186

Please sign in to comment.