Skip to content

Revert "Choose partition for topic split/merge" #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ struct TEvPQ {
EvCacheProxyForgetRead,
EvGetFullDirectReadData,
EvProvideDirectReadInfo,
EvCheckPartitionStatusRequest,
EvCheckPartitionStatusResponse,
EvEnd
};

Expand Down Expand Up @@ -493,21 +491,19 @@ struct TEvPQ {
};

struct TEvChangeOwner : public TEventLocal<TEvChangeOwner, EvChangeOwner> {
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists)
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force)
: Cookie(cookie)
, Owner(owner)
, PipeClient(pipeClient)
, Sender(sender)
, Force(force)
, RegisterIfNotExists(registerIfNotExists)
{}

ui64 Cookie;
TString Owner;
TActorId PipeClient;
TActorId Sender;
bool Force;
bool RegisterIfNotExists;
};

struct TEvPipeDisconnected : public TEventLocal<TEvPipeDisconnected, EvPipeDisconnected> {
Expand Down Expand Up @@ -993,18 +989,6 @@ struct TEvPQ {
struct TEvProvideDirectReadInfo : public TEventLocal<TEvProvideDirectReadInfo, EvProvideDirectReadInfo> {
};

struct TEvCheckPartitionStatusRequest : public TEventPB<TEvCheckPartitionStatusRequest, NKikimrPQ::TEvCheckPartitionStatusRequest, EvCheckPartitionStatusRequest> {
TEvCheckPartitionStatusRequest() = default;

TEvCheckPartitionStatusRequest(ui32 partitionId) {
Record.SetPartition(partitionId);
}
};

struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> {
};


};

} //NKikimr
21 changes: 1 addition & 20 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
{
Config = config;
PartitionConfig = GetPartitionConfig(Config, Partition);
PartitionGraph = MakePartitionGraph(Config);
PartitionGraph.Rebuild(Config);
TopicConverter = topicConverter;
NewPartition = false;

Expand Down Expand Up @@ -2616,23 +2616,4 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext
Send(ev->Sender, response.Release());
}

void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

if (Partition != record.GetPartition()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." <<
" Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "."
);
return;
}

auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);

Send(ev->Sender, response.Release());
}

} // namespace NKikimr::NPQ
5 changes: 1 addition & 4 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);

void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);

Expand Down Expand Up @@ -344,7 +344,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
// void DestroyReadSession(const TReadSessionKey& key);

void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

TString LogPrefix() const;

Expand Down Expand Up @@ -482,7 +481,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down Expand Up @@ -540,7 +538,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
Partition()->PartitionGraph.Rebuild(Partition()->Config);
break;

case NKikimrProto::ERROR:
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace NKikimr::NPQ {

IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
bool IsResearchRequires(const TPartitionGraph::Node* node);
bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);

//
// TPartitionSourceManager
Expand Down Expand Up @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() {

PendingSourceIds = std::move(UnknownSourceIds);

for(const auto* parent : node->HierarhicalParents) {
for(const auto* parent : node.value()->HierarhicalParents) {
PendingCookies.insert(++Cookie);

TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
Expand Down Expand Up @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
}

const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}

Expand Down Expand Up @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {

bool TPartitionSourceManager::HasParents() const {
auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
return node && !node->Parents.empty();
return node && !node.value()->Parents.empty();
}

TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
Expand Down Expand Up @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p
return new TSourceIdRequester(parent, partition, tabletId);
}

bool IsResearchRequires(const TPartitionGraph::Node* node) {
return node && !node->Parents.empty();
bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
return node && !node.value()->Parents.empty();
}

NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TPartition;

class TPartitionSourceManager {
private:
using TPartitionNode = TPartitionGraph::Node;
using TPartitionNode = std::optional<const TPartitionGraph::Node *>;

public:
using TPartitionId = ui32;
Expand Down Expand Up @@ -96,7 +96,7 @@ class TPartitionSourceManager {
private:
TPartitionSourceManager& Manager;

const TPartitionNode* Node;
TPartitionNode Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
Expand Down Expand Up @@ -125,7 +125,7 @@ class TPartitionSourceManager {
void FinishBatch(const TActorContext& ctx);
bool RequireEnqueue(const TString& sourceId);

const TPartitionNode* GetPartitionNode() const;
TPartitionNode GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;

Expand Down
23 changes: 6 additions & 17 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@ static const ui32 MAX_INLINE_SIZE = 1000;

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;

void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);

THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = *response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);

resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
ctx.Send(Tablet, response.Release());
}

Expand Down Expand Up @@ -150,12 +146,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
auto &owner = ev->Owner;
auto it = Owners.find(owner);
if (it == Owners.end()) {
if (ev->RegisterIfNotExists) {
Owners[owner];
it = Owners.find(owner);
} else {
return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered");
}
Owners[owner];
it = Owners.find(owner);
}
if (it->second.NeedResetOwner || ev->Force) { //change owner
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
Expand Down Expand Up @@ -354,13 +346,10 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
const auto& r = response.GetOwnership();
const TString& ownerCookie = r.OwnerCookie;
const TString& ownerCookie = response.GetOwnership().OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first));
auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo;
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo);
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
} else {
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
Expand Down
35 changes: 1 addition & 34 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,6 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
}

ProcessSourceIdRequests(partitionId);
ProcessCheckPartitionStatusRequests(partitionId);
if (allInitialized) {
SourceIdRequests.clear();
}
Expand Down Expand Up @@ -2049,8 +2048,7 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct
it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors);

InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP);
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender,
req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists());
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce());
ctx.Send(partActor, event.Release());
}

Expand Down Expand Up @@ -3917,37 +3915,6 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
}
}

void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
auto it = Partitions.find(record.GetPartition());
if (it == Partitions.end()) {
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());

auto response = THolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted);
Send(ev->Sender, response.Release());

return;
}

if (it->second.InitDone) {
Forward(ev, it->second.Actor);
} else {
CheckPartitionStatusRequests[record.GetPartition()].push_back(ev);
}
}

void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) {
auto sit = CheckPartitionStatusRequests.find(partitionId);
if (sit != CheckPartitionStatusRequests.end()) {
auto it = Partitions.find(partitionId);
for (auto& r : sit->second) {
Forward(r, it->second.Actor);
}
CheckPartitionStatusRequests.erase(partitionId);
}
}

TString TPersQueue::LogPrefix() const {
return TStringBuilder() << SelfId() << " ";
}
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void ProcessSourceIdRequests(ui32 partitionId);

void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
void ProcessCheckPartitionStatusRequests(ui32 partitionId);

TString LogPrefix() const;

static constexpr const char * KeyConfig() { return "_config"; }
Expand Down Expand Up @@ -408,7 +405,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool UseMediatorTimeCast = true;

THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
};

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,24 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

TPartitionGraph graph = MakePartitionGraph(TabletConfig);
TPartitionGraph graph;
graph.Rebuild(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
auto node = graph.GetPartition(p.GetPartitionId());
if (!node) {
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
if (node->Children.empty()) {
for (const auto* r : node->Parents) {
if (node.value()->Children.empty()) {
for (const auto* r : node.value()->Parents) {
if (extractTabletId != r->TabletId) {
Senders.insert(r->TabletId);
}
}
}

for (const auto* r : node->Children) {
for (const auto* r : node.value()->Children) {
if (r->Children.empty()) {
if (extractTabletId != r->TabletId) {
Receivers.insert(r->TabletId);
Expand Down
Loading