Skip to content

Choose partition for topic split/merge #1038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7022834
intermediate
nshestakov Dec 27, 2023
8760054
fix
nshestakov Dec 27, 2023
1517ce6
intermediate
nshestakov Dec 28, 2023
d00bcd1
intermediate
nshestakov Dec 28, 2023
75c275b
Merge branch 'splitMerge2' of github.com:nshestakov/ydb into splitMerge2
nshestakov Dec 28, 2023
16f77fa
revert
nshestakov Dec 28, 2023
9daaade
intermediate
nshestakov Dec 28, 2023
bb55812
compiled
nshestakov Dec 29, 2023
7450494
intermediate
nshestakov Jan 9, 2024
7a02555
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 9, 2024
4ba5c57
work splitmerge disabled
nshestakov Jan 9, 2024
a4220ad
split to few files
nshestakov Jan 10, 2024
aeca961
intermediate
nshestakov Jan 10, 2024
c1d2497
intermediate
nshestakov Jan 10, 2024
3e7db58
fixes
nshestakov Jan 10, 2024
9471f9c
intermediate
nshestakov Jan 11, 2024
a4143ed
xMerge branch 'main' into splitMerge_rescue
nshestakov Jan 12, 2024
b534d87
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 15, 2024
36ebcd6
add column SeqNo
nshestakov Jan 15, 2024
a6e090d
tests
nshestakov Jan 15, 2024
3fb5c20
refactor partition graph
nshestakov Jan 16, 2024
fd620ac
more tests
nshestakov Jan 16, 2024
0547c69
more tests
nshestakov Jan 16, 2024
a434d7b
codestyle
nshestakov Jan 16, 2024
388682a
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 16, 2024
4bbf8c3
fix
nshestakov Jan 17, 2024
8cfdc56
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 17, 2024
8567b65
fix test
nshestakov Jan 17, 2024
f7ef40e
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 17, 2024
d3b4700
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 17, 2024
a9c5410
fix
nshestakov Jan 18, 2024
5152c26
codestyle
nshestakov Jan 19, 2024
c076b8a
fix sourceid registration error
nshestakov Jan 22, 2024
ec43d13
separate CheckPartitionStatus request
nshestakov Jan 22, 2024
75a1475
fix fast path
nshestakov Jan 23, 2024
42edf73
fix test
nshestakov Jan 23, 2024
4bca026
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 23, 2024
6a89a74
Merge branch 'main' into splitMerge_rescue
nshestakov Jan 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ struct TEvPQ {
EvCacheProxyForgetRead,
EvGetFullDirectReadData,
EvProvideDirectReadInfo,
EvCheckPartitionStatusRequest,
EvCheckPartitionStatusResponse,
EvEnd
};

Expand Down Expand Up @@ -491,19 +493,21 @@ struct TEvPQ {
};

struct TEvChangeOwner : public TEventLocal<TEvChangeOwner, EvChangeOwner> {
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force)
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists)
: Cookie(cookie)
, Owner(owner)
, PipeClient(pipeClient)
, Sender(sender)
, Force(force)
, RegisterIfNotExists(registerIfNotExists)
{}

ui64 Cookie;
TString Owner;
TActorId PipeClient;
TActorId Sender;
bool Force;
bool RegisterIfNotExists;
};

struct TEvPipeDisconnected : public TEventLocal<TEvPipeDisconnected, EvPipeDisconnected> {
Expand Down Expand Up @@ -989,6 +993,18 @@ struct TEvPQ {
struct TEvProvideDirectReadInfo : public TEventLocal<TEvProvideDirectReadInfo, EvProvideDirectReadInfo> {
};

struct TEvCheckPartitionStatusRequest : public TEventPB<TEvCheckPartitionStatusRequest, NKikimrPQ::TEvCheckPartitionStatusRequest, EvCheckPartitionStatusRequest> {
TEvCheckPartitionStatusRequest() = default;

TEvCheckPartitionStatusRequest(ui32 partitionId) {
Record.SetPartition(partitionId);
}
};

struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> {
};


};

} //NKikimr
21 changes: 20 additions & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
{
Config = config;
PartitionConfig = GetPartitionConfig(Config, Partition);
PartitionGraph.Rebuild(Config);
PartitionGraph = MakePartitionGraph(Config);
TopicConverter = topicConverter;
NewPartition = false;

Expand Down Expand Up @@ -2616,4 +2616,23 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext
Send(ev->Sender, response.Release());
}

void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

if (Partition != record.GetPartition()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." <<
" Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "."
);
return;
}

auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);

Send(ev->Sender, response.Release());
}

} // namespace NKikimr::NPQ
5 changes: 4 additions & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);

void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);

Expand Down Expand Up @@ -344,6 +344,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
// void DestroyReadSession(const TReadSessionKey& key);

void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

TString LogPrefix() const;

Expand Down Expand Up @@ -481,6 +482,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down Expand Up @@ -538,6 +540,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
Partition()->PartitionGraph.Rebuild(Partition()->Config);
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
break;

case NKikimrProto::ERROR:
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace NKikimr::NPQ {

IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
bool IsResearchRequires(const TPartitionGraph::Node* node);

//
// TPartitionSourceManager
Expand Down Expand Up @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() {

PendingSourceIds = std::move(UnknownSourceIds);

for(const auto* parent : node.value()->HierarhicalParents) {
for(const auto* parent : node->HierarhicalParents) {
PendingCookies.insert(++Cookie);

TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
Expand Down Expand Up @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
}

TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}

Expand Down Expand Up @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {

bool TPartitionSourceManager::HasParents() const {
auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
return node && !node.value()->Parents.empty();
return node && !node->Parents.empty();
}

TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
Expand Down Expand Up @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p
return new TSourceIdRequester(parent, partition, tabletId);
}

bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
return node && !node.value()->Parents.empty();
bool IsResearchRequires(const TPartitionGraph::Node* node) {
return node && !node->Parents.empty();
}

NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TPartition;

class TPartitionSourceManager {
private:
using TPartitionNode = std::optional<const TPartitionGraph::Node *>;
using TPartitionNode = TPartitionGraph::Node;

public:
using TPartitionId = ui32;
Expand Down Expand Up @@ -96,7 +96,7 @@ class TPartitionSourceManager {
private:
TPartitionSourceManager& Manager;

TPartitionNode Node;
const TPartitionNode* Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
Expand Down Expand Up @@ -125,7 +125,7 @@ class TPartitionSourceManager {
void FinishBatch(const TActorContext& ctx);
bool RequireEnqueue(const TString& sourceId);

TPartitionNode GetPartitionNode() const;
const TPartitionNode* GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;

Expand Down
23 changes: 17 additions & 6 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ static const ui32 MAX_INLINE_SIZE = 1000;

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;

void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);

THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = *response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);

ctx.Send(Tablet, response.Release());
}

Expand Down Expand Up @@ -146,8 +150,12 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
auto &owner = ev->Owner;
auto it = Owners.find(owner);
if (it == Owners.end()) {
Owners[owner];
it = Owners.find(owner);
if (ev->RegisterIfNotExists) {
Owners[owner];
it = Owners.find(owner);
} else {
return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered");
}
}
if (it->second.NeedResetOwner || ev->Force) { //change owner
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
Expand Down Expand Up @@ -346,10 +354,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
const TString& ownerCookie = response.GetOwnership().OwnerCookie;
const auto& r = response.GetOwnership();
const TString& ownerCookie = r.OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first));
auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo;
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo);
} else {
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
Expand Down
35 changes: 34 additions & 1 deletion ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
}

ProcessSourceIdRequests(partitionId);
ProcessCheckPartitionStatusRequests(partitionId);
if (allInitialized) {
SourceIdRequests.clear();
}
Expand Down Expand Up @@ -2048,7 +2049,8 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct
it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors);

InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP);
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce());
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender,
req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists());
ctx.Send(partActor, event.Release());
}

Expand Down Expand Up @@ -3915,6 +3917,37 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
}
}

void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
auto it = Partitions.find(record.GetPartition());
if (it == Partitions.end()) {
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());

auto response = THolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted);
Send(ev->Sender, response.Release());

return;
}

if (it->second.InitDone) {
Forward(ev, it->second.Actor);
} else {
CheckPartitionStatusRequests[record.GetPartition()].push_back(ev);
}
}

void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) {
auto sit = CheckPartitionStatusRequests.find(partitionId);
if (sit != CheckPartitionStatusRequests.end()) {
auto it = Partitions.find(partitionId);
for (auto& r : sit->second) {
Forward(r, it->second.Actor);
}
CheckPartitionStatusRequests.erase(partitionId);
}
}

TString TPersQueue::LogPrefix() const {
return TStringBuilder() << SelfId() << " ";
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void ProcessSourceIdRequests(ui32 partitionId);

void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
void ProcessCheckPartitionStatusRequests(ui32 partitionId);

TString LogPrefix() const;

static constexpr const char * KeyConfig() { return "_config"; }
Expand Down Expand Up @@ -405,6 +408,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool UseMediatorTimeCast = true;

THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
};

Expand Down
9 changes: 4 additions & 5 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,23 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

TPartitionGraph graph;
graph.Rebuild(TabletConfig);
TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
auto node = graph.GetPartition(p.GetPartitionId());
if (!node) {
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
if (node.value()->Children.empty()) {
for (const auto* r : node.value()->Parents) {
if (node->Children.empty()) {
for (const auto* r : node->Parents) {
if (extractTabletId != r->TabletId) {
Senders.insert(r->TabletId);
}
}
}

for (const auto* r : node.value()->Children) {
for (const auto* r : node->Children) {
if (r->Children.empty()) {
if (extractTabletId != r->TabletId) {
Receivers.insert(r->TabletId);
Expand Down
Loading