Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](partial update)add more logs for partial update #35802

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
int64_t conflict_rows = 0;
int64_t new_generated_rows = 0;

std::map<RowsetId, RowsetSharedPtr> rsid_to_rowset;
rsid_to_rowset[rowset_id] = rowset;
Expand Down Expand Up @@ -733,6 +735,7 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// of the including columns in the current row into a new row.
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
++conflict_rows;
continue;
}
if (is_partial_update && rowset_writer != nullptr) {
Expand Down Expand Up @@ -760,11 +763,14 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
++conflict_rows;
++new_generated_rows;
continue;
}
// when st = ok
delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
++conflict_rows;
}
remaining -= num_read;
}
Expand All @@ -789,10 +795,23 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
read_plan_ori, read_plan_update, rsid_to_rowset, &block));
RETURN_IF_ERROR(sort_block(block, ordered_block));
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
if (new_generated_rows != rowset_writer->num_rows()) {
LOG(WARNING) << "partial update correctness warning: conflict new generated rows ("
<< new_generated_rows << ") not equal to the new flushed rows ("
<< rowset_writer->num_rows() << "), tablet: " << tablet_id();
}
LOG(INFO) << "calc segment delete bitmap for partial update, tablet: " << tablet_id()
<< " rowset: " << rowset_id << " seg_id: " << seg->id()
<< " dummy_version: " << end_version + 1 << " rows: " << seg->num_rows()
<< " conflict rows: " << conflict_rows
<< " new generated rows: " << new_generated_rows
<< " bimap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
<< " rows: " << seg->num_rows()
<< " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows
<< " bitmap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
Expand Down Expand Up @@ -1193,15 +1212,6 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
<< ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << ")";
}

size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) {
// only do correctness check if the rowset has at least one row written
// check if all the rowset has ROWSET_SENTINEL_MARK
Expand All @@ -1225,14 +1235,28 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
RETURN_IF_ERROR(transient_rs_writer->flush());
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset));
auto old_segments = rowset->num_segments();
rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta());
auto new_segments = rowset->num_segments();
ss << ", partial update flush rowset (old segment num: " << old_segments
<< ", new segment num: " << new_segments << ")";

// update the shared_ptr to new bitmap, which is consistent with current rowset.
txn_info->delete_bitmap = delete_bitmap;

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
}

size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
transient_rs_writer.get(), cur_rowset_ids));
return Status::OK();
Expand Down
18 changes: 18 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,24 @@ Status CompactionMixin::modify_rowsets() {
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->tablet_state() == TABLET_RUNNING &&
_stats.merged_rows != missed_rows_size) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" << _stats.merged_rows
<< ") is not equal to missed rows(" << missed_rows_size
<< ") in rowid conversion, tablet_id: " << _tablet->tablet_id()
<< ", table_id:" << _tablet->table_id();
if (missed_rows_size == 0) {
ss << ", debug info: ";
DeleteBitmap subset_map(_tablet->tablet_id());
for (auto rs : _input_rowsets) {
_tablet->tablet_meta()->delete_bitmap().subset(
{rs->rowset_id(), 0, 0},
{rs->rowset_id(), rs->num_segments(), version.second + 1},
&subset_map);
ss << "(rowset id: " << rs->rowset_id()
<< ", delete bitmap cardinality: " << subset_map.cardinality() << ")";
}
ss << ", version[0-" << version.second + 1 << "]";
}
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class BaseBetaRowsetWriter : public RowsetWriter {

int64_t num_rows() const override { return _segment_creator.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const override { return _segment_creator.num_rows_updated(); }
int64_t num_rows_deleted() const override { return _segment_creator.num_rows_deleted(); }
int64_t num_rows_new_added() const override { return _segment_creator.num_rows_new_added(); }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }

RowsetId rowset_id() override { return _context.rowset_id; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int64_t num_rows() const override { return _segment_creator.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const override { return _segment_creator.num_rows_updated(); }
int64_t num_rows_deleted() const override { return _segment_creator.num_rows_deleted(); }
int64_t num_rows_new_added() const override { return _segment_creator.num_rows_new_added(); }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }

RowsetId rowset_id() override { return _context.rowset_id; }
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ class RowsetWriter {

virtual int64_t num_rows() const = 0;

virtual int64_t num_rows_updated() const = 0;
virtual int64_t num_rows_deleted() const = 0;
virtual int64_t num_rows_new_added() const = 0;
virtual int64_t num_rows_filtered() const = 0;

virtual RowsetId rowset_id() = 0;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ Status SegmentFlusher::_flush_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, TabletSchemaSPtr flush_schema,
int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
_num_rows_new_added += writer->num_rows_new_added();
_num_rows_filtered += writer->num_rows_filtered();

if (row_num == 0) {
Expand Down Expand Up @@ -287,6 +290,9 @@ Status SegmentFlusher::_flush_segment_writer(
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
TabletSchemaSPtr flush_schema, int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
_num_rows_new_added += writer->num_rows_new_added();
_num_rows_filtered += writer->num_rows_filtered();

if (row_num == 0) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class SegmentFlusher {

int64_t num_rows_written() const { return _num_rows_written; }

// for partial update
int64_t num_rows_updated() const { return _num_rows_updated; }
int64_t num_rows_deleted() const { return _num_rows_deleted; }
int64_t num_rows_new_added() const { return _num_rows_new_added; }
int64_t num_rows_filtered() const { return _num_rows_filtered; }

Status close();
Expand Down Expand Up @@ -155,6 +159,9 @@ class SegmentFlusher {

// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
std::atomic<int64_t> _num_rows_updated = 0;
std::atomic<int64_t> _num_rows_new_added = 0;
std::atomic<int64_t> _num_rows_deleted = 0;
std::atomic<int64_t> _num_rows_filtered = 0;
};

Expand All @@ -176,6 +183,10 @@ class SegmentCreator {

int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const { return _segment_flusher.num_rows_updated(); }
int64_t num_rows_deleted() const { return _segment_flusher.num_rows_deleted(); }
int64_t num_rows_new_added() const { return _segment_flusher.num_rows_new_added(); }
int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }

// Flush a block into a single segment, with pre-allocated segment_id.
Expand Down
12 changes: 11 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

// locate rows in base data
int64_t num_rows_updated = 0;
int64_t num_rows_new_added = 0;
int64_t num_rows_deleted = 0;
int64_t num_rows_filtered = 0;
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -507,6 +510,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
error_column);
}
}
++num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
continue;
Expand Down Expand Up @@ -537,9 +541,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++num_rows_updated;
}
}
CHECK_EQ(use_default_or_null_flag.size(), num_rows);
Expand All @@ -554,6 +560,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos, block));
full_block.set_columns(std::move(mutable_full_columns));

// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
Expand All @@ -578,6 +585,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
num_rows));
}

_num_rows_updated += num_rows_updated;
_num_rows_deleted += num_rows_deleted;
_num_rows_new_added += num_rows_new_added;
_num_rows_filtered += num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ class SegmentWriter {

size_t get_inverted_index_file_size() const { return _inverted_index_file_size; }
uint32_t num_rows_written() const { return _num_rows_written; }

// for partial update
int64_t num_rows_updated() const { return _num_rows_updated; }
int64_t num_rows_deleted() const { return _num_rows_deleted; }
int64_t num_rows_new_added() const { return _num_rows_new_added; }
int64_t num_rows_filtered() const { return _num_rows_filtered; }

uint32_t row_count() const { return _row_count; }

Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
Expand Down Expand Up @@ -213,6 +219,11 @@ class SegmentWriter {
bool _has_key = true;
// _num_rows_written means row count already written in this current column group
uint32_t _num_rows_written = 0;

/** for partial update stats **/
int64_t _num_rows_updated = 0;
int64_t _num_rows_new_added = 0;
int64_t _num_rows_deleted = 0;
// number of rows filtered in strict mode partial update
int64_t _num_rows_filtered = 0;
// _row_count means total row count of this segment
Expand Down
12 changes: 11 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,11 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

// locate rows in base data
int64_t num_rows_updated = 0;
int64_t num_rows_new_added = 0;
int64_t num_rows_deleted = 0;
int64_t num_rows_filtered = 0;
for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -442,6 +445,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
error_column);
}
}
++num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
continue;
Expand Down Expand Up @@ -472,9 +476,11 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++num_rows_updated;
}
}
CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);
Expand All @@ -488,6 +494,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
auto mutable_full_columns = full_block.mutate_columns();
RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos, data.block));

// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
Expand All @@ -512,6 +519,9 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
data.num_rows));
}

_num_rows_updated += num_rows_updated;
_num_rows_deleted += num_rows_deleted;
_num_rows_new_added += num_rows_new_added;
_num_rows_filtered += num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class VerticalSegmentWriter {
}
[[nodiscard]] size_t inverted_index_file_size() const { return _inverted_index_file_size; }
[[nodiscard]] uint32_t num_rows_written() const { return _num_rows_written; }

// for partial update
[[nodiscard]] int64_t num_rows_updated() const { return _num_rows_updated; }
[[nodiscard]] int64_t num_rows_deleted() const { return _num_rows_deleted; }
[[nodiscard]] int64_t num_rows_new_added() const { return _num_rows_new_added; }
[[nodiscard]] int64_t num_rows_filtered() const { return _num_rows_filtered; }
[[nodiscard]] uint32_t row_count() const { return _row_count; }
[[nodiscard]] uint32_t segment_id() const { return _segment_id; }
Expand Down Expand Up @@ -178,8 +183,14 @@ class VerticalSegmentWriter {

// _num_rows_written means row count already written in this current column group
uint32_t _num_rows_written = 0;

/** for partial update stats **/
int64_t _num_rows_updated = 0;
int64_t _num_rows_new_added = 0;
int64_t _num_rows_deleted = 0;
// number of rows filtered in strict mode partial update
int64_t _num_rows_filtered = 0;

// _row_count means total row count of this segment
// In vertical compaction row count is recorded when key columns group finish
// and _num_rows_written will be updated in value column group
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() {
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.
if (_partial_update_info->is_partial_update) {
// for partial update, the delete bitmap calculation is done while append_block()
// we print it's summarize logs here before commit.
LOG(INFO) << fmt::format(
"partial update calc delete bitmap summary before commit: tablet({}), txn_id({}), "
"rowset_ids({}), cur max_version({}), bitmap num({}), num rows updated({}), num "
"rows new added({}), num rows deleted({}), total rows({})",
tablet()->tablet_id(), _req.txn_id, _rowset_ids.size(),
rowset_writer()->context().mow_context->max_version,
_delete_bitmap->delete_bitmap.size(), rowset_writer()->num_rows_updated(),
rowset_writer()->num_rows_new_added(), rowset_writer()->num_rows_deleted(),
rowset_writer()->num_rows());
return Status::OK();
}

Expand Down
Loading
Loading