Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
impl->EndedPartitions.insert(partitionId);
impl->EndedPartitionEvents.push_back(ev);

ev.Confirm();
});


Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
x->Confirm();
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
// do nothing.
x->Confirm();
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
if (AutoCommit) {
DeferredCommit.Add(*x);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,8 +1930,8 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
TDeferredActions actions;

TPartitionStreamImpl::SignalReadyEvents(stream,
&sessionQueue,
actions);
&sessionQueue,
actions);

UNIT_ASSERT_DATA_EVENT(1);
UNIT_ASSERT_CONTROL_EVENT();
Expand Down
6 changes: 4 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase {
}
}

void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent&) {
void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent& event) {
event.Confirm();
}

void OnPartitionStreamClosed(TReadSessionEvent::TPartitionSessionClosedEvent& event) {
Expand Down Expand Up @@ -139,7 +140,8 @@ TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::Simp
StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& event) {
event.Confirm();
});
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent&) {
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent& event) {
event.Confirm();
});
CommitOffsetAcknowledgementHandler([](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&){});
PartitionSessionClosedHandler([](TReadSessionEvent::TPartitionSessionClosedEvent&){});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ TEndPartitionSessionEvent::TEndPartitionSessionEvent(TPartitionSession::TPtr par
, ChildPartitionIds(std::move(childPartitionIds)) {
}

void TEndPartitionSessionEvent::Confirm() {
if (PartitionSession) {
static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds());
}
}

void JoinIds(TStringBuilder& ret, const std::vector<ui32> ids) {
ret << "[";
for (size_t i = 0; i < ids.size(); ++i) {
Expand Down
124 changes: 95 additions & 29 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,9 @@ struct TRawPartitionStreamEvent {
template <bool UseMigrationProtocol>
class TRawPartitionStreamEventQueue {
public:
TRawPartitionStreamEventQueue() = default;
TRawPartitionStreamEventQueue(TCallbackContextPtr<UseMigrationProtocol> cbContext)
: CbContext(cbContext) {
};

template <class... Ts>
TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back(Ts&&... event)
Expand Down Expand Up @@ -550,6 +552,7 @@ class TRawPartitionStreamEventQueue {
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>>& queue);

private:
TCallbackContextPtr<UseMigrationProtocol> CbContext;
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
};
Expand Down Expand Up @@ -583,6 +586,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
, AssignId(assignId)
, FirstNotReadOffset(readOffset)
, CbContext(std::move(cbContext))
, EventsQueue(CbContext)
{
TAPartitionStream<true>::PartitionStreamId = partitionStreamId;
TAPartitionStream<true>::TopicPath = std::move(topicPath);
Expand All @@ -603,6 +607,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
, AssignId(static_cast<ui64>(assignId))
, FirstNotReadOffset(static_cast<ui64>(readOffset))
, CbContext(std::move(cbContext))
, EventsQueue(CbContext)
{
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
TAPartitionStream<false>::TopicPath = std::move(topicPath);
Expand All @@ -625,6 +630,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {

void ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
void ConfirmDestroy();
void ConfirmEnd(const std::vector<ui32>& childIds);

void StopReading() /*override*/;
void ResumeReading() /*override*/;
Expand Down Expand Up @@ -764,6 +770,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
TMutex Lock;
};


template <bool UseMigrationProtocol>
class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSettings<UseMigrationProtocol>,
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent,
Expand Down Expand Up @@ -823,7 +830,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
}

bool TryApplyCallbackToEventImpl(typename TParent::TEvent& event,
TDeferredActions<UseMigrationProtocol>& deferred);
TDeferredActions<UseMigrationProtocol>& deferred,
TCallbackContextPtr<UseMigrationProtocol>& cbContext);
bool HasDataEventCallback() const;
void ApplyCallbackToEventImpl(TADataReceivedEvent<UseMigrationProtocol>& event,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>&& eventsInfo,
Expand Down Expand Up @@ -859,13 +867,19 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti

void ClearAllEvents();

void SetCallbackContext(TCallbackContextPtr<UseMigrationProtocol>& ctx) {
CbContext = ctx;
}

private:
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings,
typename TParent::TEvent& event,
TDeferredActions<UseMigrationProtocol>& deferred)
TDeferredActions<UseMigrationProtocol>& deferred,
TCallbackContextPtr<UseMigrationProtocol>& cbContext)
: TParent::TBaseHandlersVisitor(settings, event)
, Deferred(deferred) {
, Deferred(deferred)
, CbContext(cbContext) {
}

#define DECLARE_HANDLER(type, handler, answer) \
Expand All @@ -880,7 +894,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
} \
/**/

#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false, answer) \
#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false) \
bool operator()(std::conditional_t<UseMigrationProtocol, type_true, type_false>&) { \
if (this->template PushHandler<std::conditional_t<UseMigrationProtocol, type_true, type_false>>( \
std::move(TParent::TBaseHandlersVisitor::Event), \
Expand All @@ -892,7 +906,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
} \
}(), \
this->Settings.EventHandlers_.CommonHandler_)) { \
return answer; \
return true; \
} \
return false; \
} \
Expand All @@ -901,50 +915,81 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDataReceivedEvent,
typename TAReadSessionEvent<false>::TDataReceivedEvent,
DataReceivedHandler_,
DataReceivedHandler_,
true);
DataReceivedHandler_);
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCommitAcknowledgementEvent,
typename TAReadSessionEvent<false>::TCommitOffsetAcknowledgementEvent,
CommitAcknowledgementHandler_,
CommitOffsetAcknowledgementHandler_,
true);
CommitOffsetAcknowledgementHandler_);
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCreatePartitionStreamEvent,
typename TAReadSessionEvent<false>::TStartPartitionSessionEvent,
CreatePartitionStreamHandler_,
StartPartitionSessionHandler_,
true);
StartPartitionSessionHandler_);
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDestroyPartitionStreamEvent,
typename TAReadSessionEvent<false>::TStopPartitionSessionEvent,
DestroyPartitionStreamHandler_,
StopPartitionSessionHandler_,
true);
StopPartitionSessionHandler_);
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamStatusEvent,
typename TAReadSessionEvent<false>::TPartitionSessionStatusEvent,
PartitionStreamStatusHandler_,
PartitionSessionStatusHandler_,
true);
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent,
PartitionStreamClosedHandler_,
PartitionSessionClosedHandler_,
true);
PartitionSessionStatusHandler_);
DECLARE_HANDLER(TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false); // Not applied

#undef DECLARE_HANDLER
#undef DECLARE_TEMPLATE_HANDLER

bool operator()(std::conditional_t<UseMigrationProtocol, typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent>&) {
auto specific = [this]() {
if constexpr (UseMigrationProtocol) {
return this->Settings.EventHandlers_.PartitionStreamClosedHandler_;
} else {
return this->Settings.EventHandlers_.PartitionSessionClosedHandler_;
}
}();

if (!specific && !this->Settings.EventHandlers_.CommonHandler_) {
return false;
}

this->template PushCommonHandler<>(
std::move(TParent::TBaseHandlersVisitor::Event),
[specific = specific,
common = this->Settings.EventHandlers_.CommonHandler_,
cbContext = CbContext](auto& event) {
auto& e = std::get<std::conditional_t<UseMigrationProtocol, typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent>>(event);
if (specific) {
specific(e);
} else if (common) {
common(event);
}
if constexpr (!UseMigrationProtocol) {
if (auto session = cbContext->LockShared()) {
session->UnregisterPartition(e.GetPartitionSession()->GetPartitionId(), e.GetPartitionSession()->GetPartitionSessionId());
}
}
});

return true;
}

template<bool E = !UseMigrationProtocol>
constexpr std::enable_if_t<E, bool>
operator()(typename TAReadSessionEvent<false>::TEndPartitionSessionEvent&) {
if (this->template PushHandler<typename TAReadSessionEvent<false>::TEndPartitionSessionEvent>(
std::move(TParent::TBaseHandlersVisitor::Event),
[this](){
return this->Settings.EventHandlers_.EndPartitionSessionHandler_;
}(),
this->Settings.EventHandlers_.CommonHandler_)) {
if (!this->Settings.EventHandlers_.EndPartitionSessionHandler_ && !this->Settings.EventHandlers_.CommonHandler_) {
return false;
}
return false;
this->template PushCommonHandler<>(
std::move(TParent::TBaseHandlersVisitor::Event),
[specific = this->Settings.EventHandlers_.EndPartitionSessionHandler_,
common = this->Settings.EventHandlers_.CommonHandler_,
cbContext = CbContext](TReadSessionEvent::TEvent& event) {
auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
if (specific) {
specific(e);
} else if (common) {
common(event);
}
});
return true;
}

bool Visit() {
Expand All @@ -956,6 +1001,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
}

TDeferredActions<UseMigrationProtocol>& Deferred;
TCallbackContextPtr<UseMigrationProtocol> CbContext;
};

TADataReceivedEvent<UseMigrationProtocol>
Expand All @@ -964,12 +1010,13 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.

bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred);
THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred, CbContext);
return visitor.Visit();
}

private:
bool HasEventCallbacks;
TCallbackContextPtr<UseMigrationProtocol> CbContext;
};

} // namespace NYdb::NTopic
Expand Down Expand Up @@ -1045,6 +1092,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
void Start();
void ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
void ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
void ConfirmPartitionStreamEnd(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds);
void RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);

Expand Down Expand Up @@ -1092,6 +1140,16 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
return Log;
}

void RegisterParentPartition(ui32 partitionId, ui32 parentPartitionId, ui64 parentPartitionSessionId);
void UnregisterPartition(ui32 partitionId, ui64 partitionSessionId);
std::vector<ui64> GetParentPartitionSessions(ui32 partitionId, ui64 partitionSessionId);
bool AllParentSessionsHasBeenRead(ui32 partitionId, ui64 partitionSessionId);

void SetSelfContext(TPtr ptr) {
TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SetSelfContext(std::move(ptr));
EventsQueue->SetCallbackContext(TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SelfContext);
}

private:
void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);

Expand Down Expand Up @@ -1273,6 +1331,14 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
std::atomic<int> DecompressionTasksInflight = 0;
i64 ReadSizeBudget;
i64 ReadSizeServerDelta = 0;

struct TParentInfo {
ui32 PartitionId;
ui64 PartitionSessionId;
};

std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
std::unordered_set<ui64> ReadingFinishedData;
};

} // namespace NYdb::NTopic
Loading