Skip to content

Commit 3830e7d

Browse files
committed
more fixes
1 parent 2b9da06 commit 3830e7d

File tree

6 files changed

+19
-16
lines changed

6 files changed

+19
-16
lines changed

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
205205
x->Confirm();
206206
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
207207
} else if (auto* x = std::get_if<TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
208-
// do nothing.
208+
x->Confirm();
209+
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
209210
} else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
210211
if (AutoCommit) {
211212
DeferredCommit.Add(*x);

ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase {
7676
}
7777
}
7878

79-
void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent&) {
79+
void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent& event) {
80+
event.Confirm();
8081
}
8182

8283
void OnPartitionStreamClosed(TReadSessionEvent::TPartitionSessionClosedEvent& event) {
@@ -139,7 +140,8 @@ TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::Simp
139140
StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& event) {
140141
event.Confirm();
141142
});
142-
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent&) {
143+
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent& event) {
144+
event.Confirm();
143145
});
144146
CommitOffsetAcknowledgementHandler([](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&){});
145147
PartitionSessionClosedHandler([](TReadSessionEvent::TPartitionSessionClosedEvent&){});

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<fals
482482
auto& init = *req.mutable_init_request();
483483

484484
init.set_consumer(Settings.ConsumerName_);
485-
init.set_autoscaling_support(Settings.AutoscalingSupport_.GetOrElse(false));
485+
init.set_autoscaling_support(Settings.AutoscalingSupport_);
486486

487487
for (const TTopicReadSettings& topic : Settings.Topics_) {
488488
auto* topicSettings = init.add_topics_read_settings();
@@ -1888,7 +1888,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::AllParentSessionsHasBe
18881888

18891889
template<bool UseMigrationProtocol>
18901890
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStreamEnd(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds) {
1891-
ReadingFinishedData.insert(partitionStream->GetAssignId()); // Check
1891+
ReadingFinishedData.insert(partitionStream->GetPartitionSessionId());
18921892
for (auto& [_, s] : PartitionStreams) {
18931893
for (auto partitionId : childIds) {
18941894
if (s->GetPartitionId() == partitionId) {
@@ -2145,12 +2145,12 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
21452145
TParent::Events.pop();
21462146

21472147
if constexpr (!UseMigrationProtocol) {
2148-
// if (std::holds_alternative<TReadSessionEvent::TEndPartitionSessionEvent>(*event)) {
2149-
// auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(*event);
2150-
// if (auto session = frontCbContext->LockShared()) {
2151-
// session->SetReadingFinished(partitionStream->GetPartitionSessionId(), e.GetChildPartitionIds());
2152-
// }
2153-
// }
2148+
if (std::holds_alternative<TReadSessionEvent::TPartitionSessionClosedEvent>(*event)) {
2149+
auto& e = std::get<TReadSessionEvent::TPartitionSessionClosedEvent>(*event);
2150+
if (auto session = frontCbContext->LockShared()) {
2151+
session->UnregisterPartition(e.GetPartitionSession()->GetPartitionId(), e.GetPartitionSession()->GetPartitionSessionId());
2152+
}
2153+
}
21542154
}
21552155
}
21562156

ydb/public/sdk/cpp/client/ydb_topic/include/read_session.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
194194
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
195195

196196
//! AutoscalingSupport.
197-
FLUENT_SETTING_OPTIONAL(bool, AutoscalingSupport);
197+
FLUENT_SETTING_DEFAULT(bool, AutoscalingSupport, false);
198198

199199
//! Log.
200200
FLUENT_SETTING_OPTIONAL(TLog, Log);

ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,8 +794,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
794794
[&](TReadSessionEvent::TStopPartitionSessionEvent& event) {
795795
event.Confirm();
796796
},
797-
[&](TReadSessionEvent::TEndPartitionSessionEvent&) {
798-
// do nothing
797+
[&](TReadSessionEvent::TEndPartitionSessionEvent& event) {
798+
event.Confirm();
799799
},
800800
[&](TReadSessionEvent::TPartitionSessionStatusEvent&) {
801801
UNIT_FAIL("Test does not support lock sessions yet");

ydb/public/sdk/cpp/examples/topic_reader/eventloop/main.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ int main(int argc, const char* argv[]) {
104104
startPartitionSessionEvent->Confirm();
105105
} else if (auto* stopPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
106106
stopPartitionSessionEvent->Confirm();
107-
} else if (auto* stopPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
108-
// commit messages
107+
} else if (auto* endPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
108+
endPartitionSessionEvent->Confirm();
109109
} else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
110110
break;
111111
}

0 commit comments

Comments
 (0)