Skip to content

Commit

Permalink
[Enhancement] Support replication from another cluster with compactio…
Browse files Browse the repository at this point in the history
…n enabled in shared-data mode (StarRocks#54787)

Signed-off-by: xiangguangyxg <xiangguangyxg@gmail.com>
  • Loading branch information
xiangguangyxg authored Feb 7, 2025
1 parent 6085037 commit 5f4c5cb
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 84 deletions.
10 changes: 8 additions & 2 deletions be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,10 @@ void run_remote_snapshot_task(const std::shared_ptr<RemoteSnapshotAgentTaskReque
MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(GlobalEnv::GetInstance()->replication_mem_tracker());
DeferOp op([prev_tracker] { tls_thread_status.set_mem_tracker(prev_tracker); });

const TRemoteSnapshotRequest& remote_snapshot_req = agent_task_req->task_req;
TRemoteSnapshotRequest& remote_snapshot_req = agent_task_req->task_req;
if (remote_snapshot_req.data_version == 0) {
remote_snapshot_req.__set_data_version(remote_snapshot_req.visible_version);
}

// Return result to fe
TStatus task_status;
Expand Down Expand Up @@ -1046,7 +1049,10 @@ void run_replicate_snapshot_task(const std::shared_ptr<ReplicateSnapshotAgentTas
MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(GlobalEnv::GetInstance()->replication_mem_tracker());
DeferOp op([prev_tracker] { tls_thread_status.set_mem_tracker(prev_tracker); });

const TReplicateSnapshotRequest& replicate_snapshot_req = agent_task_req->task_req;
TReplicateSnapshotRequest& replicate_snapshot_req = agent_task_req->task_req;
if (replicate_snapshot_req.data_version == 0) {
replicate_snapshot_req.__set_data_version(replicate_snapshot_req.visible_version);
}

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand Down
24 changes: 15 additions & 9 deletions be/src/storage/lake/replication_txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,33 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ
LOG(INFO) << "Tablet " << request.tablet_id << " already made remote snapshot"
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", visible_version: " << request.visible_version << ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
return Status::OK();
}
}

std::vector<Version> missed_versions;
for (auto v = request.visible_version + 1; v <= request.src_visible_version; ++v) {
for (auto v = request.data_version + 1; v <= request.src_visible_version; ++v) {
missed_versions.emplace_back(v, v);
}
if (UNLIKELY(missed_versions.empty())) {
LOG(WARNING) << "Remote snapshot tablet skipped, no missing version"
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version
<< ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
return Status::Corruption("No missing version");
}

LOG(INFO) << "Start make remote snapshot, txn_id: " << request.transaction_id
<< ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", visible_version: " << request.visible_version << ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version << ", missed_versions: ["
<< (request.visible_version + 1) << " ... " << request.src_visible_version << "]";
<< (request.data_version + 1) << " ... " << request.src_visible_version << "]";

Status status;
if (request.visible_version <= 1) { // Make full snapshot
if (request.data_version <= 1) { // Make full snapshot
src_snapshot_info->incremental_snapshot = false;
status = make_remote_snapshot(request, nullptr, nullptr, &src_snapshot_info->backend,
&src_snapshot_info->snapshot_path);
Expand All @@ -107,7 +108,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ
LOG(INFO) << "Failed to make incremental snapshot: " << status << ". switch to fully snapshot"
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", visible_version: " << request.visible_version << ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
src_snapshot_info->incremental_snapshot = false;
status = make_remote_snapshot(request, nullptr, nullptr, &src_snapshot_info->backend,
Expand All @@ -118,7 +119,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ
if (!status.ok()) {
LOG(WARNING) << "Failed to make remote snapshot: " << status << ", txn_id: " << request.transaction_id
<< ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", visible_version: " << request.visible_version << ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
return status;
}
Expand All @@ -131,7 +132,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ
<< src_snapshot_info->backend.be_port << ":" << src_snapshot_info->snapshot_path
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version
<< ", snapshot_version: " << request.src_visible_version
<< ", data_version: " << request.data_version << ", snapshot_version: " << request.src_visible_version
<< ", incremental_snapshot: " << src_snapshot_info->incremental_snapshot;

auto txn_log = std::make_shared<TxnLog>();
Expand All @@ -143,6 +144,7 @@ Status ReplicationTxnManager::remote_snapshot(const TRemoteSnapshotRequest& requ
txn_meta->set_txn_state(ReplicationTxnStatePB::TXN_SNAPSHOTED);
txn_meta->set_tablet_id(request.tablet_id);
txn_meta->set_visible_version(request.visible_version);
txn_meta->set_data_version(request.data_version);
txn_meta->set_src_backend_host(src_snapshot_info->backend.host);
txn_meta->set_src_backend_port(src_snapshot_info->backend.be_port);
txn_meta->set_src_snapshot_path(src_snapshot_info->snapshot_path);
Expand Down Expand Up @@ -171,7 +173,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest
LOG(INFO) << "Tablet " << request.tablet_id << " already replicated remote snapshot"
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", visible_version: " << request.visible_version << ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
return Status::OK();
}
Expand All @@ -188,6 +190,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest
<< status << ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
continue;
}
Expand All @@ -198,6 +201,7 @@ Status ReplicationTxnManager::replicate_snapshot(const TReplicateSnapshotRequest
<< ", keys_type: " << KeysType_Name(tablet_metadata->schema().keys_type())
<< ", txn_id: " << request.transaction_id << ", tablet_id: " << request.tablet_id
<< ", src_tablet_id: " << request.src_tablet_id << ", visible_version: " << request.visible_version
<< ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;

return status;
Expand Down Expand Up @@ -344,6 +348,7 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot
LOG(WARNING) << "Transaction is aborted, txn_id: " << request.transaction_id
<< ", tablet_id: " << request.tablet_id << ", src_tablet_id: " << request.src_tablet_id
<< ", visible_version: " << request.visible_version
<< ", data_version: " << request.data_version
<< ", snapshot_version: " << request.src_visible_version;
return Status::InternalError("Transaction is aborted");
}
Expand Down Expand Up @@ -387,6 +392,7 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot
txn_meta->set_txn_state(ReplicationTxnStatePB::TXN_REPLICATED);
txn_meta->set_tablet_id(request.tablet_id);
txn_meta->set_visible_version(request.visible_version);
txn_meta->set_data_version(request.data_version);
txn_meta->set_src_backend_host(src_snapshot_info.backend.host);
txn_meta->set_src_backend_port(src_snapshot_info.backend.be_port);
txn_meta->set_src_snapshot_path(src_snapshot_info.snapshot_path);
Expand Down
65 changes: 40 additions & 25 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,23 +308,29 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
}

Status apply_replication_log(const TxnLogPB_OpReplication& op_replication, int64_t txn_id) {
if (op_replication.txn_meta().txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) {
const auto& txn_meta = op_replication.txn_meta();

if (txn_meta.txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) {
LOG(WARNING) << "Fail to apply replication log, invalid txn meta state: "
<< ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state());
return Status::Corruption("Invalid txn meta state: " +
ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state()));
<< ReplicationTxnStatePB_Name(txn_meta.txn_state());
return Status::Corruption("Invalid txn meta state: " + ReplicationTxnStatePB_Name(txn_meta.txn_state()));
}
if (op_replication.txn_meta().snapshot_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version"
<< ", snapshot version: " << op_replication.txn_meta().snapshot_version()
<< ", new version: " << _new_version;
return Status::Corruption("mismatched snapshot version and new version");

if (txn_meta.data_version() == 0) {
if (txn_meta.snapshot_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version"
<< ", snapshot version: " << txn_meta.snapshot_version()
<< ", base version: " << txn_meta.visible_version() << ", new version: " << _new_version;
return Status::Corruption("mismatched snapshot version and new version");
}
} else if (txn_meta.snapshot_version() - txn_meta.data_version() + txn_meta.visible_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched version, snapshot version: "
<< txn_meta.snapshot_version() << ", data version: " << txn_meta.data_version()
<< ", old version: " << txn_meta.visible_version() << ", new version: " << _new_version;
return Status::Corruption("mismatched version");
}

if (op_replication.txn_meta().incremental_snapshot()) {
DCHECK(_new_version - _base_version == op_replication.op_writes_size())
<< ", base_version: " << _base_version << ", new_version: " << _new_version
<< ", op_write_size: " << op_replication.op_writes_size();
if (txn_meta.incremental_snapshot()) {
for (const auto& op_write : op_replication.op_writes()) {
RETURN_IF_ERROR(apply_write_log(op_write, txn_id));
}
Expand Down Expand Up @@ -582,26 +588,35 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier {
}

Status apply_replication_log(const TxnLogPB_OpReplication& op_replication) {
if (op_replication.txn_meta().txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) {
const auto& txn_meta = op_replication.txn_meta();

if (txn_meta.txn_state() != ReplicationTxnStatePB::TXN_REPLICATED) {
LOG(WARNING) << "Fail to apply replication log, invalid txn meta state: "
<< ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state());
return Status::Corruption("Invalid txn meta state: " +
ReplicationTxnStatePB_Name(op_replication.txn_meta().txn_state()));
<< ReplicationTxnStatePB_Name(txn_meta.txn_state());
return Status::Corruption("Invalid txn meta state: " + ReplicationTxnStatePB_Name(txn_meta.txn_state()));
}
if (op_replication.txn_meta().snapshot_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version"
<< ", snapshot version: " << op_replication.txn_meta().snapshot_version()
<< ", new version: " << _new_version;
return Status::Corruption("mismatched snapshot version and new version");

if (txn_meta.data_version() == 0) {
if (txn_meta.snapshot_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched snapshot version and new version"
<< ", snapshot version: " << txn_meta.snapshot_version()
<< ", base version: " << txn_meta.visible_version() << ", new version: " << _new_version;
return Status::Corruption("mismatched snapshot version and new version");
}
} else if (txn_meta.snapshot_version() - txn_meta.data_version() + txn_meta.visible_version() != _new_version) {
LOG(WARNING) << "Fail to apply replication log, mismatched version, snapshot version: "
<< txn_meta.snapshot_version() << ", data version: " << txn_meta.data_version()
<< ", old version: " << txn_meta.visible_version() << ", new version: " << _new_version;
return Status::Corruption("mismatched version");
}

if (op_replication.txn_meta().incremental_snapshot()) {
if (txn_meta.incremental_snapshot()) {
for (const auto& op_write : op_replication.op_writes()) {
RETURN_IF_ERROR(apply_write_log(op_write));
}
LOG(INFO) << "Apply incremental replication log finish. tablet_id: " << _tablet.id()
<< ", base_version: " << _metadata->version() << ", new_version: " << _new_version
<< ", txn_id: " << op_replication.txn_meta().txn_id();
<< ", txn_id: " << txn_meta.txn_id();
} else {
auto old_rowsets = std::move(*_metadata->mutable_rowsets());
_metadata->mutable_rowsets()->Clear();
Expand All @@ -615,7 +630,7 @@ class NonPrimaryKeyTxnLogApplier : public TxnLogApplier {

LOG(INFO) << "Apply full replication log finish. tablet_id: " << _tablet.id()
<< ", base_version: " << _metadata->version() << ", new_version: " << _new_version
<< ", txn_id: " << op_replication.txn_meta().txn_id();
<< ", txn_id: " << txn_meta.txn_id();
}

if (op_replication.has_source_schema()) {
Expand Down
Loading

0 comments on commit 5f4c5cb

Please sign in to comment.