Skip to content

Commit

Permalink
[improvement](partial update)add more logs for partial update (#35802)
Browse files Browse the repository at this point in the history
To make us to pinpoint the issues of partial update easier
  • Loading branch information
zhannngchen authored and dataroaring committed Jun 4, 2024
1 parent b9947e2 commit 74e444a
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 12 deletions.
44 changes: 34 additions & 10 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
int64_t conflict_rows = 0;
int64_t new_generated_rows = 0;

std::map<RowsetId, RowsetSharedPtr> rsid_to_rowset;
rsid_to_rowset[rowset_id] = rowset;
Expand Down Expand Up @@ -733,6 +735,7 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// of the including columns in the current row into a new row.
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
++conflict_rows;
continue;
}
if (is_partial_update && rowset_writer != nullptr) {
Expand Down Expand Up @@ -760,11 +763,14 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
++conflict_rows;
++new_generated_rows;
continue;
}
// when st = ok
delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
++conflict_rows;
}
remaining -= num_read;
}
Expand All @@ -789,10 +795,23 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
read_plan_ori, read_plan_update, rsid_to_rowset, &block));
RETURN_IF_ERROR(sort_block(block, ordered_block));
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
if (new_generated_rows != rowset_writer->num_rows()) {
LOG(WARNING) << "partial update correctness warning: conflict new generated rows ("
<< new_generated_rows << ") not equal to the new flushed rows ("
<< rowset_writer->num_rows() << "), tablet: " << tablet_id();
}
LOG(INFO) << "calc segment delete bitmap for partial update, tablet: " << tablet_id()
<< " rowset: " << rowset_id << " seg_id: " << seg->id()
<< " dummy_version: " << end_version + 1 << " rows: " << seg->num_rows()
<< " conflict rows: " << conflict_rows
<< " new generated rows: " << new_generated_rows
<< " bimap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
<< " rows: " << seg->num_rows()
<< " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows
<< " bitmap num: " << delete_bitmap->delete_bitmap.size()
<< " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
Expand Down Expand Up @@ -1193,15 +1212,6 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
<< ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << ")";
}

size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) {
// only do correctness check if the rowset has at least one row written
// check if all the rowset has ROWSET_SENTINEL_MARK
Expand All @@ -1225,14 +1235,28 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
RETURN_IF_ERROR(transient_rs_writer->flush());
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset));
auto old_segments = rowset->num_segments();
rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta());
auto new_segments = rowset->num_segments();
ss << ", partial update flush rowset (old segment num: " << old_segments
<< ", new segment num: " << new_segments << ")";

// update the shared_ptr to new bitmap, which is consistent with current rowset.
txn_info->delete_bitmap = delete_bitmap;

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
}

size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur version: " << cur_version << ", transaction_id: " << txn_id << ","
<< ss.str() << " , total rows: " << total_rows;

RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
transient_rs_writer.get(), cur_rowset_ids));
return Status::OK();
Expand Down
18 changes: 18 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,24 @@ Status CompactionMixin::modify_rowsets() {
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->tablet_state() == TABLET_RUNNING &&
_stats.merged_rows != missed_rows_size) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" << _stats.merged_rows
<< ") is not equal to missed rows(" << missed_rows_size
<< ") in rowid conversion, tablet_id: " << _tablet->tablet_id()
<< ", table_id:" << _tablet->table_id();
if (missed_rows_size == 0) {
ss << ", debug info: ";
DeleteBitmap subset_map(_tablet->tablet_id());
for (auto rs : _input_rowsets) {
_tablet->tablet_meta()->delete_bitmap().subset(
{rs->rowset_id(), 0, 0},
{rs->rowset_id(), rs->num_segments(), version.second + 1},
&subset_map);
ss << "(rowset id: " << rs->rowset_id()
<< ", delete bitmap cardinality: " << subset_map.cardinality() << ")";
}
ss << ", version[0-" << version.second + 1 << "]";
}
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class BaseBetaRowsetWriter : public RowsetWriter {

int64_t num_rows() const override { return _segment_creator.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const override { return _segment_creator.num_rows_updated(); }
int64_t num_rows_deleted() const override { return _segment_creator.num_rows_deleted(); }
int64_t num_rows_new_added() const override { return _segment_creator.num_rows_new_added(); }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }

RowsetId rowset_id() override { return _context.rowset_id; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int64_t num_rows() const override { return _segment_creator.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const override { return _segment_creator.num_rows_updated(); }
int64_t num_rows_deleted() const override { return _segment_creator.num_rows_deleted(); }
int64_t num_rows_new_added() const override { return _segment_creator.num_rows_new_added(); }
int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }

RowsetId rowset_id() override { return _context.rowset_id; }
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ class RowsetWriter {

virtual int64_t num_rows() const = 0;

virtual int64_t num_rows_updated() const = 0;
virtual int64_t num_rows_deleted() const = 0;
virtual int64_t num_rows_new_added() const = 0;
virtual int64_t num_rows_filtered() const = 0;

virtual RowsetId rowset_id() = 0;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ Status SegmentFlusher::_flush_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, TabletSchemaSPtr flush_schema,
int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
_num_rows_new_added += writer->num_rows_new_added();
_num_rows_filtered += writer->num_rows_filtered();

if (row_num == 0) {
Expand Down Expand Up @@ -287,6 +290,9 @@ Status SegmentFlusher::_flush_segment_writer(
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
TabletSchemaSPtr flush_schema, int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
_num_rows_new_added += writer->num_rows_new_added();
_num_rows_filtered += writer->num_rows_filtered();

if (row_num == 0) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class SegmentFlusher {

int64_t num_rows_written() const { return _num_rows_written; }

// for partial update
int64_t num_rows_updated() const { return _num_rows_updated; }
int64_t num_rows_deleted() const { return _num_rows_deleted; }
int64_t num_rows_new_added() const { return _num_rows_new_added; }
int64_t num_rows_filtered() const { return _num_rows_filtered; }

Status close();
Expand Down Expand Up @@ -155,6 +159,9 @@ class SegmentFlusher {

// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
std::atomic<int64_t> _num_rows_updated = 0;
std::atomic<int64_t> _num_rows_new_added = 0;
std::atomic<int64_t> _num_rows_deleted = 0;
std::atomic<int64_t> _num_rows_filtered = 0;
};

Expand All @@ -176,6 +183,10 @@ class SegmentCreator {

int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }

// for partial update
int64_t num_rows_updated() const { return _segment_flusher.num_rows_updated(); }
int64_t num_rows_deleted() const { return _segment_flusher.num_rows_deleted(); }
int64_t num_rows_new_added() const { return _segment_flusher.num_rows_new_added(); }
int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }

// Flush a block into a single segment, with pre-allocated segment_id.
Expand Down
12 changes: 11 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

// locate rows in base data
int64_t num_rows_updated = 0;
int64_t num_rows_new_added = 0;
int64_t num_rows_deleted = 0;
int64_t num_rows_filtered = 0;
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -507,6 +510,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
error_column);
}
}
++num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
continue;
Expand Down Expand Up @@ -537,9 +541,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++num_rows_updated;
}
}
CHECK_EQ(use_default_or_null_flag.size(), num_rows);
Expand All @@ -554,6 +560,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos, block));
full_block.set_columns(std::move(mutable_full_columns));

// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
Expand All @@ -578,6 +585,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
num_rows));
}

_num_rows_updated += num_rows_updated;
_num_rows_deleted += num_rows_deleted;
_num_rows_new_added += num_rows_new_added;
_num_rows_filtered += num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ class SegmentWriter {

size_t get_inverted_index_file_size() const { return _inverted_index_file_size; }
uint32_t num_rows_written() const { return _num_rows_written; }

// for partial update
int64_t num_rows_updated() const { return _num_rows_updated; }
int64_t num_rows_deleted() const { return _num_rows_deleted; }
int64_t num_rows_new_added() const { return _num_rows_new_added; }
int64_t num_rows_filtered() const { return _num_rows_filtered; }

uint32_t row_count() const { return _row_count; }

Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
Expand Down Expand Up @@ -213,6 +219,11 @@ class SegmentWriter {
bool _has_key = true;
// _num_rows_written means row count already written in this current column group
uint32_t _num_rows_written = 0;

/** for partial update stats **/
int64_t _num_rows_updated = 0;
int64_t _num_rows_new_added = 0;
int64_t _num_rows_deleted = 0;
// number of rows filtered in strict mode partial update
int64_t _num_rows_filtered = 0;
// _row_count means total row count of this segment
Expand Down
12 changes: 11 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,11 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

// locate rows in base data
int64_t num_rows_updated = 0;
int64_t num_rows_new_added = 0;
int64_t num_rows_deleted = 0;
int64_t num_rows_filtered = 0;
for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -442,6 +445,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
error_column);
}
}
++num_rows_new_added;
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
continue;
Expand Down Expand Up @@ -472,9 +476,11 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
++num_rows_deleted;
} else {
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
++num_rows_updated;
}
}
CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);
Expand All @@ -488,6 +494,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
auto mutable_full_columns = full_block.mutate_columns();
RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos, data.block));

// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
Expand All @@ -512,6 +519,9 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
data.num_rows));
}

_num_rows_updated += num_rows_updated;
_num_rows_deleted += num_rows_deleted;
_num_rows_new_added += num_rows_new_added;
_num_rows_filtered += num_rows_filtered;
if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
DCHECK_NE(seq_column, nullptr);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class VerticalSegmentWriter {
}
[[nodiscard]] size_t inverted_index_file_size() const { return _inverted_index_file_size; }
[[nodiscard]] uint32_t num_rows_written() const { return _num_rows_written; }

// for partial update
[[nodiscard]] int64_t num_rows_updated() const { return _num_rows_updated; }
[[nodiscard]] int64_t num_rows_deleted() const { return _num_rows_deleted; }
[[nodiscard]] int64_t num_rows_new_added() const { return _num_rows_new_added; }
[[nodiscard]] int64_t num_rows_filtered() const { return _num_rows_filtered; }
[[nodiscard]] uint32_t row_count() const { return _row_count; }
[[nodiscard]] uint32_t segment_id() const { return _segment_id; }
Expand Down Expand Up @@ -178,8 +183,14 @@ class VerticalSegmentWriter {

// _num_rows_written means row count already written in this current column group
uint32_t _num_rows_written = 0;

/** for partial update stats **/
int64_t _num_rows_updated = 0;
int64_t _num_rows_new_added = 0;
int64_t _num_rows_deleted = 0;
// number of rows filtered in strict mode partial update
int64_t _num_rows_filtered = 0;

// _row_count means total row count of this segment
// In vertical compaction row count is recorded when key columns group finish
// and _num_rows_written will be updated in value column group
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() {
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.
if (_partial_update_info->is_partial_update) {
// for partial update, the delete bitmap calculation is done while append_block()
// we print it's summarize logs here before commit.
LOG(INFO) << fmt::format(
"partial update calc delete bitmap summary before commit: tablet({}), txn_id({}), "
"rowset_ids({}), cur max_version({}), bitmap num({}), num rows updated({}), num "
"rows new added({}), num rows deleted({}), total rows({})",
tablet()->tablet_id(), _req.txn_id, _rowset_ids.size(),
rowset_writer()->context().mow_context->max_version,
_delete_bitmap->delete_bitmap.size(), rowset_writer()->num_rows_updated(),
rowset_writer()->num_rows_new_added(), rowset_writer()->num_rows_deleted(),
rowset_writer()->num_rows());
return Status::OK();
}

Expand Down
Loading

0 comments on commit 74e444a

Please sign in to comment.