From 5f4c5cbc73e5e0688e532cade46d3f47fb9ad9c9 Mon Sep 17 00:00:00 2001 From: xiangguangyxg <110401425+xiangguangyxg@users.noreply.github.com> Date: Fri, 7 Feb 2025 10:12:50 +0800 Subject: [PATCH] [Enhancement] Support replication from another cluster with compaction enabled in shared-data mode (#54787) Signed-off-by: xiangguangyxg --- be/src/agent/agent_task.cpp | 10 ++- .../storage/lake/replication_txn_manager.cpp | 24 ++++--- be/src/storage/lake/txn_log_applier.cpp | 65 ++++++++++++------- .../lake/replication_txn_manager_test.cpp | 9 +++ .../starrocks/replication/ReplicationJob.java | 43 +++++++----- .../ReplicationTxnCommitAttachment.java | 2 +- .../starrocks/task/RemoteSnapshotTask.java | 6 +- .../starrocks/task/ReplicateSnapshotTask.java | 10 ++- .../transaction/DatabaseTransactionMgr.java | 8 +-- .../transaction/LakeTableTxnLogApplier.java | 3 +- .../transaction/OlapTableTxnLogApplier.java | 12 ++-- .../replication/ReplicationJobTest.java | 26 ++++---- .../replication/ReplicationMgrTest.java | 3 +- gensrc/proto/types.proto | 1 + gensrc/thrift/AgentService.thrift | 2 + 15 files changed, 140 insertions(+), 84 deletions(-) diff --git a/be/src/agent/agent_task.cpp b/be/src/agent/agent_task.cpp index 628601d91283e1..b7c5ad9a224683 100644 --- a/be/src/agent/agent_task.cpp +++ b/be/src/agent/agent_task.cpp @@ -999,7 +999,10 @@ void run_remote_snapshot_task(const std::shared_ptrreplication_mem_tracker()); DeferOp op([prev_tracker] { tls_thread_status.set_mem_tracker(prev_tracker); }); - const TRemoteSnapshotRequest& remote_snapshot_req = agent_task_req->task_req; + TRemoteSnapshotRequest& remote_snapshot_req = agent_task_req->task_req; + if (remote_snapshot_req.data_version == 0) { + remote_snapshot_req.__set_data_version(remote_snapshot_req.visible_version); + } // Return result to fe TStatus task_status; @@ -1046,7 +1049,10 @@ void run_replicate_snapshot_task(const std::shared_ptrreplication_mem_tracker()); DeferOp op([prev_tracker] { tls_thread_status.set_mem_tracker(prev_tracker); }); - const TReplicateSnapshotRequest& replicate_snapshot_req = agent_task_req->task_req; + TReplicateSnapshotRequest& replicate_snapshot_req = agent_task_req->task_req; + if (replicate_snapshot_req.data_version == 0) { + replicate_snapshot_req.__set_data_version(replicate_snapshot_req.visible_version); + } TStatusCode::type status_code = TStatusCode::OK; std::vector error_msgs; diff --git a/be/src/storage/lake/replication_txn_manager.cpp b/be/src/storage/lake/replication_txn_manager.cpp index f485650126b4c6..61628a3d314ed3 100644 --- a/be/src/storage/lake/replication_txn_manager.cpp +++ b/be/src/storage/lake/replication_txn_manager.cpp @@ -70,32 +70,33 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ LOG(INFO) << "Tablet " << request.tablet_id << " already made remote snapshot" << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id - << ", visible_version: " << request.visible_version + << ", visible_version: " << request.visible_version << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return Status::OK(); } } std::vector missed_versions; - for (auto v = request.visible_version + 1; v <= request.src_visible_version; ++v) { + for (auto v = request.data_version + 1; v <= request.src_visible_version; ++v) { missed_versions.emplace_back(v, v); } if (UNLIKELY(missed_versions.empty())) { LOG(WARNING) << "Remote snapshot tablet skipped, no missing version" << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version + << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return Status::Corruption("No missing version"); } LOG(INFO) << "Start make remote snapshot, txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id - << ", visible_version: " << request.visible_version + << ", visible_version: " << request.visible_version << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version << ", missed_versions: [" - << (request.visible_version + 1) << " ... " << request.src_visible_version << "]"; + << (request.data_version + 1) << " ... " << request.src_visible_version << "]"; Status status; - if (request.visible_version <= 1) { // Make full snapshot + if (request.data_version <= 1) { // Make full snapshot src_snapshot_info->incremental_snapshot = false; status = make_remote_snapshot(request, nullptr, nullptr, &src_snapshot_info->backend, &src_snapshot_info->snapshot_path); @@ -107,7 +108,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ LOG(INFO) << "Failed to make incremental snapshot: " << status << ". switch to fully snapshot" << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id - << ", visible_version: " << request.visible_version + << ", visible_version: " << request.visible_version << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; src_snapshot_info->incremental_snapshot = false; status = make_remote_snapshot(request, nullptr, nullptr, &src_snapshot_info->backend, @@ -118,7 +119,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ if (!status.ok()) { LOG(WARNING) << "Failed to make remote snapshot: " << status << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id - << ", visible_version: " << request.visible_version + << ", visible_version: " << request.visible_version << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return status; } @@ -131,7 +132,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ << src_snapshot_info->backend.be_port << ":" << src_snapshot_info->snapshot_path << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version - << ", snapshot_version: " << request.src_visible_version + << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version << ", incremental_snapshot: " << src_snapshot_info->incremental_snapshot; auto txn_log = std::make_shared(); @@ -143,6 +144,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ txn_meta->set_txn_state(ReplicationTxnStatePB::TXN_SNAPSHOTED); txn_meta->set_tablet_id(request.tablet_id); txn_meta->set_visible_version(request.visible_version); + txn_meta->set_data_version(request.data_version); txn_meta->set_src_backend_host(src_snapshot_info->backend.host); txn_meta->set_src_backend_port(src_snapshot_info->backend.be_port); txn_meta->set_src_snapshot_path(src_snapshot_info->snapshot_path); @@ -171,7 +173,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest LOG(INFO) << "Tablet " << request.tablet_id << " already replicated remote snapshot" << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id - << ", visible_version: " << request.visible_version + << ", visible_version: " << request.visible_version << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return Status::OK(); } @@ -188,6 +190,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest << status << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version + << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; continue; } @@ -198,6 +201,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest << ", keys_type: " << KeysType_Name(tablet_metadata->schema().keys_type()) << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version + << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return status; @@ -344,6 +348,7 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot LOG(WARNING) << "Transaction is aborted, txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version + << ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version; return Status::InternalError("Transaction is aborted"); } @@ -387,6 +392,7 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot txn_meta->set_txn_state(ReplicationTxnStatePB::TXN_REPLICATED); txn_meta->set_tablet_id(request.tablet_id); txn_meta->set_visible_version(request.visible_version); + txn_meta->set_data_version(request.data_version); txn_meta->set_src_backend_host(src_snapshot_info.backend.host); txn_meta->set_src_backend_port(src_snapshot_info.backend.be_port); txn_meta->set_src_snapshot_path(src_snapshot_info.snapshot_path); diff --git a/be/src/storage/lake/txn_log_applier.cpp b/be/src/storage/lake/txn_log_applier.cpp index 1502bf984f0f32..8901d84a7444b8 100644 --- a/be/src/storage/lake/txn_log_applier.cpp +++ b/be/src/storage/lake/txn_log_applier.cpp @@ -308,23 +308,29 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier { } Status apply_replication_log(const TxnLogPB_OpReplication& op_replication, int64_t txn_id) { - if (op_replication.txn_meta().txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) { + const auto& txn_meta = op_replication.txn_meta(); + + if (txn_meta.txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) { LOG(WARNING) << "Fail to apply replication log, invalid txn meta state: " - << ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state()); - return Status::Corruption("Invalid txn meta state: " + - ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state())); + << ReplicationTxnStatePB_Name(txn_meta.txn_state()); + return Status::Corruption("Invalid txn meta state: " + ReplicationTxnStatePB_Name(txn_meta.txn_state())); } - if (op_replication.txn_meta().snapshot_version() != _new_version) { - LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version" - << ", snapshot version: " << op_replication.txn_meta().snapshot_version() - << ", new version: " << _new_version; - return Status::Corruption("mismatched snapshot version and new version"); + + if (txn_meta.data_version() == 0) { + if (txn_meta.snapshot_version() != _new_version) { + LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version" + << ", snapshot version: " << txn_meta.snapshot_version() + << ", base version: " << txn_meta.visible_version() << ", new version: " << _new_version; + return Status::Corruption("mismatched snapshot version and new version"); + } + } else if (txn_meta.snapshot_version() - txn_meta.data_version() + txn_meta.visible_version() != _new_version) { + LOG(WARNING) << "Fail to apply replication log, mismatched version, snapshot version: " + << txn_meta.snapshot_version() << ", data version: " << txn_meta.data_version() + << ", old version: " << txn_meta.visible_version() << ", new version: " << _new_version; + return Status::Corruption("mismatched version"); } - if (op_replication.txn_meta().incremental_snapshot()) { - DCHECK(_new_version - _base_version == op_replication.op_writes_size()) - << ", base_version: " << _base_version << ", new_version: " << _new_version - << ", op_write_size: " << op_replication.op_writes_size(); + if (txn_meta.incremental_snapshot()) { for (const auto& op_write : op_replication.op_writes()) { RETURN_IF_ERROR(apply_write_log(op_write, txn_id)); } @@ -582,26 +588,35 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier { } Status apply_replication_log(const TxnLogPB_OpReplication& op_replication) { - if (op_replication.txn_meta().txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) { + const auto& txn_meta = op_replication.txn_meta(); + + if (txn_meta.txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) { LOG(WARNING) << "Fail to apply replication log, invalid txn meta state: " - << ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state()); - return Status::Corruption("Invalid txn meta state: " + - ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state())); + << ReplicationTxnStatePB_Name(txn_meta.txn_state()); + return Status::Corruption("Invalid txn meta state: " + ReplicationTxnStatePB_Name(txn_meta.txn_state())); } - if (op_replication.txn_meta().snapshot_version() != _new_version) { - LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version" - << ", snapshot version: " << op_replication.txn_meta().snapshot_version() - << ", new version: " << _new_version; - return Status::Corruption("mismatched snapshot version and new version"); + + if (txn_meta.data_version() == 0) { + if (txn_meta.snapshot_version() != _new_version) { + LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version" + << ", snapshot version: " << txn_meta.snapshot_version() + << ", base version: " << txn_meta.visible_version() << ", new version: " << _new_version; + return Status::Corruption("mismatched snapshot version and new version"); + } + } else if (txn_meta.snapshot_version() - txn_meta.data_version() + txn_meta.visible_version() != _new_version) { + LOG(WARNING) << "Fail to apply replication log, mismatched version, snapshot version: " + << txn_meta.snapshot_version() << ", data version: " << txn_meta.data_version() + << ", old version: " << txn_meta.visible_version() << ", new version: " << _new_version; + return Status::Corruption("mismatched version"); } - if (op_replication.txn_meta().incremental_snapshot()) { + if (txn_meta.incremental_snapshot()) { for (const auto& op_write : op_replication.op_writes()) { RETURN_IF_ERROR(apply_write_log(op_write)); } LOG(INFO) << "Apply incremental replication log finish. tablet_id: " << _tablet.id() << ", base_version: " << _metadata->version() << ", new_version: " << _new_version - << ", txn_id: " << op_replication.txn_meta().txn_id(); + << ", txn_id: " << txn_meta.txn_id(); } else { auto old_rowsets = std::move(*_metadata->mutable_rowsets()); _metadata->mutable_rowsets()->Clear(); @@ -615,7 +630,7 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier { LOG(INFO) << "Apply full replication log finish. tablet_id: " << _tablet.id() << ", base_version: " << _metadata->version() << ", new_version: " << _new_version - << ", txn_id: " << op_replication.txn_meta().txn_id(); + << ", txn_id: " << txn_meta.txn_id(); } if (op_replication.has_source_schema()) { diff --git a/be/test/storage/lake/replication_txn_manager_test.cpp b/be/test/storage/lake/replication_txn_manager_test.cpp index e46806051a275b..fcec0b534531d6 100644 --- a/be/test/storage/lake/replication_txn_manager_test.cpp +++ b/be/test/storage/lake/replication_txn_manager_test.cpp @@ -233,6 +233,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_remote_snapshot_no_missing_versions) remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -254,6 +255,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_remote_snapshot_no_versions) { remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_DISK); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -275,6 +277,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_replicate_snapshot_failed) { remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -297,6 +300,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_replicate_snapshot_failed) { replicate_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); replicate_snapshot_request.__set_schema_hash(_schema_hash); replicate_snapshot_request.__set_visible_version(_version); + replicate_snapshot_request.__set_data_version(_version); replicate_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); replicate_snapshot_request.__set_src_tablet_id(_src_tablet_id); replicate_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -323,6 +327,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_publish_failed) { remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -355,6 +360,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_run_normal) { remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -374,6 +380,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_run_normal) { replicate_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); replicate_snapshot_request.__set_schema_hash(_schema_hash); replicate_snapshot_request.__set_visible_version(_version); + replicate_snapshot_request.__set_data_version(_version); replicate_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); replicate_snapshot_request.__set_src_tablet_id(_src_tablet_id); replicate_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -421,6 +428,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_run_normal_encrypted) { remote_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); remote_snapshot_request.__set_schema_hash(_schema_hash); remote_snapshot_request.__set_visible_version(_version); + remote_snapshot_request.__set_data_version(_version); remote_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); remote_snapshot_request.__set_src_tablet_id(_src_tablet_id); remote_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); @@ -440,6 +448,7 @@ TEST_P(LakeReplicationTxnManagerTest, test_run_normal_encrypted) { replicate_snapshot_request.__set_tablet_type(TTabletType::TABLET_TYPE_LAKE); replicate_snapshot_request.__set_schema_hash(_schema_hash); replicate_snapshot_request.__set_visible_version(_version); + replicate_snapshot_request.__set_data_version(_version); replicate_snapshot_request.__set_src_token(ExecEnv::GetInstance()->token()); replicate_snapshot_request.__set_src_tablet_id(_src_tablet_id); replicate_snapshot_request.__set_src_tablet_type(TTabletType::TABLET_TYPE_DISK); diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java index f1ac77bffcb0ee..916e83ef2f7256 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java @@ -147,6 +147,9 @@ private static class PartitionInfo { @SerializedName(value = "version") private final long version; + @SerializedName(value = "dataVersion") + private final long dataVersion; + @SerializedName(value = "srcVersion") private final long srcVersion; @@ -156,10 +159,11 @@ private static class PartitionInfo { @SerializedName(value = "indexInfos") private final Map indexInfos; - public PartitionInfo(long partitionId, long version, long srcVersion, long srcVersionEpoch, - Map indexInfos) { + public PartitionInfo(long partitionId, long version, long dataVersion, + long srcVersion, long srcVersionEpoch, Map indexInfos) { this.partitionId = partitionId; this.version = version; + this.dataVersion = dataVersion; this.srcVersion = srcVersion; this.srcVersionEpoch = srcVersionEpoch; this.indexInfos = indexInfos; @@ -173,6 +177,10 @@ public long getVersion() { return version; } + public long getDataVersion() { + return dataVersion; + } + public long getSrcVersion() { return srcVersion; } @@ -653,13 +661,13 @@ private static TableInfo initTableInfo(TTableReplicationRequest request) throws throw new MetaNotFoundException("Partition " + tPartitionInfo.partition_id + " in table " + table.getName() + " in database " + db.getFullName() + " not found"); } - Preconditions.checkState(partition.getCommittedVersion() == partition.getVisibleVersion(), + Preconditions.checkState(partition.getCommittedDataVersion() == partition.getDataVersion(), "Partition " + tPartitionInfo.partition_id + " in table " + table.getName() + " in database " + db.getFullName() + " publish version not finished"); - Preconditions.checkState(partition.getVisibleVersion() <= tPartitionInfo.src_version, - "Target visible version: " + partition.getVisibleVersion() - + " is larger than source visible version: " + tPartitionInfo.src_version); - if (partition.getVisibleVersion() == tPartitionInfo.src_version) { + Preconditions.checkState(partition.getDataVersion() <= tPartitionInfo.src_version, + "Target data version: " + partition.getDataVersion() + + " is larger than source data version: " + tPartitionInfo.src_version); + if (partition.getDataVersion() == tPartitionInfo.src_version) { continue; } PartitionInfo partitionInfo = initPartitionInfo(olapTable, tPartitionInfo, partition); @@ -708,7 +716,7 @@ private static PartitionInfo initPartitionInfo(OlapTable olapTable, TPartitionRe IndexInfo indexInfo = initIndexInfo(olapTable, tIndexInfo, index); indexInfos.put(indexInfo.getIndexId(), indexInfo); } - return new PartitionInfo(tPartitionInfo.partition_id, partition.getVisibleVersion(), + return new PartitionInfo(tPartitionInfo.partition_id, partition.getVisibleVersion(), partition.getDataVersion(), tPartitionInfo.src_version, tPartitionInfo.src_version_epoch, indexInfos); } @@ -765,13 +773,13 @@ private static Map initPartitionInfos(OlapTable table, Olap Map partitionInfos = Maps.newHashMap(); for (PhysicalPartition physicalPartition : table.getPhysicalPartitions()) { PhysicalPartition srcPartition = srcTable.getPhysicalPartition(physicalPartition.getName()); - Preconditions.checkState(physicalPartition.getCommittedVersion() == physicalPartition.getVisibleVersion(), + Preconditions.checkState(physicalPartition.getCommittedDataVersion() == physicalPartition.getDataVersion(), "Partition " + physicalPartition.getName() + " in table " + table.getName() + " publish version not finished"); - Preconditions.checkState(physicalPartition.getVisibleVersion() <= srcPartition.getVisibleVersion(), - "Target visible version: " + physicalPartition.getVisibleVersion() - + " is larger than source visible version: " + srcPartition.getVisibleVersion()); - if (physicalPartition.getVisibleVersion() == srcPartition.getVisibleVersion()) { + Preconditions.checkState(physicalPartition.getDataVersion() <= srcPartition.getDataVersion(), + "Target data version: " + physicalPartition.getDataVersion() + + " is larger than source data version: " + srcPartition.getDataVersion()); + if (physicalPartition.getDataVersion() == srcPartition.getDataVersion()) { continue; } PartitionInfo partitionInfo = initPartitionInfo(table, srcTable, physicalPartition, srcPartition, @@ -792,8 +800,8 @@ private static PartitionInfo initPartitionInfo(OlapTable table, OlapTable srcTab IndexInfo indexInfo = initIndexInfo(table, srcTable, index, srcIndex, srcSystemInfoService); indexInfos.put(indexInfo.getIndexId(), indexInfo); } - return new PartitionInfo(partition.getId(), partition.getVisibleVersion(), srcPartition.getVisibleVersion(), - srcPartition.getVersionEpoch(), indexInfos); + return new PartitionInfo(partition.getId(), partition.getVisibleVersion(), partition.getDataVersion(), + srcPartition.getDataVersion(), srcPartition.getVersionEpoch(), indexInfos); } private static IndexInfo initIndexInfo(OlapTable table, OlapTable srcTable, MaterializedIndex index, @@ -934,7 +942,8 @@ private void sendRemoteSnapshotTasks() { tableId, partitionInfo.getPartitionId(), indexInfo.getIndexId(), tabletInfo.getTabletId(), getTabletType(tableType), transactionId, indexInfo.getSchemaHash(), partitionInfo.getVersion(), - srcToken, tabletInfo.getSrcTabletId(), getTabletType(srcTableType), + partitionInfo.getDataVersion(), srcToken, + tabletInfo.getSrcTabletId(), getTabletType(srcTableType), indexInfo.getSrcSchemaHash(), partitionInfo.getSrcVersion(), replicaInfo.getSrcBackends(), Config.replication_transaction_timeout_sec); @@ -975,7 +984,7 @@ private void sendReplicateSnapshotTasks() throws Exception { ReplicateSnapshotTask task = new ReplicateSnapshotTask(replicaInfo.getBackendId(), databaseId, tableId, partitionInfo.getPartitionId(), indexInfo.getIndexId(), tabletInfo.getTabletId(), getTabletType(tableType), transactionId, - indexInfo.getSchemaHash(), partitionInfo.getVersion(), + indexInfo.getSchemaHash(), partitionInfo.getVersion(), partitionInfo.getDataVersion(), srcToken, tabletInfo.getSrcTabletId(), getTabletType(srcTableType), indexInfo.getSrcSchemaHash(), partitionInfo.getSrcVersion(), flippedSrcSnapshotInfos, encryptionMeta); diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationTxnCommitAttachment.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationTxnCommitAttachment.java index c1543c10e96e26..a47f765b8b8e12 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationTxnCommitAttachment.java @@ -30,7 +30,7 @@ */ public class ReplicationTxnCommitAttachment extends TxnCommitAttachment { @SerializedName("partitionVersions") - private Map partitionVersions; // The version of partitions + private Map partitionVersions; // The data version of partitions, not the visible version @SerializedName("partitionVersionEpochs") private Map partitionVersionEpochs; // The version epoch of partitions diff --git a/fe/fe-core/src/main/java/com/starrocks/task/RemoteSnapshotTask.java b/fe/fe-core/src/main/java/com/starrocks/task/RemoteSnapshotTask.java index 360bc67ba9e8eb..72cb66c5b5fc7f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/RemoteSnapshotTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/RemoteSnapshotTask.java @@ -27,6 +27,7 @@ public class RemoteSnapshotTask extends AgentTask { private final TTabletType tabletType; private final int schemaHash; private final long visibleVersion; + private final long dataVersion; private final String srcToken; private final long srcTabletId; @@ -38,7 +39,7 @@ public class RemoteSnapshotTask extends AgentTask { private final int timeoutSec; public RemoteSnapshotTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - TTabletType tabletType, long transactionId, int schemaHash, long visibleVersion, + TTabletType tabletType, long transactionId, int schemaHash, long visibleVersion, long dataVersion, String srcToken, long srcTabletId, TTabletType srcTabletType, int srcSchemaHash, long srcVisibleVersion, List srcBackends, int timeoutSec) { super(null, backendId, TTaskType.REMOTE_SNAPSHOT, dbId, tableId, partitionId, indexId, tabletId, tabletId, @@ -47,6 +48,7 @@ public RemoteSnapshotTask(long backendId, long dbId, long tableId, long partitio this.tabletType = tabletType; this.schemaHash = schemaHash; this.visibleVersion = visibleVersion; + this.dataVersion = dataVersion; this.srcToken = srcToken; this.srcTabletId = srcTabletId; this.srcTabletType = srcTabletType; @@ -66,6 +68,7 @@ public TRemoteSnapshotRequest toThrift() { request.setTablet_type(tabletType); request.setSchema_hash(schemaHash); request.setVisible_version(visibleVersion); + request.setData_version(dataVersion); request.setSrc_token(srcToken); request.setSrc_tablet_id(srcTabletId); @@ -85,6 +88,7 @@ public String toString() { sb.append(", tablet id: ").append(tabletId).append(", tablet type: ").append(tabletType); sb.append(", schema hash: ").append(schemaHash); sb.append(", visible version: ").append(visibleVersion); + sb.append(", data version: ").append(dataVersion); sb.append(", src token: ").append(srcToken).append(", src tablet id: ").append(srcTabletId); sb.append(", src tablet type:").append(srcTabletType).append(", src schema hash: ").append(srcSchemaHash); sb.append(", src visible version: ").append(srcVisibleVersion); diff --git a/fe/fe-core/src/main/java/com/starrocks/task/ReplicateSnapshotTask.java b/fe/fe-core/src/main/java/com/starrocks/task/ReplicateSnapshotTask.java index ca4c26553a04a2..d77744c30c23a5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/ReplicateSnapshotTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/ReplicateSnapshotTask.java @@ -27,6 +27,7 @@ public class ReplicateSnapshotTask extends AgentTask { private final TTabletType tabletType; private final int schemaHash; private final long visibleVersion; + private final long dataVersion; private final String srcToken; private final long srcTabletId; @@ -37,15 +38,16 @@ public class ReplicateSnapshotTask extends AgentTask { private final byte[] encryptionMeta; public ReplicateSnapshotTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - TTabletType tabletType, long transactionId, int schemaHash, long visibleVersion, String srcToken, - long srcTabletId, TTabletType srcTabletType, int srcSchemaHash, long srcVisibleVersion, - List srcSnapshotInfos, byte[] encryptionMeta) { + TTabletType tabletType, long transactionId, int schemaHash, long visibleVersion, long dataVersion, + String srcToken, long srcTabletId, TTabletType srcTabletType, int srcSchemaHash, + long srcVisibleVersion, List srcSnapshotInfos, byte[] encryptionMeta) { super(null, backendId, TTaskType.REPLICATE_SNAPSHOT, dbId, tableId, partitionId, indexId, tabletId, tabletId, System.currentTimeMillis()); this.transactionId = transactionId; this.tabletType = tabletType; this.schemaHash = schemaHash; this.visibleVersion = visibleVersion; + this.dataVersion = dataVersion; this.srcToken = srcToken; this.srcTabletId = srcTabletId; this.srcTabletType = srcTabletType; @@ -65,6 +67,7 @@ public TReplicateSnapshotRequest toThrift() { request.setTablet_type(tabletType); request.setSchema_hash(schemaHash); request.setVisible_version(visibleVersion); + request.setData_version(dataVersion); request.setSrc_token(srcToken); request.setSrc_tablet_id(srcTabletId); @@ -84,6 +87,7 @@ public String toString() { sb.append(", tablet id: ").append(tabletId).append(", tablet type: ").append(tabletType); sb.append(", schema hash: ").append(schemaHash); sb.append(", visible version: ").append(visibleVersion); + sb.append(", data version: ").append(dataVersion); sb.append(", src token: ").append(srcToken).append(", src tablet id: ").append(srcTabletId); sb.append(", src tablet type:").append(srcTabletType).append(", src schema hash: ").append(srcSchemaHash); sb.append(", src visible version: ").append(srcVisibleVersion); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index 16c1faf01d6cd3..a128d187684b51 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -1347,10 +1347,10 @@ protected void unprotectedCommitPreparedTransaction(TransactionState transaction (ReplicationTxnCommitAttachment) transactionState .getTxnCommitAttachment(); Map partitionVersions = replicationTxnAttachment.getPartitionVersions(); - long newVersion = partitionVersions.get(partitionCommitInfo.getPhysicalPartitionId()); - long versionDiff = newVersion - partition.getVisibleVersion(); - partitionCommitInfo.setVersion(newVersion); - partitionCommitInfo.setDataVersion(partition.getDataVersion() + versionDiff); + long newDataVersion = partitionVersions.get(partitionCommitInfo.getPhysicalPartitionId()); + long dataVersionDiff = newDataVersion - partition.getDataVersion(); + partitionCommitInfo.setVersion(partition.getCommittedVersion() + dataVersionDiff); + partitionCommitInfo.setDataVersion(newDataVersion); Map partitionVersionEpochs = replicationTxnAttachment.getPartitionVersionEpochs(); if (partitionVersionEpochs != null) { long newVersionEpoch = partitionVersionEpochs.get(partitionCommitInfo.getPhysicalPartitionId()); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java b/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java index 180621d29bcba8..9af53319e83fbe 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java @@ -50,9 +50,8 @@ public void applyCommitLog(TransactionState txnState, TableCommitInfo commitInfo // The version of a replication transaction may not continuously if (txnState.getSourceType() == TransactionState.LoadJobSourceType.REPLICATION) { - long versionDiff = partitionCommitInfo.getVersion() - partition.getNextVersion(); partition.setNextVersion(partitionCommitInfo.getVersion() + 1); - partition.setNextDataVersion(partition.getNextDataVersion() + versionDiff + 1); + partition.setNextDataVersion(partitionCommitInfo.getDataVersion() + 1); } else { partition.setNextVersion(partition.getNextVersion() + 1); if (txnState.getSourceType() != TransactionState.LoadJobSourceType.LAKE_COMPACTION) { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java index 2d5c211c4e5525..0fbde9af8ebc13 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java @@ -65,23 +65,21 @@ public void applyCommitLog(TransactionState txnState, TableCommitInfo commitInfo } // The version of a replication transaction may not continuously if (txnState.getSourceType() == TransactionState.LoadJobSourceType.REPLICATION) { - long versionDiff = partitionCommitInfo.getVersion() - partition.getNextVersion(); partition.setNextVersion(partitionCommitInfo.getVersion() + 1); - partition.setNextDataVersion(partition.getNextDataVersion() + versionDiff + 1); } else if (txnState.isVersionOverwrite()) { // overwrite empty partition, it's next version will less than overwrite version // otherwise, it's next version will not change if (partitionCommitInfo.getVersion() + 1 > partition.getNextVersion()) { partition.setNextVersion(partitionCommitInfo.getVersion() + 1); - partition.setNextDataVersion(partition.getNextVersion()); } } else if (partitionCommitInfo.isDoubleWrite()) { partition.setNextVersion(partitionCommitInfo.getVersion() + 1); - partition.setNextDataVersion(partition.getNextVersion()); } else { partition.setNextVersion(partition.getNextVersion() + 1); - partition.setNextDataVersion(partition.getNextDataVersion() + 1); } + // data version == visible version in shared-nothing mode + partition.setNextDataVersion(partition.getNextVersion()); + LOG.debug("partition[{}] next version[{}]", partitionId, partition.getNextVersion()); } } @@ -180,7 +178,7 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf if (txnState.isVersionOverwrite()) { if (partition.getVisibleVersion() < version) { partition.updateVisibleVersion(version, versionTime, txnState.getTransactionId()); - partition.setDataVersion(partitionCommitInfo.getDataVersion()); + partition.setDataVersion(version); // data version == visible version in shared-nothing mode if (partitionCommitInfo.getVersionEpoch() > 0) { partition.setVersionEpoch(partitionCommitInfo.getVersionEpoch()); } @@ -188,7 +186,7 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf } } else { partition.updateVisibleVersion(version, versionTime, txnState.getTransactionId()); - partition.setDataVersion(partitionCommitInfo.getDataVersion()); + partition.setDataVersion(version); // data version == visible version in shared-nothing mode if (partitionCommitInfo.getVersionEpoch() > 0) { partition.setVersionEpoch(partitionCommitInfo.getVersionEpoch()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationJobTest.java b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationJobTest.java index 1237da40506a5f..fd0e064cfa4c76 100644 --- a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationJobTest.java @@ -23,6 +23,7 @@ import com.starrocks.common.jmockit.Deencapsulation; import com.starrocks.leader.LeaderImpl; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; import com.starrocks.sql.analyzer.AnalyzeTestUtil; import com.starrocks.sql.ast.CreateTableStmt; import com.starrocks.system.Backend; @@ -68,7 +69,7 @@ public class ReplicationJobTest { @BeforeClass public static void beforeClass() throws Exception { - UtFrameUtils.createMinStarRocksCluster(); + UtFrameUtils.createMinStarRocksCluster(RunMode.SHARED_NOTHING); AnalyzeTestUtil.init(); starRocksAssert = new StarRocksAssert(AnalyzeTestUtil.getConnectContext()); starRocksAssert.withDatabase("test").useDatabase("test"); @@ -76,13 +77,13 @@ public static void beforeClass() throws Exception { db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test"); String sql = "create table single_partition_duplicate_key (key1 int, key2 varchar(10))\n" + - "distributed by hash(key1) buckets 1\n" + - "properties('replication_num' = '1'); "; + "distributed by hash(key1) buckets 1\n" + + "properties('replication_num' = '1'); "; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseStmtWithNewParser(sql, - AnalyzeTestUtil.getConnectContext()); + AnalyzeTestUtil.getConnectContext()); StarRocksAssert.utCreateTableWithRetry(createTableStmt); table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore() - .getTable(db.getFullName(), "single_partition_duplicate_key"); + .getTable(db.getFullName(), "single_partition_duplicate_key"); srcTable = DeepCopy.copyWithGson(table, OlapTable.class); partition = table.getPartitions().iterator().next(); @@ -106,16 +107,16 @@ public void setUp() throws Exception { srcPartition.getDefaultPhysicalPartition().setNextDataVersion(99); job = new ReplicationJob(null, "test_token", db.getId(), table, srcTable, - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); } @Test public void testJobId() { ReplicationJob jobWithoutId = new ReplicationJob(null, "test_token", db.getId(), table, srcTable, - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); Assert.assertFalse(jobWithoutId.getJobId().isEmpty()); ReplicationJob jobWithId = new ReplicationJob("fake_id", "test_token", db.getId(), table, srcTable, - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo()); Assert.assertEquals("fake_id", jobWithId.getJobId()); } @@ -141,7 +142,7 @@ public void testNormal() throws Exception { job.finishRemoteSnapshotTask((RemoteSnapshotTask) task, request); Deencapsulation.invoke(new LeaderImpl(), "finishRemoteSnapshotTask", - (RemoteSnapshotTask) task, request); + (RemoteSnapshotTask) task, request); ((RemoteSnapshotTask) task).toThrift(); task.toString(); } @@ -155,7 +156,7 @@ public void testNormal() throws Exception { job.finishReplicateSnapshotTask((ReplicateSnapshotTask) task, request); Deencapsulation.invoke(new LeaderImpl(), "finishReplicateSnapshotTask", - (ReplicateSnapshotTask) task, request); + (ReplicateSnapshotTask) task, request); ((ReplicateSnapshotTask) task).toThrift(); task.toString(); } @@ -165,8 +166,9 @@ public void testNormal() throws Exception { Assert.assertEquals(partition.getDefaultPhysicalPartition().getCommittedVersion(), srcPartition.getDefaultPhysicalPartition().getVisibleVersion()); + // data version == visible version in shared-nothing mode Assert.assertEquals(partition.getDefaultPhysicalPartition().getCommittedDataVersion(), - srcPartition.getDefaultPhysicalPartition().getDataVersion()); + srcPartition.getDefaultPhysicalPartition().getVisibleVersion()); } @Test @@ -349,7 +351,7 @@ public void testInitializedByThrift() { tabletInfo.replica_replication_infos = new ArrayList(); TReplicaReplicationInfo replicaInfo = new TReplicaReplicationInfo(); Backend backend = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackends().iterator() - .next(); + .next(); replicaInfo.src_backend = new TBackend(backend.getHost(), backend.getBePort(), backend.getHttpPort()); tabletInfo.replica_replication_infos.add(replicaInfo); } diff --git a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java index 3b406bb61b7707..a2f012312e47c2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java @@ -25,6 +25,7 @@ import com.starrocks.common.proc.ReplicationsProcNode; import com.starrocks.leader.LeaderImpl; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; import com.starrocks.sql.analyzer.AnalyzeTestUtil; import com.starrocks.sql.ast.CreateTableStmt; import com.starrocks.system.Backend; @@ -71,7 +72,7 @@ public class ReplicationMgrTest { @BeforeClass public static void beforeClass() throws Exception { - UtFrameUtils.createMinStarRocksCluster(); + UtFrameUtils.createMinStarRocksCluster(RunMode.SHARED_DATA); AnalyzeTestUtil.init(); starRocksAssert = new StarRocksAssert(AnalyzeTestUtil.getConnectContext()); starRocksAssert.withDatabase("test").useDatabase("test"); diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index 0d3305ec993374..f074a966251121 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -134,6 +134,7 @@ enum IndexType { optional string src_snapshot_path = 7; optional int64 snapshot_version = 8; optional bool incremental_snapshot = 9; + optional int64 data_version = 10; } // Used to store additional information about a txn when it is finished/visible diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 24d878a0289dd1..360ad1dd9c7a4c 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -392,6 +392,7 @@ struct TRemoteSnapshotRequest { 12: optional Types.TVersion src_visible_version 13: optional list src_backends 14: optional i32 timeout_sec + 15: optional Types.TVersion data_version } struct TReplicateSnapshotRequest { @@ -409,6 +410,7 @@ struct TRemoteSnapshotRequest { 12: optional Types.TVersion src_visible_version 13: optional list src_snapshot_infos 14: optional binary encryption_meta + 15: optional Types.TVersion data_version } enum TTabletMetaType {