Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 10, 2024
1 parent 9c1c46e commit bacf47f
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;

import lombok.Getter;

import java.io.Serializable;
import java.util.List;

@Getter
public class LocalFileSourceConfig extends BaseFileSourceConfig {

Expand All @@ -48,77 +41,6 @@ public String getPluginName() {
}

public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
validateConfig(readonlyConfig);
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
this.localFileHadoopConf = new LocalFileHadoopConf();
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTable = parseCatalogTable(readonlyConfig);
}

private void validateConfig(ReadonlyConfig readonlyConfig) {
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_PATH + " is required"));
}
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
}
}

private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
}
}

private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
final CatalogTable catalogTable;
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
catalogTable =
CatalogTableUtil.buildWithConfig(
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
} else {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
if (CollectionUtils.isEmpty(filePaths)) {
return catalogTable;
}
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return CatalogTableUtil.newCatalogTable(
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
return CatalogTableUtil.newCatalogTable(
catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(
localFileHadoopConf, filePaths.get(0)));
default:
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
}
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveHadoopConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

Expand All @@ -44,8 +43,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final ReadonlyConfig readonlyConfig;

public HiveSinkAggregatedCommitter(
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
super(fileSystemUtils);
ReadonlyConfig readonlyConfig, Table table, HiveHadoopConfig hadoopConf) {
super(hadoopConf);
this.readonlyConfig = readonlyConfig;
this.dbName = table.getDbName();
this.tableName = table.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

public class BaseHiveOptions extends BaseSourceConfig {
public class BaseHiveOptions extends BaseSourceConfigOptions {

public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
Expand Down Expand Up @@ -71,31 +70,31 @@ public class HiveSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {
private final Table tableInformation;

// Since Table might contain some unserializable fields, we need to make it transient
// And use getTableInformation to get the Table object
private transient Table tableInformation;
private final CatalogTable catalogTable;
private final ReadonlyConfig readonlyConfig;
private final HiveHadoopConfig hiveHadoopConfig;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;

public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.readonlyConfig = readonlyConfig;
this.catalogTable = catalogTable;
this.tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, tableInformation);
this.fileSinkConfig =
generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable);
this.tableInformation = getTableInformation();
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig);
this.fileSinkConfig = generateFileSinkConfig(readonlyConfig, catalogTable);
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

private FileSinkConfig generateFileSinkConfig(
ReadonlyConfig readonlyConfig, Table tableInformation, CatalogTable catalogTable) {
ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
Table tableInformation = getTableInformation();
Config pluginConfig = readonlyConfig.toConfig();
List<String> sinkFields =
tableInformation.getSd().getCols().stream()
Expand Down Expand Up @@ -171,7 +170,8 @@ public String getPluginName() {
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(
new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils));
new HiveSinkAggregatedCommitter(
readonlyConfig, getTableInformation(), hiveHadoopConfig));
}

@Override
Expand Down Expand Up @@ -206,8 +206,8 @@ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}

private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table table) {
String hdfsLocation = tableInformation.getSd().getLocation();
private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig) {
String hdfsLocation = getTableInformation().getSd().getLocation();
HiveHadoopConfig hiveHadoopConfig;
try {
URI uri = new URI(hdfsLocation);
Expand Down Expand Up @@ -235,6 +235,16 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
readonlyConfig
.getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
.ifPresent(hiveHadoopConfig::setKerberosKeytabPath);
readonlyConfig
.getOptional(HiveSourceOptions.REMOTE_USER)
.ifPresent(hiveHadoopConfig::setRemoteUser);
return hiveHadoopConfig;
}

private Table getTableInformation() {
if (tableInformation == null) {
tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
}
return tableInformation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
Expand Down Expand Up @@ -80,7 +79,7 @@ private ReadonlyConfig generateCurrentReadonlyConfig(
tableName -> {
String replacedPath =
replaceCatalogTableInPath(tableName, catalogTable);
configMap.put(BaseSinkConfig.FILE_PATH.key(), replacedPath);
configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath);
});

return ReadonlyConfig.fromMap(new HashMap<>(configMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader;
Expand All @@ -50,7 +50,7 @@ public HiveSource(ReadonlyConfig readonlyConfig) {

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
return HiveConstants.CONNECTOR_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;

Expand All @@ -49,16 +48,17 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(HiveSourceOptions.TABLE_NAME)
.required(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_NAME)
.optional(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_CONFIGS)
.optional(HiveSourceOptions.HIVE_SITE_PATH)
.optional(
HdfsSourceConfig.HDFS_SITE_PATH,
HdfsSourceConfig.KERBEROS_PRINCIPAL,
HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.optional(BaseSourceConfig.READ_PARTITIONS)
.optional(BaseSourceConfig.READ_COLUMNS)
HdfsSourceConfigOptions.HDFS_SITE_PATH,
HdfsSourceConfigOptions.REMOTE_USER,
HdfsSourceConfigOptions.KERBEROS_PRINCIPAL,
HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.optional(HdfsSourceConfigOptions.READ_PARTITIONS)
.optional(HdfsSourceConfigOptions.READ_COLUMNS)
.build();
}

Expand Down
Loading

0 comments on commit bacf47f

Please sign in to comment.