Skip to content

Commit 14fc642

Browse files
igor-suhorukovkou
authored andcommitted
ARROW-17631: [Java] Propagate table/columns comments into Arrow Schema (apache#14081)
Allow user to provide comment in Arrow Schema from JdbcToArrowConfig . It will be very useful metadata in real life (medium to large scale project) for documentation and maintenance topics. Apache Spark code use "comment" key for such metadata, so this looks like reasonable default name for metadata in Arrow schema too Authored-by: igor.suhorukov <igor.suhorukov@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
1 parent 24224e0 commit 14fc642

File tree

7 files changed

+405
-4
lines changed

7 files changed

+405
-4
lines changed

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public final class JdbcToArrowConfig {
6060
private final Map<String, JdbcFieldInfo> arraySubTypesByColumnName;
6161
private final Map<Integer, JdbcFieldInfo> explicitTypesByColumnIndex;
6262
private final Map<String, JdbcFieldInfo> explicitTypesByColumnName;
63+
private final Map<String, String> schemaMetadata;
64+
private final Map<Integer, Map<String, String>> columnMetadataByColumnIndex;
6365
private final RoundingMode bigDecimalRoundingMode;
6466
/**
6567
* The maximum rowCount to read each time when partially convert data.
@@ -174,6 +176,8 @@ public final class JdbcToArrowConfig {
174176
jdbcToArrowTypeConverter,
175177
null,
176178
null,
179+
null,
180+
null,
177181
bigDecimalRoundingMode);
178182
}
179183

@@ -188,6 +192,8 @@ public final class JdbcToArrowConfig {
188192
Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter,
189193
Map<Integer, JdbcFieldInfo> explicitTypesByColumnIndex,
190194
Map<String, JdbcFieldInfo> explicitTypesByColumnName,
195+
Map<String, String> schemaMetadata,
196+
Map<Integer, Map<String, String>> columnMetadataByColumnIndex,
191197
RoundingMode bigDecimalRoundingMode) {
192198
Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
193199
this.allocator = allocator;
@@ -199,6 +205,8 @@ public final class JdbcToArrowConfig {
199205
this.targetBatchSize = targetBatchSize;
200206
this.explicitTypesByColumnIndex = explicitTypesByColumnIndex;
201207
this.explicitTypesByColumnName = explicitTypesByColumnName;
208+
this.schemaMetadata = schemaMetadata;
209+
this.columnMetadataByColumnIndex = columnMetadataByColumnIndex;
202210
this.bigDecimalRoundingMode = bigDecimalRoundingMode;
203211

204212
// set up type converter
@@ -312,6 +320,21 @@ public JdbcFieldInfo getExplicitTypeByColumnName(String name) {
312320
}
313321
}
314322

323+
/**
324+
* Return schema level metadata or null if not provided.
325+
*/
326+
public Map<String, String> getSchemaMetadata() {
327+
return schemaMetadata;
328+
}
329+
330+
/**
331+
* Return metadata from columnIndex->meta map on per field basis
332+
* or null if not provided.
333+
*/
334+
public Map<Integer, Map<String, String>> getColumnMetadataByColumnIndex() {
335+
return columnMetadataByColumnIndex;
336+
}
337+
315338
public RoundingMode getBigDecimalRoundingMode() {
316339
return bigDecimalRoundingMode;
317340
}

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class JdbcToArrowConfigBuilder {
4040
private Map<String, JdbcFieldInfo> arraySubTypesByColumnName;
4141
private Map<Integer, JdbcFieldInfo> explicitTypesByColumnIndex;
4242
private Map<String, JdbcFieldInfo> explicitTypesByColumnName;
43+
private Map<String, String> schemaMetadata;
44+
private Map<Integer, Map<String, String>> columnMetadataByColumnIndex;
4345
private int targetBatchSize;
4446
private Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter;
4547
private RoundingMode bigDecimalRoundingMode;
@@ -58,6 +60,8 @@ public JdbcToArrowConfigBuilder() {
5860
this.arraySubTypesByColumnName = null;
5961
this.explicitTypesByColumnIndex = null;
6062
this.explicitTypesByColumnName = null;
63+
this.schemaMetadata = null;
64+
this.columnMetadataByColumnIndex = null;
6165
this.bigDecimalRoundingMode = null;
6266
}
6367

@@ -226,6 +230,23 @@ public JdbcToArrowConfigBuilder setReuseVectorSchemaRoot(boolean reuseVectorSche
226230
return this;
227231
}
228232

233+
/**
234+
* Set metadata for schema.
235+
*/
236+
public JdbcToArrowConfigBuilder setSchemaMetadata(Map<String, String> schemaMetadata) {
237+
this.schemaMetadata = schemaMetadata;
238+
return this;
239+
}
240+
241+
/**
242+
* Set metadata from columnIndex->meta map on per field basis.
243+
*/
244+
public JdbcToArrowConfigBuilder setColumnMetadataByColumnIndex(
245+
Map<Integer, Map<String, String>> columnMetadataByColumnIndex) {
246+
this.columnMetadataByColumnIndex = columnMetadataByColumnIndex;
247+
return this;
248+
}
249+
229250
/**
230251
* Set the rounding mode used when the scale of the actual value does not match the declared scale.
231252
* <p>
@@ -255,6 +276,8 @@ public JdbcToArrowConfig build() {
255276
jdbcToArrowTypeConverter,
256277
explicitTypesByColumnIndex,
257278
explicitTypesByColumnName,
279+
schemaMetadata,
280+
columnMetadataByColumnIndex,
258281
bigDecimalRoundingMode);
259282
}
260283
}

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,24 @@ public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig
244244
for (int i = 1; i <= columnCount; i++) {
245245
final String columnName = rsmd.getColumnLabel(i);
246246

247+
final Map<String, String> columnMetadata = config.getColumnMetadataByColumnIndex() != null ?
248+
config.getColumnMetadataByColumnIndex().get(i) : null;
247249
final Map<String, String> metadata;
248250
if (config.shouldIncludeMetadata()) {
249251
metadata = new HashMap<>();
250252
metadata.put(Constants.SQL_CATALOG_NAME_KEY, rsmd.getCatalogName(i));
251253
metadata.put(Constants.SQL_TABLE_NAME_KEY, rsmd.getTableName(i));
252254
metadata.put(Constants.SQL_COLUMN_NAME_KEY, columnName);
253255
metadata.put(Constants.SQL_TYPE_KEY, rsmd.getColumnTypeName(i));
254-
256+
if (columnMetadata != null && !columnMetadata.isEmpty()) {
257+
metadata.putAll(columnMetadata);
258+
}
255259
} else {
256-
metadata = null;
260+
if (columnMetadata != null && !columnMetadata.isEmpty()) {
261+
metadata = columnMetadata;
262+
} else {
263+
metadata = null;
264+
}
257265
}
258266

259267
final JdbcFieldInfo columnFieldInfo = getJdbcFieldInfoForColumn(rsmd, i, config);
@@ -276,8 +284,7 @@ public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig
276284
fields.add(new Field(columnName, fieldType, children));
277285
}
278286
}
279-
280-
return new Schema(fields, null);
287+
return new Schema(fields, config.getSchemaMetadata());
281288
}
282289

283290
static JdbcFieldInfo getJdbcFieldInfoForColumn(
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.jdbc;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import java.nio.charset.StandardCharsets;
23+
import java.nio.file.Files;
24+
import java.nio.file.Paths;
25+
import java.sql.Connection;
26+
import java.sql.DatabaseMetaData;
27+
import java.sql.DriverManager;
28+
import java.sql.ResultSet;
29+
import java.sql.ResultSetMetaData;
30+
import java.sql.SQLException;
31+
import java.sql.Statement;
32+
import java.util.Collections;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
35+
import java.util.Map;
36+
import java.util.Objects;
37+
import java.util.Set;
38+
39+
import org.apache.arrow.memory.RootAllocator;
40+
import org.apache.arrow.vector.types.pojo.Schema;
41+
import org.apache.arrow.vector.util.ObjectMapperFactory;
42+
import org.junit.After;
43+
import org.junit.Before;
44+
import org.junit.Test;
45+
46+
import com.fasterxml.jackson.databind.ObjectWriter;
47+
48+
public class JdbcToArrowCommentMetadataTest {
49+
50+
private static final String COMMENT = "comment"; //use this metadata key for interoperability with Spark StructType
51+
private final ObjectWriter schemaSerializer = ObjectMapperFactory.newObjectMapper().writerWithDefaultPrettyPrinter();
52+
private Connection conn = null;
53+
54+
/**
55+
* This method creates Connection object and DB table and also populate data into table for test.
56+
*
57+
* @throws SQLException on error
58+
* @throws ClassNotFoundException on error
59+
*/
60+
@Before
61+
public void setUp() throws SQLException, ClassNotFoundException {
62+
String url = "jdbc:h2:mem:JdbcToArrowTest?characterEncoding=UTF-8;INIT=runscript from 'classpath:/h2/comment.sql'";
63+
String driver = "org.h2.Driver";
64+
Class.forName(driver);
65+
conn = DriverManager.getConnection(url);
66+
}
67+
68+
@After
69+
public void tearDown() throws SQLException {
70+
if (conn != null) {
71+
conn.close();
72+
conn = null;
73+
}
74+
}
75+
76+
@Test
77+
public void schemaComment() throws Exception {
78+
boolean includeMetadata = false;
79+
String schemaJson = schemaSerializer.writeValueAsString(getSchemaWithCommentFromQuery(includeMetadata));
80+
String expectedSchema = getExpectedSchema("/h2/expectedSchemaWithComments.json");
81+
assertThat(schemaJson).isEqualTo(expectedSchema);
82+
}
83+
84+
@Test
85+
public void schemaCommentWithDatabaseMetadata() throws Exception {
86+
boolean includeMetadata = true;
87+
String schemaJson = schemaSerializer.writeValueAsString(getSchemaWithCommentFromQuery(includeMetadata));
88+
String expectedSchema = getExpectedSchema("/h2/expectedSchemaWithCommentsAndJdbcMeta.json");
89+
/* corresponding Apache Spark DDL after conversion:
90+
ID BIGINT NOT NULL COMMENT 'Record identifier',
91+
NAME STRING COMMENT 'Name of record',
92+
COLUMN1 BOOLEAN,
93+
COLUMNN INT COMMENT 'Informative description of columnN'
94+
*/
95+
assertThat(schemaJson).isEqualTo(expectedSchema);
96+
}
97+
98+
private Schema getSchemaWithCommentFromQuery(boolean includeMetadata) throws SQLException {
99+
DatabaseMetaData metaData = conn.getMetaData();
100+
try (Statement statement = conn.createStatement()) {
101+
try (ResultSet resultSet = statement.executeQuery("select * from table1")) {
102+
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
103+
Map<Integer, Map<String, String>> columnCommentByColumnIndex = getColumnComments(metaData, resultSetMetaData);
104+
105+
String tableName = getTableNameFromResultSetMetaData(resultSetMetaData);
106+
String tableComment = getTableComment(metaData, tableName);
107+
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder()
108+
.setAllocator(new RootAllocator()).setSchemaMetadata(Collections.singletonMap(COMMENT, tableComment))
109+
.setColumnMetadataByColumnIndex(columnCommentByColumnIndex).setIncludeMetadata(includeMetadata).build();
110+
return JdbcToArrowUtils.jdbcToArrowSchema(resultSetMetaData, config);
111+
}
112+
}
113+
}
114+
115+
private String getTableNameFromResultSetMetaData(ResultSetMetaData resultSetMetaData) throws SQLException {
116+
Set<String> tablesFromQuery = new HashSet<>();
117+
for (int idx = 1, columnCount = resultSetMetaData.getColumnCount(); idx <= columnCount; idx++) {
118+
String tableName = resultSetMetaData.getTableName(idx);
119+
if (tableName != null && !tableName.isEmpty()) {
120+
tablesFromQuery.add(tableName);
121+
}
122+
}
123+
if (tablesFromQuery.size() == 1) {
124+
return tablesFromQuery.iterator().next();
125+
}
126+
throw new RuntimeException("Table metadata is absent or ambiguous");
127+
}
128+
129+
private Map<Integer, Map<String, String>> getColumnComments(DatabaseMetaData metaData,
130+
ResultSetMetaData resultSetMetaData) throws SQLException {
131+
Map<Integer, Map<String, String>> columnCommentByColumnIndex = new HashMap<>();
132+
for (int columnIdx = 1, columnCount = resultSetMetaData.getColumnCount(); columnIdx <= columnCount; columnIdx++) {
133+
String columnComment = getColumnComment(metaData, resultSetMetaData.getTableName(columnIdx),
134+
resultSetMetaData.getColumnName(columnIdx));
135+
if (columnComment != null && !columnComment.isEmpty()) {
136+
columnCommentByColumnIndex.put(columnIdx, Collections.singletonMap(COMMENT, columnComment));
137+
}
138+
}
139+
return columnCommentByColumnIndex;
140+
}
141+
142+
private String getTableComment(DatabaseMetaData metaData, String tableName) throws SQLException {
143+
if (tableName == null || tableName.isEmpty()) {
144+
return null;
145+
}
146+
String comment = null;
147+
int rowCount = 0;
148+
try (ResultSet tableMetadata = metaData.getTables("%", "%", tableName, null)) {
149+
if (tableMetadata.next()) {
150+
comment = tableMetadata.getString("REMARKS");
151+
rowCount++;
152+
}
153+
}
154+
if (rowCount == 1) {
155+
return comment;
156+
}
157+
if (rowCount > 1) {
158+
throw new RuntimeException("Multiple tables found for table name");
159+
}
160+
throw new RuntimeException("Table comment not found");
161+
}
162+
163+
private String getColumnComment(DatabaseMetaData metaData, String tableName, String columnName) throws SQLException {
164+
try (ResultSet tableMetadata = metaData.getColumns("%", "%", tableName, columnName)) {
165+
if (tableMetadata.next()) {
166+
return tableMetadata.getString("REMARKS");
167+
}
168+
}
169+
return null;
170+
}
171+
172+
private String getExpectedSchema(String expectedResource) throws java.io.IOException, java.net.URISyntaxException {
173+
return new String(Files.readAllBytes(Paths.get(Objects.requireNonNull(
174+
JdbcToArrowCommentMetadataTest.class.getResource(expectedResource)).toURI())), StandardCharsets.UTF_8);
175+
}
176+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
--Licensed to the Apache Software Foundation (ASF) under one or more contributor
2+
--license agreements. See the NOTICE file distributed with this work for additional
3+
--information regarding copyright ownership. The ASF licenses this file to
4+
--You under the Apache License, Version 2.0 (the "License"); you may not use
5+
--this file except in compliance with the License. You may obtain a copy of
6+
--the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
7+
--by applicable law or agreed to in writing, software distributed under the
8+
--License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
9+
--OF ANY KIND, either express or implied. See the License for the specific
10+
--language governing permissions and limitations under the License.
11+
create table table1(
12+
id bigint primary key,
13+
name varchar(255),
14+
column1 boolean,
15+
columnN int
16+
);
17+
18+
COMMENT ON TABLE table1 IS 'This is super special table with valuable data';
19+
COMMENT ON COLUMN table1.id IS 'Record identifier';
20+
COMMENT ON COLUMN table1.name IS 'Name of record';
21+
COMMENT ON COLUMN table1.columnN IS 'Informative description of columnN';
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"fields" : [ {
3+
"name" : "ID",
4+
"nullable" : false,
5+
"type" : {
6+
"name" : "int",
7+
"bitWidth" : 64,
8+
"isSigned" : true
9+
},
10+
"children" : [ ],
11+
"metadata" : [ {
12+
"value" : "Record identifier",
13+
"key" : "comment"
14+
} ]
15+
}, {
16+
"name" : "NAME",
17+
"nullable" : true,
18+
"type" : {
19+
"name" : "utf8"
20+
},
21+
"children" : [ ],
22+
"metadata" : [ {
23+
"value" : "Name of record",
24+
"key" : "comment"
25+
} ]
26+
}, {
27+
"name" : "COLUMN1",
28+
"nullable" : true,
29+
"type" : {
30+
"name" : "bool"
31+
},
32+
"children" : [ ]
33+
}, {
34+
"name" : "COLUMNN",
35+
"nullable" : true,
36+
"type" : {
37+
"name" : "int",
38+
"bitWidth" : 32,
39+
"isSigned" : true
40+
},
41+
"children" : [ ],
42+
"metadata" : [ {
43+
"value" : "Informative description of columnN",
44+
"key" : "comment"
45+
} ]
46+
} ],
47+
"metadata" : [ {
48+
"value" : "This is super special table with valuable data",
49+
"key" : "comment"
50+
} ]
51+
}

0 commit comments

Comments
 (0)