Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,7 @@ message TSplitMergeTablePartitions {
repeated TSplitBoundary SplitBoundary = 5; // Points of split (there will be N+1 parts)
optional uint64 SchemeshardId = 6; // Only needed if TableId is used instead of path
optional uint64 TableOwnerId = 7;
optional bool AllowOneToOneSplitMerge = 8; // Allow a special 1-to-1 split/merge for emergencies
}

message TUserAttribute {
Expand Down
154 changes: 148 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <library/cpp/protobuf/json/proto2json.h>

#include <library/cpp/html/pcdata/pcdata.h>
#include <util/string/cast.h>

static ui64 TryParseTabletId(TStringBuf tabletIdParam) {
if (tabletIdParam.StartsWith("0x"))
return IntFromString<ui64, 16>(tabletIdParam.substr(2));
else
return FromStringWithDefault<ui64>(tabletIdParam, ui64(NKikimr::NSchemeShard::InvalidTabletId));
ui64 tabletId = ui64(NKikimr::NSchemeShard::InvalidTabletId);
if (tabletIdParam.StartsWith("0x")) {
TryIntFromString<16>(tabletIdParam.substr(2), tabletId);
} else {
TryFromString(tabletIdParam, tabletId);
}
return tabletId;
}

namespace NKikimr {
Expand Down Expand Up @@ -79,6 +84,7 @@ struct TCgi {
static const TParam BuildIndexId;
static const TParam UpdateCoordinatorsConfig;
static const TParam UpdateCoordinatorsConfigDryRun;
static const TParam Action;

struct TPages {
static constexpr TStringBuf MainPage = "Main";
Expand All @@ -91,6 +97,10 @@ struct TCgi {
static constexpr TStringBuf ShardInfoByShardIdx = "ShardInfoByShardIdx";
static constexpr TStringBuf BuildIndexInfo = "BuildIndexInfo";
};

struct TActions {
static constexpr TStringBuf SplitOneToOne = "SplitOneToOne";
};
};

const TCgi::TParam TCgi::TabletID = TStringBuf("TabletID");
Expand All @@ -111,6 +121,7 @@ const TCgi::TParam TCgi::Page = TStringBuf("Page");
const TCgi::TParam TCgi::BuildIndexId = TStringBuf("BuildIndexId");
const TCgi::TParam TCgi::UpdateCoordinatorsConfig = TStringBuf("UpdateCoordinatorsConfig");
const TCgi::TParam TCgi::UpdateCoordinatorsConfigDryRun = TStringBuf("UpdateCoordinatorsConfigDryRun");
const TCgi::TParam TCgi::Action = TStringBuf("Action");


class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordinatorsConfigActor> {
Expand Down Expand Up @@ -231,6 +242,93 @@ class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordina
THashMap<ui64, const TItem*> InFlight;
};

class TMonitoringShardSplitOneToOne : public TActorBootstrapped<TMonitoringShardSplitOneToOne> {
public:
TMonitoringShardSplitOneToOne(NMon::TEvRemoteHttpInfo::TPtr&& ev, ui64 schemeShardId, const TPathId& pathId, TTabletId shardId)
: Ev(std::move(ev))
, SchemeShardId(schemeShardId)
, PathId(pathId)
, ShardId(shardId)
{}

void Bootstrap() {
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
Become(&TThis::StateWaitTxId);
}

STFUNC(StateWaitTxId) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
}
}

void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
TxId = ev->Get()->TxId;

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);

auto& modifyScheme = *propose->Record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
modifyScheme.SetInternal(true);

auto& info = *modifyScheme.MutableSplitMergeTablePartitions();
info.SetTableOwnerId(PathId.OwnerId);
info.SetTableLocalId(PathId.LocalPathId);
info.AddSourceTabletId(ui64(ShardId));
info.SetAllowOneToOneSplitMerge(true);

PipeCache = MakePipePerNodeCacheID(EPipePerNodeCache::Leader);
Send(PipeCache, new TEvPipeCache::TEvForward(propose.Release(), SchemeShardId, /* subscribe */ true));
Become(&TThis::StateWaitProposed);
}

STFUNC(StateWaitProposed) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
}
}

void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
TString text;
try {
NProtobufJson::Proto2Json(ev->Get()->Record, text, {
.EnumMode = NProtobufJson::TProto2JsonConfig::EnumName,
.FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense,
.MapAsObject = true,
});
} catch (const std::exception& e) {
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
"HTTP/1.1 500 Internal Error\r\nConnection: Close\r\n\r\nUnexpected failure to serialize the response\r\n"));
PassAway();
}

Send(Ev->Sender, new NMon::TEvRemoteJsonInfoRes(text));
PassAway();
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
TStringBuilder() << "HTTP/1.1 502 Bad Gateway\r\nConnection: Close\r\n\r\nSchemeShard tablet disconnected\r\n"));
PassAway();
}

void PassAway() override {
if (PipeCache) {
Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
}
TActorBootstrapped::PassAway();
}

private:
NMon::TEvRemoteHttpInfo::TPtr Ev;
ui64 SchemeShardId;
TPathId PathId;
TTabletId ShardId;
ui64 TxId = 0;
TActorId PipeCache;
};

struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
NMon::TEvRemoteHttpInfo::TPtr Ev;
TStringStream Answer;
Expand All @@ -242,11 +340,18 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
{
}

TTxType GetTxType() const override { return TXTYPE_MONITORING; }

bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
Y_UNUSED(txc);

const TCgiParameters& cgi = Ev->Get()->Cgi();

if (cgi.Has(TCgi::Action)) {
HandleAction(cgi.Get(TCgi::Action), cgi, ctx);
return true;
}

const TString page = cgi.Has(TCgi::Page) ? cgi.Get(TCgi::Page) : ToString(TCgi::TPages::MainPage);

if (page == TCgi::TPages::AdminRequest) {
Expand Down Expand Up @@ -311,7 +416,7 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
}

void Complete(const TActorContext &ctx) override {
if (Answer) {
if (Ev && Answer) {
ctx.Send(Ev->Sender, new NMon::TEvRemoteHttpInfoRes(Answer.Str()));
}
}
Expand Down Expand Up @@ -1364,7 +1469,44 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
}
}

TTxType GetTxType() const override { return TXTYPE_MONITORING; }
private:
void SendBadRequest(const TString& details, const TActorContext& ctx) {
ctx.Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
TStringBuilder() << "HTTP/1.1 400 Bad Request\r\nConnection: Close\r\n\r\n" << details << "\r\n"));
}

private:
void HandleAction(const TString& action, const TCgiParameters& cgi, const TActorContext& ctx) {
if (Ev->Get()->Method != HTTP_METHOD_POST) {
SendBadRequest("Action requires a POST method", ctx);
return;
}

if (action == TCgi::TActions::SplitOneToOne) {
TTabletId tabletId = TTabletId(TryParseTabletId(cgi.Get(TCgi::ShardID)));
TShardIdx shardIdx = Self->GetShardIdx(tabletId);
if (!shardIdx) {
SendBadRequest("Cannot find the specified shard", ctx);
return;
}
auto* info = Self->ShardInfos.FindPtr(shardIdx);
if (!info) {
SendBadRequest("Cannot find the specified shard info", ctx);
return;
}
TPathId pathId = info->PathId;
auto* table = Self->Tables.FindPtr(pathId);
if (!table) {
SendBadRequest("Cannot find the specified shard's table", ctx);
return;
}

ctx.Register(new TMonitoringShardSplitOneToOne(std::move(Ev), Self->TabletID(), pathId, tabletId));
return;
}

SendBadRequest("Action not supported", ctx);
}
};

bool TSchemeShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) {
Expand Down
75 changes: 75 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,75 @@ class TSplitMerge: public TSubOperation {
return true;
}

bool AllocateDstForOneToOne(
const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
TTxId txId,
const TPathId& pathId,
const TVector<ui64>& srcPartitionIdxs,
const TTableInfo::TCPtr tableInfo,
TTxState& op,
const TChannelsBindings& channels,
TString& errStr,
TOperationContext& context)
{
Y_UNUSED(errStr);

// 1 source shard is split/merged into 1 shard
Y_ABORT_UNLESS(srcPartitionIdxs.size() == 1);
Y_ABORT_UNLESS(info.SplitBoundarySize() == 0);

TString firstRangeBegin;
if (srcPartitionIdxs[0] != 0) {
// Take the end of previous shard
firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange;
} else {
TVector<TCell> firstKey;
ui32 keyColCount = 0;
for (const auto& col : tableInfo->Columns) {
if (col.second.IsKey()) {
++keyColCount;
}
}
// Or start from (NULL, NULL, .., NULL)
firstKey.resize(keyColCount);
firstRangeBegin = TSerializedCellVec::Serialize(firstKey);
}

op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
// Fill src shards
TString prevRangeEnd = firstRangeBegin;
for (ui64 pi : srcPartitionIdxs) {
auto* srcRange = op.SplitDescription->AddSourceRanges();
auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
srcRange->SetShardIdx(ui64(shardIdx.GetLocalId()));
srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID));
srcRange->SetKeyRangeBegin(prevRangeEnd);
TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange;
srcRange->SetKeyRangeEnd(rangeEnd);
prevRangeEnd = rangeEnd;
}

// Fill dst shard
TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
datashardInfo.BindedChannels = channels;

auto idx = context.SS->RegisterShardInfo(datashardInfo);

ui64 lastSrcPartition = srcPartitionIdxs.back();
TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange;

TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
dstShardOp.RangeEnd = lastRangeEnd;
op.Shards.push_back(dstShardOp);

auto* dstRange = op.SplitDescription->AddDestinationRanges();
dstRange->SetShardIdx(ui64(idx.GetLocalId()));
dstRange->SetKeyRangeBegin(firstRangeBegin);
dstRange->SetKeyRangeEnd(lastRangeEnd);

return true;
}

THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const TTabletId ssId = context.SS->SelfTabletId();

Expand Down Expand Up @@ -928,6 +997,12 @@ class TSplitMerge: public TSubOperation {
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
} else if (srcPartitionIdxs.size() == 1 && dstCount == 1 && info.GetAllowOneToOneSplitMerge()) {
// This is one-to-one split/merge
if (!AllocateDstForOneToOne(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) {
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
} else {
result->SetError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
return result;
Expand Down
47 changes: 47 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,53 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {

}

Y_UNIT_TEST(ConcurrentSplitOneToOne) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);

TTestEnv env(runtime, opts);

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "Key" Type: "Utf8"}
Columns { Name: "Value" Type: "Utf8"}
KeyColumnNames: ["Key", "Value"]
)");
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionKeys({""})});

TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvHive::TEvCreateTablet::EventType);

TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
SourceTabletId: 72075186233409546
AllowOneToOneSplitMerge: true
)");

RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
SourceTabletId: 72075186233409546
AllowOneToOneSplitMerge: true
)",
{NKikimrScheme::StatusMultipleModifications});

WaitForSuppressed(runtime, suppressed, 2, prevObserver);

RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

env.TestWaitNotification(runtime, {txId-1, txId});
env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets); //delete src

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionKeys({""})});
}

Y_UNIT_TEST(Split10Shards) {
TTestBasicRuntime runtime;

Expand Down
Loading
Loading