Skip to content

Commit d7f3b26

Browse files
committed
fix multi table load repeated failures and retries when meet data quality error
1 parent 66e17bc commit d7f3b26

File tree

6 files changed

+145
-6
lines changed

6 files changed

+145
-6
lines changed

be/src/io/fs/multi_table_pipe.cpp

+12-5
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ Status MultiTablePipe::request_and_exec_plans() {
193193
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
194194
request.__set_user(_ctx->qualified_user);
195195
request.__set_cloud_cluster(_ctx->cloud_cluster);
196+
request.__set_max_filter_ratio(1.0);
196197
// no need to register new_load_stream_mgr coz it is already done in routineload submit task
197198

198199
// plan this load
@@ -271,7 +272,6 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
271272
_number_loaded_rows += state->num_rows_load_success();
272273
_number_filtered_rows += state->num_rows_load_filtered();
273274
_number_unselected_rows += state->num_rows_load_unselected();
274-
_ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
275275
// check filtered ratio for this plan fragment
276276
int64_t num_selected_rows =
277277
state->num_rows_load_total() - state->num_rows_load_unselected();
@@ -282,10 +282,17 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
282282
}
283283

284284
// if any of the plan fragment exec failed, set the status to the first failed plan
285-
if (!status->ok()) {
286-
LOG(WARNING)
287-
<< "plan fragment exec failed. errmsg=" << *status << _ctx->brief();
288-
_status = *status;
285+
{
286+
std::lock_guard<std::mutex> l(_callback_lock);
287+
if (!state->get_error_log_file_path().empty()) {
288+
_ctx->error_url =
289+
to_load_error_http_path(state->get_error_log_file_path());
290+
}
291+
if (!status->ok()) {
292+
LOG(WARNING) << "plan fragment exec failed. errmsg=" << *status
293+
<< _ctx->brief();
294+
_status = *status;
295+
}
289296
}
290297

291298
auto inflight_cnt = _inflight_cnt.fetch_sub(1);

be/src/io/fs/multi_table_pipe.h

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class MultiTablePipe : public KafkaConsumerPipe {
9595
std::atomic<int64_t> _number_unselected_rows {0};
9696

9797
std::mutex _pipe_map_lock;
98+
std::mutex _callback_lock;
9899
std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
99100

100101
uint32_t _row_threshold = config::multi_table_batch_plan_threshold;

fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.doris.common.util.TimeUtils;
4646
import org.apache.doris.load.BrokerFileGroup;
4747
import org.apache.doris.load.loadv2.LoadTask;
48+
import org.apache.doris.load.routineload.RoutineLoadJob;
4849
import org.apache.doris.qe.ConnectContext;
4950
import org.apache.doris.service.FrontendOptions;
5051
import org.apache.doris.task.LoadTaskInfo;
@@ -384,7 +385,11 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
384385
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
385386
queryGlobals.setTimestampMs(System.currentTimeMillis());
386387
queryGlobals.setTimeZone(taskInfo.getTimezone());
387-
queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
388+
if (taskInfo instanceof RoutineLoadJob) {
389+
queryGlobals.setLoadZeroTolerance(false);
390+
} else {
391+
queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
392+
}
388393
queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
389394

390395
params.setQueryGlobals(queryGlobals);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql --
3+
1 a
4+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
test_multi_table_load_data_quality|1,a
2+
test_multi_table_load_data_quality_error|a,a
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.kafka.clients.admin.AdminClient
19+
import org.apache.kafka.clients.producer.KafkaProducer
20+
import org.apache.kafka.clients.producer.ProducerRecord
21+
import org.apache.kafka.clients.producer.ProducerConfig
22+
23+
suite("test_multi_table_load_data_quality_error","p0") {
24+
def kafkaCsvTpoics = [
25+
"multi_table_load_data_quality",
26+
]
27+
String enabled = context.config.otherConfigs.get("enableKafkaTest")
28+
String kafka_port = context.config.otherConfigs.get("kafka_port")
29+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
30+
def kafka_broker = "${externalEnvIp}:${kafka_port}"
31+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
32+
// define kafka
33+
def props = new Properties()
34+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
35+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
36+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
37+
// Create kafka producer
38+
def producer = new KafkaProducer<>(props)
39+
40+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
41+
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
42+
def lines = txt.readLines()
43+
lines.each { line ->
44+
logger.info("=====${line}========")
45+
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
46+
producer.send(record)
47+
}
48+
}
49+
}
50+
51+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
52+
def tableName = "test_multi_table_load_data_quality"
53+
def tableName1 = "test_multi_table_load_data_quality_error"
54+
def jobName = "test_multi_table_load_data_quality_error"
55+
sql """ DROP TABLE IF EXISTS ${tableName} """
56+
sql """ DROP TABLE IF EXISTS ${tableName1} """
57+
sql """
58+
CREATE TABLE IF NOT EXISTS ${tableName} (
59+
`k1` int(20) NULL,
60+
`k2` string NULL,
61+
) ENGINE=OLAP
62+
DUPLICATE KEY(`k1`)
63+
COMMENT 'OLAP'
64+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
65+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
66+
"""
67+
sql """
68+
CREATE TABLE IF NOT EXISTS ${tableName1} (
69+
`k1` int(20) NULL,
70+
`k2` string NULL,
71+
) ENGINE=OLAP
72+
DUPLICATE KEY(`k1`)
73+
COMMENT 'OLAP'
74+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
75+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
76+
"""
77+
78+
try {
79+
sql """
80+
CREATE ROUTINE LOAD ${jobName}
81+
COLUMNS TERMINATED BY ","
82+
PROPERTIES
83+
(
84+
"strict_mode" = "true"
85+
)
86+
FROM KAFKA
87+
(
88+
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
89+
"kafka_topic" = "${kafkaCsvTpoics[0]}",
90+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
91+
);
92+
"""
93+
sql "sync"
94+
95+
def count = 0
96+
while (true) {
97+
def res = sql "select count(*) from ${tableName}"
98+
def state = sql "show routine load for ${jobName}"
99+
log.info("routine load state: ${state[0][8].toString()}".toString())
100+
log.info("routine load statistic: ${state[0][14].toString()}".toString())
101+
log.info("reason of state changed: ${state[0][17].toString()}".toString())
102+
log.info("error url: ${state[0][18].toString()}".toString())
103+
if (res[0][0] > 0 && state[0][18].toString() != "") {
104+
break
105+
}
106+
if (count >= 120) {
107+
log.error("routine load can not visible for long time")
108+
assertEquals(20, res[0][0])
109+
break
110+
}
111+
sleep(1000)
112+
count++
113+
}
114+
qt_sql "select * from ${tableName} order by k1"
115+
116+
} finally {
117+
sql "stop routine load for ${jobName}"
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)