Skip to content

Commit

Permalink
Support config tableIdentifier for schema
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Oct 15, 2023
1 parent b1d66c5 commit 3c56bc2
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 7 deletions.
17 changes: 17 additions & 0 deletions docs/en/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ We can use SchemaOptions to define schema, the SchemaOptions contains some confi

```
schema = {
database = "databaseName"
table = "tableName"
comment = "comment"
columns = [
...
]
Expand All @@ -24,6 +27,18 @@ schema = {
}
```

### database

The database name of the table identifier which the schema belongs to.

### table

The table name of the table identifier which the schema belongs to.

### comment

The comment of the CatalogTable which the schema belongs to.

### Columns

Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.
Expand Down Expand Up @@ -131,6 +146,8 @@ source {
result_table_name = "fake"
row.num = 16
schema {
database = "FakeDatabase"
table = "FakeTable"
columns = [
{
name = id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public static CatalogTable of(
Map<String, String> options,
List<String> partitionKeys,
String comment) {
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
return new CatalogTable(
tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
}

public static CatalogTable of(
Expand All @@ -67,7 +68,7 @@ private CatalogTable(
Map<String, String> options,
List<String> partitionKeys,
String comment) {
this(tableId, tableSchema, options, partitionKeys, comment, "");
this(tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
}

private CatalogTable(
Expand Down Expand Up @@ -127,6 +128,9 @@ public String toString() {
+ ", comment='"
+ comment
+ '\''
+ ", catalogName='"
+ catalogName
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
return Collections.singletonList(catalogTable);
}

Expand Down Expand Up @@ -190,19 +190,40 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
}
}

// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
// Since this method will not inject the correct catalogName into CatalogTable
@Deprecated
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
return buildWithConfig("", readonlyConfig);
}

public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
throw new RuntimeException(
"Schema config need option [schema], please correct your config first");
}
TableSchema tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);

ReadonlyConfig schemaConfig =
readonlyConfig
.getOptional(TableSchemaOptions.SCHEMA)
.map(ReadonlyConfig::fromMap)
.orElseThrow(
() -> new IllegalArgumentException("Schema config can't be null"));

TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogName,
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.DATABASE),
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE));

return CatalogTable.of(
// TODO: other table info
TableIdentifier.of("", "", ""),
tableIdentifier,
tableSchema,
new HashMap<>(),
// todo: add partitionKeys?
new ArrayList<>(),
"");
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
}

public static SeaTunnelRowType buildSimpleTextSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@

public class TableSchemaOptions {

public static class TableIdentifierOptions {

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("SeaTunnel Schema DataBase Name");

public static final Option<String> TABLE =
Options.key("table")
.stringType()
.noDefaultValue()
.withDescription("SeaTunnel Schema Table Name");

public static final Option<String> COMMENT =
Options.key("comment")
.stringType()
.noDefaultValue()
.withDescription("SeaTunnel Schema Table Comment");
}

public static final Option<Map<String, Object>> SCHEMA =
Options.key("schema")
.type(new TypeReference<Map<String, Object>>() {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;

Expand All @@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
@OptionMark(description = "column rule")
private AssertColumnRule columnRule;

@OptionMark(description = "tableIdentifier rule")
private AssertTableIdentifierRule tableIdentifierRule;

public void checkRule(CatalogTable catalogTable) {
TableSchema tableSchema = catalogTable.getTableSchema();
if (tableSchema == null) {
Expand All @@ -62,6 +66,9 @@ public void checkRule(CatalogTable catalogTable) {
if (columnRule != null) {
columnRule.checkRule(tableSchema.getColumns());
}
if (tableIdentifierRule != null) {
tableIdentifierRule.checkRule(catalogTable.getTableId());
}
}

@Data
Expand Down Expand Up @@ -138,4 +145,24 @@ public void checkRule(List<Column> check) {
}
}
}

@Data
@AllArgsConstructor
public static class AssertTableIdentifierRule implements Serializable {

private TableIdentifier tableIdentifier;

public void checkRule(TableIdentifier actiualTableIdentifier) {
if (actiualTableIdentifier == null) {
throw new AssertConnectorException(CATALOG_TABLE_FAILED, "tableIdentifier is null");
}
if (!actiualTableIdentifier.equals(tableIdentifier)) {
throw new AssertConnectorException(
CATALOG_TABLE_FAILED,
String.format(
"tableIdentifier: %s is not equal to %s",
actiualTableIdentifier, tableIdentifier));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;

Expand All @@ -46,6 +47,10 @@
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_DATABASE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;

public class AssertCatalogTableRuleParser {

Expand All @@ -55,6 +60,7 @@ public AssertCatalogTableRule parseCatalogTableRule(Config catalogTableRule) {
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
return tableRule;
}

Expand Down Expand Up @@ -156,4 +162,18 @@ private Optional<AssertCatalogTableRule.AssertConstraintKeyRule> parseConstraint
.collect(Collectors.toList());
return Optional.of(new AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
}

private Optional<AssertCatalogTableRule.AssertTableIdentifierRule> parseTableIdentifierRule(
Config catalogTableRule) {
if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
return Optional.empty();
}
Config tableIdentifierRule = catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
TableIdentifier tableIdentifier =
TableIdentifier.of(
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
tableIdentifierRule.getString(TABLE_IDENTIFIER_DATABASE_NAME),
tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME));
return Optional.of(new AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public class AssertConfig {
public static final String COLUMN_DEFAULT_VALUE = "default_value";
public static final String COLUMN_COMMENT = "comment";

public static class TableIdentifierRule {
public static final String TABLE_IDENTIFIER_RULE = "table_identifier_rule";

public static final String TABLE_IDENTIFIER_CATALOG_NAME = "catalog_name";
public static final String TABLE_IDENTIFIER_DATABASE_NAME = "database_name";
public static final String TABLE_IDENTIFIER_TABLE_NAME = "table_name";
}

public static final Option<String> COMMENT =
Options.key("comment")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class FakeSource
public FakeSource() {}

public FakeSource(ReadonlyConfig readonlyConfig) {
this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), readonlyConfig);
this.fakeConfig = FakeConfig.buildWithConfig(readonlyConfig.toConfig());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ source {
FakeSource {
row.num = 100
schema = {
database = "fakeDatabase"
table = "fakeTable"
columns = [
{
name = id
Expand Down Expand Up @@ -63,6 +65,12 @@ sink{
Assert {
rules {
catalog_table_rule {
table_identifier_rule = {
catalog_name = "FakeSource"
database_name = "fakeDatabase"
table_name = "fakeTable"
}

primary_key_rule = {
primary_key_name = "primary key"
primary_key_columns = ["id"]
Expand Down

0 comments on commit 3c56bc2

Please sign in to comment.