Skip to content

Commit 34ea9dc

Browse files
committed
Support an emergency one-to-one split/merge in SchemeShard (#17642)
1 parent 81ecefd commit 34ea9dc

File tree

6 files changed

+313
-6
lines changed

6 files changed

+313
-6
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,6 +1397,7 @@ message TSplitMergeTablePartitions {
13971397
repeated TSplitBoundary SplitBoundary = 5; // Points of split (there will be N+1 parts)
13981398
optional uint64 SchemeshardId = 6; // Only needed if TableId is used instead of path
13991399
optional uint64 TableOwnerId = 7;
1400+
optional bool AllowOneToOneSplitMerge = 8; // Allow a special 1-to-1 split/merge for emergencies
14001401
}
14011402

14021403
message TUserAttribute {

ydb/core/tx/schemeshard/schemeshard__monitoring.cpp

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55
#include <ydb/core/tx/datashard/range_ops.h>
66
#include <ydb/core/tx/tx_proxy/proxy.h>
77

8+
#include <library/cpp/protobuf/json/proto2json.h>
9+
810
#include <library/cpp/html/pcdata/pcdata.h>
911
#include <util/string/cast.h>
1012

1113
static ui64 TryParseTabletId(TStringBuf tabletIdParam) {
12-
if (tabletIdParam.StartsWith("0x"))
13-
return IntFromString<ui64, 16>(tabletIdParam.substr(2));
14-
else
15-
return FromStringWithDefault<ui64>(tabletIdParam, ui64(NKikimr::NSchemeShard::InvalidTabletId));
14+
ui64 tabletId = ui64(NKikimr::NSchemeShard::InvalidTabletId);
15+
if (tabletIdParam.StartsWith("0x")) {
16+
TryIntFromString<16>(tabletIdParam.substr(2), tabletId);
17+
} else {
18+
TryFromString(tabletIdParam, tabletId);
19+
}
20+
return tabletId;
1621
}
1722

1823
namespace NKikimr {
@@ -79,6 +84,7 @@ struct TCgi {
7984
static const TParam BuildIndexId;
8085
static const TParam UpdateCoordinatorsConfig;
8186
static const TParam UpdateCoordinatorsConfigDryRun;
87+
static const TParam Action;
8288

8389
struct TPages {
8490
static constexpr TStringBuf MainPage = "Main";
@@ -91,6 +97,10 @@ struct TCgi {
9197
static constexpr TStringBuf ShardInfoByShardIdx = "ShardInfoByShardIdx";
9298
static constexpr TStringBuf BuildIndexInfo = "BuildIndexInfo";
9399
};
100+
101+
struct TActions {
102+
static constexpr TStringBuf SplitOneToOne = "SplitOneToOne";
103+
};
94104
};
95105

96106
const TCgi::TParam TCgi::TabletID = TStringBuf("TabletID");
@@ -111,6 +121,7 @@ const TCgi::TParam TCgi::Page = TStringBuf("Page");
111121
const TCgi::TParam TCgi::BuildIndexId = TStringBuf("BuildIndexId");
112122
const TCgi::TParam TCgi::UpdateCoordinatorsConfig = TStringBuf("UpdateCoordinatorsConfig");
113123
const TCgi::TParam TCgi::UpdateCoordinatorsConfigDryRun = TStringBuf("UpdateCoordinatorsConfigDryRun");
124+
const TCgi::TParam TCgi::Action = TStringBuf("Action");
114125

115126

116127
class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordinatorsConfigActor> {
@@ -231,6 +242,93 @@ class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordina
231242
THashMap<ui64, const TItem*> InFlight;
232243
};
233244

245+
class TMonitoringShardSplitOneToOne : public TActorBootstrapped<TMonitoringShardSplitOneToOne> {
246+
public:
247+
TMonitoringShardSplitOneToOne(NMon::TEvRemoteHttpInfo::TPtr&& ev, ui64 schemeShardId, const TPathId& pathId, TTabletId shardId)
248+
: Ev(std::move(ev))
249+
, SchemeShardId(schemeShardId)
250+
, PathId(pathId)
251+
, ShardId(shardId)
252+
{}
253+
254+
void Bootstrap() {
255+
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
256+
Become(&TThis::StateWaitTxId);
257+
}
258+
259+
STFUNC(StateWaitTxId) {
260+
switch (ev->GetTypeRewrite()) {
261+
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
262+
}
263+
}
264+
265+
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
266+
TxId = ev->Get()->TxId;
267+
268+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);
269+
270+
auto& modifyScheme = *propose->Record.AddTransaction();
271+
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
272+
modifyScheme.SetInternal(true);
273+
274+
auto& info = *modifyScheme.MutableSplitMergeTablePartitions();
275+
info.SetTableOwnerId(PathId.OwnerId);
276+
info.SetTableLocalId(PathId.LocalPathId);
277+
info.AddSourceTabletId(ui64(ShardId));
278+
info.SetAllowOneToOneSplitMerge(true);
279+
280+
PipeCache = MakePipePerNodeCacheID(EPipePerNodeCache::Leader);
281+
Send(PipeCache, new TEvPipeCache::TEvForward(propose.Release(), SchemeShardId, /* subscribe */ true));
282+
Become(&TThis::StateWaitProposed);
283+
}
284+
285+
STFUNC(StateWaitProposed) {
286+
switch (ev->GetTypeRewrite()) {
287+
hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
288+
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
289+
}
290+
}
291+
292+
void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
293+
TString text;
294+
try {
295+
NProtobufJson::Proto2Json(ev->Get()->Record, text, {
296+
.EnumMode = NProtobufJson::TProto2JsonConfig::EnumName,
297+
.FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense,
298+
.MapAsObject = true,
299+
});
300+
} catch (const std::exception& e) {
301+
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
302+
"HTTP/1.1 500 Internal Error\r\nConnection: Close\r\n\r\nUnexpected failure to serialize the response\r\n"));
303+
PassAway();
304+
}
305+
306+
Send(Ev->Sender, new NMon::TEvRemoteJsonInfoRes(text));
307+
PassAway();
308+
}
309+
310+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
311+
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
312+
TStringBuilder() << "HTTP/1.1 502 Bad Gateway\r\nConnection: Close\r\n\r\nSchemeShard tablet disconnected\r\n"));
313+
PassAway();
314+
}
315+
316+
void PassAway() override {
317+
if (PipeCache) {
318+
Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
319+
}
320+
TActorBootstrapped::PassAway();
321+
}
322+
323+
private:
324+
NMon::TEvRemoteHttpInfo::TPtr Ev;
325+
ui64 SchemeShardId;
326+
TPathId PathId;
327+
TTabletId ShardId;
328+
ui64 TxId = 0;
329+
TActorId PipeCache;
330+
};
331+
234332
struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
235333
NMon::TEvRemoteHttpInfo::TPtr Ev;
236334
TStringStream Answer;
@@ -242,11 +340,18 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
242340
{
243341
}
244342

343+
TTxType GetTxType() const override { return TXTYPE_MONITORING; }
344+
245345
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
246346
Y_UNUSED(txc);
247347

248348
const TCgiParameters& cgi = Ev->Get()->Cgi();
249349

350+
if (cgi.Has(TCgi::Action)) {
351+
HandleAction(cgi.Get(TCgi::Action), cgi, ctx);
352+
return true;
353+
}
354+
250355
const TString page = cgi.Has(TCgi::Page) ? cgi.Get(TCgi::Page) : ToString(TCgi::TPages::MainPage);
251356

252357
if (page == TCgi::TPages::AdminRequest) {
@@ -311,7 +416,7 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
311416
}
312417

313418
void Complete(const TActorContext &ctx) override {
314-
if (Answer) {
419+
if (Ev && Answer) {
315420
ctx.Send(Ev->Sender, new NMon::TEvRemoteHttpInfoRes(Answer.Str()));
316421
}
317422
}
@@ -1364,7 +1469,44 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
13641469
}
13651470
}
13661471

1367-
TTxType GetTxType() const override { return TXTYPE_MONITORING; }
1472+
private:
1473+
void SendBadRequest(const TString& details, const TActorContext& ctx) {
1474+
ctx.Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
1475+
TStringBuilder() << "HTTP/1.1 400 Bad Request\r\nConnection: Close\r\n\r\n" << details << "\r\n"));
1476+
}
1477+
1478+
private:
1479+
void HandleAction(const TString& action, const TCgiParameters& cgi, const TActorContext& ctx) {
1480+
if (Ev->Get()->Method != HTTP_METHOD_POST) {
1481+
SendBadRequest("Action requires a POST method", ctx);
1482+
return;
1483+
}
1484+
1485+
if (action == TCgi::TActions::SplitOneToOne) {
1486+
TTabletId tabletId = TTabletId(TryParseTabletId(cgi.Get(TCgi::ShardID)));
1487+
TShardIdx shardIdx = Self->GetShardIdx(tabletId);
1488+
if (!shardIdx) {
1489+
SendBadRequest("Cannot find the specified shard", ctx);
1490+
return;
1491+
}
1492+
auto* info = Self->ShardInfos.FindPtr(shardIdx);
1493+
if (!info) {
1494+
SendBadRequest("Cannot find the specified shard info", ctx);
1495+
return;
1496+
}
1497+
TPathId pathId = info->PathId;
1498+
auto* table = Self->Tables.FindPtr(pathId);
1499+
if (!table) {
1500+
SendBadRequest("Cannot find the specified shard's table", ctx);
1501+
return;
1502+
}
1503+
1504+
ctx.Register(new TMonitoringShardSplitOneToOne(std::move(Ev), Self->TabletID(), pathId, tabletId));
1505+
return;
1506+
}
1507+
1508+
SendBadRequest("Action not supported", ctx);
1509+
}
13681510
};
13691511

13701512
bool TSchemeShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) {

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,75 @@ class TSplitMerge: public TSubOperation {
704704
return true;
705705
}
706706

707+
bool AllocateDstForOneToOne(
708+
const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
709+
TTxId txId,
710+
const TPathId& pathId,
711+
const TVector<ui64>& srcPartitionIdxs,
712+
const TTableInfo::TCPtr tableInfo,
713+
TTxState& op,
714+
const TChannelsBindings& channels,
715+
TString& errStr,
716+
TOperationContext& context)
717+
{
718+
Y_UNUSED(errStr);
719+
720+
// 1 source shard is split/merged into 1 shard
721+
Y_ABORT_UNLESS(srcPartitionIdxs.size() == 1);
722+
Y_ABORT_UNLESS(info.SplitBoundarySize() == 0);
723+
724+
TString firstRangeBegin;
725+
if (srcPartitionIdxs[0] != 0) {
726+
// Take the end of previous shard
727+
firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange;
728+
} else {
729+
TVector<TCell> firstKey;
730+
ui32 keyColCount = 0;
731+
for (const auto& col : tableInfo->Columns) {
732+
if (col.second.IsKey()) {
733+
++keyColCount;
734+
}
735+
}
736+
// Or start from (NULL, NULL, .., NULL)
737+
firstKey.resize(keyColCount);
738+
firstRangeBegin = TSerializedCellVec::Serialize(firstKey);
739+
}
740+
741+
op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
742+
// Fill src shards
743+
TString prevRangeEnd = firstRangeBegin;
744+
for (ui64 pi : srcPartitionIdxs) {
745+
auto* srcRange = op.SplitDescription->AddSourceRanges();
746+
auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
747+
srcRange->SetShardIdx(ui64(shardIdx.GetLocalId()));
748+
srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID));
749+
srcRange->SetKeyRangeBegin(prevRangeEnd);
750+
TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange;
751+
srcRange->SetKeyRangeEnd(rangeEnd);
752+
prevRangeEnd = rangeEnd;
753+
}
754+
755+
// Fill dst shard
756+
TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
757+
datashardInfo.BindedChannels = channels;
758+
759+
auto idx = context.SS->RegisterShardInfo(datashardInfo);
760+
761+
ui64 lastSrcPartition = srcPartitionIdxs.back();
762+
TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange;
763+
764+
TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
765+
dstShardOp.RangeEnd = lastRangeEnd;
766+
op.Shards.push_back(dstShardOp);
767+
768+
auto* dstRange = op.SplitDescription->AddDestinationRanges();
769+
dstRange->SetShardIdx(ui64(idx.GetLocalId()));
770+
dstRange->SetKeyRangeBegin(firstRangeBegin);
771+
dstRange->SetKeyRangeEnd(lastRangeEnd);
772+
773+
return true;
774+
}
775+
707776
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
708777
const TTabletId ssId = context.SS->SelfTabletId();
709778

@@ -928,6 +997,12 @@ class TSplitMerge: public TSubOperation {
928997
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
929998
return result;
930999
}
1000+
} else if (srcPartitionIdxs.size() == 1 && dstCount == 1 && info.GetAllowOneToOneSplitMerge()) {
1001+
// This is one-to-one split/merge
1002+
if (!AllocateDstForOneToOne(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) {
1003+
setResultError(NKikimrScheme::StatusInvalidParameter, errStr);
1004+
return result;
1005+
}
9311006
} else {
9321007
result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
9331008
return result;

ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,53 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
6666

6767
}
6868

69+
Y_UNIT_TEST(ConcurrentSplitOneToOne) {
70+
TTestBasicRuntime runtime;
71+
72+
TTestEnvOptions opts;
73+
opts.EnableBackgroundCompaction(false);
74+
75+
TTestEnv env(runtime, opts);
76+
77+
ui64 txId = 100;
78+
79+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
80+
Name: "Table"
81+
Columns { Name: "Key" Type: "Utf8"}
82+
Columns { Name: "Value" Type: "Utf8"}
83+
KeyColumnNames: ["Key", "Value"]
84+
)");
85+
env.TestWaitNotification(runtime, txId);
86+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
87+
{NLs::PartitionKeys({""})});
88+
89+
TVector<THolder<IEventHandle>> suppressed;
90+
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvHive::TEvCreateTablet::EventType);
91+
92+
TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
93+
SourceTabletId: 72075186233409546
94+
AllowOneToOneSplitMerge: true
95+
)");
96+
97+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
98+
99+
TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
100+
SourceTabletId: 72075186233409546
101+
AllowOneToOneSplitMerge: true
102+
)",
103+
{NKikimrScheme::StatusMultipleModifications});
104+
105+
WaitForSuppressed(runtime, suppressed, 2, prevObserver);
106+
107+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
108+
109+
env.TestWaitNotification(runtime, {txId-1, txId});
110+
env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets); //delete src
111+
112+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
113+
{NLs::PartitionKeys({""})});
114+
}
115+
69116
Y_UNIT_TEST(Split10Shards) {
70117
TTestBasicRuntime runtime;
71118

0 commit comments

Comments
 (0)