Skip to content

Commit 843e6b7

Browse files
committed
intermediate
1 parent c7d81b6 commit 843e6b7

File tree

5 files changed

+79
-35
lines changed

5 files changed

+79
-35
lines changed

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
209209
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
210210
impl->EndedPartitions.insert(partitionId);
211211
impl->EndedPartitionEvents.push_back(ev);
212+
213+
ev.Confirm();
212214
});
213215

214216

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ TEndPartitionSessionEvent::TEndPartitionSessionEvent(TPartitionSession::TPtr par
349349
, ChildPartitionIds(std::move(childPartitionIds)) {
350350
}
351351

352+
void TEndPartitionSessionEvent::Confirm() {
353+
if (PartitionSession) {
354+
static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds());
355+
}
356+
}
357+
352358
void JoinIds(TStringBuilder& ret, const std::vector<ui32> ids) {
353359
ret << "[";
354360
for (size_t i = 0; i < ids.size(); ++i) {

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.h

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
630630

631631
void ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
632632
void ConfirmDestroy();
633+
void ConfirmEnd(const std::vector<ui32>& childIds);
633634

634635
void StopReading() /*override*/;
635636
void ResumeReading() /*override*/;
@@ -893,7 +894,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
893894
} \
894895
/**/
895896

896-
#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false, answer) \
897+
#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false) \
897898
bool operator()(std::conditional_t<UseMigrationProtocol, type_true, type_false>&) { \
898899
if (this->template PushHandler<std::conditional_t<UseMigrationProtocol, type_true, type_false>>( \
899900
std::move(TParent::TBaseHandlersVisitor::Event), \
@@ -905,7 +906,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
905906
} \
906907
}(), \
907908
this->Settings.EventHandlers_.CommonHandler_)) { \
908-
return answer; \
909+
return true; \
909910
} \
910911
return false; \
911912
} \
@@ -914,38 +915,64 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
914915
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDataReceivedEvent,
915916
typename TAReadSessionEvent<false>::TDataReceivedEvent,
916917
DataReceivedHandler_,
917-
DataReceivedHandler_,
918-
true);
918+
DataReceivedHandler_);
919919
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCommitAcknowledgementEvent,
920920
typename TAReadSessionEvent<false>::TCommitOffsetAcknowledgementEvent,
921921
CommitAcknowledgementHandler_,
922-
CommitOffsetAcknowledgementHandler_,
923-
true);
922+
CommitOffsetAcknowledgementHandler_);
924923
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCreatePartitionStreamEvent,
925924
typename TAReadSessionEvent<false>::TStartPartitionSessionEvent,
926925
CreatePartitionStreamHandler_,
927-
StartPartitionSessionHandler_,
928-
true);
926+
StartPartitionSessionHandler_);
929927
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDestroyPartitionStreamEvent,
930928
typename TAReadSessionEvent<false>::TStopPartitionSessionEvent,
931929
DestroyPartitionStreamHandler_,
932-
StopPartitionSessionHandler_,
933-
true);
930+
StopPartitionSessionHandler_);
934931
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamStatusEvent,
935932
typename TAReadSessionEvent<false>::TPartitionSessionStatusEvent,
936933
PartitionStreamStatusHandler_,
937-
PartitionSessionStatusHandler_,
938-
true);
934+
PartitionSessionStatusHandler_);
939935
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
940936
typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent,
941937
PartitionStreamClosedHandler_,
942-
PartitionSessionClosedHandler_,
943-
true);
938+
PartitionSessionClosedHandler_);
944939
DECLARE_HANDLER(TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false); // Not applied
945940

946941
#undef DECLARE_HANDLER
947942
#undef DECLARE_TEMPLATE_HANDLER
948943

944+
using TPartitionSessionClosedEvent = std::conditional_t<UseMigrationProtocol,
945+
TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
946+
TAReadSessionEvent<false>::TPartitionSessionClosedEvent>;
947+
948+
/*
949+
bool operator()(TPartitionSessionClosedEvent&) {
950+
auto specificProvider = [this](){
951+
if constexpr (UseMigrationProtocol) {
952+
return this->Settings.EventHandlers_.PartitionStreamClosedHandler_;
953+
} else {
954+
return this->Settings.EventHandlers_.PartitionSessionClosedHandler_;
955+
}
956+
};
957+
958+
this->template PushCommonHandler<>(
959+
std::move(TParent::TBaseHandlersVisitor::Event),
960+
[specific = specificProvider(),
961+
common = this->Settings.EventHandlers_.CommonHandler_,
962+
cbContext = CbContext](TReadSessionEvent::TEvent& event) {
963+
auto& e = std::get<TPartitionSessionClosedEvent>(event);
964+
if (specific) {
965+
specific(e);
966+
} else if (common) {
967+
common(event);
968+
}
969+
if (auto session = cbContext->LockShared()) {
970+
session->UnregisterPartition(e.GetPartitionSession()->GetPartitionId(), e.GetPartitionSession()->GetPartitionSessionId());
971+
}
972+
});
973+
return specificProvider() || this->Settings.EventHandlers_.CommonHandler_;
974+
}
975+
*/
949976
template<bool E = !UseMigrationProtocol>
950977
constexpr std::enable_if_t<E, bool>
951978
operator()(typename TAReadSessionEvent<false>::TEndPartitionSessionEvent&) {
@@ -954,17 +981,17 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
954981
[specific = this->Settings.EventHandlers_.EndPartitionSessionHandler_,
955982
common = this->Settings.EventHandlers_.CommonHandler_,
956983
cbContext = CbContext](TReadSessionEvent::TEvent& event) {
957-
auto e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
984+
auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
958985
if (specific) {
959986
specific(e);
960987
} else if (common) {
961988
common(event);
962989
}
963-
if (auto session = cbContext->LockShared()) {
964-
session->SetReadingFinished(e.GetPartitionSession()->GetPartitionSessionId(), e.GetChildPartitionIds());
965-
}
990+
//if (auto session = cbContext->LockShared()) {
991+
// session->SetReadingFinished(e.GetPartitionSession()->GetPartitionSessionId(), e.GetChildPartitionIds());
992+
//}
966993
});
967-
return false;
994+
return this->Settings.EventHandlers_.EndPartitionSessionHandler_ || this->Settings.EventHandlers_.CommonHandler_;
968995
}
969996

970997
bool Visit() {
@@ -1067,6 +1094,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
10671094
void Start();
10681095
void ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
10691096
void ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
1097+
void ConfirmPartitionStreamEnd(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds);
10701098
void RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
10711099
void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
10721100

@@ -1118,7 +1146,6 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
11181146
void UnregisterPartition(ui32 partitionId, ui64 partitionSessionId);
11191147
std::vector<ui64> GetParentPartitionSessions(ui32 partitionId, ui64 partitionSessionId);
11201148
bool AllParentSessionsHasBeenRead(ui32 partitionId, ui64 partitionSessionId);
1121-
void SetReadingFinished(ui64 partitionSessionId, const std::vector<ui32>& childIds);
11221149

11231150
void SetSelfContext(TPtr ptr) {
11241151
TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SetSelfContext(std::move(ptr));

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmDestroy() {
8282
}
8383
}
8484

85+
template<bool UseMigrationProtocol>
86+
void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmEnd(const std::vector<ui32>& childIds) {
87+
if (auto sessionShared = CbContext->LockShared()) {
88+
sessionShared->ConfirmPartitionStreamEnd(this, childIds);
89+
}
90+
}
91+
8592
template<bool UseMigrationProtocol>
8693
void TPartitionStreamImpl<UseMigrationProtocol>::StopReading() {
8794
Y_ABORT("Not implemented"); // TODO
@@ -1825,18 +1832,19 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::RegisterParentPartitio
18251832

18261833
template<bool UseMigrationProtocol>
18271834
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::UnregisterPartition(ui32 partitionId, ui64 partitionSessionId) {
1828-
auto it = HierarchyData.find(partitionId);
1829-
if (it != HierarchyData.end()) {
1835+
for (auto it = HierarchyData.begin(); it != HierarchyData.end();) {
18301836
auto& values = it->second;
18311837
for (auto v = values.begin(); v != values.end();) {
1832-
if (v->PartitionSessionId < partitionSessionId) {
1838+
if (v->PartitionId == partitionId && v->PartitionSessionId < partitionSessionId) {
18331839
v = values.erase(v);
18341840
} else {
18351841
++v;
18361842
}
18371843
}
18381844
if (values.empty()) {
1839-
HierarchyData.erase(it);
1845+
it = HierarchyData.erase(it);
1846+
} else {
1847+
++it;
18401848
}
18411849
}
18421850
}
@@ -1879,8 +1887,8 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::AllParentSessionsHasBe
18791887
}
18801888

18811889
template<bool UseMigrationProtocol>
1882-
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::SetReadingFinished(ui64 partitionSessionId, const std::vector<ui32>& childIds) {
1883-
ReadingFinishedData.insert(partitionSessionId);
1890+
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamEnd(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds) {
1891+
ReadingFinishedData.insert(partitionStream->GetAssignId()); // Check
18841892
for (auto& [_, s] : PartitionStreams) {
18851893
for (auto partitionId : childIds) {
18861894
if (s->GetPartitionId() == partitionId) {
@@ -2137,12 +2145,12 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
21372145
TParent::Events.pop();
21382146

21392147
if constexpr (!UseMigrationProtocol) {
2140-
if (std::holds_alternative<TReadSessionEvent::TEndPartitionSessionEvent>(*event)) {
2141-
auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(*event);
2142-
if (auto session = frontCbContext->LockShared()) {
2143-
session->SetReadingFinished(partitionStream->GetPartitionSessionId(), e.GetChildPartitionIds());
2144-
}
2145-
}
2148+
// if (std::holds_alternative<TReadSessionEvent::TEndPartitionSessionEvent>(*event)) {
2149+
// auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(*event);
2150+
// if (auto session = frontCbContext->LockShared()) {
2151+
// session->SetReadingFinished(partitionStream->GetPartitionSessionId(), e.GetChildPartitionIds());
2152+
// }
2153+
// }
21462154
}
21472155
}
21482156

@@ -2153,9 +2161,6 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
21532161

21542162
Y_ASSERT(TParent::CloseEvent);
21552163

2156-
// TODO close
2157-
2158-
21592164
return {*TParent::CloseEvent};
21602165
}
21612166

ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,10 @@ struct TReadSessionEvent {
298298
return ChildPartitionIds;
299299
}
300300

301+
//! Confirm partition session destruction.
302+
//! Confirm has no effect if TPartitionSessionClosedEvent for same partition session with is received.
303+
void Confirm();
304+
301305
private:
302306
std::vector<ui32> AdjacentPartitionIds;
303307
std::vector<ui32> ChildPartitionIds;

0 commit comments

Comments
 (0)