33#include < ydb/public/sdk/cpp/client/ydb_topic/topic.h>
44#include < ydb/public/sdk/cpp/client/ydb_table/table.h>
55#include < ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
6+ #include < ydb/core/cms/console/console.h>
67#include < ydb/core/keyvalue/keyvalue_events.h>
78#include < ydb/core/persqueue/key.h>
89#include < ydb/core/persqueue/blob.h>
@@ -37,8 +38,14 @@ class TFixture : public NUnitTest::TBaseFixture {
3738 void Write (const TString& message, NTable::TTransaction* tx = nullptr );
3839 };
3940
41+ struct TFeatureFlags {
42+ bool EnablePQConfigTransactionsAtSchemeShard = true ;
43+ };
44+
4045 void SetUp (NUnitTest::TTestContext&) override ;
4146
47+ void NotifySchemeShard (const TFeatureFlags& flags);
48+
4249 NTable::TSession CreateTableSession ();
4350 NTable::TTransaction BeginTx (NTable::TSession& session);
4451 void CommitTx (NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -62,6 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6269 std::optional<size_t > maxPartitionCount = std::nullopt );
6370 void DescribeTopic (const TString& path);
6471
72+ void AddConsumer (const TString& topic, const TVector<TString>& consumers);
73+
6574 void WriteToTopicWithInvalidTxId (bool invalidTxId);
6675
6776 TTopicWriteSessionPtr CreateTopicWriteSession (const TString& topicPath,
@@ -95,6 +104,8 @@ class TFixture : public NUnitTest::TBaseFixture {
95104 NYdb::EStatus status);
96105 void CloseTopicWriteSession (const TString& topicPath,
97106 const TString& messageGroupId);
107+ void CloseTopicReadSession (const TString& topicPath,
108+ const TString& consumerName);
98109
99110 enum EEndOfTransaction {
100111 Commit,
@@ -175,6 +186,8 @@ class TFixture : public NUnitTest::TBaseFixture {
175186 ui64 tabletId,
176187 const NPQ::TWriteId& writeId);
177188
189+ ui64 GetSchemeShardTabletId (const TActorId& actorId);
190+
178191 std::unique_ptr<TTopicSdkTestSetup> Setup;
179192 std::unique_ptr<TDriver> Driver;
180193
@@ -192,11 +205,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
192205{
193206 NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings ();
194207 settings.SetEnableTopicServiceTx (true );
208+
195209 Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
196210
197211 Driver = std::make_unique<TDriver>(Setup->MakeDriver ());
198212}
199213
214+ void TFixture::NotifySchemeShard (const TFeatureFlags& flags)
215+ {
216+ auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
217+ *request->Record .MutableConfig () = *Setup->GetServer ().ServerSettings .AppConfig ;
218+ request->Record .MutableConfig ()->MutableFeatureFlags ()->SetEnablePQConfigTransactionsAtSchemeShard (flags.EnablePQConfigTransactionsAtSchemeShard );
219+
220+ auto & runtime = Setup->GetRuntime ();
221+ auto actorId = runtime.AllocateEdgeActor ();
222+
223+ ui64 ssId = GetSchemeShardTabletId (actorId);
224+
225+ runtime.SendToPipe (ssId, actorId, request.release ());
226+ runtime.GrabEdgeEvent <NConsole::TEvConsole::TEvConfigNotificationResponse>();
227+ }
228+
200229NTable::TSession TFixture::CreateTableSession ()
201230{
202231 NTable::TTableClient client (GetDriver ());
@@ -323,6 +352,20 @@ void TFixture::CreateTopic(const TString& path,
323352 Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount);
324353}
325354
355+ void TFixture::AddConsumer (const TString& path,
356+ const TVector<TString>& consumers)
357+ {
358+ NTopic::TTopicClient client (GetDriver ());
359+ NTopic::TAlterTopicSettings settings;
360+
361+ for (const auto & consumer : consumers) {
362+ settings.BeginAddConsumer (consumer);
363+ }
364+
365+ auto result = client.AlterTopic (path, settings).GetValueSync ();
366+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
367+ }
368+
326369void TFixture::DescribeTopic (const TString& path)
327370{
328371 Setup->DescribeTopic (path);
@@ -645,6 +688,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
645688 TopicWriteSessions.erase (key);
646689}
647690
691+ void TFixture::CloseTopicReadSession (const TString& topicPath,
692+ const TString& consumerName)
693+ {
694+ Y_UNUSED (consumerName);
695+ TopicReadSessions.erase (topicPath);
696+ }
697+
648698void TFixture::WriteToTopic (const TString& topicPath,
649699 const TString& messageGroupId,
650700 const TString& message,
@@ -763,6 +813,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
763813 UNIT_ASSERT (context.AckCount <= context.WriteCount );
764814}
765815
816+ ui64 TFixture::GetSchemeShardTabletId (const TActorId& actorId)
817+ {
818+ auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
819+ navigate->DatabaseName = " /Root" ;
820+
821+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
822+ entry.Path = SplitPath (" /Root" );
823+ entry.SyncVersion = true ;
824+ entry.ShowPrivatePath = true ;
825+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
826+
827+ navigate->ResultSet .push_back (std::move (entry));
828+ // navigate->UserToken = "root@builtin";
829+ navigate->Cookie = 12345 ;
830+
831+ auto & runtime = Setup->GetRuntime ();
832+
833+ runtime.Send (MakeSchemeCacheID (), actorId,
834+ new TEvTxProxySchemeCache::TEvNavigateKeySet (navigate.release ()),
835+ 0 ,
836+ true );
837+ auto response = runtime.GrabEdgeEvent <TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
838+
839+ UNIT_ASSERT_VALUES_EQUAL (response->Request ->Cookie , 12345 );
840+ UNIT_ASSERT_VALUES_EQUAL (response->Request ->ErrorCount , 0 );
841+
842+ auto & front = response->Request ->ResultSet .front ();
843+
844+ return front.Self ->Info .GetSchemeshardId ();
845+ }
846+
766847ui64 TFixture::GetTopicTabletId (const TActorId& actorId, const TString& topicPath, ui32 partition)
767848{
768849 auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -1998,6 +2079,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
19982079 WriteMessagesInTx (0 , 1 );
19992080}
20002081
2082+ Y_UNIT_TEST_F (ReadRuleGeneration, TFixture)
2083+ {
2084+ // There was a server
2085+ NotifySchemeShard ({.EnablePQConfigTransactionsAtSchemeShard = false });
2086+
2087+ // Users have created their own topic on it
2088+ CreateTopic (TEST_TOPIC);
2089+
2090+ // And they wrote their messages into it
2091+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-1" );
2092+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-2" );
2093+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-3" );
2094+
2095+ // And he had a consumer
2096+ AddConsumer (TEST_TOPIC, {" consumer-1" });
2097+
2098+ // We read messages from the topic and committed offsets
2099+ auto messages = ReadFromTopic (TEST_TOPIC, " consumer-1" , TDuration::Seconds (2 ));
2100+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2101+ CloseTopicReadSession (TEST_TOPIC, " consumer-1" );
2102+
2103+ // And then the Logbroker team turned on the feature flag
2104+ NotifySchemeShard ({.EnablePQConfigTransactionsAtSchemeShard = true });
2105+
2106+ // Users continued to write to the topic
2107+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-4" );
2108+
2109+ // Users have added new consumers
2110+ AddConsumer (TEST_TOPIC, {" consumer-2" });
2111+
2112+ // And they wanted to continue reading their messages
2113+ messages = ReadFromTopic (TEST_TOPIC, " consumer-1" , TDuration::Seconds (2 ));
2114+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 1 );
2115+ }
2116+
20012117}
20022118
20032119}
0 commit comments