Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Apr 8, 2024
1 parent 9c1c46e commit 24da46a
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;

import lombok.Getter;

import java.io.Serializable;
import java.util.List;

@Getter
public class LocalFileSourceConfig extends BaseFileSourceConfig {

Expand All @@ -48,77 +41,6 @@ public String getPluginName() {
}

public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
validateConfig(readonlyConfig);
this.fileFormat = readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
this.localFileHadoopConf = new LocalFileHadoopConf();
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, localFileHadoopConf);
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTable = parseCatalogTable(readonlyConfig);
}

private void validateConfig(ReadonlyConfig readonlyConfig) {
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_PATH + " is required"));
}
if (!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
FileSystemType.LOCAL.getFileSystemPluginName(),
PluginType.SOURCE,
LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " is required"));
}
}

private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(localFileHadoopConf, rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
}
}

private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
final CatalogTable catalogTable;
if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
catalogTable =
CatalogTableUtil.buildWithConfig(
FileSystemType.LOCAL.getFileSystemPluginName(), readonlyConfig);
} else {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
if (CollectionUtils.isEmpty(filePaths)) {
return catalogTable;
}
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
return CatalogTableUtil.newCatalogTable(
catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
return CatalogTableUtil.newCatalogTable(
catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(
localFileHadoopConf, filePaths.get(0)));
default:
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"SeaTunnel does not supported this file format: [" + fileFormat + "]");
}
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.hive.commit;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;

import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

Expand All @@ -44,8 +43,8 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final ReadonlyConfig readonlyConfig;

public HiveSinkAggregatedCommitter(
ReadonlyConfig readonlyConfig, Table table, FileSystemUtils fileSystemUtils) {
super(fileSystemUtils);
ReadonlyConfig readonlyConfig, Table table, HadoopConf hadoopConf) {
super(hadoopConf);
this.readonlyConfig = readonlyConfig;
this.dbName = table.getDbName();
this.tableName = table.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

public class BaseHiveOptions extends BaseSourceConfig {
public class BaseHiveOptions extends BaseSourceConfigOptions {

public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
Expand Down Expand Up @@ -75,7 +74,6 @@ public class HiveSink
private final CatalogTable catalogTable;
private final ReadonlyConfig readonlyConfig;
private final HiveHadoopConfig hiveHadoopConfig;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;
Expand All @@ -89,9 +87,7 @@ public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
generateFileSinkConfig(readonlyConfig, tableInformation, catalogTable);
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hiveHadoopConfig);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

private FileSinkConfig generateFileSinkConfig(
Expand Down Expand Up @@ -171,7 +167,8 @@ public String getPluginName() {
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(
new HiveSinkAggregatedCommitter(readonlyConfig, tableInformation, fileSystemUtils));
new HiveSinkAggregatedCommitter(
readonlyConfig, tableInformation, hiveHadoopConfig));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;

Expand All @@ -49,16 +48,16 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(HiveSourceOptions.TABLE_NAME)
.required(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_NAME)
.optional(HiveSourceOptions.METASTORE_URI)
.optional(HiveSourceOptions.TABLE_CONFIGS)
.optional(HiveSourceOptions.HIVE_SITE_PATH)
.optional(
HdfsSourceConfig.HDFS_SITE_PATH,
HdfsSourceConfig.KERBEROS_PRINCIPAL,
HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.optional(BaseSourceConfig.READ_PARTITIONS)
.optional(BaseSourceConfig.READ_COLUMNS)
HdfsSourceConfigOptions.HDFS_SITE_PATH,
HdfsSourceConfigOptions.KERBEROS_PRINCIPAL,
HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.optional(HdfsSourceConfigOptions.READ_PARTITIONS)
.optional(HdfsSourceConfigOptions.READ_COLUMNS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.hive.source.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -28,11 +31,10 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
Expand All @@ -55,6 +57,11 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;

@Getter
public class HiveSourceConfig implements Serializable {
Expand All @@ -71,13 +78,13 @@ public class HiveSourceConfig implements Serializable {
@SneakyThrows
public HiveSourceConfig(ReadonlyConfig readonlyConfig) {
readonlyConfig
.getOptional(BaseSourceConfig.READ_PARTITIONS)
.getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
.ifPresent(this::validatePartitions);
this.table = HiveTableUtils.getTableInfo(readonlyConfig);
this.hiveHadoopConfig = parseHiveHadoopConfig(readonlyConfig, table);
this.fileFormat = HiveTableUtils.parseFileFormat(table);
this.readStrategy = parseReadStrategy(readonlyConfig, fileFormat, hiveHadoopConfig);
this.filePaths = parseFilePaths(table, hiveHadoopConfig, readStrategy);
this.readStrategy = parseReadStrategy(table, readonlyConfig, fileFormat, hiveHadoopConfig);
this.filePaths = parseFilePaths(table, readStrategy);
this.catalogTable =
parseCatalogTable(
readonlyConfig,
Expand Down Expand Up @@ -108,11 +115,45 @@ private void validatePartitions(List<String> partitionsList) {
}

private ReadStrategy parseReadStrategy(
Table table,
ReadonlyConfig readonlyConfig,
FileFormat fileFormat,
HiveHadoopConfig hiveHadoopConfig) {

ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name());
readStrategy.setPluginConfig(readonlyConfig.toConfig());
Config config = readonlyConfig.toConfig();

switch (fileFormat) {
case TEXT:
// if the file format is text, we set the delim.
Map<String, String> parameters = table.getSd().getSerdeInfo().getParameters();
config =
config.withValue(
FIELD_DELIMITER.key(),
ConfigValueFactory.fromAnyRef(
parameters.get("field.delim")))
.withValue(
ROW_DELIMITER.key(),
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name()));
break;
case ORC:
config =
config.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.name()));
break;
case PARQUET:
config =
config.withValue(
FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name()));
break;
default:
}
readStrategy.setPluginConfig(config);
readStrategy.init(hiveHadoopConfig);
return readStrategy;
}
Expand All @@ -125,22 +166,24 @@ private HiveHadoopConfig parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Ta
readonlyConfig.get(HiveSourceOptions.METASTORE_URI),
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
readonlyConfig
.getOptional(HdfsSourceConfig.HDFS_SITE_PATH)
.getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH)
.ifPresent(hiveHadoopConfig::setHdfsSitePath);
readonlyConfig
.getOptional(HdfsSourceConfig.KERBEROS_PRINCIPAL)
.getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL)
.ifPresent(hiveHadoopConfig::setKerberosPrincipal);
readonlyConfig
.getOptional(HdfsSourceConfig.KERBEROS_KEYTAB_PATH)
.getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
.ifPresent(hiveHadoopConfig::setKerberosKeytabPath);
readonlyConfig
.getOptional(HdfsSourceConfigOptions.REMOTE_USER)
.ifPresent(hiveHadoopConfig::setRemoteUser);
return hiveHadoopConfig;
}

private List<String> parseFilePaths(
Table table, HiveHadoopConfig hiveHadoopConfig, ReadStrategy readStrategy) {
private List<String> parseFilePaths(Table table, ReadStrategy readStrategy) {
String hdfsPath = parseHdfsPath(table);
try {
return readStrategy.getFileNamesByPath(hiveHadoopConfig, hdfsPath);
return readStrategy.getFileNamesByPath(hdfsPath);
} catch (Exception e) {
String errorMsg = String.format("Get file list from this path [%s] failed", hdfsPath);
throw new FileConnectorException(
Expand Down Expand Up @@ -214,7 +257,7 @@ private CatalogTable parseCatalogTableFromRemotePath(
CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table);
try {
SeaTunnelRowType seaTunnelRowTypeInfo =
readStrategy.getSeaTunnelRowTypeInfo(hiveHadoopConfig, filePaths.get(0));
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
return CatalogTableUtil.newCatalogTable(catalogTable, seaTunnelRowTypeInfo);
} catch (FileConnectorException e) {
String errorMsg =
Expand Down
Loading

0 comments on commit 24da46a

Please sign in to comment.