11#include " data_plane_helpers.h"
2+ #include < ydb/public/sdk/cpp/client/resources/ydb_resources.h>
3+ #include < ydb/public/sdk/cpp/client/ydb_topic/topic.h>
24
35namespace NKikimr ::NPersQueueTests {
46
@@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
5153 std::optional<ui32> partitionGroup,
5254 std::optional<TString> codec,
5355 std::optional<bool > reconnectOnFailure,
54- THashMap<TString, TString> sessionMeta
56+ THashMap<TString, TString> sessionMeta,
57+ const TString& userAgent
5558 ) {
5659 auto settings = TWriteSessionSettings ().Path (topic).MessageGroupId (sourceId);
5760 if (partitionGroup) settings.PartitionGroupId (*partitionGroup);
@@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
6669 }
6770 settings.MaxMemoryUsage (1024 *1024 *1024 *1024ll );
6871 settings.Meta_ .Fields = sessionMeta;
72+ if (!userAgent.empty ()) {
73+ settings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
74+ }
6975 return CreateSimpleWriter (driver, settings);
7076 }
7177
@@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
7985 return TPersQueueClient (driver, clientSettings).CreateReadSession (TReadSessionSettings (settings).DisableClusterDiscovery (true ));
8086 }
8187
88+ std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader (
89+ NYdb::TDriver& driver,
90+ const NYdb::NTopic::TReadSessionSettings& settings,
91+ std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
92+ const TString& userAgent
93+ ) {
94+ NYdb::NTopic::TTopicClientSettings clientSettings;
95+ if (creds) clientSettings.CredentialsProviderFactory (creds);
96+ auto readerSettings = settings;
97+ if (!userAgent.empty ()) {
98+ readerSettings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
99+ }
100+ return NYdb::NTopic::TTopicClient (driver, clientSettings).CreateReadSession (readerSettings);
101+ }
102+
82103 TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<IReadSession>& reader, TDuration timeout) {
83104 while (true ) {
84105 auto future = reader->WaitEvent ();
@@ -99,4 +120,25 @@ namespace NKikimr::NPersQueueTests {
99120 }
100121 return {};
101122 }
123+
124+ TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
125+ while (true ) {
126+ auto future = reader->WaitEvent ();
127+ future.Wait (timeout);
128+
129+ TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent (false , 1 );
130+ if (!event)
131+ return {};
132+ if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
133+ return *e;
134+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
135+ e->Confirm ();
136+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
137+ e->Confirm ();
138+ } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
139+ return {};
140+ }
141+ }
142+ return {};
143+ }
102144}
0 commit comments