Skip to content

Commit

Permalink
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory o…
Browse files Browse the repository at this point in the history
…n kudu (#5789)
  • Loading branch information
Carl-Zhou-CN authored Nov 8, 2023
1 parent aec9a93 commit 10e791d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ public Set<KuduSourceSplit> createInputSplits() throws IOException {
public KuduScanner scanner(byte[] token) throws IOException {
return KuduScanToken.deserializeIntoScanner(token, kuduClient);
}

public SeaTunnelRowType getRowTypeInfo() {
return this.rowTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
Expand All @@ -52,29 +44,14 @@ public class KuduSink
private KuduSinkConfig kuduSinkConfig;
private SeaTunnelRowType seaTunnelRowType;

@Override
public String getPluginName() {
return "kudu";
public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
this.kuduSinkConfig = kuduSinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new KuduSinkFactory().optionRule());
try {
kuduSinkConfig = new KuduSinkConfig(config);
} catch (Exception e) {
throw new KuduConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e)));
}
public String getPluginName() {
return "kudu";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -64,4 +68,12 @@ public OptionRule optionRule() {
KuduSinkConfig.KERBEROS_KEYTAB)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
return () -> new KuduSink(kuduSinkConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@AutoService(SeaTunnelSource.class)
public class KuduSource
Expand All @@ -62,6 +41,12 @@ public class KuduSource
private KuduInputFormat kuduInputFormat;
private KuduSourceConfig kuduSourceConfig;

public KuduSource(KuduSourceConfig kuduSourceConfig, KuduInputFormat kuduInputFormat) {
this.kuduSourceConfig = kuduSourceConfig;
this.kuduInputFormat = kuduInputFormat;
this.rowTypeInfo = kuduInputFormat.getRowTypeInfo();
}

@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
Expand Down Expand Up @@ -96,57 +81,4 @@ public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator
public String getPluginName() {
return "Kudu";
}

@Override
public void prepare(Config pluginConfig) {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new KuduSourceFactory().optionRule());
try {
kuduSourceConfig = new KuduSourceConfig(config);
} catch (Exception e) {
throw new KuduConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e)));
}

if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
rowTypeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
} else {
try (KuduClient kuduClient = KuduUtil.getKuduClient(kuduSourceConfig)) {
rowTypeInfo =
getSeaTunnelRowType(
kuduClient
.openTable(kuduSourceConfig.getTable())
.getSchema()
.getColumns());
} catch (Exception e) {
throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
}
}
kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);
}

public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {

for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}

} catch (Exception e) {
throw new KuduConnectorException(
CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
"Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,38 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;

Expand Down Expand Up @@ -63,4 +85,53 @@ public OptionRule optionRule() {
public Class<? extends SeaTunnelSource> getSourceClass() {
return KuduSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
ReadonlyConfig config = context.getOptions();
KuduSourceConfig kuduSourceConfig = new KuduSourceConfig(config);
SeaTunnelRowType rowTypeInfo;
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
rowTypeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
} else {
try (KuduClient kuduClient = KuduUtil.getKuduClient(kuduSourceConfig)) {
rowTypeInfo =
getSeaTunnelRowType(
kuduClient
.openTable(kuduSourceConfig.getTable())
.getSchema()
.getColumns());
} catch (Exception e) {
throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
}
}
KuduInputFormat kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);

return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new KuduSource(kuduSourceConfig, kuduInputFormat);
}

public static SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {

for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}

} catch (Exception e) {
throw new KuduConnectorException(
CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
"Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
}

0 comments on commit 10e791d

Please sign in to comment.