Skip to content

WIP #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ struct TKikimrEvents : TEvents {
ES_LIMITER = 4258,
ES_MEMORY = 4259,
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
ES_INCREMENTAL_RESTORE_SCAN = 4261,
};
};

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class TDataShard::TTxCdcStreamScanProgress
bool pageFault = false;

for (const auto& [k, v] : ev.Rows) {
const auto key = NStreamScan::MakeKey(k.GetCells(), table);
const auto key = NStreamScan::MakeKey(k.GetCells(), table->KeyColumnTypes);
const auto& keyTags = table->KeyColumnIds;

TRowState row(0);
Expand All @@ -299,7 +299,7 @@ class TDataShard::TTxCdcStreamScanProgress
Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table));
break;
case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup:
if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) {
if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table->Columns); updates) {
Serialize(body, ERowOp::Upsert, key, keyTags, *updates);
} else {
Serialize(body, ERowOp::Erase, key, keyTags, {});
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
return proto;
}


Y_UNIT_TEST(SimpleBackup) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down
170 changes: 170 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include "incr_restore_scan.h"

#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/util/testactorsys.h>

namespace NKikimr::NDataShard {

class TDriverMock
: public NTable::IDriver
{
public:
std::optional<NTable::EScan> LastScan;

void Touch(NTable::EScan scan) noexcept {
LastScan = scan;
}
};

class TCbExecutorActor : public TActorBootstrapped<TCbExecutorActor> {
public:
enum EEv {
EvExec = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
EvBoot,
EvExecuted,

EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE),
"expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)");

struct TEvExec : public TEventLocal<TEvExec, EvExec> {
std::function<void()> OnHandle;
bool Async;

TEvExec(std::function<void()> onHandle, bool async = true)
: OnHandle(onHandle)
, Async(async)
{}
};

struct TEvBoot : public TEventLocal<TEvBoot, EvBoot> {};
struct TEvExecuted : public TEventLocal<TEvExecuted, EvExecuted> {};

std::function<void()> OnBootstrap;
TActorId ReplyTo;
TActorId ForwardTo;

void Bootstrap() {
if (OnBootstrap) {
OnBootstrap();
}

Become(&TThis::Serve);
Send(ReplyTo, new TCbExecutorActor::TEvBoot());
}

void Handle(TEvExec::TPtr& ev) {
ev->Get()->OnHandle();
if (!ev->Get()->Async) {
Send(ReplyTo, new TCbExecutorActor::TEvExecuted());
}
}

STATEFN(Serve) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExec, Handle);
default: Y_ABORT("unexpected");
}
}
};

class TRuntimeCbExecutor {
public:
TRuntimeCbExecutor(TTestActorRuntime& runtime, std::function<void()> onBootstrap = {}, TActorId forwardTo = {})
: Runtime(runtime)
, Sender(runtime.AllocateEdgeActor())
{
auto* executor = new TCbExecutorActor;
executor->OnBootstrap = onBootstrap;
executor->ForwardTo = forwardTo;
executor->ReplyTo = Sender;
Impl = runtime.Register(executor);
Runtime.EnableScheduleForActor(Impl);
Runtime.GrabEdgeEventRethrow<TCbExecutorActor::TEvBoot>(Sender);
}

void AsyncExecute(std::function<void()> cb) {
Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb), 0, 0), 0);
}

void Execute(std::function<void()> cb) {
Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb, false), 0, 0), 0);
Runtime.GrabEdgeEventRethrow<TCbExecutorActor::TEvExecuted>(Sender);
}

private:
TTestActorRuntime& Runtime;
TActorId Sender;
TActorId Impl;
};

Y_UNIT_TEST_SUITE(IncrementalRestoreScan) {
Y_UNIT_TEST(Empty) {
TPortManager pm;
Tests::TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();
auto sender2 = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);

TUserTable::TPtr table = new TUserTable;
NTable::TScheme::TTableSchema tableSchema;
table->Columns.emplace(0, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32), "", "Key", true));
tableSchema.Columns[0] = NTable::TColumn("key", 0, {}, "");
tableSchema.Columns[0].KeyOrder = 0;

table->Columns.emplace(1, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Bool), "", "__ydb_incrBackupImpl_deleted", false));
tableSchema.Columns[1] = NTable::TColumn("__ydb_incrBackupImpl_deleted", 1, {}, "");
tableSchema.Columns[1].KeyOrder = 1;

auto scheme = NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond());

TPathId sourcePathId{1, 2};
TPathId targetPathId{3, 4};
ui64 txId = 1337;

auto* scan = CreateIncrementalRestoreScan(
sender,
[&](const TActorContext&) {
return sender2;
},
sourcePathId,
table,
targetPathId,
txId,
{}).Release();

TDriverMock driver;

// later we can use driver, scan and scheme ONLY with additional sync, e.g. from actorExec to avoid races
TRuntimeCbExecutor actorExec(runtime, [&]() {
scan->Prepare(&driver, scheme);
});

actorExec.Execute([&]() {
UNIT_ASSERT_EQUAL(scan->Exhausted(), NTable::EScan::Sleep);
});

auto resp = runtime.GrabEdgeEventRethrow<TEvIncrementalRestoreScan::TEvNoMoreData>(sender2);

runtime.Send(new IEventHandle(resp->Sender, sender2, new TEvIncrementalRestoreScan::TEvFinished(), 0, 0), 0);

actorExec.Execute([&]() {
UNIT_ASSERT(driver.LastScan && *driver.LastScan == NTable::EScan::Final);
scan->Finish(NTable::EAbort::None);
});

runtime.GrabEdgeEventRethrow<TEvIncrementalRestoreScan::TEvFinished>(sender);
}
}

} // namespace NKikimr::NDataShard
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/incr_restore_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

namespace NKikimr::NDataShard::NIncrRestoreHelpers {

std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, const TMap<ui32, TUserTable::TUserColumn>& columns) {
Y_ABORT_UNLESS(cells.size() >= 1);
TVector<TUpdateOp> updates(::Reserve(cells.size() - 1));

bool foundSpecialColumn = false;
Y_ABORT_UNLESS(cells.size() == tags.size());
for (TPos pos = 0; pos < cells.size(); ++pos) {
const auto tag = tags.at(pos);
auto it = table->Columns.find(tag);
Y_ABORT_UNLESS(it != table->Columns.end());
auto it = columns.find(tag);
Y_ABORT_UNLESS(it != columns.end());
if (it->second.Name == "__ydb_incrBackupImpl_deleted") {
if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue<bool>()) {
return std::nullopt;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/incr_restore_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ namespace NKikimr::NDataShard::NIncrRestoreHelpers {

using namespace NTable;

std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table);
std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, const TMap<ui32, TUserTable::TUserColumn>& columns);

} // namespace NKikimr::NBackup::NImpl
Loading