Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
break;
case NKikimrReplication::TReplicationState::kPaused:
to.mutable_paused();
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
break;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,10 @@ class TKqpGatewayProxy : public IKikimrGateway {
static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
}

if (settings.Settings.StatePaused) {
op.MutableState()->MutablePaused();
}

if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database ||
settings.Settings.OAuthToken || settings.Settings.StaticCredentials) {
auto& config = *op.MutableConfig();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ namespace {
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
if (to_lower(value) == "done") {
dstSettings.EnsureStateDone();
} else if (to_lower(value) == "paused") {
dstSettings.EnsureStatePaused();
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown replication state: " << value));
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ struct TReplicationSettings {
EFailoverMode FailoverMode;
};

struct TStatePaused {};

struct TOAuthToken {
TString Token;
TString TokenSecretName;
Expand All @@ -728,6 +730,7 @@ struct TReplicationSettings {
TMaybe<TOAuthToken> OAuthToken;
TMaybe<TStaticCredentials> StaticCredentials;
TMaybe<TStateDone> StateDone;
TMaybe<TStatePaused> StatePaused;

TOAuthToken& EnsureOAuthToken() {
if (!OAuthToken) {
Expand Down Expand Up @@ -755,6 +758,14 @@ struct TReplicationSettings {

return *StateDone;
}

TStatePaused& EnsureStatePaused() {
if (!StatePaused) {
StatePaused = TStatePaused{};
}

return *StatePaused;
}
};

struct TCreateReplicationSettings {
Expand Down
59 changes: 55 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5620,8 +5620,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
--!syntax_v1
ALTER ASYNC REPLICATION `/Root/replication`
SET (
STATE = "DONE",
FAILOVER_MODE = "FORCE"
STATE = "PAUSED"
);
)";

Expand All @@ -5633,15 +5632,14 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& desc = result.GetReplicationDescription();
if (desc.GetState() == TReplicationDescription::EState::Done) {
if (desc.GetState() == TReplicationDescription::EState::Paused) {
break;
}

Sleep(TDuration::Seconds(1));
}
}


// Connection string and Endpoint/Database are mutually exclusive
{
auto query = R"(
Expand Down Expand Up @@ -5858,6 +5856,59 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// alter replication to done state
{
auto query = R"(
--!syntax_v1
ALTER ASYNC REPLICATION `/Root/replication`
SET (
STATE = "DONE",
FAILOVER_MODE = "FORCE"
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

{
Sleep(TDuration::Seconds(10));

const auto result = repl.DescribeReplication("/Root/replication").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& desc = result.GetReplicationDescription();
UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Done);
}

while (true) {
const auto result = repl.DescribeReplication("/Root/replication").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& desc = result.GetReplicationDescription();
if (desc.GetState() == TReplicationDescription::EState::Done) {
break;
}

Sleep(TDuration::Seconds(1));
}
}

// alter state and config in done state
{
auto query = R"(
--!syntax_v1
ALTER ASYNC REPLICATION `/Root/replication`
SET (
STATE = "DONE",
FAILOVER_MODE = "FORCE",
CONNECTION_STRING = "grpc://localhost:2135/?database=/Root"
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Replication is in Done state.");
}
}

Y_UNIT_TEST(DropAsyncReplication) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ enum ETxTypes {
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
TXTYPE_PAUSE_TARGET_RESULT = 15 [(TxTypeOpts) = {Name: "TxPauseTargetResult"}];
}
23 changes: 23 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvAssignStreamName, Handle);
HFunc(TEvPrivate::TEvCreateStreamResult, Handle);
HFunc(TEvPrivate::TEvDropStreamResult, Handle);
HFunc(TEvPrivate::TEvPauseTargetResult, Handle);
HFunc(TEvPrivate::TEvCreateDstResult, Handle);
HFunc(TEvPrivate::TEvAlterDstResult, Handle);
HFunc(TEvPrivate::TEvDropDstResult, Handle);
Expand All @@ -64,6 +65,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvService::TEvStatus, Handle);
HFunc(TEvService::TEvWorkerStatus, Handle);
HFunc(TEvService::TEvRunWorker, Handle);
HFunc(TEvService::TEvStopWorker, Handle);
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
default:
HandleDefaultEvents(ev, SelfId());
Expand Down Expand Up @@ -152,6 +154,12 @@ void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActor
RunTxDropStreamResult(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvPauseTargetResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

RunTxPauseTargetResult(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxCreateDstResult(ev, ctx);
Expand Down Expand Up @@ -433,6 +441,21 @@ TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplicat
return &it->second;
}

void TController::Handle(TEvService::TEvStopWorker::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

auto& record = ev->Get()->Record;
const auto id = TWorkerId::Parse(record.GetWorker());

auto it = Workers.find(id);
if (it == Workers.end()) {
return;
}

auto& worker = it->second;
StopWorker(worker.GetSession(), id);
}

void TController::ScheduleProcessQueues() {
if (ProcessQueuesScheduled || (!BootQueue && !StopQueue)) {
return;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class TController
void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvPauseTargetResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
Expand All @@ -87,6 +88,7 @@ class TController
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStopWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);

void CreateSession(ui32 nodeId, const TActorContext& ctx);
Expand Down Expand Up @@ -116,6 +118,7 @@ class TController
class TTxCreateDstResult;
class TTxAlterDstResult;
class TTxDropDstResult;
class TTxPauseTargetResult;
class TTxResolveSecretResult;
class TTxWorkerError;

Expand All @@ -131,6 +134,7 @@ class TController
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
void RunTxDropStreamResult(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx);
void RunTxPauseTargetResult(TEvPrivate::TEvPauseTargetResult::TPtr& ev, const TActorContext& ctx);
void RunTxCreateDstResult(TEvPrivate::TEvCreateDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxAlterDstResult(TEvPrivate::TEvAlterDstResult::TPtr& ev, const TActorContext& ctx);
void RunTxDropDstResult(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/replication/controller/private_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ TString TEvPrivate::TEvDropReplication::ToString() const {
<< " }";
}

TEvPrivate::TEvPauseTargetResult::TEvPauseTargetResult(ui64 rid, ui64 tid)
: ReplicationId(rid)
, TargetId(tid)
{
}

TString TEvPrivate::TEvPauseTargetResult::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " ReplicationId: " << ReplicationId
<< " TargetId: " << TargetId
<< " }";
}

TEvPrivate::TEvResolveTenantResult::TEvResolveTenantResult(ui64 rid, const TString& tenant)
: ReplicationId(rid)
, Tenant(tenant)
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/tx/replication/controller/private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ struct TEvPrivate {
EvDropReplication,
EvResolveTenantResult,
EvUpdateTenantNodes,
EvProcessQueues,
EvStopWorkers,
EvPauseTargetResult,
EvRunWorkers,
EvResolveSecretResult,
EvAlterDstResult,
EvProcessQueues,
EvRemoveWorker,

EvEnd,
};

Expand Down Expand Up @@ -143,6 +145,14 @@ struct TEvPrivate {
TString ToString() const override;
};

struct TEvPauseTargetResult: public TEventLocal<TEvPauseTargetResult, EvPauseTargetResult> {
const ui64 ReplicationId;
const ui64 TargetId;

explicit TEvPauseTargetResult(ui64 rid, ui64 tid);
TString ToString() const override;
};

struct TEvResolveTenantResult: public TEventLocal<TEvResolveTenantResult, EvResolveTenantResult> {
const ui64 ReplicationId;
const TString Tenant;
Expand All @@ -165,6 +175,9 @@ struct TEvPrivate {
struct TEvProcessQueues: public TEventLocal<TEvProcessQueues, EvProcessQueues> {
};

struct TEvStopWorkers: public TEventLocal<TEvStopWorkers, EvStopWorkers> {
};

struct TEvResolveSecretResult: public TEventLocal<TEvResolveSecretResult, EvResolveSecretResult> {
const ui64 ReplicationId;
const TString SecretValue;
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ class TReplication::TImpl {
} else {
return ProgressTargets(ctx);
}
case EState::Pausing:
return ProgressTargets(ctx);
case EState::Done:
case EState::Paused:
return;
case EState::Error:
return;
}
Expand Down Expand Up @@ -185,6 +189,7 @@ class TReplication::TImpl {
ui64 NextTargetId = 1;
THashMap<ui64, THolder<ITarget>> Targets;
THashSet<ui64> PendingAlterTargets;
THashSet<ui64> PendingPauseTargets;
TActorId SecretResolver;
TActorId YdbProxy;
TActorId TenantResolver;
Expand Down Expand Up @@ -329,6 +334,18 @@ bool TReplication::CheckAlterDone() const {
return Impl->State == EState::Ready && Impl->PendingAlterTargets.empty();
}

void TReplication::AddPendingPauseTarget(ui64 id) {
Impl->PendingPauseTargets.insert(id);
}

void TReplication::RemovePendingPauseTarget(ui64 id) {
Impl->PendingPauseTargets.erase(id);
}

bool TReplication::CheckPauseDone() const {
return Impl->State == EState::Pausing && Impl->PendingPauseTargets.empty();
}

}

Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
enum class EState: ui8 {
Ready,
Done,
Pausing,
Paused,
Removing,
Error = 255
};
Expand All @@ -37,6 +39,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
Ready,
Alter,
Done,
Pausing,
Paused,
Removing,
Error = 255
};
Expand Down Expand Up @@ -83,11 +87,14 @@ class TReplication: public TSimpleRefCount<TReplication> {

protected:
virtual IActor* CreateWorkerRegistar(const TActorContext& ctx) const = 0;
virtual IActor* CreateWorkerStopper(const TActorContext& ctx) const = 0;
};

friend class TTargetBase;
void AddPendingAlterTarget(ui64 id);
void RemovePendingAlterTarget(ui64 id);
void AddPendingPauseTarget(ui64 id);
void RemovePendingPauseTarget(ui64 id);

struct TDropOp {
TActorId Sender;
Expand Down Expand Up @@ -127,6 +134,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
const TString& GetTenant() const;

bool CheckAlterDone() const;
bool CheckPauseDone() const;

void SetDropOp(const TActorId& sender, const std::pair<ui64, ui32>& opId);
const std::optional<TDropOp>& GetDropOp() const;
Expand Down
Loading