Skip to content

Commit

Permalink
[Enhancement] Print compaction task statistics logs for cloud native …
Browse files Browse the repository at this point in the history
…table (StarRocks#37616)

Signed-off-by: drake_wang <wxl250059@alibaba-inc.com>
  • Loading branch information
wxl24life authored Mar 1, 2024
1 parent e0d75e4 commit 60da7cc
Show file tree
Hide file tree
Showing 21 changed files with 408 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include "storage/lake/tablet_manager.h"
#include "storage/storage_engine.h"
#include "types/logical_type.h"
#include "util/metrics.h"
#include "util/starrocks_metrics.h"

namespace starrocks {

Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ add_library(Storage STATIC
lake/compaction_policy.cpp
lake/compaction_scheduler.cpp
lake/compaction_task.cpp
lake/compaction_task_context.cpp
lake/horizontal_compaction_task.cpp
lake/delta_writer.cpp
lake/vacuum.cpp
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext>
context->runs.fetch_add(1, std::memory_order_relaxed);

auto status = Status::OK();
auto task_or = _tablet_mgr->compact(tablet_id, version, txn_id);
auto task_or = _tablet_mgr->compact(context.get());
if (task_or.ok()) {
auto should_cancel = [&]() { return context->callback->has_error() || context->callback->timeout_exceeded(); };
TEST_SYNC_POINT("CompactionScheduler::do_compaction:before_execute_task");
Expand All @@ -247,7 +247,7 @@ Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext>
return Status::InternalError("Get memory table flush pool failed");
}
}
status.update(task_or.value()->execute(&context->progress, std::move(should_cancel), flush_pool));
status.update(task_or.value()->execute(std::move(should_cancel), flush_pool));
} else {
status.update(task_or.status());
}
Expand Down
28 changes: 2 additions & 26 deletions be/src/storage/lake/compaction_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <memory>

#include "common/status.h"
#include "compaction_task_context.h"
#include "gutil/macros.h"
#include "storage/lake/compaction_task.h"
#include "util/blocking_queue.hpp"
#include "util/stack_trace_mutex.h"

Expand All @@ -39,7 +39,7 @@ namespace starrocks::lake {
class CompactRequest;
class CompactResponse;
class CompactionScheduler;
struct CompactionTaskContext;
class CompactionTask;
class TabletManager;

// For every `CompactRequest` a new `CompactionTaskCallback` instance will be created.
Expand Down Expand Up @@ -90,30 +90,6 @@ class CompactionTaskCallback {
std::vector<std::unique_ptr<CompactionTaskContext>> _contexts;
};

// Context of a single tablet compaction task.
struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_,
std::shared_ptr<CompactionTaskCallback> cb_)
: txn_id(txn_id_), tablet_id(tablet_id_), version(version_), callback(std::move(cb_)) {}

#ifndef NDEBUG
~CompactionTaskContext() {
CHECK(next() == this && previous() == this) << "Must remove CompactionTaskContext from list before destructor";
}
#endif

const int64_t txn_id;
const int64_t tablet_id;
const int64_t version;
std::atomic<int64_t> start_time{0};
std::atomic<int64_t> finish_time{0};
std::atomic<bool> skipped{false};
std::atomic<int> runs{0};
Status status;
lake::CompactionTask::Progress progress;
std::shared_ptr<CompactionTaskCallback> callback;
};

struct CompactionTaskInfo {
int64_t txn_id;
int64_t tablet_id;
Expand Down
9 changes: 5 additions & 4 deletions be/src/storage/lake/compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

namespace starrocks::lake {

CompactionTask::CompactionTask(int64_t txn_id, VersionedTablet tablet,
std::vector<std::shared_ptr<Rowset>> input_rowsets)
: _txn_id(txn_id),
CompactionTask::CompactionTask(VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets,
CompactionTaskContext* context)
: _txn_id(context->txn_id),
_tablet(std::move(tablet)),
_input_rowsets(std::move(input_rowsets)),
_mem_tracker(std::make_unique<MemTracker>(MemTracker::COMPACTION, -1,
"Compaction-" + std::to_string(_tablet.metadata()->id()),
GlobalEnv::GetInstance()->compaction_mem_tracker())) {}
GlobalEnv::GetInstance()->compaction_mem_tracker())),
_context(context) {}

} // namespace starrocks::lake
17 changes: 5 additions & 12 deletions be/src/storage/lake/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <ostream>

#include "common/status.h"
#include "compaction_task_context.h"
#include "runtime/mem_tracker.h"
#include "storage/lake/versioned_tablet.h"

Expand All @@ -28,24 +29,15 @@ class Rowset;

class CompactionTask {
public:
class Progress {
public:
int value() const { return _value.load(std::memory_order_acquire); }

void update(int value) { _value.store(value, std::memory_order_release); }

private:
std::atomic<int> _value{0};
};

// CancelFunc is a function that used to tell the compaction task whether the task
// should be cancelled.
using CancelFunc = std::function<bool()>;

explicit CompactionTask(int64_t txn_id, VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets);
explicit CompactionTask(VersionedTablet tablet, std::vector<std::shared_ptr<Rowset>> input_rowsets,
CompactionTaskContext* context);
virtual ~CompactionTask() = default;

virtual Status execute(Progress* stats, CancelFunc cancel_func, ThreadPool* flush_pool = nullptr) = 0;
virtual Status execute(CancelFunc cancel_func, ThreadPool* flush_pool = nullptr) = 0;

inline static const CancelFunc kNoCancelFn = []() { return false; };
inline static const CancelFunc kCancelledFn = []() { return true; };
Expand All @@ -55,6 +47,7 @@ class CompactionTask {
VersionedTablet _tablet;
std::vector<std::shared_ptr<Rowset>> _input_rowsets;
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
CompactionTaskContext* _context;
};

} // namespace starrocks::lake
61 changes: 61 additions & 0 deletions be/src/storage/lake/compaction_task_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "storage/lake/compaction_task_context.h"

#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

#include "storage/olap_common.h"

namespace starrocks::lake {

static constexpr long TIME_UNIT_NS_PER_SECOND = 1000000000;

void CompactionTaskStats::accumulate(const OlapReaderStatistics& reader_stats) {
io_ns += reader_stats.io_ns;
io_ns_remote += reader_stats.io_ns_remote;
io_ns_local_disk += reader_stats.io_ns_local_disk;
segment_init_ns += reader_stats.segment_init_ns;
column_iterator_init_ns += reader_stats.column_iterator_init_ns;
io_count_local_disk += reader_stats.io_count_local_disk;
io_count_remote += reader_stats.io_count_remote;
compressed_bytes_read += reader_stats.compressed_bytes_read;
}

std::string CompactionTaskStats::to_json_stats() {
rapidjson::Document root;
root.SetObject();
auto& allocator = root.GetAllocator();
// add stats
root.AddMember("reader_total_time_second", rapidjson::Value(reader_time_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second", rapidjson::Value(io_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second_remote", rapidjson::Value(io_ns_remote / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second_local_disk", rapidjson::Value(io_ns_local_disk / TIME_UNIT_NS_PER_SECOND),
allocator);
root.AddMember("reader_io_count_remote", rapidjson::Value(io_count_remote), allocator);
root.AddMember("reader_io_count_local_disk", rapidjson::Value(io_count_local_disk), allocator);
root.AddMember("compressed_bytes_read", rapidjson::Value(compressed_bytes_read), allocator);
root.AddMember("segment_init_second", rapidjson::Value(segment_init_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("column_iterator_init_second", rapidjson::Value(column_iterator_init_ns / TIME_UNIT_NS_PER_SECOND),
allocator);
root.AddMember("segment_write_second", rapidjson::Value(segment_write_ns / TIME_UNIT_NS_PER_SECOND), allocator);

rapidjson::StringBuffer strbuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
return {strbuf.GetString()};
}
} // namespace starrocks::lake
83 changes: 83 additions & 0 deletions be/src/storage/lake/compaction_task_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <butil/containers/linked_list.h>

#include <atomic>
#include <memory>
#include <string>

#include "common/status.h"

namespace starrocks {
struct OlapReaderStatistics;
}

namespace starrocks::lake {

class CompactionTaskCallback;
class Progress {
public:
int value() const { return _value.load(std::memory_order_acquire); }

void update(int value) { _value.store(value, std::memory_order_release); }

private:
std::atomic<int> _value{0};
};

struct CompactionTaskStats {
int64_t io_ns = 0;
int64_t io_ns_remote = 0;
int64_t io_ns_local_disk = 0;
int64_t segment_init_ns = 0;
int64_t column_iterator_init_ns = 0;
int64_t io_count_local_disk = 0;
int64_t io_count_remote = 0;
int64_t compressed_bytes_read = 0;
int64_t reader_time_ns = 0;
int64_t segment_write_ns = 0;

void accumulate(const OlapReaderStatistics& reader_stats);
std::string to_json_stats();
};

// Context of a single tablet compaction task.
struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_,
std::shared_ptr<CompactionTaskCallback> cb_)
: txn_id(txn_id_), tablet_id(tablet_id_), version(version_), callback(std::move(cb_)) {}

#ifndef NDEBUG
~CompactionTaskContext() {
CHECK(next() == this && previous() == this) << "Must remove CompactionTaskContext from list before destructor";
}
#endif

const int64_t txn_id;
const int64_t tablet_id;
const int64_t version;
std::atomic<int64_t> start_time{0};
std::atomic<int64_t> finish_time{0};
std::atomic<bool> skipped{false};
std::atomic<int> runs{0};
Status status;
Progress progress;
std::shared_ptr<CompactionTaskCallback> callback;
std::unique_ptr<CompactionTaskStats> stats = std::make_unique<CompactionTaskStats>();
};

} // namespace starrocks::lake
5 changes: 5 additions & 0 deletions be/src/storage/lake/general_tablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Status HorizontalGeneralTabletWriter::open() {
}

Status HorizontalGeneralTabletWriter::write(const starrocks::Chunk& data, SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
if (_seg_writer == nullptr || _seg_writer->estimate_segment_size() >= config::max_segment_file_size ||
_seg_writer->num_rows_written() + data.num_rows() >= INT32_MAX /*TODO: configurable*/) {
RETURN_IF_ERROR(flush_segment_writer(segment));
Expand All @@ -58,6 +59,7 @@ Status HorizontalGeneralTabletWriter::flush(SegmentPB* segment) {
}

Status HorizontalGeneralTabletWriter::finish(SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
RETURN_IF_ERROR(flush_segment_writer(segment));
_finished = true;
return Status::OK();
Expand Down Expand Up @@ -133,6 +135,7 @@ Status VerticalGeneralTabletWriter::open() {

Status VerticalGeneralTabletWriter::write_columns(const Chunk& data, const std::vector<uint32_t>& column_indexes,
bool is_key) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
const size_t chunk_num_rows = data.num_rows();
if (_segment_writers.empty()) {
DCHECK(is_key);
Expand Down Expand Up @@ -204,6 +207,7 @@ Status VerticalGeneralTabletWriter::flush(SegmentPB* segment) {
}

Status VerticalGeneralTabletWriter::flush_columns() {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
if (_segment_writers.empty()) {
return Status::OK();
}
Expand All @@ -219,6 +223,7 @@ Status VerticalGeneralTabletWriter::flush_columns() {
}

Status VerticalGeneralTabletWriter::finish(SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
for (auto& segment_writer : _segment_writers) {
uint64_t segment_size = 0;
uint64_t footer_position = 0;
Expand Down
Loading

0 comments on commit 60da7cc

Please sign in to comment.