@@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 {
9494
9595 return {};
9696 }
97+
98+ void SetShardProperties (::Ydb::DataStreams::V1::Shard* shard,
99+ const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition,
100+ const bool autoPartitioningEnabled,
101+ const size_t allShardsCount,
102+ const std::map<ui64, std::pair<ui64, ui64>>& offsets) {
103+ shard->set_shard_id (GetShardName (partition.GetPartitionId ()));
104+
105+
106+ const auto & parents = partition.GetParentPartitionIds ();
107+ if (parents.size () > 0 ) {
108+ shard->set_parent_shard_id (GetShardName (parents[0 ]));
109+ }
110+ if (parents.size () > 1 ) {
111+ shard->set_adjacent_parent_shard_id (GetShardName (parents[1 ]));
112+ }
113+
114+ auto * rangeProto = shard->mutable_hash_key_range ();
115+ if (autoPartitioningEnabled) {
116+ NYql::NDecimal::TUint128 from = partition.HasKeyRange () && partition.GetKeyRange ().HasFromBound ()
117+ ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetFromBound ()) + 1 : 0 ;
118+ NYql::NDecimal::TUint128 to = partition.HasKeyRange () && partition.GetKeyRange ().HasToBound ()
119+ ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetToBound ()): -1 ;
120+ rangeProto->set_starting_hash_key (Uint128ToDecimalString (from));
121+ rangeProto->set_ending_hash_key (Uint128ToDecimalString (to));
122+ } else {
123+ auto range = RangeFromShardNumber (partition.GetPartitionId (), allShardsCount);
124+ rangeProto->set_starting_hash_key (Uint128ToDecimalString (range.Start ));
125+ rangeProto->set_ending_hash_key (Uint128ToDecimalString (range.End ));
126+ }
127+
128+ auto it = offsets.find (partition.GetPartitionId ());
129+ if (it != offsets.end ()) {
130+ auto * rangeProto = shard->mutable_sequence_number_range ();
131+ rangeProto->set_starting_sequence_number (TStringBuilder () << it->second .first );
132+
133+ if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus ()) {
134+ rangeProto->set_ending_sequence_number (TStringBuilder () << it->second .second );
135+ }
136+ }
137+ }
97138 }
98139
99140
@@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 {
845886 break ;
846887 } else {
847888 auto * shard = description.add_shards ();
848- shard->set_shard_id (shardName);
849-
850- const auto & parents = partition.GetParentPartitionIds ();
851- if (parents.size () > 0 ) {
852- shard->set_parent_shard_id (GetShardName (parents[0 ]));
853- }
854- if (parents.size () > 1 ) {
855- shard->set_adjacent_parent_shard_id (GetShardName (parents[1 ]));
856- }
857-
858- auto * rangeProto = shard->mutable_hash_key_range ();
859- if (NPQ::SplitMergeEnabled (pqConfig)) {
860- NYql::NDecimal::TUint128 from = partition.HasKeyRange () && partition.GetKeyRange ().HasFromBound () ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetFromBound ()) + 1 : 0 ;
861- NYql::NDecimal::TUint128 to = partition.HasKeyRange () && partition.GetKeyRange ().HasToBound () ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetToBound ()): -1 ;
862- rangeProto->set_starting_hash_key (Uint128ToDecimalString (from));
863- rangeProto->set_ending_hash_key (Uint128ToDecimalString (to));
864- } else {
865- auto range = RangeFromShardNumber (partitionId, PQGroup.GetPartitions ().size ());
866- rangeProto->set_starting_hash_key (Uint128ToDecimalString (range.Start ));
867- rangeProto->set_ending_hash_key (Uint128ToDecimalString (range.End ));
868- }
869- auto it = StartEndOffsetsPerPartition.find (partitionId);
870- if (it != StartEndOffsetsPerPartition.end ()) {
871- auto * rangeProto = shard->mutable_sequence_number_range ();
872- rangeProto->set_starting_sequence_number (TStringBuilder () << it->second .first );
873- }
889+ SetShardProperties (shard, partition, NPQ::SplitMergeEnabled (pqConfig), PQGroup.GetPartitions ().size (), StartEndOffsetsPerPartition);
874890 }
875891 }
876892 }
@@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 {
17541770 std::vector<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition> Shards;
17551771 ui32 LeftToRead = 0 ;
17561772 ui32 AllShardsCount = 0 ;
1773+ bool AutoPartitioningEnabled = false ;
17571774 std::atomic<ui32> GotOffsetResponds;
17581775 std::vector<TActorId> Pipes;
17591776 };
@@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 {
18471864 }
18481865
18491866 using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition;
1850- const auto & partitions = topicInfo.PQGroupInfo ->Description .GetPartitions ();
1867+ const auto & description = topicInfo.PQGroupInfo ->Description ;
1868+ const auto & partitions = description.GetPartitions ();
18511869 TString startingShardId = this ->GetProtoRequest ()->Getexclusive_start_shard_id ();
18521870 ui64 startingTimepoint{0 };
18531871 bool onlyOpenShards{true };
@@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 {
18951913 }}
18961914 };
18971915
1916+ AutoPartitioningEnabled = NPQ::SplitMergeEnabled (description.GetPQTabletConfig ());
1917+
18981918 const auto alreadyRead = NextToken.GetAlreadyRead ();
18991919 if (alreadyRead > (ui32)partitions.size ()) {
19001920 return ReplyWithError (Ydb::StatusIds::BAD_REQUEST, static_cast <size_t >(NYds::EErrorCodes::INVALID_ARGUMENT),
@@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 {
19701990 void TListShardsActor::SendResponse (const TActorContext& ctx) {
19711991 Ydb::DataStreams::V1::ListShardsResult result;
19721992 for (auto & shard : Shards) {
1973- auto awsShard = result.Addshards ();
1974- // TODO:
1975- // awsShard->set_parent_shard_id("");
1976- // awsShard->set_adjacent_parent_shard_id(prevShardName);
1977- auto range = RangeFromShardNumber (shard.GetPartitionId (), AllShardsCount);
1978- awsShard->mutable_hash_key_range ()->set_starting_hash_key (
1979- Uint128ToDecimalString (range.Start ));
1980- awsShard->mutable_hash_key_range ()->set_ending_hash_key (
1981- Uint128ToDecimalString (range.End ));
1982- awsShard->mutable_sequence_number_range ()->set_starting_sequence_number (
1983- std::to_string (StartEndOffsetsPerPartition[shard.GetPartitionId ()].first ));
1984- // TODO: fill it only for closed partitions
1985- // awsShard->mutable_sequence_number_range()->set_ending_sequence_number(
1986- // std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second));
1987- awsShard->set_shard_id (GetShardName (shard.GetPartitionId ()));
1993+ SetShardProperties (result.Addshards (), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition);
19881994 }
19891995 if (LeftToRead > 0 ) {
19901996 TNextToken token (StreamName, NextToken.GetAlreadyRead () + Shards.size (), MaxResults, TInstant::Now ().MilliSeconds ());
0 commit comments