Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 9, 2024
1 parent 9c1c46e commit 650e196
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;

import lombok.Getter;

import java.io.Serializable;
import java.util.List;

@Getter
public class LocalFileSourceConfig extends BaseFileSourceConfig {

Expand All @@ -48,77 +41,6 @@ public String getPluginName() {
}

public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
validateConfig(readonlyConfig);
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
this.localFileHadoopConf = new LocalFileHadoopConf();
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTable = parseCatalogTable(readonlyConfig);
}

private void validateConfig(ReadonlyConfig readonlyConfig) {
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_PATH + " is required"));
}
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
}
}

private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
}
}

private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
final CatalogTable catalogTable;
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
catalogTable =
CatalogTableUtil.buildWithConfig(
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
} else {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
if (CollectionUtils.isEmpty(filePaths)) {
return catalogTable;
}
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return CatalogTableUtil.newCatalogTable(
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
return CatalogTableUtil.newCatalogTable(
catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(
localFileHadoopConf, filePaths.get(0)));
default:
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
}
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.hive.commit;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

Expand All @@ -44,8 +43,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final ReadonlyConfig readonlyConfig;

public HiveSinkAggregatedCommitter(
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
super(fileSystemUtils);
ReadonlyConfig readonlyConfig, Table table, HadoopConf hadoopConf) {
super(hadoopConf);
this.readonlyConfig = readonlyConfig;
this.dbName = table.getDbName();
this.tableName = table.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

public class BaseHiveOptions extends BaseSourceConfig {
public class BaseHiveOptions extends BaseSourceConfigOptions {

public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
Expand Down Expand Up @@ -75,7 +74,6 @@ public class HiveSink
private final CatalogTable catalogTable;
private final ReadonlyConfig readonlyConfig;
private final HiveHadoopConfig hiveHadoopConfig;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;
Expand All @@ -89,9 +87,7 @@ public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable);
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

private FileSinkConfig generateFileSinkConfig(
Expand Down Expand Up @@ -171,7 +167,8 @@ public String getPluginName() {
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(
new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils));
new HiveSinkAggregatedCommitter(
readonlyConfig, tableInformation, hiveHadoopConfig));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
Expand Down Expand Up @@ -80,7 +79,7 @@ private ReadonlyConfig generateCurrentReadonlyConfig(
tableName -> {
String replacedPath =
replaceCatalogTableInPath(tableName, catalogTable);
configMap.put(BaseSinkConfig.FILE_PATH.key(), replacedPath);
configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath);
});

return ReadonlyConfig.fromMap(new HashMap<>(configMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;

Expand All @@ -49,16 +48,17 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(HiveSourceOptions.TABLE_NAME)
.required(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_NAME)
.optional(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_CONFIGS)
.optional(HiveSourceOptions.HIVE_SITE_PATH)
.optional(
HdfsSourceConfig.HDFS_SITE_PATH,
HdfsSourceConfig.KERBEROS_PRINCIPAL,
HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.optional(BaseSourceConfig.READ_PARTITIONS)
.optional(BaseSourceConfig.READ_COLUMNS)
HdfsSourceConfigOptions.HDFS_SITE_PATH,
HdfsSourceConfigOptions.REMOTE_USER,
HdfsSourceConfigOptions.KERBEROS_PRINCIPAL,
HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.optional(HdfsSourceConfigOptions.READ_PARTITIONS)
.optional(HdfsSourceConfigOptions.READ_COLUMNS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.hive.source.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -28,11 +31,10 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
Expand All @@ -55,6 +57,11 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;

@Getter
public class HiveSourceConfig implements Serializable {
Expand All @@ -71,13 +78,13 @@ public class HiveSourceConfig implements Serializable {
@SneakyThrows
public HiveSourceConfig(ReadonlyConfig readonlyConfig) {
readonlyConfig
.getOptional(BaseSourceConfig.READ_PARTITIONS)
.getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
.ifPresent(this::validatePartitions);
this.table = HiveTableUtils.getTableInfo(readonlyConfig);
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, table);
this.fileFormat = HiveTableUtils.parseFileFormat(table);
this.readStrategy = parseReadStrategy(readonlyConfig, fileFormat, hiveHadoopConfig);
this.filePaths = parseFilePaths(table, hiveHadoopConfig, readStrategy);
this.readStrategy = parseReadStrategy(table, readonlyConfig, fileFormat, hiveHadoopConfig);
this.filePaths = parseFilePaths(table, readStrategy);
this.catalogTable =
parseCatalogTable(
readonlyConfig,
Expand Down Expand Up @@ -108,11 +115,45 @@ private void validatePartitions(List<String> partitionsList) {
}

private ReadStrategy parseReadStrategy(
Table table,
ReadonlyConfig readonlyConfig,
FileFormat fileFormat,
HiveHadoopConfig hiveHadoopConfig) {

ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name());
readStrategy.setPluginConfig(readonlyConfig.toConfig());
Config config = readonlyConfig.toConfig();

switch (fileFormat) {
case TEXT:
// if the file format is text, we set the delim.
Map<String, String> parameters = table.getSd().getSerdeInfo().getParameters();
config =
config.withValue(
FIELD_DELIMITER.key(),
ConfigValueFactory.fromAnyRef(
parameters.get("field.delim")))
.withValue(
ROW_DELIMITER.key(),
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name()));
break;
case ORC:
config =
config.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.name()));
break;
case PARQUET:
config =
config.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name()));
break;
default:
}
readStrategy.setPluginConfig(config);
readStrategy.init(hiveHadoopConfig);
return readStrategy;
}
Expand All @@ -125,22 +166,24 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
readonlyConfig.get(HiveSourceOptions.METASTORE_URI),
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
readonlyConfig
.getOptional(HdfsSourceConfig.HDFS_SITE_PATH)
.getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH)
.ifPresent(hiveHadoopConfig::setHdfsSitePath);
readonlyConfig
.getOptional(HdfsSourceConfig.KERBEROS_PRINCIPAL)
.getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL)
.ifPresent(hiveHadoopConfig::setKerberosPrincipal);
readonlyConfig
.getOptional(HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.ifPresent(hiveHadoopConfig::setKerberosKeytabPath);
readonlyConfig
.getOptional(HdfsSourceConfigOptions.REMOTE_USER)
.ifPresent(hiveHadoopConfig::setRemoteUser);
return hiveHadoopConfig;
}

private List<String> parseFilePaths(
Table table, HiveHadoopConfig hiveHadoopConfig, ReadStrategy readStrategy) {
private List<String> parseFilePaths(Table table, ReadStrategy readStrategy) {
String hdfsPath = parseHdfsPath(table);
try {
return readStrategy.getFileNamesByPath(hiveHadoopConfig, hdfsPath);
return readStrategy.getFileNamesByPath(hdfsPath);
} catch (Exception e) {
String errorMsg = String.format("Get file list from this path [%s] failed", hdfsPath);
throw new FileConnectorException(
Expand Down Expand Up @@ -214,7 +257,7 @@ private CatalogTable parseCatalogTableFromRemotePath(
CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table);
try {
SeaTunnelRowType seaTunnelRowTypeInfo =
readStrategy.getSeaTunnelRowTypeInfo(hiveHadoopConfig, filePaths.get(0));
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
return CatalogTableUtil.newCatalogTable(catalogTable, seaTunnelRowTypeInfo);
} catch (FileConnectorException e) {
String errorMsg =
Expand Down
Loading

0 comments on commit 650e196

Please sign in to comment.