Skip to content

Commit

Permalink
[refactor](load) add tablet errors when close_wait return error (#9619)
Browse files Browse the repository at this point in the history
  • Loading branch information
pengxiangyu authored May 22, 2022
1 parent 3391de4 commit 75b3707
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 34 deletions.
22 changes: 2 additions & 20 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ Status DeltaWriter::close() {
return Status::OK();
}

Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken) {
Status DeltaWriter::close_wait() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
Expand All @@ -303,15 +301,7 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
}

// return error if previous flush failed
Status s = _flush_token->wait();
if (!s.ok()) {
#ifndef BE_TEST
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(_tablet->tablet_id());
tablet_error->set_msg(s.get_error_msg());
#endif
return s;
}
RETURN_NOT_OK(_flush_token->wait());

// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
Expand All @@ -327,14 +317,6 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
return res;
}

#ifndef BE_TEST
if (!is_broken) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(_tablet->tablet_id());
tablet_info->set_schema_hash(_tablet->schema_hash());
}
#endif

_delta_written_success = true;

const FlushStatistic& stat = _flush_token->get_stats();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ class DeltaWriter {
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken);
Status close_wait();

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Expand All @@ -91,6 +89,8 @@ class DeltaWriter {

int64_t tablet_id() { return _tablet->tablet_id(); }

int32_t schema_hash() { return _tablet->schema_hash(); }

int64_t save_mem_consumption_snapshot();

int64_t get_mem_consumption_snapshot() const;
Expand Down Expand Up @@ -133,4 +133,4 @@ class DeltaWriter {
int64_t _mem_consumption_snapshot = 0;
};

} // namespace doris
} // namespace doris
21 changes: 18 additions & 3 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,29 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge it.
writer->close_wait(
tablet_vec, tablet_errors,
(_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
_close_wait(writer, tablet_vec, tablet_errors);
}
}
return Status::OK();
}

void TabletsChannel::_close_wait(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
Status st = writer->close_wait();
if (st.ok()) {
if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
tablet_info->set_schema_hash(writer->schema_hash());
}
} else {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(writer->tablet_id());
tablet_error->set_msg(st.get_error_msg());
}
}

Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class TabletsChannel {
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);

// deal with DeltaWriter close_wait(), add tablet to list for return.
void _close_wait(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);

// id of this load channel
TabletsChannelKey _key;

Expand Down
12 changes: 6 additions & 6 deletions be/test/olap/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);

Expand All @@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);

Expand Down Expand Up @@ -475,7 +475,7 @@ TEST_F(TestDeltaWriter, write) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down Expand Up @@ -609,7 +609,7 @@ TEST_F(TestDeltaWriter, vec_write) {

res = delta_writer->close();
ASSERT_TRUE(res.ok());
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());

// publish version success
Expand Down Expand Up @@ -687,7 +687,7 @@ TEST_F(TestDeltaWriter, sequence_col) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down Expand Up @@ -772,7 +772,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {

res = delta_writer->close();
ASSERT_TRUE(res.ok());
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
ASSERT_TRUE(res.ok());

// publish version success
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/engine_storage_migration_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->close_wait(nullptr, nullptr, false);
res = delta_writer->close_wait();
EXPECT_EQ(Status::OK(), res);

// publish version success
Expand Down

0 comments on commit 75b3707

Please sign in to comment.