Skip to content

Commit

Permalink
FakeSource support generate different CatalogTable for MultipleTable
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 14, 2023
1 parent 72be666 commit 7e9094a
Show file tree
Hide file tree
Showing 19 changed files with 794 additions and 512 deletions.
120 changes: 70 additions & 50 deletions docs/en/connector-v2/source/FakeSource.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@

package org.apache.seatunnel.connectors.seatunnel.fake.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import java.util.List;
import java.util.Map;

public class FakeOption {

public static final Option<List<SeaTunnelRow>> ROWS =
public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("The multiple table config list of fake source");

public static final Option<List<Map<String, Object>>> ROWS =
Options.key("rows")
.listType(SeaTunnelRow.class)
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("The row list of fake data output per degree of parallelism");
public static final Option<Integer> ROW_NUM =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fake.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;
import lombok.Getter;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

public class MultipleTableFakeSourceConfig implements Serializable {

private static final long serialVersionUID = 1L;

@Getter private List<FakeConfig> fakeConfigs;

public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {
if (fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) {
parseFromConfigs(fakeSourceRootConfig);
} else {
parseFromConfig(fakeSourceRootConfig);
}
// validate
assert fakeConfigs != null;
if (fakeConfigs.size() > 1) {
List<String> tableNames =
fakeConfigs.stream()
.map(FakeConfig::getCatalogTable)
.map(catalogTable -> catalogTable.getTableId().toTablePath().toString())
.collect(Collectors.toList());
if (CollectionUtils.size(tableNames) != new HashSet<>(tableNames).size()) {
throw new IllegalArgumentException("table name: " + tableNames + " must be unique");
}
}
}

private void parseFromConfigs(ReadonlyConfig readonlyConfig) {
List<ReadonlyConfig> readonlyConfigs =
readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream()
.map(ReadonlyConfig::fromMap)
.collect(Collectors.toList());
// Use the config outside if it's not set in sub config
fakeConfigs =
readonlyConfigs.stream()
.map(FakeConfig::buildWithConfig)
.collect(Collectors.toList());
}

private void parseFromConfig(ReadonlyConfig readonlyConfig) {
FakeConfig fakeConfig = FakeConfig.buildWithConfig(readonlyConfig);
fakeConfigs = Lists.newArrayList(fakeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand All @@ -32,27 +32,28 @@
import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import org.apache.commons.lang3.RandomUtils;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class FakeDataGenerator {
private final SeaTunnelRowType rowType;
private final CatalogTable catalogTable;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;
private final FakeDataRandomUtils fakeDataRandomUtils;
private String tableId;

public FakeDataGenerator(SeaTunnelRowType rowType, FakeConfig fakeConfig) {
this.rowType = rowType;
public FakeDataGenerator(FakeConfig fakeConfig) {
this.catalogTable = fakeConfig.getCatalogTable();
this.tableId = catalogTable.getTableId().toTablePath().toString();
this.fakeConfig = fakeConfig;
this.jsonDeserializationSchema =
fakeConfig.getFakeRows() == null
? null
: new JsonDeserializationSchema(false, false, rowType);
: new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
}

Expand All @@ -63,46 +64,43 @@ private SeaTunnelRow convertRow(FakeConfig.RowData rowData) {
if (rowData.getKind() != null) {
seaTunnelRow.setRowKind(RowKind.valueOf(rowData.getKind()));
}
seaTunnelRow.setTableId(tableId);
return seaTunnelRow;
} catch (IOException e) {
throw new FakeConnectorException(CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e);
}
}

private SeaTunnelRow randomRow() {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
String[] fieldNames = rowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
List<Object> randomRow = new ArrayList<>(fieldNames.length);
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
randomRow.add(randomColumnValue(fieldType));
}
SeaTunnelRow row = new SeaTunnelRow(randomRow.toArray());
if (!fakeConfig.getTableIdentifiers().isEmpty()) {
row.setTableId(
fakeConfig
.getTableIdentifiers()
.get(RandomUtils.nextInt(0, fakeConfig.getTableIdentifiers().size()))
.toTablePath()
.toString());
}
return row;
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray());
seaTunnelRow.setTableId(tableId);
return seaTunnelRow;
}

/**
* @param rowNum The number of pieces of data to be generated by the current task
* @param output Data collection and distribution
* @return The generated data
*/
public void collectFakedRows(int rowNum, Collector<SeaTunnelRow> output) {
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
// Use manual configuration data preferentially
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
output.collect(convertRow(rowData));
seaTunnelRows.add(convertRow(rowData));
}
} else {
for (int i = 0; i < rowNum; i++) {
output.collect(randomRow());
seaTunnelRows.add(randomRow());
}
}
return seaTunnelRows;
}

@SuppressWarnings("magicnumber")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;

import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -47,14 +42,10 @@ public class FakeSource
SupportColumnProjection {

private JobContext jobContext;
private CatalogTable catalogTable;
private FakeConfig fakeConfig;

public FakeSource() {}
private final MultipleTableFakeSourceConfig multipleTableFakeSourceConfig;

public FakeSource(ReadonlyConfig readonlyConfig) {
this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), readonlyConfig);
this.fakeConfig = FakeConfig.buildWithConfig(readonlyConfig.toConfig());
this.multipleTableFakeSourceConfig = new MultipleTableFakeSourceConfig(readonlyConfig);
}

@Override
Expand All @@ -66,43 +57,32 @@ public Boundedness getBoundedness() {

@Override
public List<CatalogTable> getProducedCatalogTables() {
// If tableNames is empty, means this is only one catalogTable, return the original
// catalogTable
if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) {
return Lists.newArrayList(catalogTable);
}
// Otherwise, return the catalogTables with the tableNames
return fakeConfig.getTableIdentifiers().stream()
.map(
tableIdentifier ->
CatalogTable.of(
TableIdentifier.of(
getPluginName(), tableIdentifier.toTablePath()),
catalogTable.getTableSchema(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment()))
return multipleTableFakeSourceConfig.getFakeConfigs().stream()
.map(FakeConfig::getCatalogTable)
.collect(Collectors.toList());
}

@Override
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet());
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
return new FakeSourceSplitEnumerator(
enumeratorContext, multipleTableFakeSourceConfig, Collections.emptySet());
}

@Override
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
FakeSourceState checkpointState) {
return new FakeSourceSplitEnumerator(
enumeratorContext, fakeConfig, checkpointState.getAssignedSplits());
enumeratorContext,
multipleTableFakeSourceConfig,
checkpointState.getAssignedSplits());
}

@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(
SourceReader.Context readerContext) {
return new FakeSourceReader(readerContext, catalogTable.getSeaTunnelRowType(), fakeConfig);
return new FakeSourceReader(readerContext, multipleTableFakeSourceConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE;
Expand All @@ -70,7 +71,8 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(TableSchemaOptions.SCHEMA)
.optional(TABLES_CONFIGS)
.optional(TableSchemaOptions.SCHEMA)
.optional(STRING_FAKE_MODE)
.conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, STRING_TEMPLATE)
.optional(TINYINT_FAKE_MODE)
Expand Down
Loading

0 comments on commit 7e9094a

Please sign in to comment.