From f65d86df5b8bcf250fce7715b4bc4c73f5748d0d Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 8 Apr 2024 10:38:42 +0800 Subject: [PATCH] Resolve conflicts --- .../source/config/LocalFileSourceConfig.java | 82 +------------------ .../commit/HiveSinkAggregatedCommitter.java | 15 ++-- .../hive/config/BaseHiveOptions.java | 4 +- .../seatunnel/hive/sink/HiveSink.java | 58 ++++++++----- .../seatunnel/hive/sink/HiveSinkFactory.java | 3 +- .../seatunnel/hive/source/HiveSource.java | 4 +- .../hive/source/HiveSourceFactory.java | 18 ++-- .../hive/source/config/HiveSourceConfig.java | 69 +++++++++++++--- .../hive/utils/HiveMetaStoreProxy.java | 24 ++---- .../hive/utils/HiveMetaStoreProxyUtils.java | 13 ++- .../seatunnel/hive/utils/HiveTableUtils.java | 8 +- .../utils/HiveMetaStoreProxyUtilsTest.java | 9 +- .../ConnectorSpecificationCheckTest.java | 1 + .../seatunnel-engine-examples/pom.xml | 50 +++++++++++ .../resources/examples/fake_to_console.conf | 25 ++++-- 15 files changed, 209 insertions(+), 174 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java index cfc2df5719d..ce20a69f91f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java @@ -18,20 +18,13 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.source.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf; import lombok.Getter; -import java.io.Serializable; -import java.util.List; - @Getter public class LocalFileSourceConfig extends BaseFileSourceConfig { @@ -48,77 +41,6 @@ public String getPluginName() { } public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) { - validateConfig(readonlyConfig); - this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE); - this.localFileHadoopConf = new LocalFileHadoopConf(); - this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf); - this.filePaths = parseFilePaths(readonlyConfig); - this.catalogTable = parseCatalogTable(readonlyConfig); - } - - private void validateConfig(ReadonlyConfig readonlyConfig) { - if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - FileSystemType.LOCAL.getFileSystemPluginName(), - PluginType.SOURCE, - LocalFileSourceOptions.FILE_PATH + " is required")); - } - if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - FileSystemType.LOCAL.getFileSystemPluginName(), - PluginType.SOURCE, - LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required")); - } - } - - private List parseFilePaths(ReadonlyConfig readonlyConfig) { - String rootPath = null; - try { - rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH); - return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath); - } catch (Exception ex) { - String errorMsg = String.format("Get file list from this path [%s] failed", rootPath); - throw new FileConnectorException( - FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex); - } - } - - private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) { - final CatalogTable catalogTable; - if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { - catalogTable = - CatalogTableUtil.buildWithConfig( - FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig); - } else { - catalogTable = CatalogTableUtil.buildSimpleTextTable(); - } - if (CollectionUtils.isEmpty(filePaths)) { - return catalogTable; - } - switch (fileFormat) { - case CSV: - case TEXT: - case JSON: - case EXCEL: - readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); - return CatalogTableUtil.newCatalogTable( - catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo()); - case ORC: - case PARQUET: - return CatalogTableUtil.newCatalogTable( - catalogTable, - readStrategy.getSeaTunnelRowTypeInfo( - localFileHadoopConf, filePaths.get(0))); - default: - throw new FileConnectorException( - FileConnectorErrorCode.FORMAT_NOT_SUPPORT, - "SeaTunnel does not supported this file format: [" + fileFormat + "]"); - } + super(readonlyConfig); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java index 1024474bcc0..58b3eb5f8eb 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java @@ -20,12 +20,10 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveHadoopConfig; import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions; import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; import lombok.extern.slf4j.Slf4j; @@ -44,11 +42,14 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter { private final ReadonlyConfig readonlyConfig; public HiveSinkAggregatedCommitter( - ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) { - super(fileSystemUtils); + ReadonlyConfig readonlyConfig, + String dbName, + String tableName, + HiveHadoopConfig hadoopConf) { + super(hadoopConf); this.readonlyConfig = readonlyConfig; - this.dbName = table.getDbName(); - this.tableName = table.getTableName(); + this.dbName = dbName; + this.tableName = tableName; this.abortDropPartitionMetadata = readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA); } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java index b25494b0035..efed4e91c58 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java @@ -19,9 +19,9 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; -public class BaseHiveOptions extends BaseSourceConfig { +public class BaseHiveOptions extends BaseSourceConfigOptions { public static final Option TABLE_NAME = Options.key("table_name") diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 49fbe1f1234..f7a2e5c06bf 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -36,7 +36,6 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter; @@ -71,31 +70,29 @@ public class HiveSink implements SeaTunnelSink< SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>, SupportMultiTableSink { - private final Table tableInformation; + + // Since Table might contain some unserializable fields, we need to make it transient + // And use getTableInformation to get the Table object + private transient Table tableInformation; private final CatalogTable catalogTable; private final ReadonlyConfig readonlyConfig; private final HiveHadoopConfig hiveHadoopConfig; - private final FileSystemUtils fileSystemUtils; private final FileSinkConfig fileSinkConfig; - private final WriteStrategy writeStrategy; + private transient WriteStrategy writeStrategy; private String jobId; public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { this.readonlyConfig = readonlyConfig; this.catalogTable = catalogTable; - this.tableInformation = HiveTableUtils.getTableInfo(readonlyConfig); - this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, tableInformation); - this.fileSinkConfig = - generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable); - this.writeStrategy = - WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); - this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig); - this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); - this.writeStrategy.setFileSystemUtils(fileSystemUtils); + this.tableInformation = getTableInformation(); + this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig); + this.fileSinkConfig = generateFileSinkConfig(readonlyConfig, catalogTable); + this.writeStrategy = getWriteStrategy(); } private FileSinkConfig generateFileSinkConfig( - ReadonlyConfig readonlyConfig, Table tableInformation, CatalogTable catalogTable) { + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + Table tableInformation = getTableInformation(); Config pluginConfig = readonlyConfig.toConfig(); List sinkFields = tableInformation.getSd().getCols().stream() @@ -171,7 +168,11 @@ public String getPluginName() { public Optional> createAggregatedCommitter() { return Optional.of( - new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils)); + new HiveSinkAggregatedCommitter( + readonlyConfig, + getTableInformation().getDbName(), + getTableInformation().getTableName(), + hiveHadoopConfig)); } @Override @@ -182,13 +183,13 @@ public void setJobContext(JobContext jobContext) { @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) { - return new HiveSinkWriter(writeStrategy, hiveHadoopConfig, context, jobId, states); + return new HiveSinkWriter(getWriteStrategy(), hiveHadoopConfig, context, jobId, states); } @Override public SinkWriter createWriter( SinkWriter.Context context) { - return new HiveSinkWriter(writeStrategy, hiveHadoopConfig, context, jobId); + return new HiveSinkWriter(getWriteStrategy(), hiveHadoopConfig, context, jobId); } @Override @@ -206,8 +207,8 @@ public Optional> getWriterStateSerializer() { return Optional.of(new DefaultSerializer<>()); } - private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table table) { - String hdfsLocation = tableInformation.getSd().getLocation(); + private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig) { + String hdfsLocation = getTableInformation().getSd().getLocation(); HiveHadoopConfig hiveHadoopConfig; try { URI uri = new URI(hdfsLocation); @@ -235,6 +236,25 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta readonlyConfig .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH) .ifPresent(hiveHadoopConfig::setKerberosKeytabPath); + readonlyConfig + .getOptional(HiveSourceOptions.REMOTE_USER) + .ifPresent(hiveHadoopConfig::setRemoteUser); return hiveHadoopConfig; } + + private Table getTableInformation() { + if (tableInformation == null) { + tableInformation = HiveTableUtils.getTableInfo(readonlyConfig); + } + return tableInformation; + } + + private WriteStrategy getWriteStrategy() { + if (writeStrategy == null) { + writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); + ; + writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + } + return writeStrategy; + } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java index 449b43567d5..f1b702f2bad 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java @@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; @@ -80,7 +79,7 @@ private ReadonlyConfig generateCurrentReadonlyConfig( tableName -> { String replacedPath = replaceCatalogTableInPath(tableName, catalogTable); - configMap.put(BaseSinkConfig.FILE_PATH.key(), replacedPath); + configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath); }); return ReadonlyConfig.fromMap(new HashMap<>(configMap)); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java index f1f5afd52b9..8db25232b03 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java @@ -26,7 +26,7 @@ import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig; import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig; import org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader; @@ -50,7 +50,7 @@ public HiveSource(ReadonlyConfig readonlyConfig) { @Override public String getPluginName() { - return FileSystemType.LOCAL.getFileSystemPluginName(); + return HiveConstants.CONNECTOR_NAME; } @Override diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java index 613c96a2031..83fedd502e2 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java @@ -24,8 +24,7 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; -import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; @@ -49,16 +48,17 @@ TableSource createSource(TableSourceFactoryContext context) { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HiveSourceOptions.TABLE_NAME) - .required(HiveSourceOptions.METASTORE_URI) + .optional(HiveSourceOptions.TABLE_NAME) + .optional(HiveSourceOptions.METASTORE_URI) .optional(HiveSourceOptions.TABLE_CONFIGS) .optional(HiveSourceOptions.HIVE_SITE_PATH) .optional( - HdfsSourceConfig.HDFS_SITE_PATH, - HdfsSourceConfig.KERBEROS_PRINCIPAL, - HdfsSourceConfig.KERBEROS_KEYTAB_PATH) - .optional(BaseSourceConfig.READ_PARTITIONS) - .optional(BaseSourceConfig.READ_COLUMNS) + HdfsSourceConfigOptions.HDFS_SITE_PATH, + HdfsSourceConfigOptions.REMOTE_USER, + HdfsSourceConfigOptions.KERBEROS_PRINCIPAL, + HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH) + .optional(HdfsSourceConfigOptions.READ_PARTITIONS) + .optional(HdfsSourceConfigOptions.READ_COLUMNS) .build(); } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java index 6643bfb15c0..c9355aaf3f1 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source.config; +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -28,11 +31,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; -import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; @@ -55,6 +57,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER; @Getter public class HiveSourceConfig implements Serializable { @@ -71,13 +78,13 @@ public class HiveSourceConfig implements Serializable { @SneakyThrows public HiveSourceConfig(ReadonlyConfig readonlyConfig) { readonlyConfig - .getOptional(BaseSourceConfig.READ_PARTITIONS) + .getOptional(HdfsSourceConfigOptions.READ_PARTITIONS) .ifPresent(this::validatePartitions); this.table = HiveTableUtils.getTableInfo(readonlyConfig); this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, table); this.fileFormat = HiveTableUtils.parseFileFormat(table); - this.readStrategy = parseReadStrategy(readonlyConfig, fileFormat, hiveHadoopConfig); - this.filePaths = parseFilePaths(table, hiveHadoopConfig, readStrategy); + this.readStrategy = parseReadStrategy(table, readonlyConfig, fileFormat, hiveHadoopConfig); + this.filePaths = parseFilePaths(table, readStrategy); this.catalogTable = parseCatalogTable( readonlyConfig, @@ -108,11 +115,45 @@ private void validatePartitions(List partitionsList) { } private ReadStrategy parseReadStrategy( + Table table, ReadonlyConfig readonlyConfig, FileFormat fileFormat, HiveHadoopConfig hiveHadoopConfig) { + ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name()); - readStrategy.setPluginConfig(readonlyConfig.toConfig()); + Config config = readonlyConfig.toConfig(); + + switch (fileFormat) { + case TEXT: + // if the file format is text, we set the delim. + Map parameters = table.getSd().getSerdeInfo().getParameters(); + config = + config.withValue( + FIELD_DELIMITER.key(), + ConfigValueFactory.fromAnyRef( + parameters.get("field.delim"))) + .withValue( + ROW_DELIMITER.key(), + ConfigValueFactory.fromAnyRef(parameters.get("line.delim"))) + .withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name())); + break; + case ORC: + config = + config.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.ORC.name())); + break; + case PARQUET: + config = + config.withValue( + FILE_FORMAT_TYPE.key(), + ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name())); + break; + default: + } + readStrategy.setPluginConfig(config); readStrategy.init(hiveHadoopConfig); return readStrategy; } @@ -125,22 +166,24 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta readonlyConfig.get(HiveSourceOptions.METASTORE_URI), readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH)); readonlyConfig - .getOptional(HdfsSourceConfig.HDFS_SITE_PATH) + .getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH) .ifPresent(hiveHadoopConfig::setHdfsSitePath); readonlyConfig - .getOptional(HdfsSourceConfig.KERBEROS_PRINCIPAL) + .getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL) .ifPresent(hiveHadoopConfig::setKerberosPrincipal); readonlyConfig - .getOptional(HdfsSourceConfig.KERBEROS_KEYTAB_PATH) + .getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH) .ifPresent(hiveHadoopConfig::setKerberosKeytabPath); + readonlyConfig + .getOptional(HdfsSourceConfigOptions.REMOTE_USER) + .ifPresent(hiveHadoopConfig::setRemoteUser); return hiveHadoopConfig; } - private List parseFilePaths( - Table table, HiveHadoopConfig hiveHadoopConfig, ReadStrategy readStrategy) { + private List parseFilePaths(Table table, ReadStrategy readStrategy) { String hdfsPath = parseHdfsPath(table); try { - return readStrategy.getFileNamesByPath(hiveHadoopConfig, hdfsPath); + return readStrategy.getFileNamesByPath(hdfsPath); } catch (Exception e) { String errorMsg = String.format("Get file list from this path [%s] failed", hdfsPath); throw new FileConnectorException( @@ -214,7 +257,7 @@ private CatalogTable parseCatalogTableFromRemotePath( CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table); try { SeaTunnelRowType seaTunnelRowTypeInfo = - readStrategy.getSeaTunnelRowTypeInfo(hiveHadoopConfig, filePaths.get(0)); + readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)); return CatalogTableUtil.newCatalogTable(catalogTable, seaTunnelRowTypeInfo); } catch (FileConnectorException e) { String errorMsg = diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index b75c9cc336e..b3c463d8042 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; @@ -51,38 +50,27 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI); HiveConf hiveConf = new HiveConf(); hiveConf.set("hive.metastore.uris", metastoreUri); - if (readonlyConfig.getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL).isPresent() - && readonlyConfig.getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH).isPresent()) { - String principal = readonlyConfig.get(HiveSourceOptions.KERBEROS_PRINCIPAL); - String keytabPath = readonlyConfig.get(HiveSourceOptions.KERBEROS_KEYTAB_PATH); - Configuration configuration = new Configuration(); - FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath); - } try { if (StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) { String hiveSitePath = readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH); hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); } - if (HiveMetaStoreProxyUtils.enableKerberos(config)) { + if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) { this.hiveMetaStoreClient = HadoopLoginFactory.loginWithKerberos( new Configuration(), - TypesafeConfigUtils.getConfig( - config, - BaseSourceConfigOptions.KRB5_PATH.key(), - BaseSourceConfigOptions.KRB5_PATH.defaultValue()), - config.getString(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()), - config.getString( - BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()), + readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH), + readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL), + readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH), (configuration, userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); return; } - if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) { + if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) { this.hiveMetaStoreClient = HadoopLoginFactory.loginWithRemoteUser( new Configuration(), - readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER.key()), + readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER), (configuration, userGroupInformation) -> new HiveMetaStoreClient(hiveConf)); return; diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java index fda221d886f..f1474f7694b 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import lombok.experimental.UtilityClass; @@ -26,11 +25,11 @@ @UtilityClass public class HiveMetaStoreProxyUtils { - public boolean enableKerberos(Config config) { + public boolean enableKerberos(ReadonlyConfig config) { boolean kerberosPrincipalEmpty = - config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()); + config.getOptional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent(); boolean kerberosKeytabPathEmpty = - config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()); + config.getOptional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent(); if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) { return true; } @@ -43,7 +42,7 @@ public boolean enableKerberos(Config config) { throw new IllegalArgumentException("Please set kerberosKeytabPath"); } - public boolean enableRemoteUser(Config config) { - return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key()); + public boolean enableRemoteUser(ReadonlyConfig config) { + return config.getOptional(BaseSourceConfigOptions.REMOTE_USER).isPresent(); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java index 7b727238a04..e4282db204b 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; @@ -32,13 +33,14 @@ public class HiveTableUtils { public static Table getTableInfo(ReadonlyConfig readonlyConfig) { String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME); - String[] splits = table.split("\\."); - if (splits.length != 2) { + TablePath tablePath = TablePath.of(table); + if (tablePath.getDatabaseName() == null || tablePath.getTableName() == null) { throw new SeaTunnelRuntimeException( HiveConnectorErrorCode.HIVE_TABLE_NAME_ERROR, "Current table name is " + table); } HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(readonlyConfig); - Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]); + Table tableInformation = + hiveMetaStoreProxy.getTable(tablePath.getDatabaseName(), tablePath.getTableName()); hiveMetaStoreProxy.close(); return tableInformation; } diff --git a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java index ad952112c44..eb0afe9d4ad 100644 --- a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java +++ b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + import org.junit.jupiter.api.Test; import lombok.SneakyThrows; @@ -35,7 +37,7 @@ class HiveMetaStoreProxyUtilsTest { @Test void enableKerberos() { - Config config = parseConfig("/hive_without_kerberos.conf"); + ReadonlyConfig config = parseConfig("/hive_without_kerberos.conf"); assertFalse(HiveMetaStoreProxyUtils.enableKerberos(config)); assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config)); @@ -48,9 +50,10 @@ void enableKerberos() { } @SneakyThrows - private Config parseConfig(String configFile) { + private ReadonlyConfig parseConfig(String configFile) { URL resource = HiveMetaStoreProxyUtilsTest.class.getResource(configFile); String filePath = Paths.get(resource.toURI()).toString(); - return ConfigFactory.parseFile(new File(filePath)); + Config config = ConfigFactory.parseFile(new File(filePath)); + return ReadonlyConfig.fromConfig(config); } } diff --git a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java index 5b46d812012..6d59ff27f56 100644 --- a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java +++ b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java @@ -64,6 +64,7 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot // hive-exec.jar. We need to check manually. List blockList = new ArrayList<>(); blockList.add("HiveSourceFactory"); + blockList.add("HiveSinkFactory"); for (TableSourceFactory factory : sourceFactories) { if (ReflectionUtils.getDeclaredMethod( diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 3256bdde885..1f22a56658c 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -67,5 +67,55 @@ connector-assert ${project.version} + + + org.apache.seatunnel + connector-hive + ${project.version} + + + + org.apache.hive + hive-exec + 2.3.9 + + + log4j + log4j + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-web + + + org.slf4j + slf4j-log4j12 + + + org.apache.parquet + parquet-hadoop-bundle + + + jdk.tools + jdk.tools + + + org.pentaho + pentaho-aggdesigner-algorithm + + + org.apache.avro + avro + + + diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf index ec7871359d4..b8aba38656c 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf @@ -25,15 +25,20 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { + Hive { result_table_name = "fake" - parallelism = 1 - schema = { - fields { - name = "string" - age = "int" - } - } + read_columns = [ + "column1", + "column2", + "column3" + ] + tables_configs = + [ + { + table_name = "default.my_csv_table" + metastore_uri = "thrift://172.20.123.148:9083" + } + ] } } @@ -41,7 +46,9 @@ transform { } sink { - console { + Hive { source_table_name="fake" + metastore_uri = "thrift://172.20.123.148:9083" + table_name = "default.my_csv_table" } } \ No newline at end of file