Skip to content

Commit

Permalink
[Feature] support primary key dump (StarRocks#38297)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Jan 17, 2024
1 parent 6b2bd0c commit 64a6c83
Show file tree
Hide file tree
Showing 22 changed files with 983 additions and 7 deletions.
46 changes: 46 additions & 0 deletions be/src/fs/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ inline StatusOr<int64_t> copy(SequentialFile* src, WritableFile* dest, size_t bu
return ncopy;
}

// copy [offset, offset + size] from src to dest file.
inline StatusOr<int64_t> copy_by_range(RandomAccessFile* src, WritableFile* dest, int64_t offset, int64_t size,
size_t buff_size = 8192) {
char* buf = new char[buff_size];
std::unique_ptr<char[]> guard(buf);
int64_t ncopy = 0;
RETURN_IF_ERROR(src->skip(offset));
while (true) {
ASSIGN_OR_RETURN(auto nread, src->read(buf, buff_size));
if (nread == 0) {
return Status::Corruption("file length no match");
}
if (ncopy + nread < size) {
ncopy += nread;
RETURN_IF_ERROR(dest->append(Slice(buf, nread)));
} else {
// read all we need, and we don't need latest [ncopy + nread - size] data,
// So only keep nread - (ncopy + nread - size) = size - ncopy bytes
RETURN_IF_ERROR(dest->append(Slice(buf, size - ncopy)));
break;
}
}
return size;
}

// copy the file from src path to dest path, it will overwrite the existing files
inline Status copy_file(const std::string& src_path, const std::string& dst_path) {
TEST_ERROR_POINT("fs::copy_file");
Expand All @@ -141,6 +166,27 @@ inline Status copy_file(const std::string& src_path, const std::string& dst_path
return Status::OK();
}

// copy the file range [offset, offset + size] from src path to dest path, it will overwrite the existing files
inline Status copy_file_by_range(const std::string& src_path, const std::string& dst_path, int64_t offset,
int64_t size) {
WritableFileOptions opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto src_fs, FileSystem::CreateSharedFromString(src_path));
ASSIGN_OR_RETURN(auto dst_fs, FileSystem::CreateSharedFromString(dst_path));
ASSIGN_OR_RETURN(auto src_file, src_fs->new_random_access_file(src_path));
ASSIGN_OR_RETURN(auto dst_file, dst_fs->new_writable_file(opts, dst_path));
RETURN_IF_ERROR(copy_by_range(src_file.get(), dst_file.get(), offset, size));
RETURN_IF_ERROR(dst_file->close());
return Status::OK();
}

// copy from src path and append dest path, dest must exist
inline Status copy_append_file(const std::string& src_path, WritableFile* dst_file) {
ASSIGN_OR_RETURN(auto src_fs, FileSystem::CreateSharedFromString(src_path));
ASSIGN_OR_RETURN(auto src_file, src_fs->new_sequential_file(src_path));
RETURN_IF_ERROR(copy(src_file.get(), dst_file));
return Status::OK();
}

inline Status canonicalize(const std::string& path, std::string* real_path) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(path));
return fs->canonicalize(path, real_path);
Expand Down
20 changes: 20 additions & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "io/io_profiler.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "storage/primary_key_dump.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
Expand Down Expand Up @@ -347,6 +348,24 @@ class StorageEngineRef {
return exec_whitelist(strings::Substitute("ls -al $0", tablet->schema_hash_path()));
}

static std::string print_primary_key_dump(int64_t tablet_id) {
auto tablet = get_tablet(tablet_id);
if (!tablet) {
return "tablet not found";
}
if (tablet->updates() == nullptr) {
return "non-pk tablet no support set error";
}
PrimaryKeyDump pkd(tablet.get());
auto st = pkd.dump();
if (st.ok()) {
return "print primary key dump success";
} else {
LOG(ERROR) << "print primary key dump fail, " << st;
return "print primary key dump fail";
}
}

static void bind(ForeignModule& m) {
{
auto& cls = m.klass<TabletBasicInfo>("TabletBasicInfo");
Expand Down Expand Up @@ -501,6 +520,7 @@ class StorageEngineRef {
REG_STATIC_METHOD(StorageEngineRef, submit_manual_compaction_task_for_partition);
REG_STATIC_METHOD(StorageEngineRef, submit_manual_compaction_task_for_tablet);
REG_STATIC_METHOD(StorageEngineRef, get_manual_compaction_status);
REG_STATIC_METHOD(StorageEngineRef, print_primary_key_dump);
REG_STATIC_METHOD(StorageEngineRef, ls_tablet_dir);
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,5 @@ add_library(Storage STATIC
lake/lake_persistent_index.cpp
lake/persistent_index_memtable.cpp
lake/local_pk_index_manager.cpp
primary_key_dump.cpp
dictionary_cache_manager.cpp)
78 changes: 78 additions & 0 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/chunk_helper.h"
#include "storage/chunk_iterator.h"
#include "storage/persistent_index_tablet_loader.h"
#include "storage/primary_key_dump.h"
#include "storage/primary_key_encoder.h"
#include "storage/rowset/rowset.h"
#include "storage/storage_engine.h"
Expand Down Expand Up @@ -911,6 +912,15 @@ class FixedMutableIndex : public MutableIndex {

bool dump(phmap::BinaryOutputArchive& ar) override { return _map.dump(ar); }

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& each : _map) {
RETURN_IF_ERROR(dump->add_pindex_kvs(
std::string_view(reinterpret_cast<const char*>(each.first.data), sizeof(KeyType)),
each.second.get_value(), dump_pb));
}
return dump->finish_pindex_kvs(dump_pb);
}

std::vector<std::vector<KVRef>> get_kv_refs_by_shard(size_t nshard, size_t num_entry,
bool with_null) const override {
std::vector<std::vector<KVRef>> ret(nshard);
Expand Down Expand Up @@ -1244,6 +1254,15 @@ class SliceMutableIndex : public MutableIndex {
// return _set.dump(ar);
}

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& composite_key : _set) {
auto value = UNALIGNED_LOAD64(composite_key.data() + composite_key.size() - kIndexValueSize);
RETURN_IF_ERROR(dump->add_pindex_kvs(
std::string_view(composite_key.data(), composite_key.size() - kIndexValueSize), value, dump_pb));
}
return dump->finish_pindex_kvs(dump_pb);
}

bool load_snapshot(phmap::BinaryInputArchive& ar) override {
size_t size = 0;
if (!ar.load(&size)) {
Expand Down Expand Up @@ -1858,6 +1877,14 @@ bool ShardByLengthMutableIndex::dump(phmap::BinaryOutputArchive& ar_out, std::se
return true;
}

Status ShardByLengthMutableIndex::pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) {
for (uint32_t i = 0; i < _shards.size(); ++i) {
const auto& shard = _shards[i];
RETURN_IF_ERROR(shard->pk_dump(dump, dump_pb));
}
return Status::OK();
}

static Status checksum_of_file(RandomAccessFile* file, uint64_t offset, uint32_t size, uint32* checksum) {
std::string buff;
raw::stl_string_resize_uninitialized(&buff, size);
Expand Down Expand Up @@ -2473,6 +2500,38 @@ Status ImmutableIndex::_get_in_shard_by_page(size_t shard_idx, size_t n, const S
}
}

Status ImmutableIndex::pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) {
// put all kvs in one shard
std::vector<std::vector<KVRef>> kvs_by_shard(1);
for (size_t shard_idx = 0; shard_idx < _shards.size(); shard_idx++) {
const auto& shard_info = _shards[shard_idx];
if (shard_info.size == 0) {
// skip empty shard
continue;
}
auto shard = std::make_unique<ImmutableIndexShard>(shard_info.npage);
RETURN_IF_ERROR(_file->read_at_fully(shard_info.offset, shard->pages.data(), shard_info.bytes));
RETURN_IF_ERROR(shard->decompress_pages(_compression_type, shard_info.npage, shard_info.uncompressed_size,
shard_info.bytes));
if (shard_info.key_size != 0) {
RETURN_IF_ERROR(_get_fixlen_kvs_for_shard(kvs_by_shard, shard_idx, 0, &shard));
} else {
RETURN_IF_ERROR(_get_varlen_kvs_for_shard(kvs_by_shard, shard_idx, 0, &shard));
}
}

// read kv from KVRef
for (const auto& each : kvs_by_shard) {
for (const auto& each_kv : each) {
auto value = UNALIGNED_LOAD64(each_kv.kv_pos + each_kv.size - kIndexValueSize);
RETURN_IF_ERROR(dump->add_pindex_kvs(
std::string_view(reinterpret_cast<const char*>(each_kv.kv_pos), each_kv.size - kIndexValueSize),
value, dump_pb));
}
}
return dump->finish_pindex_kvs(dump_pb);
}

Status ImmutableIndex::_get_in_shard(size_t shard_idx, size_t n, const Slice* keys, std::vector<KeyInfo>& keys_info,
IndexValue* values, KeysInfo* found_keys_info, IOStat* stat) const {
const auto& shard_info = _shards[shard_idx];
Expand Down Expand Up @@ -5056,4 +5115,23 @@ Status PersistentIndex::_load_by_loader(TabletLoader* loader) {
return Status::OK();
}

Status PersistentIndex::pk_dump(PrimaryKeyDump* dump, PrimaryIndexMultiLevelPB* dump_pb) {
for (const auto& l2 : _l2_vec) {
PrimaryIndexDumpPB* level = dump_pb->add_primary_index_levels();
level->set_filename(l2->filename());
RETURN_IF_ERROR(l2->pk_dump(dump, level));
}
for (const auto& l1 : _l1_vec) {
PrimaryIndexDumpPB* level = dump_pb->add_primary_index_levels();
level->set_filename(l1->filename());
RETURN_IF_ERROR(l1->pk_dump(dump, level));
}
if (_l0) {
PrimaryIndexDumpPB* level = dump_pb->add_primary_index_levels();
level->set_filename("persistent index l0");
RETURN_IF_ERROR(_l0->pk_dump(dump, level));
}
return Status::OK();
}

} // namespace starrocks
9 changes: 9 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace starrocks {
class Tablet;
class Schema;
class Column;
class PrimaryKeyDump;

class TabletLoader {
public:
Expand Down Expand Up @@ -266,6 +267,8 @@ class MutableIndex {

virtual size_t memory_usage() = 0;

virtual Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) = 0;

static StatusOr<std::unique_ptr<MutableIndex>> create(size_t key_size);

static std::tuple<size_t, size_t> estimate_nshard_and_npage(const size_t total_kv_pairs_usage);
Expand Down Expand Up @@ -390,6 +393,8 @@ class ShardByLengthMutableIndex {

static StatusOr<std::unique_ptr<ShardByLengthMutableIndex>> create(size_t key_size, const std::string& path);

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb);

private:
friend class PersistentIndex;
friend class starrocks::lake::LakeLocalPersistentIndex;
Expand Down Expand Up @@ -492,6 +497,8 @@ class ImmutableIndex {
static StatusOr<std::unique_ptr<ImmutableIndex>> load(std::unique_ptr<RandomAccessFile>&& index_rb,
bool load_bf_data);

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb);

private:
friend class PersistentIndex;
friend class starrocks::lake::LakeLocalPersistentIndex;
Expand Down Expand Up @@ -779,6 +786,8 @@ class PersistentIndex {
static void modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta);

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexMultiLevelPB* dump_pb);

protected:
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
const EditVersionWithMerge& min_l2_version);
Expand Down
48 changes: 48 additions & 0 deletions be/src/storage/primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "io/io_profiler.h"
#include "runtime/large_int_value.h"
#include "storage/chunk_helper.h"
#include "storage/primary_key_dump.h"
#include "storage/primary_key_encoder.h"
#include "storage/rowset/rowset.h"
#include "storage/rowset/rowset_options.h"
Expand Down Expand Up @@ -70,6 +71,8 @@ class HashIndex {

// just an estimate value for now
virtual std::size_t memory_usage() const = 0;

virtual Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) = 0;
};

#pragma pack(push)
Expand Down Expand Up @@ -224,6 +227,14 @@ class HashIndexImpl : public HashIndex {
std::size_t memory_usage() const final {
return _map.capacity() * (1 + (sizeof(Key) + 3) / 4 * 4 + sizeof(RowIdPack4));
}

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& kv : _map) {
RETURN_IF_ERROR(dump->add_pindex_kvs(
std::string_view(reinterpret_cast<const char*>(&kv.first), sizeof(Key)), kv.second.value, dump_pb));
}
return dump->finish_pindex_kvs(dump_pb);
}
};

template <size_t S>
Expand Down Expand Up @@ -532,6 +543,15 @@ class FixSliceHashIndex : public HashIndex {
}

std::size_t memory_usage() const final { return _map.capacity() * (1 + S * 4 + sizeof(RowIdPack4)); }

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& kv : _map) {
RETURN_IF_ERROR(dump->add_pindex_kvs(
std::string_view(reinterpret_cast<const char*>(kv.first.v), sizeof(FixSlice<S>)), kv.second.value,
dump_pb));
}
return dump->finish_pindex_kvs(dump_pb);
}
};

struct StringHasher1 {
Expand Down Expand Up @@ -669,6 +689,14 @@ class SliceHashIndex : public HashIndex {
}
return ret;
}

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& kv : _map) {
RETURN_IF_ERROR(
dump->add_pindex_kvs(std::string_view(kv.first.data(), kv.first.size()), kv.second, dump_pb));
}
return dump->finish_pindex_kvs(dump_pb);
}
};

class ShardByLengthSliceHashIndex : public HashIndex {
Expand Down Expand Up @@ -860,6 +888,15 @@ class ShardByLengthSliceHashIndex : public HashIndex {
}
return ret;
}

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& _map : _maps) {
if (_map) {
RETURN_IF_ERROR(_map->pk_dump(dump, dump_pb));
}
}
return Status::OK();
}
};

static std::unique_ptr<HashIndex> create_hash_index(LogicalType key_type, size_t fix_size) {
Expand Down Expand Up @@ -1452,4 +1489,15 @@ void PrimaryIndex::reset_cancel_major_compaction() {
}
}

Status PrimaryIndex::pk_dump(PrimaryKeyDump* dump, PrimaryIndexMultiLevelPB* dump_pb) {
if (_persistent_index != nullptr) {
RETURN_IF_ERROR(_persistent_index->pk_dump(dump, dump_pb));
} else {
PrimaryIndexDumpPB* level = dump_pb->add_primary_index_levels();
level->set_filename("memory primary index");
RETURN_IF_ERROR(_pkey_to_rssid_rowid->pk_dump(dump, level));
}
return Status::OK();
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/storage/primary_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class PrimaryIndex {

void reset_cancel_major_compaction();

Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexMultiLevelPB* dump_pb);

protected:
void _set_schema(const Schema& pk_schema);

Expand Down
Loading

0 comments on commit 64a6c83

Please sign in to comment.