Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on kudu #5789

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ public Set<KuduSourceSplit> createInputSplits() throws IOException {
public KuduScanner scanner(byte[] token) throws IOException {
return KuduScanToken.deserializeIntoScanner(token, kuduClient);
}

public SeaTunnelRowType getRowTypeInfo() {
return this.rowTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
Expand All @@ -52,29 +44,14 @@ public class KuduSink
private KuduSinkConfig kuduSinkConfig;
private SeaTunnelRowType seaTunnelRowType;

@Override
public String getPluginName() {
return "kudu";
public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
this.kuduSinkConfig = kuduSinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new KuduSinkFactory().optionRule());
try {
kuduSinkConfig = new KuduSinkConfig(config);
} catch (Exception e) {
throw new KuduConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e)));
}
public String getPluginName() {
return "kudu";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -64,4 +68,12 @@ public OptionRule optionRule() {
KuduSinkConfig.KERBEROS_KEYTAB)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
KuduSinkConfig kuduSinkConfig = new KuduSinkConfig(config);
return () -> new KuduSink(kuduSinkConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@AutoService(SeaTunnelSource.class)
public class KuduSource
Expand All @@ -62,6 +41,12 @@ public class KuduSource
private KuduInputFormat kuduInputFormat;
private KuduSourceConfig kuduSourceConfig;

public KuduSource(KuduSourceConfig kuduSourceConfig, KuduInputFormat kuduInputFormat) {
this.kuduSourceConfig = kuduSourceConfig;
this.kuduInputFormat = kuduInputFormat;
this.rowTypeInfo = kuduInputFormat.getRowTypeInfo();
}

@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
Expand Down Expand Up @@ -96,57 +81,4 @@ public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator
public String getPluginName() {
return "Kudu";
}

@Override
public void prepare(Config pluginConfig) {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
ConfigValidator.of(config).validate(new KuduSourceFactory().optionRule());
try {
kuduSourceConfig = new KuduSourceConfig(config);
} catch (Exception e) {
throw new KuduConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e)));
}

if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
rowTypeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
} else {
try (KuduClient kuduClient = KuduUtil.getKuduClient(kuduSourceConfig)) {
rowTypeInfo =
getSeaTunnelRowType(
kuduClient
.openTable(kuduSourceConfig.getTable())
.getSchema()
.getColumns());
} catch (Exception e) {
throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
}
}
kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);
}

public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {

for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}

} catch (Exception e) {
throw new KuduConnectorException(
CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
"Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,38 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;

Expand Down Expand Up @@ -63,4 +85,53 @@ public OptionRule optionRule() {
public Class<? extends SeaTunnelSource> getSourceClass() {
return KuduSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
ReadonlyConfig config = context.getOptions();
KuduSourceConfig kuduSourceConfig = new KuduSourceConfig(config);
SeaTunnelRowType rowTypeInfo;
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
rowTypeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
} else {
try (KuduClient kuduClient = KuduUtil.getKuduClient(kuduSourceConfig)) {
rowTypeInfo =
getSeaTunnelRowType(
kuduClient
.openTable(kuduSourceConfig.getTable())
.getSchema()
.getColumns());
} catch (Exception e) {
throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
}
}
KuduInputFormat kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo);

return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new KuduSource(kuduSourceConfig, kuduInputFormat);
}

public static SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {

for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}

} catch (Exception e) {
throw new KuduConnectorException(
CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
"Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
return new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
}