Skip to content

Commit

Permalink
[Feature][Elasticsearch] Support multi-table sink write apache#7041 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
CosmosNi authored and Thomas-HuWei committed Jul 10, 2024
1 parent 99390d3 commit 7d53592
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -47,6 +48,7 @@ public class ElasticsearchSink
ElasticsearchSinkState,
ElasticsearchCommitInfo,
ElasticsearchAggregatedCommitInfo>,
SupportMultiTableSink,
SupportSaveMode {

private ReadonlyConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
Expand All @@ -28,6 +30,9 @@

import com.google.auto.service.AutoService;

import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
Expand Down Expand Up @@ -75,18 +80,56 @@ public OptionRule optionRule() {

@Override
public TableSink createSink(TableSinkFactoryContext context) {
String original = context.getOptions().get(INDEX);
original =
original.replace(
REPLACE_TABLE_NAME_KEY,
context.getCatalogTable().getTableId().getTableName());
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();

ReadonlyConfig finalReadonlyConfig =
generateCurrentReadonlyConfig(readonlyConfig, catalogTable);

String original = finalReadonlyConfig.get(INDEX);

CatalogTable newTable =
CatalogTable.of(
TableIdentifier.of(
context.getCatalogTable().getCatalogName(),
context.getCatalogTable().getTablePath().getDatabaseName(),
original),
context.getCatalogTable());
return () -> new ElasticsearchSink(context.getOptions(), newTable);
return () -> new ElasticsearchSink(finalReadonlyConfig, newTable);
}

private ReadonlyConfig generateCurrentReadonlyConfig(
ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {

Map<String, String> configMap = readonlyConfig.toMap();

readonlyConfig
.getOptional(INDEX)
.ifPresent(
tableName -> {
String replacedPath =
replaceCatalogTableInPath(tableName, catalogTable);
configMap.put(INDEX.key(), replacedPath);
});

return ReadonlyConfig.fromMap(new HashMap<>(configMap));
}

private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) {
String tableName = originTableName;
TableIdentifier tableIdentifier = catalogTable.getTableId();
if (tableIdentifier != null) {
if (tableIdentifier.getSchemaName() != null) {
tableName =
tableName.replace(
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
tableIdentifier.getSchemaName());
}
if (tableIdentifier.getTableName() != null) {
tableName =
tableName.replace(REPLACE_TABLE_NAME_KEY, tableIdentifier.getTableName());
}
}
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -47,9 +48,10 @@
*/
@Slf4j
public class ElasticsearchSinkWriter
implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState>,
SupportMultiTableSinkWriter<Void> {

private final SinkWriter.Context context;
private final Context context;

private final int maxBatchSize;

Expand All @@ -60,7 +62,7 @@ public class ElasticsearchSinkWriter
private static final long DEFAULT_SLEEP_TIME_MS = 200L;

public ElasticsearchSinkWriter(
SinkWriter.Context context,
Context context,
CatalogTable catalogTable,
ReadonlyConfig config,
int maxBatchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.io.IOUtils;
Expand All @@ -50,6 +52,7 @@
import org.testcontainers.utility.DockerLoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand Down Expand Up @@ -176,11 +179,54 @@ public void testElasticsearch(TestContainer container)
Container.ExecResult execResult =
container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
List<String> sinkData = readSinkData();
List<String> sinkData = readSinkData("st_index2");
// for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK/FLINK do not support multiple table read")
@TestTemplate
public void testElasticsearchWithMultiSink(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
List<String> source5 =
Lists.newArrayList(
"id",
"c_bool",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_string");
List<String> source6 =
Lists.newArrayList(
"id",
"c_bool",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal");
List<String> sinkIndexData5 = readMultiSinkData("st_index5", source5);
List<String> sinkIndexData6 = readMultiSinkData("st_index6", source6);
String stIndex5 =
"{\"c_smallint\":2,\"c_string\":\"NEW\",\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
String stIndex6 =
"{\"c_smallint\":2,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
Assertions.assertIterableEquals(Lists.newArrayList(stIndex5), sinkIndexData5);
Assertions.assertIterableEquals(Lists.newArrayList(stIndex6), sinkIndexData6);
}

@TestTemplate
public void testElasticsearchWithFullType(TestContainer container)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -262,7 +308,7 @@ private List<String> readSinkDataWithOutSchema() throws InterruptedException {
return getDocsWithTransformDate(source, "st_index4");
}

private List<String> readSinkData() throws InterruptedException {
private List<String> readSinkData(String index) throws InterruptedException {
// wait for index refresh
Thread.sleep(2000);
List<String> source =
Expand All @@ -281,7 +327,33 @@ private List<String> readSinkData() throws InterruptedException {
"c_int",
"c_date",
"c_timestamp");
return getDocsWithTransformTimestamp(source, "st_index2");
return getDocsWithTransformTimestamp(source, index);
}

private List<String> readMultiSinkData(String index, List<String> source)
throws InterruptedException {
// wait for index refresh
Thread.sleep(2000);
Map<String, Object> query = new HashMap<>();
query.put("match_all", Maps.newHashMap());

ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
});
List<String> docs =
scrollResult.getDocs().stream()
.sorted(
Comparator.comparingInt(
o -> Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
return docs;
}

private List<String> getDocsWithTransformTimestamp(List<String> source, String index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
#checkpoint.interval = 10000
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "st_index5"
fields {
id = int
c_bool = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_string = string
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
}
]
},
{
schema = {
table = "st_index6"
fields {
id = int
c_bool = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
}
]
}
]
}
}
transform {
}

sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false

index = "${table_name}"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
}

0 comments on commit 7d53592

Please sign in to comment.