|
1 | 1 | #include "controller.h"
|
2 | 2 | #include "controller_impl.h"
|
3 | 3 |
|
| 4 | +#include <ydb/core/base/appdata.h> |
4 | 5 | #include <ydb/core/discovery/discovery.h>
|
5 | 6 | #include <ydb/core/engine/minikql/flat_local_tx_factory.h>
|
6 | 7 |
|
@@ -60,6 +61,8 @@ STFUNC(TController::StateWork) {
|
60 | 61 | HFunc(TEvPrivate::TEvProcessQueues, Handle);
|
61 | 62 | HFunc(TEvPrivate::TEvRemoveWorker, Handle);
|
62 | 63 | HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
|
| 64 | + HFunc(TEvPrivate::TEvRequestCreateStream, Handle); |
| 65 | + HFunc(TEvPrivate::TEvRequestDropStream, Handle); |
63 | 66 | HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
|
64 | 67 | HFunc(TEvDiscovery::TEvError, Handle);
|
65 | 68 | HFunc(TEvService::TEvStatus, Handle);
|
@@ -148,13 +151,53 @@ void TController::Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActor
|
148 | 151 | RunTxAssignStreamName(ev, ctx);
|
149 | 152 | }
|
150 | 153 |
|
| 154 | +template <typename TEvent> |
| 155 | +void ProcessLimiterQueue(TDeque<TActorId>& requested, THashSet<TActorId>& inflight, ui32 limit, const TActorContext& ctx) { |
| 156 | + while (!requested.empty() && inflight.size() < limit) { |
| 157 | + const auto& actorId = requested.front(); |
| 158 | + ctx.Send(actorId, new TEvent()); |
| 159 | + inflight.insert(actorId); |
| 160 | + requested.pop_front(); |
| 161 | + } |
| 162 | +} |
| 163 | + |
| 164 | +void TController::ProcessCreateStreamQueue(const TActorContext& ctx) { |
| 165 | + const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits(); |
| 166 | + ProcessLimiterQueue<TEvPrivate::TEvAllowCreateStream>(RequestedCreateStream, InflightCreateStream, limits.GetInflightCreateStreamLimit(), ctx); |
| 167 | +} |
| 168 | + |
| 169 | +void TController::ProcessDropStreamQueue(const TActorContext& ctx) { |
| 170 | + const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits(); |
| 171 | + ProcessLimiterQueue<TEvPrivate::TEvAllowDropStream>(RequestedDropStream, InflightDropStream, limits.GetInflightDropStreamLimit(), ctx); |
| 172 | +} |
| 173 | + |
| 174 | +void TController::Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx) { |
| 175 | + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); |
| 176 | + |
| 177 | + RequestedCreateStream.push_back(ev->Sender); |
| 178 | + ProcessCreateStreamQueue(ctx); |
| 179 | +} |
| 180 | + |
151 | 181 | void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx) {
|
152 | 182 | CLOG_T(ctx, "Handle " << ev->Get()->ToString());
|
| 183 | + |
| 184 | + InflightCreateStream.erase(ev->Sender); |
| 185 | + ProcessCreateStreamQueue(ctx); |
153 | 186 | RunTxCreateStreamResult(ev, ctx);
|
154 | 187 | }
|
155 | 188 |
|
| 189 | +void TController::Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx) { |
| 190 | + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); |
| 191 | + |
| 192 | + RequestedDropStream.push_back(ev->Sender); |
| 193 | + ProcessDropStreamQueue(ctx); |
| 194 | +} |
| 195 | + |
156 | 196 | void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
|
157 | 197 | CLOG_T(ctx, "Handle " << ev->Get()->ToString());
|
| 198 | + |
| 199 | + InflightDropStream.erase(ev->Sender); |
| 200 | + ProcessDropStreamQueue(ctx); |
158 | 201 | RunTxDropStreamResult(ev, ctx);
|
159 | 202 | }
|
160 | 203 |
|
|
0 commit comments