Skip to content

Commit

Permalink
[Feature] support for table-level primary index expire time (StarRock…
Browse files Browse the repository at this point in the history
…s#30407)

Currently, we use config `update_cache_expire_sec` to control the expire time of primary index. This config is cluster-level. 
Now we add property `primary_index_cache_expire_sec` to table, it can control the expire time of primary index in table-level.

E.g. we can create table with property `primary_index_cache_expire_sec`
```
CREATE TABLE `demo` (
  `id` bigint(20) NOT NULL COMMENT "",
  `pass` int NOT NULL,
  `info` int NOT NULL
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"primary_index_cache_expire_sec" = "3600",
"compression" = "LZ4"
);
```

Or, we can use alter table to change this property:
```
alter table demo set ("primary_index_cache_expire_sec" = "3600");
```

Use show create table, we can make sure it work:
```
mysql> show create table demo;
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| demo  | CREATE TABLE `demo` (
  `id` bigint(20) NOT NULL COMMENT "",
  `pass` int(11) NOT NULL COMMENT "",
  `info` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"primary_index_cache_expire_sec" = "3600",
"compression" = "LZ4"
); |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
```
Property `primary_index_cache_expire_sec` must not less than 0, and `primary_index_cache_expire_sec == 0` means use config `update_cache_expire_sec` by default.

Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Sep 6, 2023
1 parent 12d0bf1 commit b795faf
Show file tree
Hide file tree
Showing 38 changed files with 623 additions and 39 deletions.
17 changes: 15 additions & 2 deletions be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ void run_update_meta_info_task(const std::shared_ptr<UpdateTabletMetaInfoAgentTa

LOG(INFO) << "get update tablet meta task, signature:" << agent_task_req->signature;

auto update_manager = StorageEngine::instance()->update_manager();
TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;

Expand Down Expand Up @@ -743,8 +744,20 @@ void run_update_meta_info_task(const std::shared_ptr<UpdateTabletMetaInfoAgentTa
// If tablet is doing apply rowset right now, remove primary index from index cache may be failed
// because the primary index is available in cache
// But it will be remove from index cache after apply is finished
auto manager = StorageEngine::instance()->update_manager();
manager->index_cache().try_remove_by_key(tablet->tablet_id());
update_manager->index_cache().try_remove_by_key(tablet->tablet_id());
break;
case TTabletMetaType::PRIMARY_INDEX_CACHE_EXPIRE_SEC:
LOG(INFO) << "update tablet:" << tablet->tablet_id()
<< " primary_index_cache_expire_sec:" << tablet_meta_info.primary_index_cache_expire_sec;
tablet->tablet_meta()->set_primary_index_cache_expire_sec(
tablet_meta_info.primary_index_cache_expire_sec);
// update index's expire right now, if pk index is alive
auto index_entry = update_manager->index_cache().get(tablet->tablet_id());
if (index_entry != nullptr) {
index_entry->update_expire_time(MonotonicMillis() +
update_manager->get_index_cache_expire_ms(*tablet));
update_manager->index_cache().release(index_entry);
}
break;
}
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {

// Property encapsulated in TabletMeta
const TabletMetaSharedPtr tablet_meta();
const TabletMetaSharedPtr tablet_meta() const;

void set_tablet_meta(const TabletMetaSharedPtr& tablet_meta) { _tablet_meta = tablet_meta; }

Expand Down Expand Up @@ -142,6 +143,10 @@ inline const TabletMetaSharedPtr BaseTablet::tablet_meta() {
return _tablet_meta;
}

inline const TabletMetaSharedPtr BaseTablet::tablet_meta() const {
return _tablet_meta;
}

inline TabletUid BaseTablet::tablet_uid() const {
return _tablet_meta->tablet_uid();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_enable_persistent_index(_tablet_meta->get_enable_persistent_index());
tablet_info->__set_primary_index_cache_expire_sec(_tablet_meta->get_primary_index_cache_expire_sec());
if (_tablet_meta->get_binlog_config() != nullptr) {
tablet_info->__set_binlog_config_version(_tablet_meta->get_binlog_config()->version);
}
Expand Down
8 changes: 6 additions & 2 deletions be/src/storage/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl
request.tablet_schema, next_unique_id,
request.__isset.enable_persistent_index && request.enable_persistent_index, col_ordinal_to_unique_id,
tablet_uid, request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
request.__isset.compression_type ? request.compression_type : TCompressionType::LZ4_FRAME);
request.__isset.compression_type ? request.compression_type : TCompressionType::LZ4_FRAME,
request.__isset.primary_index_cache_expire_sec ? request.primary_index_cache_expire_sec : 0);

if (request.__isset.binlog_config) {
BinlogConfig binlog_config;
Expand Down Expand Up @@ -87,7 +88,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
bool enable_persistent_index,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
const TabletUid& tablet_uid, TTabletType::type tabletType,
TCompressionType::type compression_type)
TCompressionType::type compression_type, int32_t primary_index_cache_expire_sec)
: _tablet_uid(0, 0) {
TabletMetaPB tablet_meta_pb;
tablet_meta_pb.set_table_id(table_id);
Expand All @@ -103,6 +104,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_MEMORY ? TabletTypePB::TABLET_TYPE_MEMORY
: TabletTypePB::TABLET_TYPE_DISK);
tablet_meta_pb.set_in_restore_mode(false);
tablet_meta_pb.set_primary_index_cache_expire_sec(primary_index_cache_expire_sec);

TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
auto st = convert_t_schema_to_pb_schema(tablet_schema, next_unique_id, col_ordinal_to_unique_id, schema,
Expand Down Expand Up @@ -292,6 +294,7 @@ void TabletMeta::init_from_pb(TabletMetaPB* ptablet_meta_pb) {
}

_enable_shortcut_compaction = tablet_meta_pb.enable_shortcut_compaction();
_primary_index_cache_expire_sec = tablet_meta_pb.primary_index_cache_expire_sec();
}

void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
Expand Down Expand Up @@ -350,6 +353,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
}

tablet_meta_pb->set_enable_shortcut_compaction(_enable_shortcut_compaction);
tablet_meta_pb->set_primary_index_cache_expire_sec(_primary_index_cache_expire_sec);
}

void TabletMeta::to_json(string* json_string, json2pb::Pb2JsonOptions& options) {
Expand Down
9 changes: 8 additions & 1 deletion be/src/storage/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class TabletMeta {
TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash, uint64_t shard_id,
const TTabletSchema& tablet_schema, uint32_t next_unique_id, bool enable_persistent_index,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, const TabletUid& tablet_uid,
TTabletType::type tabletType, TCompressionType::type compression_type);
TTabletType::type tabletType, TCompressionType::type compression_type,
int32_t primary_index_cache_expire_sec);

virtual ~TabletMeta();

Expand Down Expand Up @@ -198,6 +199,11 @@ class TabletMeta {
_enable_persistent_index = enable_persistent_index;
}

int32_t get_primary_index_cache_expire_sec() const { return _primary_index_cache_expire_sec; }
void set_primary_index_cache_expire_sec(int32_t primary_index_cache_expire_sec) {
_primary_index_cache_expire_sec = primary_index_cache_expire_sec;
}

std::shared_ptr<BinlogConfig> get_binlog_config() { return _binlog_config; }

void set_binlog_config(const BinlogConfig& new_config) {
Expand Down Expand Up @@ -232,6 +238,7 @@ class TabletMeta {
int64_t _creation_time = 0;
int64_t _cumulative_layer_point = 0;
bool _enable_persistent_index = false;
int32_t _primary_index_cache_expire_sec = 0;
TabletUid _tablet_uid;
TabletTypePB _tablet_type = TabletTypePB::TABLET_TYPE_DISK;

Expand Down
16 changes: 8 additions & 8 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v
std::lock_guard lg(_index_lock);
// 2. load primary index, using it in finalize step.
auto index_entry = manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
// empty rowset does not need to load in-memory primary index, so skip it
if (rowset->has_data_files() || _tablet.get_enable_persistent_index()) {
Expand Down Expand Up @@ -1096,7 +1096,7 @@ void TabletUpdates::_apply_normal_rowset_commit(const EditVersionInfo& version_i
std::lock_guard lg(_index_lock);
// 2. load index
auto index_entry = manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();

auto failure_handler = [&](const std::string& msg, bool remove_update_state) {
Expand Down Expand Up @@ -1856,7 +1856,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
// 1. load index
std::lock_guard lg(_index_lock);
auto index_entry = manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
auto st = index.load(&_tablet);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
Expand Down Expand Up @@ -3173,7 +3173,7 @@ Status TabletUpdates::link_from(Tablet* base_tablet, int64_t request_version, Ch
}

auto index_entry = update_manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
index.unload();
update_manager->index_cache().release(index_entry);
Expand Down Expand Up @@ -3378,7 +3378,7 @@ Status TabletUpdates::convert_from(const std::shared_ptr<Tablet>& base_tablet, i

auto update_manager = StorageEngine::instance()->update_manager();
auto index_entry = update_manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
index.unload();
update_manager->index_cache().release(index_entry);
Expand Down Expand Up @@ -3681,7 +3681,7 @@ Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, i

auto update_manager = StorageEngine::instance()->update_manager();
auto index_entry = update_manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
index.unload();
update_manager->index_cache().release(index_entry);
Expand Down Expand Up @@ -4105,7 +4105,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest
auto manager = StorageEngine::instance()->update_manager();
auto& index_cache = manager->index_cache();
auto index_entry = index_cache.get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet));
index_entry->value().unload();
index_cache.release(index_entry);

Expand Down Expand Up @@ -4406,7 +4406,7 @@ Status TabletUpdates::get_rss_rowids_by_pk_unlock(Tablet* tablet, const Column&
}
auto manager = StorageEngine::instance()->update_manager();
auto index_entry = manager->index_cache().get_or_create(tablet->tablet_id());
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(*tablet));
bool enable_persistent_index = tablet->get_enable_persistent_index();
auto& index = index_entry->value();
auto st = index.load(tablet);
Expand Down
10 changes: 9 additions & 1 deletion be/src/storage/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ Status UpdateManager::init() {
return Status::OK();
}

int64_t UpdateManager::get_index_cache_expire_ms(const Tablet& tablet) const {
const int32_t tablet_index_cache_expire_sec = tablet.tablet_meta()->get_primary_index_cache_expire_sec();
if (tablet_index_cache_expire_sec > 0) {
return tablet_index_cache_expire_sec * 1000;
}
return _cache_expire_ms;
}

Status UpdateManager::get_del_vec_in_meta(KVStore* meta, const TabletSegmentId& tsid, int64_t version,
DelVector* delvec, int64_t* latest_version) {
return TabletMetaManager::get_del_vector(meta, tsid.tablet_id, tsid.segment_id, version, delvec, latest_version);
Expand Down Expand Up @@ -525,7 +533,7 @@ Status UpdateManager::on_rowset_finished(Tablet* tablet, Rowset* rowset) {
if (st.ok()) {
auto index_entry = _index_cache.get_or_create(tablet->tablet_id());
st = index_entry->value().load(tablet);
index_entry->update_expire_time(MonotonicMillis() + _cache_expire_ms);
index_entry->update_expire_time(MonotonicMillis() + get_index_cache_expire_ms(*tablet));
_index_cache.update_object_size(index_entry, index_entry->value().memory_usage());
if (st.ok()) {
_index_cache.release(index_entry);
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class UpdateManager {

int64_t get_cache_expire_ms() const { return _cache_expire_ms; }

int64_t get_index_cache_expire_ms(const Tablet& tablet) const;

Status get_del_vec_in_meta(KVStore* meta, const TabletSegmentId& tsid, int64_t version, DelVector* delvec,
int64_t* latest_version);

Expand Down
2 changes: 1 addition & 1 deletion be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ void build_persistent_index_from_tablet(size_t N) {

auto manager = StorageEngine::instance()->update_manager();
auto index_entry = manager->index_cache().get_or_create(tablet->tablet_id());
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(*tablet));
auto& primary_index = index_entry->value();
st = primary_index.load(tablet.get());
if (!st.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_FOREIGN_KEY_CONSTRAINT) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_UNIQUE_CONSTRAINT));
properties.containsKey(PropertyAnalyzer.PROPERTIES_UNIQUE_CONSTRAINT) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));

olapTable = (OlapTable) db.getTable(tableName);
if (olapTable.isCloudNativeTable()) {
Expand All @@ -605,6 +606,9 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BUCKET_SIZE)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.BUCKET_SIZE);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ protected void runPendingJob() throws AlterCancelException {
CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId, tableId, partitionId,
shadowIdxId, shadowTabletId, shadowShortKeyColumnCount, 0, Partition.PARTITION_INIT_VERSION,
originKeysType, TStorageType.COLUMN, storageMedium, copiedShadowSchema, bfColumns, bfFpp,
countDownLatch, indexes, table.isInMemory(), table.enablePersistentIndex(),
TTabletType.TABLET_TYPE_LAKE, table.getCompressionType(), copiedSortKeyIdxes);
countDownLatch, indexes, table.isInMemory(), table.enablePersistentIndex(),
table.primaryIndexCacheExpireSec(), TTabletType.TABLET_TYPE_LAKE, table.getCompressionType(),
copiedSortKeyIdxes);

Long baseTabletId = partitionIndexTabletMap.row(partitionId).get(shadowIdxId).get(shadowTabletId);
assert baseTabletId != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getCopiedIndexes(),
tbl.isInMemory(),
tbl.enablePersistentIndex(),
tbl.primaryIndexCacheExpireSec(),
tabletType, tbl.getCompressionType(), index.getSortKeyIdxes());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
batchTask.addTask(createReplicaTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,12 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (bucketSize == olapTable.getAutomaticBucketSize()) {
return;
}
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
int primaryIndexCacheExpireSec = Integer.parseInt(properties.get(
PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
if (primaryIndexCacheExpireSec == olapTable.primaryIndexCacheExpireSec()) {
return;
}
} else {
LOG.warn("meta type: {} does not support", metaType);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ protected void runPendingJob() throws AlterCancelException {
copiedShadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
tbl.isInMemory(),
tbl.enablePersistentIndex(),
tbl.primaryIndexCacheExpireSec(),
tbl.getPartitionInfo().getTabletType(partitionId),
tbl.getCompressionType(), copiedSortKeyIdxes);
createReplicaTask.setBaseTablet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ protected void createReplicas(OlapTable localTbl, Partition restorePart) {
localTbl.getCopiedIndexes(),
localTbl.isInMemory(),
localTbl.enablePersistentIndex(),
localTbl.primaryIndexCacheExpireSec(),
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
localTbl.getCompressionType(), indexMeta.getSortKeyIdxes());
task.setInRestoreMode(true);
Expand Down
Loading

0 comments on commit b795faf

Please sign in to comment.