Skip to content

Commit 0c6e2ca

Browse files
authored
[fix][sql] Fix the decimal type error convert in json schema (#15687)
### Motivation In the current sql implementation, If using `JSON` schema and querying for decimal type, there will be the following two errors: 1. The data type is displayed as varchar. 2. Loss of precision because scientific notation is used to display. ``` presto> select bigdecimal, typeof(bigdecimal) as devimal_type from pulsar."public/default".test_avro2; bigdecimal | devimal_type -----------------------+-------------- 1.2345678912345678E36 | varchar 1.2345678912345678E36 | varchar (2 rows) ``` The original data is: `1234567891234567891234567891234567.89` ### Modifications - When getting jsonNode, use `BIG_DECIMAL` instead of float and double. - `PulsarJsonFieldDecoder` increases the processing of Decimal types
1 parent e2b264b commit 0c6e2ca

File tree

5 files changed

+68
-14
lines changed

5 files changed

+68
-14
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
package org.apache.pulsar.client.impl.schema.generic;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import com.fasterxml.jackson.databind.DeserializationFeature;
2223
import com.fasterxml.jackson.databind.JsonNode;
2324
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.fasterxml.jackson.databind.ObjectReader;
2426
import java.io.IOException;
2527
import java.io.InputStream;
2628
import java.util.List;
@@ -34,16 +36,13 @@
3436

3537
public class GenericJsonReader implements SchemaReader<GenericRecord> {
3638

37-
private final ObjectMapper objectMapper;
39+
private final ObjectReader objectReader;
3840
private final byte[] schemaVersion;
3941
private final List<Field> fields;
4042
private SchemaInfo schemaInfo;
4143

4244
public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
43-
this.fields = fields;
44-
this.schemaVersion = null;
45-
this.objectMapper = new ObjectMapper();
46-
this.schemaInfo = schemaInfo;
45+
this(null, fields, schemaInfo);
4746
}
4847

4948
public GenericJsonReader(List<Field> fields){
@@ -55,16 +54,17 @@ public GenericJsonReader(byte[] schemaVersion, List<Field> fields){
5554
}
5655

5756
public GenericJsonReader(byte[] schemaVersion, List<Field> fields, SchemaInfo schemaInfo){
58-
this.objectMapper = new ObjectMapper();
5957
this.fields = fields;
6058
this.schemaVersion = schemaVersion;
6159
this.schemaInfo = schemaInfo;
60+
ObjectMapper objectMapper = new ObjectMapper();
61+
this.objectReader = objectMapper.reader().with(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
6262
}
6363

6464
@Override
6565
public GenericJsonRecord read(byte[] bytes, int offset, int length) {
6666
try {
67-
JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8));
67+
JsonNode jn = objectReader.readTree(new String(bytes, offset, length, UTF_8));
6868
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
6969
} catch (IOException ioe) {
7070
throw new SchemaSerializationException(ioe);
@@ -74,7 +74,7 @@ public GenericJsonRecord read(byte[] bytes, int offset, int length) {
7474
@Override
7575
public GenericRecord read(InputStream inputStream) {
7676
try {
77-
JsonNode jn = objectMapper.readTree(inputStream);
77+
JsonNode jn = objectReader.readTree(inputStream);
7878
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
7979
} catch (IOException ioe) {
8080
throw new SchemaSerializationException(ioe);

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import io.prestosql.spi.type.BigintType;
5858
import io.prestosql.spi.type.BooleanType;
5959
import io.prestosql.spi.type.DateType;
60+
import io.prestosql.spi.type.DecimalType;
61+
import io.prestosql.spi.type.Decimals;
6062
import io.prestosql.spi.type.DoubleType;
6163
import io.prestosql.spi.type.IntegerType;
6264
import io.prestosql.spi.type.MapType;
@@ -69,6 +71,7 @@
6971
import io.prestosql.spi.type.Type;
7072
import io.prestosql.spi.type.VarbinaryType;
7173
import io.prestosql.spi.type.VarcharType;
74+
import java.math.BigInteger;
7275
import java.util.Iterator;
7376
import java.util.List;
7477
import java.util.Map;
@@ -118,6 +121,9 @@ private static Pair<Long, Long> getNumRangeByType(Type type) {
118121
}
119122

120123
private boolean isSupportedType(Type type) {
124+
if (type instanceof DecimalType) {
125+
return true;
126+
}
121127
if (isVarcharType(type)) {
122128
return true;
123129
}
@@ -226,6 +232,13 @@ public static long getLong(JsonNode value, Type type, String columnName, long mi
226232
return floatToIntBits((Float) parseFloat(value.asText()));
227233
}
228234

235+
// If it is decimalType, need to eliminate the decimal point,
236+
// and give it to presto to set the decimal point
237+
if (type instanceof DecimalType) {
238+
String decimalLong = value.asText().replace(".", "");
239+
return Long.valueOf(decimalLong);
240+
}
241+
229242
long longValue;
230243
if (value.isIntegralNumber() && !value.isBigInteger()) {
231244
longValue = value.longValue();
@@ -265,6 +278,15 @@ public static double getDouble(JsonNode value, Type type, String columnName) {
265278

266279
private static Slice getSlice(JsonNode value, Type type, String columnName) {
267280
String textValue = value.isValueNode() ? value.asText() : value.toString();
281+
282+
// If it is decimalType, need to eliminate the decimal point,
283+
// and give it to presto to set the decimal point
284+
if (type instanceof DecimalType) {
285+
textValue = textValue.replace(".", "");
286+
BigInteger bigInteger = new BigInteger(textValue);
287+
return Decimals.encodeUnscaledValue(bigInteger);
288+
}
289+
268290
Slice slice = utf8Slice(textValue);
269291
if (isVarcharType(type)) {
270292
slice = truncateToLength(slice, type);

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.prestosql.spi.type.ArrayType;
3434
import io.prestosql.spi.type.BigintType;
3535
import io.prestosql.spi.type.BooleanType;
36+
import io.prestosql.spi.type.DecimalType;
3637
import io.prestosql.spi.type.DoubleType;
3738
import io.prestosql.spi.type.IntegerType;
3839
import io.prestosql.spi.type.RealType;
@@ -128,11 +129,13 @@ private Type parseJsonPrestoType(String fieldname, Schema schema) {
128129
+ "please check the schema or report the bug.", fieldname));
129130
case FIXED:
130131
case BYTES:
131-
// In the current implementation, since JsonSchema is generated by Avro,
132-
// there may exist LogicalTypes.Decimal.
133-
// Mapping decimalType with varcharType in JsonSchema.
132+
// When the precision <= 0, throw Exception.
133+
// When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
134+
// When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
135+
// When the precision > 36, throw Exception.
134136
if (logicalType instanceof LogicalTypes.Decimal) {
135-
return createUnboundedVarcharType();
137+
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
138+
return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
136139
}
137140
return VarbinaryType.VARBINARY;
138141
case INT:

pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ public void testPrimitiveType() {
8989
message.longField = 222L;
9090
message.timestampField = System.currentTimeMillis();
9191
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
92-
message.decimalField = BigDecimal.valueOf(2233, 2);
93-
message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
9492

9593
LocalTime now = LocalTime.now(ZoneId.systemDefault());
9694
message.timeField = now.toSecondOfDay() * 1000;
@@ -130,6 +128,17 @@ public void testPrimitiveType() {
130128
PulsarColumnHandle enumFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
131129
"enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
132130
checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
131+
}
132+
133+
@Test
134+
public void testDecimal() {
135+
DecoderTestMessage message = new DecoderTestMessage();
136+
message.decimalField = BigDecimal.valueOf(2233, 2);
137+
message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
138+
139+
ByteBuf payload = io.netty.buffer.Unpooled
140+
.copiedBuffer(schema.encode(message));
141+
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
133142

134143
PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
135144
"decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);

pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.prestosql.decoder.FieldValueProvider;
2525
import io.prestosql.spi.PrestoException;
2626
import io.prestosql.spi.type.*;
27+
import java.math.BigDecimal;
2728
import org.apache.pulsar.client.impl.schema.JSONSchema;
2829
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
2930
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -119,6 +120,25 @@ public void testPrimitiveType() {
119120

120121
}
121122

123+
@Test
124+
public void testDecimal() {
125+
DecoderTestMessage message = new DecoderTestMessage();
126+
message.decimalField = BigDecimal.valueOf(2233, 2);
127+
message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
128+
129+
ByteBuf payload = io.netty.buffer.Unpooled
130+
.copiedBuffer(schema.encode(message));
131+
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
132+
133+
PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
134+
"decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
135+
checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
136+
137+
PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
138+
"longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
139+
checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
140+
}
141+
122142
@Test
123143
public void testArray() {
124144
DecoderTestMessage message = new DecoderTestMessage();

0 commit comments

Comments
 (0)