Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ ydb/core/transfer/ut/large TransferLarge.Transfer100KM_10P_RowTable_TopicAutoPar
ydb/core/transfer/ut/large TransferLarge.Transfer1KM_1KP_RowTable_TopicAutoPartitioning
ydb/core/tx/conveyor_composite/ut CompositeConveyorTests.TestUniformDistribution
ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.ComplexRestoreBackupCollection+WithIncremental
ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.E2EBackupCollection
ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.MultiRestore
ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.SimpleRestoreBackupCollection+WithIncremental
ydb/core/tx/schemeshard/ut_background_cleaning TSchemeshardBackgroundCleaningTest.SchemeshardBackgroundCleaningTestCreateCleanManyTables
Expand Down
27 changes: 14 additions & 13 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -499,28 +499,28 @@ enum EPercentileCounters {
Ranges: { Value: 1000000 Name: "1000000" },
Ranges: { Value: 10000000 Name: "10000000" },
Ranges: { Value: 100000000 Name: "100000000" },
Ranges: { Value: 1000000000 Name: "1000000000" },
Ranges: { Value: 1000000000 Name: "1000000000" }
}];

COUNTER_STATS_BATCH_LATENCY = 5 [(CounterOpts) = {
Name: "StatsBatchLatency",
Ranges: { Value: 1000 Name: "1 ms" }
Ranges: { Value: 10000 Name: "10 ms" }
Ranges: { Value: 50000 Name: "50 ms" }
Ranges: { Value: 100000 Name: "100 ms" }
Ranges: { Value: 200000 Name: "200 ms" }
Ranges: { Value: 500000 Name: "500 ms" }
Ranges: { Value: 1000 Name: "1 ms" },
Ranges: { Value: 10000 Name: "10 ms" },
Ranges: { Value: 50000 Name: "50 ms" },
Ranges: { Value: 100000 Name: "100 ms" },
Ranges: { Value: 200000 Name: "200 ms" },
Ranges: { Value: 500000 Name: "500 ms" },
Ranges: { Value: 1000000 Name: "1000 ms" }
}];

COUNTER_PQ_STATS_BATCH_LATENCY = 6 [(CounterOpts) = {
Name: "PQStatsBatchLatency",
Ranges: { Value: 1000 Name: "1 ms" }
Ranges: { Value: 10000 Name: "10 ms" }
Ranges: { Value: 50000 Name: "50 ms" }
Ranges: { Value: 100000 Name: "100 ms" }
Ranges: { Value: 200000 Name: "200 ms" }
Ranges: { Value: 500000 Name: "500 ms" }
Ranges: { Value: 1000 Name: "1 ms" },
Ranges: { Value: 10000 Name: "10 ms" },
Ranges: { Value: 50000 Name: "50 ms" },
Ranges: { Value: 100000 Name: "100 ms" },
Ranges: { Value: 200000 Name: "200 ms" },
Ranges: { Value: 500000 Name: "500 ms" },
Ranges: { Value: 1000000 Name: "1000 ms" }
}];
}
Expand Down Expand Up @@ -655,4 +655,5 @@ enum ETxTypes {
TXTYPE_LOGIN_FINALIZE = 100 [(TxTypeOpts) = {Name: "TxLoginFinalize"}];

TXTYPE_PROGRESS_INCREMENTAL_RESTORE = 101 [(TxTypeOpts) = {Name: "TxProgressIncrementalRestore"}];
TXTYPE_INCREMENTAL_RESTORE_SHARD_RESPONSE = 102 [(TxTypeOpts) = {Name: "TxIncrementalRestoreShardResponse"}];
}
34 changes: 34 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,40 @@ message TEvRecomputeKMeansResponse {
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
}

// Incremental restore messages
message TEvIncrementalRestoreRequest {
optional uint64 TxId = 1;
optional uint64 TableId = 2;
optional uint64 OperationId = 3;
optional uint32 IncrementalIdx = 4;
optional bytes StartKey = 5;
optional bytes EndKey = 6;
optional string BackupPath = 7;
optional uint64 RestoreTimestamp = 8;
optional uint64 ShardIdx = 9;
optional uint64 BackupCollectionPathId = 10;
optional uint64 SourcePathId = 11;
}

message TEvIncrementalRestoreResponse {
enum Status {
SUCCESS = 0;
RETRY = 1;
ERROR = 2;
}

optional uint64 TxId = 1;
optional uint64 TableId = 2;
optional uint64 OperationId = 3;
optional uint32 IncrementalIdx = 4;
optional Status RestoreStatus = 5;
optional string ErrorMessage = 6;
optional uint64 ProcessedRows = 7;
optional uint64 ProcessedBytes = 8;
optional bytes LastProcessedKey = 9;
optional uint64 ShardIdx = 10;
}

message TEvPrefixKMeansRequest {
optional uint64 Id = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {
DataShard.GetUserTables().at(tableId),
dstTablePathId,
txId,
DataShard.GetCurrentSchemeShardId(), // Pass SchemeShard TabletID
{});
}

Expand Down Expand Up @@ -252,7 +253,13 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {


void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr& ev, TOperation::TPtr op, const TActorContext& ctx) {
Y_UNUSED(ev, op, ctx);
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"IncrementalRestoreScan finished for txId: " << ev->Get()->TxId
<< " at DataShard: " << DataShard.TabletID());

// Additional completion handling can be added here if needed
// (e.g., updating operation status, sending additional notifications)

ResetWaiting(op);
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "datashard_impl.h"
#include "datashard_txs.h"
#include "datashard_locks_db.h"
#include "datashard_incremental_restore.h"
#include "memory_state_migration.h"
#include "probes.h"

Expand Down Expand Up @@ -4415,6 +4416,17 @@ void TDataShard::Handle(TEvDataShard::TEvCancelRestore::TPtr& ev, const TActorCo
}
}

void TDataShard::Handle(TEvDataShard::TEvIncrementalRestoreRequest::TPtr& ev, const TActorContext& ctx)
{
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"Handle TEvIncrementalRestoreRequest at tablet " << TabletID()
<< " operationId: " << ev->Get()->Record.GetOperationId()
<< " tableId: " << ev->Get()->Record.GetTableId()
<< " shardIdx: " << ev->Get()->Record.GetShardIdx());

Execute(new TTxIncrementalRestore(this, ev), ctx);
}

void TDataShard::Handle(TEvDataShard::TEvGetS3Upload::TPtr& ev, const TActorContext& ctx)
{
Execute(new TTxGetS3Upload(this, ev), ctx);
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ namespace TEvDataShard {
EvRecomputeKMeansRequest,
EvRecomputeKMeansResponse,

// Incremental restore events
EvIncrementalRestoreRequest,
EvIncrementalRestoreResponse,

EvEnd
};

Expand Down Expand Up @@ -1548,6 +1552,74 @@ namespace TEvDataShard {
TEvDataShard::EvPrefixKMeansResponse> {
};

// Incremental restore event classes
struct TEvIncrementalRestoreRequest
: public TEventPB<TEvIncrementalRestoreRequest,
NKikimrTxDataShard::TEvIncrementalRestoreRequest,
TEvDataShard::EvIncrementalRestoreRequest> {
TEvIncrementalRestoreRequest() = default;

TEvIncrementalRestoreRequest(ui64 txId, ui64 tableId, ui64 operationId, ui32 incrementalIdx,
const TString& backupPath, ui64 restoreTimestamp) {
Record.SetTxId(txId);
Record.SetTableId(tableId);
Record.SetOperationId(operationId);
Record.SetIncrementalIdx(incrementalIdx);
Record.SetBackupPath(backupPath);
Record.SetRestoreTimestamp(restoreTimestamp);
}

TEvIncrementalRestoreRequest(ui64 txId, ui64 tableId, ui64 operationId, ui32 incrementalIdx,
const TString& startKey, const TString& endKey,
const TString& backupPath, ui64 restoreTimestamp) {
Record.SetTxId(txId);
Record.SetTableId(tableId);
Record.SetOperationId(operationId);
Record.SetIncrementalIdx(incrementalIdx);
Record.SetStartKey(startKey);
Record.SetEndKey(endKey);
Record.SetBackupPath(backupPath);
Record.SetRestoreTimestamp(restoreTimestamp);
}
};

struct TEvIncrementalRestoreResponse
: public TEventPB<TEvIncrementalRestoreResponse,
NKikimrTxDataShard::TEvIncrementalRestoreResponse,
TEvDataShard::EvIncrementalRestoreResponse> {
TEvIncrementalRestoreResponse() = default;

TEvIncrementalRestoreResponse(ui64 txId, ui64 tableId, ui64 operationId, ui32 incrementalIdx,
NKikimrTxDataShard::TEvIncrementalRestoreResponse::Status status, const TString& errorMessage = "") {
Record.SetTxId(txId);
Record.SetTableId(tableId);
Record.SetOperationId(operationId);
Record.SetIncrementalIdx(incrementalIdx);
Record.SetRestoreStatus(status);
if (!errorMessage.empty()) {
Record.SetErrorMessage(errorMessage);
}
}

TEvIncrementalRestoreResponse(ui64 txId, ui64 tableId, ui64 operationId, ui32 incrementalIdx,
NKikimrTxDataShard::TEvIncrementalRestoreResponse::Status status, ui64 processedRows, ui64 processedBytes,
const TString& lastProcessedKey = "", const TString& errorMessage = "") {
Record.SetTxId(txId);
Record.SetTableId(tableId);
Record.SetOperationId(operationId);
Record.SetIncrementalIdx(incrementalIdx);
Record.SetRestoreStatus(status);
Record.SetProcessedRows(processedRows);
Record.SetProcessedBytes(processedBytes);
if (!lastProcessedKey.empty()) {
Record.SetLastProcessedKey(lastProcessedKey);
}
if (!errorMessage.empty()) {
Record.SetErrorMessage(errorMessage);
}
}
};

struct TEvKqpScan
: public TEventPB<TEvKqpScan,
NKikimrTxDataShard::TEvKqpScan,
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class TDataShard
class TTxCdcStreamEmitHeartbeats;
class TTxUpdateFollowerReadEdge;
class TTxRemoveSchemaSnapshots;
class TTxIncrementalRestore;
class TTxCleanupUncommitted;
class TTxDataCleanup;
class TTxCompleteDataCleanup;
Expand Down Expand Up @@ -1411,6 +1412,8 @@ class TDataShard

void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr& ev, const TActorContext& ctx);

void Handle(TEvDataShard::TEvIncrementalRestoreRequest::TPtr& ev, const TActorContext& ctx);

void Handle(TEvDataShard::TEvForceDataCleanup::TPtr& ev, const TActorContext& ctx);

void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
Expand Down Expand Up @@ -3255,6 +3258,7 @@ class TDataShard
HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle);
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
HFunc(TEvIncrementalRestoreScan::TEvFinished, Handle);
HFunc(TEvDataShard::TEvIncrementalRestoreRequest, Handle);
HFunc(TEvDataShard::TEvForceDataCleanup, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
Expand Down
40 changes: 40 additions & 0 deletions ydb/core/tx/datashard/datashard_incremental_restore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "datashard_incremental_restore.h"
#include "datashard_impl.h"

namespace NKikimr {
namespace NDataShard {

TDataShard::TTxIncrementalRestore::TTxIncrementalRestore(TDataShard* self, TEvDataShard::TEvIncrementalRestoreRequest::TPtr& ev)
: TBase(self)
, Event(ev)
{}

bool TDataShard::TTxIncrementalRestore::Execute(TTransactionContext&, const TActorContext& ctx) {
const auto& record = Event->Get()->Record;

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"TTxIncrementalRestore at tablet " << Self->TabletID()
<< " operationId: " << record.GetOperationId()
<< " shardIdx: " << record.GetShardIdx());

// DataShard just acknowledges the request
// Actual incremental restore work happens via change senders
return true;
}

void TDataShard::TTxIncrementalRestore::Complete(const TActorContext& ctx) {
auto response = MakeHolder<TEvDataShard::TEvIncrementalRestoreResponse>();
const auto& record = Event->Get()->Record;

response->Record.SetTxId(record.GetTxId());
response->Record.SetTableId(record.GetTableId());
response->Record.SetOperationId(record.GetOperationId());
response->Record.SetIncrementalIdx(record.GetIncrementalIdx());
response->Record.SetShardIdx(record.GetShardIdx());
response->Record.SetRestoreStatus(NKikimrTxDataShard::TEvIncrementalRestoreResponse::SUCCESS);

ctx.Send(Event->Sender, response.Release());
}

} // namespace NDataShard
} // namespace NKikimr
20 changes: 20 additions & 0 deletions ydb/core/tx/datashard/datashard_incremental_restore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once
#include "datashard_impl.h"

namespace NKikimr {
namespace NDataShard {

// Forward declaration - implementation is in datashard_incremental_restore.cpp
class TDataShard::TTxIncrementalRestore : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxIncrementalRestore(TDataShard* self, TEvDataShard::TEvIncrementalRestoreRequest::TPtr& ev);

bool Execute(TTransactionContext&, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;

private:
TEvDataShard::TEvIncrementalRestoreRequest::TPtr Event;
};

} // namespace NDataShard
} // namespace NKikimr
Loading