Skip to content

Commit

Permalink
[Enhancement] Introduce dataVersion, versionEpoch and versionTxnType …
Browse files Browse the repository at this point in the history
…to partition (StarRocks#46507)

Signed-off-by: xiangguangyxg <xiangguangyxg@gmail.com>
  • Loading branch information
xiangguangyxg authored Jul 26, 2024
1 parent 9058b0a commit 656a47c
Show file tree
Hide file tree
Showing 33 changed files with 532 additions and 102 deletions.
53 changes: 36 additions & 17 deletions be/src/exec/schema_scanner/schema_partitions_meta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ SchemaScanner::ColumnDesc SchemaPartitionsMetaScanner::_s_columns[] = {
{"VISIBLE_VERSION", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"VISIBLE_VERSION_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"NEXT_VERSION", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"DATA_VERSION", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"VERSION_EPOCH", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"VERSION_TXN_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
{"PARTITION_KEY", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
{"PARTITION_VALUE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
{"DISTRIBUTION_KEY", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
Expand Down Expand Up @@ -106,7 +109,7 @@ Status SchemaPartitionsMetaScanner::fill_chunk(ChunkPtr* chunk) {
const TPartitionMetaInfo& info = _partitions_meta_response.partitions_meta_infos[_partitions_meta_index];
const auto& slot_id_to_index_map = (*chunk)->get_slot_id_to_index_map();
for (const auto& [slot_id, index] : slot_id_to_index_map) {
if (slot_id < 1 || slot_id > 25) {
if (slot_id < 1 || slot_id > _column_num) {
return Status::InternalError(fmt::format("invalid slot id:{}", slot_id));
}
ColumnPtr column = (*chunk)->get_column_by_slot_id(slot_id);
Expand Down Expand Up @@ -162,40 +165,56 @@ Status SchemaPartitionsMetaScanner::fill_chunk(ChunkPtr* chunk) {
break;
}
case 9: {
// DATA_VERSION
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.data_version);
break;
}
case 10: {
// VERSION_EPOCH
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.version_epoch);
break;
}
case 11: {
// VERSION_TXN_TYPE
Slice version_txn_type = Slice(to_string(info.version_txn_type));
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&version_txn_type);
break;
}
case 12: {
// PARTITION_KEY
Slice partition_key = Slice(info.partition_key);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&partition_key);
break;
}
case 10: {
case 13: {
// PARTITION_VALUE
Slice partition_value = Slice(info.partition_value);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&partition_value);
break;
}
case 11: {
case 14: {
// DISTRIBUTION_KEY
Slice distribution_key = Slice(info.distribution_key);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&distribution_key);
break;
}
case 12: {
case 15: {
// BUCKETS
fill_column_with_slot<TYPE_INT>(column.get(), (void*)&info.buckets);
break;
}
case 13: {
case 16: {
// REPLICATION_NUM
fill_column_with_slot<TYPE_INT>(column.get(), (void*)&info.replication_num);
break;
}
case 14: {
case 17: {
// STORAGE_MEDIUM
Slice storage_medium = Slice(info.storage_medium);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&storage_medium);
break;
}
case 15: {
case 18: {
// COOLDOWN_TIME
if (info.cooldown_time > 0) {
DateTimeValue ts;
Expand All @@ -206,7 +225,7 @@ Status SchemaPartitionsMetaScanner::fill_chunk(ChunkPtr* chunk) {
}
break;
}
case 16: {
case 19: {
// LAST_CONSISTENCY_CHECK_TIME
if (info.last_consistency_check_time > 0) {
DateTimeValue ts;
Expand All @@ -217,48 +236,48 @@ Status SchemaPartitionsMetaScanner::fill_chunk(ChunkPtr* chunk) {
}
break;
}
case 17: {
case 20: {
// IS_IN_MEMORY
fill_column_with_slot<TYPE_BOOLEAN>(column.get(), (void*)&info.is_in_memory);
break;
}
case 18: {
case 21: {
// IS_TEMP
fill_column_with_slot<TYPE_BOOLEAN>(column.get(), (void*)&info.is_temp);
break;
}
case 19: {
case 22: {
// DATA_SIZE
Slice data_size = Slice(info.data_size);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&data_size);
break;
}
case 20: {
case 23: {
// ROW_COUNT
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.row_count);
break;
}
case 21: {
case 24: {
// ENABLE_DATACACHE
fill_column_with_slot<TYPE_BOOLEAN>(column.get(), (void*)&info.enable_datacache);
break;
}
case 22: {
case 25: {
// AVG_CS
fill_column_with_slot<TYPE_DOUBLE>(column.get(), (void*)&info.avg_cs);
break;
}
case 23: {
case 26: {
// P50_CS
fill_column_with_slot<TYPE_DOUBLE>(column.get(), (void*)&info.p50_cs);
break;
}
case 24: {
case 27: {
// MAX_CS
fill_column_with_slot<TYPE_DOUBLE>(column.get(), (void*)&info.max_cs);
break;
}
case 25: {
case 28: {
// STORAGE_PATH
Slice storage_path = Slice(info.storage_path);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&storage_path);
Expand Down
99 changes: 93 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.starrocks.catalog.MaterializedIndex.IndexState;
import com.starrocks.common.FeConstants;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.transaction.TransactionType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -119,13 +121,31 @@ public enum PartitionState {
private volatile long visibleVersion;
@SerializedName(value = "visibleVersionTime")
private volatile long visibleVersionTime;
@SerializedName(value = "nextVersion")
private volatile long nextVersion;

/*
* in shared-nothing mode, data version is always equals to visible version
* in shared-data mode, compactions increase visible version but not data version
*/
@SerializedName(value = "dataVersion")
private volatile long dataVersion;
@SerializedName(value = "nextDataVersion")
private volatile long nextDataVersion;

/*
* if the visible version and version epoch are unchanged, the data is unchanged
*/
@SerializedName(value = "versionEpoch")
private volatile long versionEpoch;
@SerializedName(value = "versionTxnType")
private volatile TransactionType versionTxnType;

/**
* ID of the transaction that has committed current visible version.
* Just for tracing the txn log, no need to persist.
*/
private volatile long visibleTxnId = -1;
@SerializedName(value = "nextVersion")
private volatile long nextVersion;

private volatile long lastVacuumTime = 0;

Expand All @@ -146,7 +166,11 @@ public Partition(long id, String name,
this.visibleVersion = PARTITION_INIT_VERSION;
this.visibleVersionTime = System.currentTimeMillis();
// PARTITION_INIT_VERSION == 1, so the first load version is 2 !!!
this.nextVersion = PARTITION_INIT_VERSION + 1;
this.nextVersion = this.visibleVersion + 1;
this.dataVersion = this.visibleVersion;
this.nextDataVersion = this.nextVersion;
this.versionEpoch = this.nextVersionEpoch();
this.versionTxnType = TransactionType.TXN_NORMAL;

this.distributionInfo = distributionInfo;
}
Expand All @@ -169,6 +193,10 @@ public Partition shallowCopy() {
partition.visibleVersion = this.visibleVersion;
partition.visibleVersionTime = this.visibleVersionTime;
partition.nextVersion = this.nextVersion;
partition.dataVersion = this.dataVersion;
partition.nextDataVersion = this.nextDataVersion;
partition.versionEpoch = this.versionEpoch;
partition.versionTxnType = this.versionTxnType;
partition.distributionInfo = this.distributionInfo;
partition.shardGroupId = this.shardGroupId;
partition.idToSubPartition = Maps.newHashMap(this.idToSubPartition);
Expand Down Expand Up @@ -353,6 +381,46 @@ public long getCommittedVersion() {
return this.nextVersion - 1;
}

public long getDataVersion() {
return dataVersion;
}

public void setDataVersion(long dataVersion) {
this.dataVersion = dataVersion;
}

public long getNextDataVersion() {
return nextDataVersion;
}

public void setNextDataVersion(long nextDataVersion) {
this.nextDataVersion = nextDataVersion;
}

public long getCommittedDataVersion() {
return this.nextDataVersion - 1;
}

public long getVersionEpoch() {
return versionEpoch;
}

public void setVersionEpoch(long versionEpoch) {
this.versionEpoch = versionEpoch;
}

public long nextVersionEpoch() {
return GlobalStateMgr.getCurrentState().getGtidGenerator().nextGtid();
}

public TransactionType getVersionTxnType() {
return versionTxnType;
}

public void setVersionTxnType(TransactionType versionTxnType) {
this.versionTxnType = versionTxnType;
}

public MaterializedIndex getIndex(long indexId) {
if (baseIndex.getId() == indexId) {
return baseIndex;
Expand Down Expand Up @@ -527,10 +595,16 @@ public String toString() {
}
}

buffer.append("committedVersion: ").append(visibleVersion).append("; ");
buffer.append("committedVersionHash: ").append(0).append("; ");
buffer.append("visibleVersion: ").append(visibleVersion).append("; ");
buffer.append("committedVersion: ").append(getCommittedVersion()).append("; ");
buffer.append("nextVersion: ").append(nextVersion).append("; ");

buffer.append("dataVersion: ").append(dataVersion).append("; ");
buffer.append("committedDataVersion: ").append(getCommittedDataVersion()).append("; ");

buffer.append("versionEpoch: ").append(versionEpoch).append("; ");
buffer.append("versionTxnType: ").append(versionTxnType).append("; ");

buffer.append("distribution_info.type: ").append(distributionInfo.getType().name()).append("; ");
buffer.append("distribution_info: ").append(distributionInfo.toString());

Expand All @@ -556,9 +630,22 @@ public void setMinRetainVersion(long minRetainVersion) {
public String generatePhysicalPartitionName(long physicalParitionId) {
return this.name + '_' + physicalParitionId;
}

@Override
public void gsonPostProcess() throws IOException {
if (dataVersion == 0) {
dataVersion = visibleVersion;
}
if (nextDataVersion == 0) {
nextDataVersion = nextVersion;
}
if (versionEpoch == 0) {
versionEpoch = nextVersionEpoch();
}
if (versionTxnType == null) {
versionTxnType = TransactionType.TXN_NORMAL;
}

for (PhysicalPartitionImpl subPartition : idToSubPartition.values()) {
if (subPartition.getName() == null) {
subPartition.setName(generatePhysicalPartitionName(subPartition.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.catalog;

import com.starrocks.catalog.MaterializedIndex.IndexExtState;
import com.starrocks.transaction.TransactionType;

import java.util.List;

Expand Down Expand Up @@ -55,6 +56,16 @@ public interface PhysicalPartition {
public long getNextVersion();
public void setNextVersion(long nextVersion);
public long getCommittedVersion();
public long getDataVersion();
public void setDataVersion(long dataVersion);
public long getNextDataVersion();
public void setNextDataVersion(long nextDataVersion);
public long getCommittedDataVersion();
public long getVersionEpoch();
public void setVersionEpoch(long versionEpoch);
public long nextVersionEpoch();
public TransactionType getVersionTxnType();
public void setVersionTxnType(TransactionType versionTxnType);
public long getVisibleTxnId();

// materialized index interface
Expand Down
Loading

0 comments on commit 656a47c

Please sign in to comment.