Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ package NKikimrReplication;
option java_package = "ru.yandex.kikimr.proto";

message TReplicationDefaults {
message TSchemeOperationLimits {
optional uint32 InflightCreateStreamLimit = 1 [default = 1];
optional uint32 InflightDropStreamLimit = 2 [default = 1];
}

optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d
optional TSchemeOperationLimits SchemeOperationLimits = 2;
}

message TStaticCredentials {
Expand Down
43 changes: 43 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "controller.h"
#include "controller_impl.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/discovery/discovery.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>

Expand Down Expand Up @@ -60,6 +61,8 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvProcessQueues, Handle);
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
HFunc(TEvPrivate::TEvRequestCreateStream, Handle);
HFunc(TEvPrivate::TEvRequestDropStream, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
Expand Down Expand Up @@ -148,13 +151,53 @@ void TController::Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActor
RunTxAssignStreamName(ev, ctx);
}

template <typename TEvent>
void ProcessLimiterQueue(TDeque<TActorId>& requested, THashSet<TActorId>& inflight, ui32 limit, const TActorContext& ctx) {
while (!requested.empty() && inflight.size() < limit) {
const auto& actorId = requested.front();
ctx.Send(actorId, new TEvent());
inflight.insert(actorId);
requested.pop_front();
}
}

void TController::ProcessCreateStreamQueue(const TActorContext& ctx) {
const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
ProcessLimiterQueue<TEvPrivate::TEvAllowCreateStream>(RequestedCreateStream, InflightCreateStream, limits.GetInflightCreateStreamLimit(), ctx);
}

void TController::ProcessDropStreamQueue(const TActorContext& ctx) {
const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
ProcessLimiterQueue<TEvPrivate::TEvAllowDropStream>(RequestedDropStream, InflightDropStream, limits.GetInflightDropStreamLimit(), ctx);
}

void TController::Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

RequestedCreateStream.push_back(ev->Sender);
ProcessCreateStreamQueue(ctx);
}

void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

InflightCreateStream.erase(ev->Sender);
ProcessCreateStreamQueue(ctx);
RunTxCreateStreamResult(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

RequestedDropStream.push_back(ev->Sender);
ProcessDropStreamQueue(ctx);
}

void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

InflightDropStream.erase(ev->Sender);
ProcessDropStreamQueue(ctx);
RunTxDropStreamResult(ev, ctx);
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

#include <util/generic/deque.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>

Expand Down Expand Up @@ -83,6 +84,8 @@ class TController
void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
Expand All @@ -103,6 +106,8 @@ class TController
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
void UpdateLag(const TWorkerId& id, TDuration lag);
void ProcessCreateStreamQueue(const TActorContext& ctx);
void ProcessDropStreamQueue(const TActorContext& ctx);

// local transactions
class TTxInitSchema;
Expand Down Expand Up @@ -178,6 +183,13 @@ class TController
bool ProcessQueuesScheduled = false;
static constexpr ui32 ProcessBatchLimit = 100;

// create stream limiter
TDeque<TActorId> RequestedCreateStream;
THashSet<TActorId> InflightCreateStream;
// drop stream limiter
TDeque<TActorId> RequestedDropStream;
THashSet<TActorId> InflightDropStream;

}; // TController

}
16 changes: 16 additions & 0 deletions ydb/core/tx/replication/controller/private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ struct TEvPrivate {
EvAlterDstResult,
EvRemoveWorker,
EvDescribeTargetsResult,
EvRequestCreateStream,
EvAllowCreateStream,
EvRequestDropStream,
EvAllowDropStream,

EvEnd,
};
Expand Down Expand Up @@ -221,6 +225,18 @@ struct TEvPrivate {
TString ToString() const override;
};

struct TEvRequestCreateStream: public TEventLocal<TEvRequestCreateStream, EvRequestCreateStream> {
};

struct TEvAllowCreateStream: public TEventLocal<TEvAllowCreateStream, EvAllowCreateStream> {
};

struct TEvRequestDropStream: public TEventLocal<TEvRequestDropStream, EvRequestDropStream> {
};

struct TEvAllowDropStream: public TEventLocal<TEvAllowDropStream, EvAllowDropStream> {
};

}; // TEvPrivate

}
24 changes: 23 additions & 1 deletion ydb/core/tx/replication/controller/stream_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
}

void RequestPermission() {
Send(Parent, new TEvPrivate::TEvRequestCreateStream());
Become(&TThis::StateRequestPermission);
}

STATEFN(StateRequestPermission) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvAllowCreateStream, Handle);
default:
return StateBase(ev);
}
}

void Handle(TEvPrivate::TEvAllowCreateStream::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());
CreateStream();
}

void CreateStream() {
switch (Kind) {
case TReplication::ETargetKind::Table:
Expand Down Expand Up @@ -103,6 +121,10 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
LOG_T("Handle " << ev->Get()->ToString());
auto& result = ev->Get()->Result;

if (result.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
return Reply(NYdb::TStatus(NYdb::EStatus::SUCCESS, NYql::TIssues()));
}

if (!result.IsSuccess()) {
if (IsRetryableError(result)) {
LOG_D("Retry CreateConsumer");
Expand Down Expand Up @@ -155,7 +177,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
}

void Bootstrap() {
CreateStream();
RequestPermission();
}

STATEFN(StateBase) {
Expand Down
29 changes: 27 additions & 2 deletions ydb/core/tx/replication/controller/stream_remover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@
namespace NKikimr::NReplication::NController {

class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
void RequestPermission() {
Send(Parent, new TEvPrivate::TEvRequestDropStream());
Become(&TThis::StateRequestPermission);
}

STATEFN(StateRequestPermission) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvAllowDropStream, Handle);
default:
return StateBase(ev);
}
}

void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());
DropStream();
}

void DropStream() {
switch (Kind) {
case TReplication::ETargetKind::Table:
Expand All @@ -26,7 +44,8 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
sFunc(TEvents::TEvWakeup, DropStream);
sFunc(TEvents::TEvPoison, PassAway);
default:
return StateBase(ev);
}
}

Expand Down Expand Up @@ -77,7 +96,13 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
}

void Bootstrap() {
DropStream();
RequestPermission();
}

STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
Expand Down