Skip to content

Commit

Permalink
improve HiveSerDe
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Mar 11, 2024
1 parent ed322ce commit a160915
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -29,6 +30,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.base.Splitter;
Expand Down Expand Up @@ -96,7 +98,11 @@ public List<String> fieldComments() {
/** Extract {@link HiveSchema} from Hive serde properties. */
public static HiveSchema extract(@Nullable Configuration configuration, Properties properties) {
String location = LocationKeyExtractor.getPaimonLocation(configuration, properties);
Optional<TableSchema> tableSchema = getExistingSchema(configuration, location);
Optional<TableSchema> tableSchema = getTableSchemaFromCache(properties);
tableSchema =
tableSchema.isPresent()
? tableSchema
: HiveSchema.getExistingSchema(configuration, location);
String columnProperty = properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);

// Create hive external table with empty ddl
Expand Down Expand Up @@ -192,7 +198,16 @@ public static HiveSchema extract(@Nullable Configuration configuration, Properti
return new HiveSchema(builder.build());
}

private static Optional<TableSchema> getExistingSchema(
@VisibleForTesting
static Optional<TableSchema> getTableSchemaFromCache(Properties properties) {
String paimonSchemaStr = properties.getProperty(PaimonStorageHandler.PAIMON_SCHEMA);
if (paimonSchemaStr != null) {
return Optional.of(JsonSerdeUtil.fromJson(paimonSchemaStr, TableSchema.class));
}
return Optional.empty();
}

public static Optional<TableSchema> getExistingSchema(
@Nullable Configuration configuration, @Nullable String location) {
if (location == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import org.apache.paimon.schema.TableSchema;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.mapred.OutputFormat;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/** {@link HiveStorageHandler} for paimon. This is the entrance class of Hive API. */
Expand All @@ -46,6 +48,8 @@ public class PaimonStorageHandler implements HiveStoragePredicateHandler, HiveSt
private static final String MAPRED_OUTPUT_COMMITTER = "mapred.output.committer.class";
private static final String PAIMON_WRITE = "paimon.write";

public static final String PAIMON_SCHEMA = "paimon.schema";

private Configuration conf;

@Override
Expand Down Expand Up @@ -76,9 +80,16 @@ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
Properties properties = tableDesc.getProperties();
map.put(
LocationKeyExtractor.INTERNAL_LOCATION,
LocationKeyExtractor.getPaimonLocation(conf, properties));
String paimonLocation = LocationKeyExtractor.getPaimonLocation(conf, properties);
map.put(LocationKeyExtractor.INTERNAL_LOCATION, paimonLocation);

// cache the schema into table properties
Optional<TableSchema> existingSchema = HiveSchema.getExistingSchema(null, paimonLocation);
if (existingSchema.isPresent()) {
String tableSchema = existingSchema.get().toString();
tableDesc.getProperties().put(PAIMON_SCHEMA, tableSchema);
map.put(PAIMON_SCHEMA, tableSchema);
}
}

public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand All @@ -33,8 +34,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;

import static org.apache.paimon.hive.PaimonStorageHandler.PAIMON_SCHEMA;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -342,4 +345,21 @@ private Properties createTableWithExistsDDL() {
properties.setProperty("location", tempDir.toString());
return properties;
}

@Test
public void testReadSchemaFromProperties() throws Exception {
createSchema();
// cache the TableSchema to properties
Optional<TableSchema> existingSchema =
HiveSchema.getExistingSchema(null, tempDir.toString());
assertThat(existingSchema).isPresent();
Properties properties = new Properties();
TableSchema tableSchema = existingSchema.get();
properties.put(PAIMON_SCHEMA, tableSchema.toString());

// get the TableSchema from properties
Optional<TableSchema> tableSchemaFromCache = HiveSchema.getTableSchemaFromCache(properties);
assertThat(tableSchemaFromCache).isPresent();
assertThat(tableSchemaFromCache.get()).isEqualTo(tableSchema);
}
}

0 comments on commit a160915

Please sign in to comment.