Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[performance](move-memtable) only call _select_streams when necessary #35576

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
return _map.at(tablet_id);
}
std::shared_ptr<DeltaWriterV2> writer = creator();
_map[tablet_id] = writer;
if (writer != nullptr) {
_map[tablet_id] = writer;
}
return writer;
}

Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,17 +416,21 @@ Status VTabletWriterV2::write(Block& input_block) {

// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
Streams streams;
RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
}

return Status::OK();
}

Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams) {
const Rows& rows) {
auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
Streams streams;
auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << st << ", load_id=" << print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {
RowsForTablet& rows_for_tablet);

Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams);
const Rows& rows);

Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
// VTabletWriterV2 tablet_location is null
load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location")
// VTabletWriterV2 location is null
load_with_injection("VTabletWriterV2._select_streams.location_null", "unknown tablet location")
load_with_injection("VTabletWriterV2._select_streams.location_null", "failed to open DeltaWriter for tablet")
// VTabletWriterV2 cancel
load_with_injection("VTabletWriterV2.close.cancel", "load cancel")
// DeltaWriterV2 stream_size is 0
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema")

sql """ set enable_memtable_on_sink_node=false """
}
}
Loading