Skip to content

Commit

Permalink
Adding read support for json & jsonb datatypes in postgresql
Browse files Browse the repository at this point in the history
Cherry pick of trinodb/trino@653ee44

Co-authored-by: guyco33 <guyc@gettaxi.com>
  • Loading branch information
2 people authored and rongrong committed Mar 30, 2022
1 parent 3f60a5f commit 0b67486
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
while (resultSet.next()) {
JdbcTypeHandle typeHandle = new JdbcTypeHandle(
resultSet.getInt("DATA_TYPE"),
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.plugin.jdbc;

import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
Expand Down Expand Up @@ -67,6 +68,7 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
binder -> {
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public final class JdbcTypeHandle
{
private final int jdbcType;
private final String jdbcTypeName;
private final int columnSize;
private final int decimalDigits;

@JsonCreator
public JdbcTypeHandle(
@JsonProperty("jdbcType") int jdbcType,
@JsonProperty("jdbcTypeName") String jdbcTypeName,
@JsonProperty("columnSize") int columnSize,
@JsonProperty("decimalDigits") int decimalDigits)
{
this.jdbcType = jdbcType;
this.jdbcTypeName = requireNonNull(jdbcTypeName, "jdbcTypeName is null");
this.columnSize = columnSize;
this.decimalDigits = decimalDigits;
}
Expand All @@ -43,6 +47,12 @@ public int getJdbcType()
return jdbcType;
}

@JsonProperty
public String getJdbcTypeName()
{
return jdbcTypeName;
}

@JsonProperty
public int getColumnSize()
{
Expand All @@ -58,7 +68,7 @@ public int getDecimalDigits()
@Override
public int hashCode()
{
return Objects.hash(jdbcType, columnSize, decimalDigits);
return Objects.hash(jdbcType, jdbcTypeName, columnSize, decimalDigits);
}

@Override
Expand All @@ -73,14 +83,16 @@ public boolean equals(Object o)
JdbcTypeHandle that = (JdbcTypeHandle) o;
return jdbcType == that.jdbcType &&
columnSize == that.columnSize &&
decimalDigits == that.decimalDigits;
decimalDigits == that.decimalDigits &&
Objects.equals(jdbcTypeName, that.jdbcTypeName);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("jdbcType", jdbcType)
.add("jdbcTypeName", jdbcTypeName)
.add("columnSize", columnSize)
.add("decimalDigits", decimalDigits)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ public final class TestingJdbcTypeHandle
{
private TestingJdbcTypeHandle() {}

public static final JdbcTypeHandle JDBC_BOOLEAN = new JdbcTypeHandle(Types.BOOLEAN, 1, 0);
public static final JdbcTypeHandle JDBC_BOOLEAN = new JdbcTypeHandle(Types.BOOLEAN, "boolean", 1, 0);

public static final JdbcTypeHandle JDBC_SMALLINT = new JdbcTypeHandle(Types.SMALLINT, 1, 0);
public static final JdbcTypeHandle JDBC_TINYINT = new JdbcTypeHandle(Types.TINYINT, 2, 0);
public static final JdbcTypeHandle JDBC_INTEGER = new JdbcTypeHandle(Types.INTEGER, 4, 0);
public static final JdbcTypeHandle JDBC_BIGINT = new JdbcTypeHandle(Types.BIGINT, 8, 0);
public static final JdbcTypeHandle JDBC_SMALLINT = new JdbcTypeHandle(Types.SMALLINT, "smallint", 1, 0);
public static final JdbcTypeHandle JDBC_TINYINT = new JdbcTypeHandle(Types.TINYINT, "tinyint", 2, 0);
public static final JdbcTypeHandle JDBC_INTEGER = new JdbcTypeHandle(Types.INTEGER, "integer", 4, 0);
public static final JdbcTypeHandle JDBC_BIGINT = new JdbcTypeHandle(Types.BIGINT, "bigint", 8, 0);

public static final JdbcTypeHandle JDBC_REAL = new JdbcTypeHandle(Types.REAL, 8, 0);
public static final JdbcTypeHandle JDBC_DOUBLE = new JdbcTypeHandle(Types.DOUBLE, 8, 0);
public static final JdbcTypeHandle JDBC_REAL = new JdbcTypeHandle(Types.REAL, "real", 8, 0);
public static final JdbcTypeHandle JDBC_DOUBLE = new JdbcTypeHandle(Types.DOUBLE, "double precision", 8, 0);

public static final JdbcTypeHandle JDBC_CHAR = new JdbcTypeHandle(Types.CHAR, 10, 0);
public static final JdbcTypeHandle JDBC_VARCHAR = new JdbcTypeHandle(Types.VARCHAR, 10, 0);
public static final JdbcTypeHandle JDBC_CHAR = new JdbcTypeHandle(Types.CHAR, "char", 10, 0);
public static final JdbcTypeHandle JDBC_VARCHAR = new JdbcTypeHandle(Types.VARCHAR, "varchar", 10, 0);

public static final JdbcTypeHandle JDBC_DATE = new JdbcTypeHandle(Types.DATE, 8, 0);
public static final JdbcTypeHandle JDBC_TIME = new JdbcTypeHandle(Types.TIME, 4, 0);
public static final JdbcTypeHandle JDBC_TIMESTAMP = new JdbcTypeHandle(Types.TIMESTAMP, 8, 0);
public static final JdbcTypeHandle JDBC_DATE = new JdbcTypeHandle(Types.DATE, "date", 8, 0);
public static final JdbcTypeHandle JDBC_TIME = new JdbcTypeHandle(Types.TIME, "time", 4, 0);
public static final JdbcTypeHandle JDBC_TIMESTAMP = new JdbcTypeHandle(Types.TIMESTAMP, "timestamp", 8, 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ private static VariableReferenceExpression newVariable(String name, Type type)

private static JdbcColumnHandle integerJdbcColumnHandle(String name)
{
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BIGINT, 10, 0), BIGINT, false, Optional.empty());
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BIGINT, "integer", 10, 0), BIGINT, false, Optional.empty());
}

private static JdbcColumnHandle booleanJdbcColumnHandle(String name)
{
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BOOLEAN, 1, 0), BOOLEAN, false, Optional.empty());
return new JdbcColumnHandle(CONNECTOR_ID, name, new JdbcTypeHandle(Types.BOOLEAN, "boolean", 1, 0), BOOLEAN, false, Optional.empty());
}

private static JdbcColumnHandle getColumnHandleForVariable(String name, Type type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.StandardTypes.ARRAY;
Expand Down Expand Up @@ -270,6 +271,9 @@ else if (DOUBLE.equals(type)) {
else if (BOOLEAN.equals(type)) {
type.writeBoolean(blockBuilder, (Boolean) value);
}
else if (JSON.equals(type)) {
type.writeSlice(blockBuilder, Slices.utf8Slice((String) value));
}
else if (type instanceof VarcharType) {
type.writeSlice(blockBuilder, Slices.utf8Slice((String) value));
}
Expand Down
21 changes: 15 additions & 6 deletions presto-postgresql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down Expand Up @@ -109,12 +124,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,36 @@
*/
package com.facebook.presto.plugin.postgresql;

import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.DriverConnectionFactory;
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.plugin.jdbc.JdbcIdentity;
import com.facebook.presto.plugin.jdbc.JdbcTypeHandle;
import com.facebook.presto.plugin.jdbc.ReadMapping;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonFactoryBuilder;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import org.postgresql.Driver;

import javax.inject.Inject;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -37,17 +53,27 @@
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

public class PostgreSqlClient
extends BaseJdbcClient
{
protected final Type jsonType;
private static final String DUPLICATE_TABLE_SQLSTATE = "42P07";

private static final JsonFactory JSON_FACTORY = new JsonFactoryBuilder().configure(CANONICALIZE_FIELD_NAMES, false).build();
private static final ObjectMapper SORTED_MAPPER = new JsonObjectMapperProvider().get().configure(ORDER_MAP_ENTRIES_BY_KEYS, true);

@Inject
public PostgreSqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config)
public PostgreSqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, TypeManager typeManager)
{
super(connectorId, config, "\"", new DriverConnectionFactory(new Driver(), config));
this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
}

@Override
Expand Down Expand Up @@ -83,6 +109,15 @@ protected String toSqlType(Type type)
return super.toSqlType(type);
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
if (typeHandle.getJdbcTypeName().equals("jsonb") || typeHandle.getJdbcTypeName().equals("json")) {
return Optional.of(jsonColumnMapping());
}
return super.toPrestoType(session, typeHandle);
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down Expand Up @@ -112,4 +147,35 @@ protected void renameTable(JdbcIdentity identity, String catalogName, SchemaTabl
throw new PrestoException(JDBC_ERROR, e);
}
}

private ReadMapping jsonColumnMapping()
{
return ReadMapping.sliceReadMapping(
jsonType,
(resultSet, columnIndex) -> jsonParse(utf8Slice(resultSet.getString(columnIndex))));
}

public static Slice jsonParse(Slice slice)
{
try (JsonParser parser = createJsonParser(JSON_FACTORY, slice)) {
byte[] in = slice.getBytes();
SliceOutput dynamicSliceOutput = new DynamicSliceOutput(in.length);
SORTED_MAPPER.writeValue((OutputStream) dynamicSliceOutput, SORTED_MAPPER.readValue(parser, Object.class));
// nextToken() returns null if the input is parsed correctly,
// but will throw an exception if there are trailing characters.
parser.nextToken();
return dynamicSliceOutput.slice();
}
catch (Exception e) {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, format("Cannot convert '%s' to JSON", slice.toStringUtf8()));
}
}

public static JsonParser createJsonParser(JsonFactory factory, Slice json)
throws IOException
{
// Jackson tries to detect the character encoding automatically when using InputStream
// so we pass an InputStreamReader instead.
return factory.createParser(new InputStreamReader(json.getInput(), UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import java.time.ZoneId;
import java.util.function.Function;

import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.plugin.postgresql.PostgreSqlQueryRunner.createPostgreSqlQueryRunner;
import static com.facebook.presto.tests.datatype.DataType.bigintDataType;
import static com.facebook.presto.tests.datatype.DataType.booleanDataType;
import static com.facebook.presto.tests.datatype.DataType.dataType;
import static com.facebook.presto.tests.datatype.DataType.dateDataType;
import static com.facebook.presto.tests.datatype.DataType.decimalDataType;
import static com.facebook.presto.tests.datatype.DataType.doubleDataType;
Expand All @@ -55,6 +57,7 @@
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_16LE;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.function.Function.identity;

@Test
public class TestPostgreSqlTypeMapping
Expand Down Expand Up @@ -274,13 +277,65 @@ public void testTimestamp()
// TODO timestamp is not correctly read (see comment in StandardReadMappings.timestampReadMapping), but testing this is hard because of #7122
}

@Test
public void testJson()
{
jsonTestCases(jsonDataType())
.execute(getQueryRunner(), postgresCreateAndInsert("tpch.postgresql_test_json"));
}

@Test
public void testJsonb()
{
jsonTestCases(jsonbDataType())
.execute(getQueryRunner(), postgresCreateAndInsert("tpch.postgresql_test_jsonb"));
}

private DataTypeTest jsonTestCases(DataType<String> jsonDataType)
{
return DataTypeTest.create()
.addRoundTrip(jsonDataType, "{}")
.addRoundTrip(jsonDataType, null)
.addRoundTrip(jsonDataType, "null")
.addRoundTrip(jsonDataType, "123.4")
.addRoundTrip(jsonDataType, "\"abc\"")
.addRoundTrip(jsonDataType, "\"text with \\\" quotations and ' apostrophes\"")
.addRoundTrip(jsonDataType, "\"\"")
.addRoundTrip(jsonDataType, "{\"a\":1,\"b\":2}")
.addRoundTrip(jsonDataType, "{\"a\":[1,2,3],\"b\":{\"aa\":11,\"bb\":[{\"a\":1,\"b\":2},{\"a\":0}]}}")
.addRoundTrip(jsonDataType, "[]");
}

private static DataType<String> jsonDataType()
{
return dataType(
"json",
JSON,
value -> "JSON " + formatStringLiteral(value),
identity());
}

public static DataType<String> jsonbDataType()
{
return dataType(
"jsonb",
JSON,
value -> "JSON " + formatStringLiteral(value),
identity());
}

public static String formatStringLiteral(String value)
{
return "'" + value.replace("'", "''") + "'";
}

private void testUnsupportedDataType(String databaseDataType)
{
JdbcSqlExecutor jdbcSqlExecutor = new JdbcSqlExecutor(postgreSqlServer.getJdbcUrl());
jdbcSqlExecutor.execute(format("CREATE TABLE tpch.test_unsupported_data_type(key varchar(5), unsupported_column %s)", databaseDataType));
try {
assertQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'tpch' AND TABLE_NAME = 'test_unsupported_data_type'",
"SELECT column_name FROM information_schema.columns WHERE table_schema = 'tpch' AND table_name = 'test_unsupported_data_type'",
"VALUES 'key'"); // no 'unsupported_column'
}
finally {
Expand Down
Loading

0 comments on commit 0b67486

Please sign in to comment.