Skip to content

Commit

Permalink
[fix](move-memtable) all sinks wait stream close for load timeout (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jan 25, 2024
1 parent d732824 commit 520293a
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 13 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
DEFINE_Bool(share_delta_writers, "true");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
// timeout for load stream close wait in ms
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min

// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
DECLARE_Bool(share_delta_writers);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
// timeout for load stream close wait in ms
DECLARE_Int64(close_load_stream_timeout_ms);

// brpc streaming max_buf_size in bytes
DECLARE_Int64(load_stream_max_buf_size);
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
if (_is_closed.load()) {
return _check_cancel();
}
// if there are other sinks remaining, let the last sink handle close wait
if (_use_cnt > 0) {
return Status::OK();
}
if (timeout_ms <= 0) {
timeout_ms = config::close_load_stream_timeout_ms;
}
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
if (!_is_closed.load()) {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ class LoadStreamStub {

// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
// if timeout_ms <= 0, will fallback to config::close_load_stream_timeout_ms
Status close_wait(int64_t timeout_ms = 0);

// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {

Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
RETURN_IF_ERROR(_init(state, profile));
_timeout_watch.start();
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -550,9 +551,11 @@ Status VTabletWriterV2::close(Status exec_status) {

{
SCOPED_TIMER(_close_load_timer);
auto remain_ms = _state->execution_timeout() * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
RETURN_IF_ERROR(stream->close_wait(remain_ms));
}
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {
int64_t _number_output_rows = 0;

MonotonicStopWatch _row_distribution_watch;
MonotonicStopWatch _timeout_watch;

RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
Expand Down

0 comments on commit 520293a

Please sign in to comment.