Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 10, 2024
1 parent 9c1c46e commit e8fe75f
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;

import lombok.Getter;

import java.io.Serializable;
import java.util.List;

@Getter
public class LocalFileSourceConfig extends BaseFileSourceConfig {

Expand All @@ -48,77 +41,6 @@ public String getPluginName() {
}

public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
validateConfig(readonlyConfig);
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
this.localFileHadoopConf = new LocalFileHadoopConf();
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTable = parseCatalogTable(readonlyConfig);
}

private void validateConfig(ReadonlyConfig readonlyConfig) {
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_PATH + " is required"));
}
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
}
}

private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
}
}

private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
final CatalogTable catalogTable;
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
catalogTable =
CatalogTableUtil.buildWithConfig(
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
} else {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
if (CollectionUtils.isEmpty(filePaths)) {
return catalogTable;
}
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return CatalogTableUtil.newCatalogTable(
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
return CatalogTableUtil.newCatalogTable(
catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(
localFileHadoopConf, filePaths.get(0)));
default:
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
}
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveHadoopConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

Expand All @@ -44,8 +43,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final ReadonlyConfig readonlyConfig;

public HiveSinkAggregatedCommitter(
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
super(fileSystemUtils);
ReadonlyConfig readonlyConfig, Table table, HiveHadoopConfig hadoopConf) {
super(hadoopConf);
this.readonlyConfig = readonlyConfig;
this.dbName = table.getDbName();
this.tableName = table.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

public class BaseHiveOptions extends BaseSourceConfig {
public class BaseHiveOptions extends BaseSourceConfigOptions {

public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
Expand Down Expand Up @@ -71,11 +70,10 @@ public class HiveSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {
private final Table tableInformation;
private final transient Table tableInformation;
private final CatalogTable catalogTable;
private final ReadonlyConfig readonlyConfig;
private final HiveHadoopConfig hiveHadoopConfig;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;
Expand All @@ -89,9 +87,7 @@ public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable);
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

private FileSinkConfig generateFileSinkConfig(
Expand Down Expand Up @@ -171,7 +167,8 @@ public String getPluginName() {
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(
new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils));
new HiveSinkAggregatedCommitter(
readonlyConfig, tableInformation, hiveHadoopConfig));
}

@Override
Expand Down Expand Up @@ -235,6 +232,9 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
readonlyConfig
.getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
.ifPresent(hiveHadoopConfig::setKerberosKeytabPath);
readonlyConfig
.getOptional(HiveSourceOptions.REMOTE_USER)
.ifPresent(hiveHadoopConfig::setRemoteUser);
return hiveHadoopConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
Expand Down Expand Up @@ -80,7 +79,7 @@ private ReadonlyConfig generateCurrentReadonlyConfig(
tableName -> {
String replacedPath =
replaceCatalogTableInPath(tableName, catalogTable);
configMap.put(BaseSinkConfig.FILE_PATH.key(), replacedPath);
configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath);
});

return ReadonlyConfig.fromMap(new HashMap<>(configMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader;
Expand All @@ -50,7 +50,7 @@ public HiveSource(ReadonlyConfig readonlyConfig) {

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
return HiveConstants.CONNECTOR_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;

Expand All @@ -49,16 +48,17 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(HiveSourceOptions.TABLE_NAME)
.required(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_NAME)
.optional(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_CONFIGS)
.optional(HiveSourceOptions.HIVE_SITE_PATH)
.optional(
HdfsSourceConfig.HDFS_SITE_PATH,
HdfsSourceConfig.KERBEROS_PRINCIPAL,
HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.optional(BaseSourceConfig.READ_PARTITIONS)
.optional(BaseSourceConfig.READ_COLUMNS)
HdfsSourceConfigOptions.HDFS_SITE_PATH,
HdfsSourceConfigOptions.REMOTE_USER,
HdfsSourceConfigOptions.KERBEROS_PRINCIPAL,
HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.optional(HdfsSourceConfigOptions.READ_PARTITIONS)
.optional(HdfsSourceConfigOptions.READ_COLUMNS)
.build();
}

Expand Down
Loading

0 comments on commit e8fe75f

Please sign in to comment.