Skip to content

Commit c09bd6c

Browse files
authored
Read session param for CommitOffset RPC to 24-4 (#16697)
1 parent 4e8794d commit c09bd6c

File tree

8 files changed

+288
-3
lines changed

8 files changed

+288
-3
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,265 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
621621
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
622622
}
623623

624+
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionResetAfterCommit) {
625+
TTopicSdkTestSetup setup = CreateSetup();
626+
TTopicClient client = setup.MakeClient();
627+
628+
TCreateTopicSettings createSettings;
629+
createSettings
630+
.BeginConfigurePartitioningSettings()
631+
.MinActivePartitions(1)
632+
.MaxActivePartitions(100)
633+
.BeginConfigureAutoPartitioningSettings()
634+
.UpUtilizationPercent(2)
635+
.DownUtilizationPercent(1)
636+
.StabilizationWindow(TDuration::Seconds(2))
637+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
638+
.EndConfigureAutoPartitioningSettings()
639+
.EndConfigurePartitioningSettings()
640+
.BeginAddConsumer()
641+
.ConsumerName(TEST_CONSUMER);
642+
643+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
644+
645+
auto msg = TString(1_MB, 'a');
646+
647+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
648+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, std::string{TEST_TOPIC}, false);
649+
auto seqNo = 1;
650+
{
651+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
652+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
653+
Sleep(TDuration::Seconds(5));
654+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
655+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
656+
}
657+
658+
{
659+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
660+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
661+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
662+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
663+
Sleep(TDuration::Seconds(15));
664+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
665+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
666+
}
667+
668+
auto writeSession_3 = CreateWriteSession(client, "producer-2", 1, std::string{TEST_TOPIC}, false);
669+
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
670+
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
671+
672+
auto reader = client.CreateReadSession(
673+
TReadSessionSettings()
674+
.AutoPartitioningSupport(true)
675+
.AppendTopics(TTopicReadSettings(TEST_TOPIC))
676+
.ConsumerName(TEST_CONSUMER));
677+
678+
TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(5);
679+
680+
auto commitSent = false;
681+
while(deadlineTime > TInstant::Now()) {
682+
for (auto event : reader->GetEvents(false)) {
683+
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
684+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
685+
auto& messages = x->GetMessages();
686+
for (size_t i = 0u; i < messages.size(); ++i) {
687+
auto& message = messages[i];
688+
message.Commit();
689+
Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush;
690+
// check we get this SeqNo two times
691+
if (message.GetSeqNo() == 6) {
692+
if (!commitSent) {
693+
commitSent = true;
694+
Sleep(TDuration::MilliSeconds(300));
695+
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
696+
UNIT_ASSERT(status.IsSuccess());
697+
} else {
698+
return;
699+
}
700+
}
701+
}
702+
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo++)));
703+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
704+
x->Confirm();
705+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
706+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
707+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
708+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
709+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
710+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
711+
x->Confirm();
712+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
713+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
714+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
715+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
716+
x->Confirm();
717+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
718+
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
719+
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
720+
} else {
721+
Cerr << "SESSION EVENT unhandled \n";
722+
}
723+
}
724+
Sleep(TDuration::MilliSeconds(250));
725+
}
726+
727+
UNIT_ASSERT(false);
728+
}
729+
730+
Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) {
731+
TTopicSdkTestSetup setup = CreateSetup();
732+
TTopicClient client = setup.MakeClient();
733+
734+
TCreateTopicSettings createSettings;
735+
createSettings
736+
.BeginConfigurePartitioningSettings()
737+
.MinActivePartitions(1)
738+
.MaxActivePartitions(100)
739+
.BeginConfigureAutoPartitioningSettings()
740+
.UpUtilizationPercent(2)
741+
.DownUtilizationPercent(1)
742+
.StabilizationWindow(TDuration::Seconds(2))
743+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
744+
.EndConfigureAutoPartitioningSettings()
745+
.EndConfigurePartitioningSettings()
746+
.BeginAddConsumer()
747+
.ConsumerName(TEST_CONSUMER);
748+
749+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
750+
751+
auto msg = TString(1_MB, 'a');
752+
753+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
754+
auto seqNo = 1;
755+
{
756+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
757+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
758+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
759+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
760+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
761+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
762+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
763+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
764+
writeSession_1->Close();
765+
Sleep(TDuration::Seconds(15));
766+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
767+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
768+
}
769+
770+
auto writeSession_2 = CreateWriteSession(client, "producer-1", 0, std::string{TEST_TOPIC}, false);
771+
auto reader = client.CreateReadSession(
772+
TReadSessionSettings()
773+
.AutoPartitioningSupport(true)
774+
.AppendTopics(TTopicReadSettings(TEST_TOPIC))
775+
.ConsumerName(TEST_CONSUMER));
776+
777+
TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(5);
778+
779+
auto commitSent = false;
780+
TString readSessionId = "";
781+
while(deadlineTime > TInstant::Now()) {
782+
for (auto event : reader->GetEvents(false)) {
783+
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
784+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
785+
auto& messages = x->GetMessages();
786+
for (size_t i = 0u; i < messages.size(); ++i) {
787+
auto& message = messages[i];
788+
789+
if (commitSent) {
790+
// read session not changed
791+
UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId());
792+
}
793+
794+
// check we NOT get this SeqNo two times
795+
if (message.GetSeqNo() == 6) {
796+
if (!commitSent) {
797+
commitSent = true;
798+
Sleep(TDuration::MilliSeconds(300));
799+
800+
readSessionId = message.GetPartitionSession()->GetReadSessionId();
801+
TCommitOffsetSettings commitSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
802+
auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 8, commitSettings).GetValueSync();
803+
UNIT_ASSERT(status.IsSuccess());
804+
805+
{
806+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
807+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
808+
UNIT_ASSERT(result.IsSuccess());
809+
810+
auto description = result.GetConsumerDescription();
811+
812+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
813+
UNIT_ASSERT(stats);
814+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
815+
}
816+
817+
// must be ignored, because commit to past
818+
TCommitOffsetSettings commitToPastSettings {.ReadSessionId_ = message.GetPartitionSession()->GetReadSessionId()};
819+
auto commitToPastStatus = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitToPastSettings).GetValueSync();
820+
UNIT_ASSERT(commitToPastStatus.IsSuccess());
821+
822+
{
823+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
824+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
825+
UNIT_ASSERT(result.IsSuccess());
826+
827+
auto description = result.GetConsumerDescription();
828+
829+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
830+
UNIT_ASSERT(stats);
831+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
832+
}
833+
834+
TCommitOffsetSettings commitSettingsWrongSession {.ReadSessionId_ = "random_session"};
835+
auto statusWrongSession = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0, commitSettingsWrongSession).GetValueSync();
836+
UNIT_ASSERT(!statusWrongSession.IsSuccess());
837+
838+
{
839+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
840+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
841+
UNIT_ASSERT(result.IsSuccess());
842+
843+
auto description = result.GetConsumerDescription();
844+
845+
auto stats = description.GetPartitions().at(0).GetPartitionConsumerStats();
846+
UNIT_ASSERT(stats);
847+
UNIT_ASSERT(stats->GetCommittedOffset() == 8);
848+
}
849+
850+
} else {
851+
UNIT_ASSERT(false);
852+
}
853+
} else {
854+
message.Commit();
855+
}
856+
}
857+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
858+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
859+
x->Confirm();
860+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
861+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
862+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
863+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
864+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
865+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
866+
x->Confirm();
867+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
868+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
869+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
870+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
871+
x->Confirm();
872+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
873+
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
874+
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
875+
} else {
876+
Cerr << "SESSION EVENT unhandled \n";
877+
}
878+
}
879+
Sleep(TDuration::MilliSeconds(250));
880+
}
881+
}
882+
624883
Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) {
625884
auto autoscalingTestTopic = "autoscalit-topic";
626885
TTopicSdkTestSetup setup = CreateSetup();

ydb/public/api/protos/ydb_topic.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,9 @@ message CommitOffsetRequest {
751751

752752
// Processed offset.
753753
int64 offset = 5;
754+
755+
// Read session identifier from StreamRead RPC.
756+
string read_session_id = 6;
754757
}
755758

756759
// Commit offset response sent from server to client.

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
599599
template <bool V = UseMigrationProtocol, class = std::enable_if_t<!V>>
600600
TPartitionStreamImpl(ui64 partitionStreamId,
601601
TString topicPath,
602+
TString readSessionId,
602603
i64 partitionId,
603604
i64 assignId,
604605
i64 readOffset,
@@ -611,6 +612,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
611612
{
612613
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
613614
TAPartitionStream<false>::TopicPath = std::move(topicPath);
615+
TAPartitionStream<false>::ReadSessionId = std::move(readSessionId);
614616
TAPartitionStream<false>::PartitionId = static_cast<ui64>(partitionId);
615617
MaxCommittedOffset = static_cast<ui64>(readOffset);
616618
}
@@ -1296,6 +1298,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
12961298
const TString Database;
12971299
const TString SessionId;
12981300
const TString ClusterName;
1301+
TString ReadSessionId;
12991302
TLog Log;
13001303
ui64 NextPartitionStreamId;
13011304
ui64 PartitionStreamIdStep;

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
954954
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
955955

956956
RetryState = nullptr;
957+
ReadSessionId = msg.session_id();
957958

958959
// Successful init. Do nothing.
959960
ContinueReadingDataImpl();
@@ -1205,6 +1206,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
12051206
Y_UNUSED(deferred);
12061207

12071208
RetryState = nullptr;
1209+
ReadSessionId = msg.session_id();
12081210

12091211
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id());
12101212

@@ -1304,8 +1306,12 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
13041306
Y_ABORT_UNLESS(Lock.IsLocked());
13051307

13061308
auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
1307-
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
1308-
msg.partition_session().partition_session_id(), msg.committed_offset(),
1309+
NextPartitionStreamId,
1310+
msg.partition_session().path(),
1311+
ReadSessionId,
1312+
msg.partition_session().partition_id(),
1313+
msg.partition_session().partition_session_id(),
1314+
msg.committed_offset(),
13091315
SelfContext);
13101316
NextPartitionStreamId += PartitionStreamIdStep;
13111317

ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,9 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
323323
request.set_partition_id(partitionId);
324324
request.set_consumer(consumerName);
325325
request.set_offset(offset);
326+
if (settings.ReadSessionId_) {
327+
request.set_read_session_id(*settings.ReadSessionId_);
328+
}
326329

327330
return RunSimple<Ydb::Topic::V1::TopicService, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse>(
328331
std::move(request),

ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,8 @@ struct TDescribePartitionSettings: public TOperationRequestSettings<TDescribePar
749749
};
750750

751751
// Settings for commit offset request.
752-
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {};
752+
struct TCommitOffsetSettings : public TOperationRequestSettings<TCommitOffsetSettings> {
753+
FLUENT_SETTING_OPTIONAL(TString, ReadSessionId);
754+
};
753755

754756
} // namespace NYdb::NTopic

ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
3232
return TopicPath;
3333
}
3434

35+
//! Read session id.
36+
const TString GetReadSessionId() const {
37+
return ReadSessionId;
38+
}
39+
3540
//! Partition id.
3641
ui64 GetPartitionId() const {
3742
return PartitionId;
@@ -40,6 +45,7 @@ struct TPartitionSession: public TThrRefBase, public TPrintable<TPartitionSessio
4045
protected:
4146
ui64 PartitionSessionId;
4247
TString TopicPath;
48+
TString ReadSessionId;
4349
ui64 PartitionId;
4450
};
4551

ydb/services/persqueue_v1/actors/commit_offset_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
127127
commit->SetClientId(ClientId);
128128
commit->SetOffset(client_req->offset());
129129
commit->SetStrict(true);
130+
if (!client_req->read_session_id().empty()) {
131+
commit->SetSessionId(client_req->read_session_id());
132+
}
130133

131134
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "strict CommitOffset, partition " << client_req->partition_id()
132135
<< " committing to position " << client_req->offset() /*<< " prev " << CommittedOffset

0 commit comments

Comments
 (0)