Skip to content

[client-v2] Fix reading Array(Uint64) #1995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ public interface ClickHouseBinaryFormatReader extends AutoCloseable {
*/
double[] getDoubleArray(String colName);

/**
*
* @param colName
* @return
*/
boolean[] getBooleanArray(String colName);

/**
* Reads column with name `colName` as a string.
*
Expand Down Expand Up @@ -503,6 +510,8 @@ public interface ClickHouseBinaryFormatReader extends AutoCloseable {
*/
double[] getDoubleArray(int index);

boolean[] getBooleanArray(int index);

Object[] getTuple(int index);

Object[] getTuple(String colName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -58,13 +59,24 @@ private boolean readBlock() throws IOException {
for (int i = 0; i < nColumns; i++) {
ClickHouseColumn column = ClickHouseColumn.of(BinaryStreamReader.readString(input),
BinaryStreamReader.readString(input));

names.add(column.getColumnName());
types.add(column.getDataType().name());

List<Object> values = new ArrayList<>(nRows);
for (int j = 0; j < nRows; j++) {
Object value = binaryStreamReader.readValue(column);
values.add(value);
if (column.isArray()) {
int[] sizes = new int[nRows];
for (int j = 0; j < nRows; j++) {
sizes[j] = Math.toIntExact(binaryStreamReader.readLongLE());
}
for (int j = 0; j < nRows; j++) {
values.add(binaryStreamReader.readArrayItem(column.getNestedColumns().get(0), sizes[0]));
}
} else {
for (int j = 0; j < nRows; j++) {
Object value = binaryStreamReader.readValue(column);
values.add(value);
}
}
currentBlock.add(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ public double[] getDoubleArray(String colName) {
return getPrimitiveArray(colName);
}

@Override
public boolean[] getBooleanArray(String colName) {
return getPrimitiveArray(colName);
}

@Override
public boolean hasValue(int colIndex) {
return currentRecord.containsKey(getSchema().indexToName(colIndex - 1));
Expand Down Expand Up @@ -646,6 +651,11 @@ public double[] getDoubleArray(int index) {
return getPrimitiveArray(schema.indexToName(index));
}

@Override
public boolean[] getBooleanArray(int index) {
return getPrimitiveArray(schema.indexToName(index));
}

@Override
public Object[] getTuple(int index) {
return readValue(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public double[] getDoubleArray(String colName) {
return reader.getDoubleArray(colName);
}

@Override
public boolean[] getBooleanArray(String colName) {
return reader.getBooleanArray(colName);
}

@Override
public String getString(int index) {
return reader.getString(index);
Expand Down Expand Up @@ -298,6 +303,11 @@ public double[] getDoubleArray(int index) {
return reader.getDoubleArray(index);
}

@Override
public boolean[] getBooleanArray(int index) {
return reader.getBooleanArray(index);
}

@Override
public Object[] getTuple(int index) {
return reader.getTuple(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.TimeZone;
import java.util.UUID;

import static com.clickhouse.data.ClickHouseDataType.toObjectType;

/**
* This class is not thread safe and should not be shared between multiple threads.
* Internally it may use a shared buffer to read data from the input stream.
Expand Down Expand Up @@ -521,24 +519,59 @@ public static byte[] readNBytesLE(InputStream input, byte[] buffer, int offset,
* @throws IOException when IO error occurs
*/
public ArrayValue readArray(ClickHouseColumn column) throws IOException {
Class<?> itemType = column.getArrayBaseColumn().getDataType().getWiderPrimitiveClass();
if (column.getArrayBaseColumn().isNullable()) {
itemType = toObjectType(itemType);
}
int len = readVarInt(input);
ArrayValue array = new ArrayValue(column.getArrayNestedLevel() > 1 ? ArrayValue.class : itemType, len);

if (len == 0) {
return array;
return new ArrayValue(Object.class, 0);
}

for (int i = 0; i < len; i++) {
array.set(i, readValue(column.getNestedColumns().get(0)));
ArrayValue array;
ClickHouseColumn itemTypeColumn = column.getNestedColumns().get(0);
if (column.getArrayNestedLevel() == 1) {
array = readArrayItem(itemTypeColumn, len);

} else {
array = new ArrayValue(ArrayValue.class, len);
for (int i = 0; i < len; i++) {
array.set(i, readArray(itemTypeColumn));
}
}

return array;
}

public ArrayValue readArrayItem(ClickHouseColumn itemTypeColumn, int len) throws IOException {
ArrayValue array;
if (itemTypeColumn.isNullable()) {
array = new ArrayValue(Object.class, len);
for (int i = 0; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
}
} else {
Object firstValue = readValue(itemTypeColumn);
Class<?> itemClass = firstValue.getClass();
if (firstValue instanceof Byte) {
itemClass = byte.class;
} else if (firstValue instanceof Character) {
itemClass = char.class;
} else if (firstValue instanceof Short) {
itemClass = short.class;
} else if (firstValue instanceof Integer) {
itemClass = int.class;
} else if (firstValue instanceof Long) {
itemClass = long.class;
} else if (firstValue instanceof Boolean) {
itemClass = boolean.class;
}

array = new ArrayValue(itemClass, len);
array.set(0, firstValue);
for (int i = 1; i < len; i++) {
array.set(i, readValue(itemTypeColumn));
}
}
return array;
}

public void skipValue(ClickHouseColumn column) throws IOException {
readValue(column, null);
}
Expand All @@ -557,8 +590,6 @@ public static class ArrayValue {

try {
if (itemType.isArray()) {
array = Array.newInstance(ArrayValue.class, length);
} else if (itemType == List.class) {
array = Array.newInstance(Object[].class, length);
} else {
array = Array.newInstance(itemType, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ public double[] getDoubleArray(String colName) {
return getPrimitiveArray(colName);
}

@Override
public boolean[] getBooleanArray(String colName) {
return getPrimitiveArray(colName);
}

@Override
public boolean hasValue(int colIndex) {
return record.containsKey(schema.indexToName(colIndex));
Expand Down Expand Up @@ -426,6 +431,11 @@ public double[] getDoubleArray(int index) {
return getPrimitiveArray(schema.indexToName(index));
}

@Override
public boolean[] getBooleanArray(int index) {
return getPrimitiveArray(schema.indexToName(index));
}

@Override
public Object[] getTuple(int index) {
return readValue(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ public interface GenericRecord {
*/
double[] getDoubleArray(String colName);

boolean[] getBooleanArray(String colName);

/**
* Reads column with name `colName` as a string.
*
Expand Down Expand Up @@ -469,6 +471,8 @@ public interface GenericRecord {
*/
double[] getDoubleArray(int index);

boolean[] getBooleanArray(int index);

Object[] getTuple(int index);

Object[] getTuple(String colName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.command.CommandSettings;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
Expand Down Expand Up @@ -170,6 +171,8 @@ public void testReadRecords() throws Exception {
Assert.assertEquals(record.getString("col3"), dsRecords.get("col3"));
Assert.assertEquals(record.getLong("col4"), dsRecords.get("col4"));
Assert.assertEquals(record.getString("col5"), dsRecords.get("col5"));
Assert.assertEquals(record.getBooleanArray("col6"), ((List)dsRecords.get("col6")).toArray());
Assert.assertEquals(record.getIntArray("col7"), ((List)dsRecords.get("col7")).toArray());
}
}

Expand Down Expand Up @@ -286,7 +289,16 @@ public void testQueryAll() throws Exception {
for (String colDefinition : DATASET_COLUMNS) {
// result values
String colName = colDefinition.split(" ")[0];
List<Object> colValues = records.stream().map(r -> r.getObject(colName)).collect(Collectors.toList());
List<Object> colValues = records.stream().map(r -> {
Object v = r.getObject(colName);
if (v instanceof BinaryStreamReader.ArrayValue) {
v = ((BinaryStreamReader.ArrayValue)v).asList();
}

return v;
}

).collect(Collectors.toList());
Assert.assertEquals(colValues.size(), dataset.size());

// dataset values
Expand Down Expand Up @@ -387,13 +399,36 @@ public void testRowBinaryQueries(ClickHouseFormat format)
while (dataIterator.hasNext()) {
Map<String, Object> expectedRecord = dataIterator.next();
Map<String, Object> actualRecord = reader.next();
Assert.assertEquals(actualRecord, expectedRecord);
for (Map.Entry<String, Object> entry : actualRecord.entrySet()) {
Object value = entry.getValue();
if (entry.getValue() instanceof BinaryStreamReader.ArrayValue) {
value = ((BinaryStreamReader.ArrayValue)value).asList();
}

Assert.assertEquals(value, expectedRecord.get(entry.getKey()), "Value of " + entry.getKey() + " doesn't match: "
+ expectedRecord.get(entry.getKey()) + " expected, actual: " + value);

}
rowsCount++;
}

Assert.assertEquals(rowsCount, rows);
}

@Test
public void testReadingArrayInNative() throws Exception {

QuerySettings querySettings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
try (QueryResponse response = client.query("SELECT [1, 2, 3] as arr1, [[1, 2, 3], [4, 5, 6]] as arr2", querySettings).get()) {
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

Map<String, Object> record = reader.next();
Assert.assertEquals(((BinaryStreamReader.ArrayValue)record.get("arr1")).asList(), Arrays.asList((short)1, (short)2, (short)3));
Assert.assertEquals(((BinaryStreamReader.ArrayValue)record.get("arr2")).asList().get(0), Arrays.asList((short)1, (short)2, (short)3));
Assert.assertEquals(((BinaryStreamReader.ArrayValue)record.get("arr2")).asList().get(1), Arrays.asList((short)4, (short)5, (short)6));
}
}

@Test(groups = {"integration"})
public void testBinaryStreamReader() throws Exception {
final String table = "dynamic_schema_test_table";
Expand Down Expand Up @@ -450,7 +485,9 @@ record = reader.next();

private final static List<String> ARRAY_COLUMNS = Arrays.asList(
"col1 Array(UInt32)",
"col2 Array(Array(Int32))"
"col2 Array(Array(Int32))",
"col3 Array(UInt64)",
"col4 Array(Bool)"
);

private final static List<Function<String, Object>> ARRAY_VALUE_GENERATORS = Arrays.asList(
Expand All @@ -459,11 +496,18 @@ record = reader.next();
.asLongStream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll),
c -> {
List<List<Integer>> values = new ArrayList<>();

for (int i = 0; i < 10; i++) {
values.add(Arrays.asList(1, 2, 3));
values.add(Arrays.asList(i, 2 * i , 3 * i));
}
return values;
}
},
c ->
RANDOM.longs(10, 0, Long.MAX_VALUE)
.mapToObj(BigInteger::valueOf).collect(Collectors.toList()),
c -> RANDOM.ints(10, 0, 1)
.mapToObj(i -> i == 0 ).collect(Collectors.toList())

);

@Test(groups = {"integration"})
Expand Down Expand Up @@ -497,6 +541,12 @@ public void testArrayValues() throws Exception {
Assert.assertEquals(reader.getList("col1"), datasetRecord.get("col1"));
List<List<Long>> col2Values = reader.getList("col2");
Assert.assertEquals(col2Values, data.get(0).get("col2"));
List<BigInteger> col3Values = reader.getList("col3");
Assert.assertEquals(col3Values, data.get(0).get("col3"));
List<Boolean> col4Values = reader.getList("col4");
Assert.assertEquals(col4Values, data.get(0).get("col4"));
boolean[] col4Array = reader.getBooleanArray("col4");
Assert.assertEquals(col4Array, ((List)data.get(0).get("col4")).toArray());
}

private final static List<String> MAP_COLUMNS = Arrays.asList(
Expand Down Expand Up @@ -1292,15 +1342,19 @@ public void testQueryMetrics() throws Exception {
"col2 Int32",
"col3 String",
"col4 Int64",
"col5 String"
"col5 String",
"col6 Array(Bool)",
"col7 Array(Int32)"
);

private final static List<Function<String, Object>> DATASET_VALUE_GENERATORS = Arrays.asList(
c -> Long.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)),
c -> RANDOM.nextInt(Integer.MAX_VALUE),
c -> "value_" + RANDOM.nextInt(Integer.MAX_VALUE),
c -> Long.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)),
c -> "value_" + RANDOM.nextInt(Integer.MAX_VALUE)
c -> "value_" + RANDOM.nextInt(Integer.MAX_VALUE),
c -> RANDOM.ints(10, 0, 1).mapToObj(i -> i == 0).collect(Collectors.toList()),
c -> RANDOM.ints(10, 0, Integer.MAX_VALUE).boxed().collect(Collectors.toList())
);

private final static String DATASET_TABLE = "query_test_table";
Expand Down
Loading