Skip to content

Set ending_sequence_number for inactive partitions of datashard (#9636) #9668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 {

return {};
}

void SetShardProperties(::Ydb::DataStreams::V1::Shard* shard,
const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition,
const bool autoPartitioningEnabled,
const size_t allShardsCount,
const std::map<ui64, std::pair<ui64, ui64>>& offsets) {
shard->set_shard_id(GetShardName(partition.GetPartitionId()));


const auto& parents = partition.GetParentPartitionIds();
if (parents.size() > 0) {
shard->set_parent_shard_id(GetShardName(parents[0]));
}
if (parents.size() > 1) {
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
}

auto* rangeProto = shard->mutable_hash_key_range();
if (autoPartitioningEnabled) {
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound()
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound()
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
} else {
auto range = RangeFromShardNumber(partition.GetPartitionId(), allShardsCount);
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
}

auto it = offsets.find(partition.GetPartitionId());
if (it != offsets.end()) {
auto* rangeProto = shard->mutable_sequence_number_range();
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);

if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus()) {
rangeProto->set_ending_sequence_number(TStringBuilder() << it->second.second);
}
}
}
}


Expand Down Expand Up @@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 {
break;
} else {
auto* shard = description.add_shards();
shard->set_shard_id(shardName);

const auto& parents = partition.GetParentPartitionIds();
if (parents.size() > 0) {
shard->set_parent_shard_id(GetShardName(parents[0]));
}
if (parents.size() > 1) {
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
}

auto* rangeProto = shard->mutable_hash_key_range();
if (NPQ::SplitMergeEnabled(pqConfig)) {
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
} else {
auto range = RangeFromShardNumber(partitionId, PQGroup.GetPartitions().size());
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
}
auto it = StartEndOffsetsPerPartition.find(partitionId);
if (it != StartEndOffsetsPerPartition.end()) {
auto* rangeProto = shard->mutable_sequence_number_range();
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);
}
SetShardProperties(shard, partition, NPQ::SplitMergeEnabled(pqConfig), PQGroup.GetPartitions().size(), StartEndOffsetsPerPartition);
}
}
}
Expand Down Expand Up @@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 {
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition> Shards;
ui32 LeftToRead = 0;
ui32 AllShardsCount = 0;
bool AutoPartitioningEnabled = false;
std::atomic<ui32> GotOffsetResponds;
std::vector<TActorId> Pipes;
};
Expand Down Expand Up @@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 {
}

using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition;
const auto& partitions = topicInfo.PQGroupInfo->Description.GetPartitions();
const auto& description = topicInfo.PQGroupInfo->Description;
const auto& partitions = description.GetPartitions();
TString startingShardId = this->GetProtoRequest()->Getexclusive_start_shard_id();
ui64 startingTimepoint{0};
bool onlyOpenShards{true};
Expand Down Expand Up @@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 {
}}
};

AutoPartitioningEnabled = NPQ::SplitMergeEnabled(description.GetPQTabletConfig());

const auto alreadyRead = NextToken.GetAlreadyRead();
if (alreadyRead > (ui32)partitions.size()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
Expand Down Expand Up @@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::SendResponse(const TActorContext& ctx) {
Ydb::DataStreams::V1::ListShardsResult result;
for (auto& shard : Shards) {
auto awsShard = result.Addshards();
// TODO:
// awsShard->set_parent_shard_id("");
// awsShard->set_adjacent_parent_shard_id(prevShardName);
auto range = RangeFromShardNumber(shard.GetPartitionId(), AllShardsCount);
awsShard->mutable_hash_key_range()->set_starting_hash_key(
Uint128ToDecimalString(range.Start));
awsShard->mutable_hash_key_range()->set_ending_hash_key(
Uint128ToDecimalString(range.End));
awsShard->mutable_sequence_number_range()->set_starting_sequence_number(
std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].first));
//TODO: fill it only for closed partitions
//awsShard->mutable_sequence_number_range()->set_ending_sequence_number(
// std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second));
awsShard->set_shard_id(GetShardName(shard.GetPartitionId()));
SetShardProperties(result.Addshards(), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition);
}
if (LeftToRead > 0) {
TNextToken token(StreamName, NextToken.GetAlreadyRead() + Shards.size(), MaxResults, TInstant::Now().MilliSeconds());
Expand Down
15 changes: 15 additions & 0 deletions ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,18 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(description.partitioning_settings().auto_partitioning_settings().partition_write_speed().down_utilization_percent(), 13);
}

{
std::vector<NYDS_V1::TDataRecord> records;
for (ui32 i = 1; i <= 30; ++i) {
TString data = Sprintf("%04u", i);
records.push_back({data, data, ""});
}
auto result = testServer.DataStreamsClient->PutRecords(streamName, records).ExtractValueSync();
Cerr << result.GetResult().DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
ui64 txId = 107;
SplitPartition(*kikimr->GetRuntime(), txId, 1, "a");
Expand All @@ -2776,8 +2788,11 @@ Y_UNIT_TEST_SUITE(DataStreams) {

UNIT_ASSERT_VALUES_EQUAL(description.shards().size(), 5);
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().starting_sequence_number(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().ending_sequence_number(), "");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().starting_hash_key(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().ending_hash_key(), "113427455640312821154458202477256070484");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().starting_sequence_number(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().ending_sequence_number(), "8");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().starting_hash_key(), "113427455640312821154458202477256070485");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().ending_hash_key(), "226854911280625642308916404954512140969");
UNIT_ASSERT_VALUES_EQUAL(description.shards(2).hash_key_range().starting_hash_key(), "226854911280625642308916404954512140970");
Expand Down
Loading