Skip to content

Commit

Permalink
LocalFile sink support multiple table
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 1, 2023
1 parent f051154 commit 94c288d
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 39 deletions.
14 changes: 13 additions & 1 deletion docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ By default, we use 2PC commit to ensure `exactly-once`

### path [string]

The target dir path is required.
The target dir path is required, you can inject the upstream CatalogTable into the path by using: `${database_name}`, `${table_name}` and `${schema_name}`.

### custom_filename [boolean]

Expand Down Expand Up @@ -237,6 +237,18 @@ LocalFile {

```

For multiple table

```bash

LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ public void validate(OptionRule rule) {
List<RequiredOption> requiredOptions = rule.getRequiredOptions();
for (RequiredOption requiredOption : requiredOptions) {
validate(requiredOption);
requiredOption
.getOptions()
.forEach(
option -> {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
validateSingleChoice(option);
}
});

for (Option<?> option : requiredOption.getOptions()) {
if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
// is required option and not match condition, skip validate
if (isConditionOption(requiredOption)
&& !matchCondition(
(RequiredOption.ConditionalRequiredOptions) requiredOption)) {
continue;
}
validateSingleChoice(option);
}
}
}

for (Option option : rule.getOptionalOptions()) {
Expand All @@ -74,15 +78,15 @@ void validateSingleChoice(Option option) {
Object o = singleChoiceOption.defaultValue();
if (o != null && !optionValues.contains(o)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), o);
"These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), o, optionValues);
}

Object value = config.get(option);
if (value != null && !optionValues.contains(value)) {
throw new OptionValidationException(
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.",
getOptionKeys(Arrays.asList(singleChoiceOption)), value);
"These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues(%s).",
getOptionKeys(Arrays.asList(singleChoiceOption)), value, optionValues);
}
}

Expand All @@ -99,7 +103,7 @@ void validate(RequiredOption requiredOption) {
validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
return;
}
if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) {
if (isConditionOption(requiredOption)) {
validate((RequiredOption.ConditionalRequiredOptions) requiredOption);
return;
}
Expand Down Expand Up @@ -181,8 +185,7 @@ void validate(RequiredOption.ExclusiveRequiredOptions exclusiveRequiredOptions)
}

void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
boolean match = validate(expression);
boolean match = matchCondition(conditionalRequiredOptions);
if (!match) {
return;
}
Expand All @@ -193,7 +196,8 @@ void validate(RequiredOption.ConditionalRequiredOptions conditionalRequiredOptio
}
throw new OptionValidationException(
"There are unconfigured options, the options(%s) are required because [%s] is true.",
getOptionKeys(absentOptions), expression.toString());
getOptionKeys(absentOptions),
conditionalRequiredOptions.getExpression().toString());
}

private boolean validate(Expression expression) {
Expand Down Expand Up @@ -222,4 +226,14 @@ private <T> boolean validate(Condition<T> condition) {
return match || validate(condition.getNext());
}
}

private boolean isConditionOption(RequiredOption requiredOption) {
return requiredOption instanceof RequiredOption.ConditionalRequiredOptions;
}

private boolean matchCondition(
RequiredOption.ConditionalRequiredOptions conditionalRequiredOptions) {
Expression expression = conditionalRequiredOptions.getExpression();
return validate(expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig re
schemaConfig.get(
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
} else {
tablePath = TablePath.EMPTY;
Optional<String> resultTableNameOptional =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
}

return CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void testSingleChoiceOptionDefaultValueValidator() {
config.put(SINGLE_CHOICE_TEST.key(), "A");
Executable executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}

Expand All @@ -290,7 +290,7 @@ public void testSingleChoiceOptionValueValidator() {
config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N");
executable = () -> validate(config, optionRule);
assertEquals(
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues.",
"ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues([A, B, C]).",
assertThrows(OptionValidationException.class, executable).getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,94 @@

package org.apache.seatunnel.connectors.seatunnel.file.local.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;

import java.util.List;
import java.util.Optional;

import com.google.auto.service.AutoService;
public class LocalFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>,
SupportMultiTableSink {

@AutoService(SeaTunnelSink.class)
public class LocalFileSink extends BaseFileSink {
private final HadoopConf hadoopConf;
private final FileSystemUtils fileSystemUtils;
private final FileSinkConfig fileSinkConfig;
private final WriteStrategy writeStrategy;
private String jobId;

public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
this.hadoopConf = new LocalFileHadoopConf();
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(), catalogTable.getSeaTunnelRowType());
this.writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
this.fileSystemUtils = new FileSystemUtils(hadoopConf);
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
}

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
public void setJobContext(JobContext jobContext) {
this.jobId = jobContext.getJobId();
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
}

@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(new FileSinkAggregatedCommitter(fileSystemUtils));
}

@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
SinkWriter.Context context) {
return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
hadoopConf = new LocalFileHadoopConf();
public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@

package org.apache.seatunnel.connectors.seatunnel.file.local.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class LocalFileSinkFactory implements TableSinkFactory {
public class LocalFileSinkFactory
implements TableSinkFactory<
SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> {
@Override
public String factoryIdentifier() {
return FileSystemType.LOCAL.getFileSystemPluginName();
Expand Down Expand Up @@ -82,4 +97,70 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.TIME_FORMAT)
.build();
}

@Override
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();

ReadonlyConfig finalReadonlyConfig =
generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
return () -> new LocalFileSink(finalReadonlyConfig, catalogTable);
}

// replace the table name in sink config's path
private ReadonlyConfig generateCurrentReadonlyConfig(
ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
// Copy the config to avoid modifying the original config
Config config = readonlyConfig.toConfig();

if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) {
String replacedPath =
replaceCatalogTableInPath(
config.getString(BaseSinkConfig.FILE_PATH.key()), catalogTable);
config =
config.withValue(
BaseSinkConfig.FILE_PATH.key(),
ConfigValueFactory.fromAnyRef(replacedPath));
}

if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) {
String replacedPath =
replaceCatalogTableInPath(
config.getString(BaseSinkConfig.TMP_PATH.key()), catalogTable);
config =
config.withValue(
BaseSinkConfig.TMP_PATH.key(),
ConfigValueFactory.fromAnyRef(replacedPath));
}

return ReadonlyConfig.fromConfig(config);
}

private String replaceCatalogTableInPath(String originString, CatalogTable catalogTable) {
String path = originString;
TableIdentifier tableIdentifier = catalogTable.getTableId();
if (tableIdentifier != null) {
if (tableIdentifier.getDatabaseName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
tableIdentifier.getDatabaseName());
}
if (tableIdentifier.getSchemaName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
tableIdentifier.getSchemaName());
}
if (tableIdentifier.getTableName() != null) {
path =
path.replace(
SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
tableIdentifier.getTableName());
}
}
return path;
}
}
Loading

0 comments on commit 94c288d

Please sign in to comment.