Skip to content

Commit 47cd9f3

Browse files
committed
DESCRIBER
1 parent 8a13540 commit 47cd9f3

29 files changed

+2167
-31
lines changed

ydb/core/persqueue/actor_persqueue_client_iface.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ class IPersQueueMirrorReaderFactory {
5252
TMaybe<TLog> logger = Nothing()
5353
) const = 0;
5454

55+
virtual NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
56+
const NKikimrPQ::TMirrorPartitionConfig& config,
57+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
58+
) const = 0;
59+
5560
virtual ~IPersQueueMirrorReaderFactory() = default;
5661

5762
TDeferredActorLogBackend::TSharedAtomicActorSystemPtr GetSharedActorSystem() const {
@@ -120,6 +125,22 @@ class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
120125
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
121126
return topicClient.CreateReadSession(settings);
122127
}
128+
129+
NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
130+
const NKikimrPQ::TMirrorPartitionConfig& config,
131+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
132+
) const override {
133+
NYdb::NTopic::TTopicClientSettings clientSettings = NYdb::NTopic::TTopicClientSettings()
134+
.DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort())
135+
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
136+
.CredentialsProviderFactory(credentialsProviderFactory)
137+
.SslCredentials(NYdb::TSslCredentials(config.GetUseSecureConnection()));
138+
if (config.HasDatabase()) {
139+
clientSettings.Database(config.GetDatabase());
140+
}
141+
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
142+
return topicClient.DescribeTopic(config.GetTopic());
143+
}
123144
};
124145

125146
} // namespace NKikimr::NSQS

ydb/core/persqueue/events/internal.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <ydb/library/actors/core/actorid.h>
2121
#include <ydb/core/grpc_services/rpc_calls.h>
2222
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
23+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h>
2324
#include <util/generic/maybe.h>
25+
#include <expected>
2426

2527
namespace NYdb::inline Dev {
2628
class ICredentialsProviderFactory;
@@ -204,6 +206,7 @@ struct TEvPQ {
204206
EvExclusiveLockAcquired,
205207
EvReleaseExclusiveLock,
206208
EvRunCompaction,
209+
EvMirrorTopicDescription,
207210
EvEnd
208211
};
209212

@@ -771,6 +774,21 @@ struct TEvPQ {
771774
NKikimr::TTabletCountersBase Counters;
772775
};
773776

777+
struct TEvMirrorTopicDescription : public TEventLocal<TEvMirrorTopicDescription, EvMirrorTopicDescription> {
778+
TEvMirrorTopicDescription(NYdb::NTopic::TDescribeTopicResult description)
779+
: Description(std::move(description))
780+
{
781+
}
782+
783+
TEvMirrorTopicDescription(TString error)
784+
: Description(std::unexpected(std::move(error)))
785+
{
786+
}
787+
788+
std::expected<NYdb::NTopic::TDescribeTopicResult, TString> Description;
789+
};
790+
791+
774792
struct TEvRetryWrite : public TEventLocal<TEvRetryWrite, EvRetryWrite> {
775793
TEvRetryWrite()
776794
{}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#include "partition_key_range_sequence.h"
2+
3+
#include <util/generic/maybe.h>
4+
#include <util/generic/vector.h>
5+
#include <util/stream/format.h>
6+
#include <util/string/builder.h>
7+
#include <algorithm>
8+
#include <tuple>
9+
10+
namespace NKikimr::NPQ {
11+
namespace {
12+
struct TPoint {
13+
TMaybe<TStringBuf> Bound;
14+
bool StartOfInterval;
15+
ui32 Partition;
16+
17+
friend std::strong_ordering operator<=>(const TPoint& a, const TPoint& b) {
18+
return std::forward_as_tuple(a.SignedBound(), a.StartOfInterval) <=> std::forward_as_tuple(b.SignedBound(), b.StartOfInterval);
19+
}
20+
21+
std::string BoundToString() const {
22+
if (!Bound) {
23+
return StartOfInterval ? "\"-inf\"" : "\"+inf\"";
24+
}
25+
return TStringBuilder() << '"' << HexText(TStringBuf(*Bound)) << '"';
26+
}
27+
28+
constexpr std::tuple<int, TStringBuf> SignedBound() const {
29+
if (Bound) {
30+
return {0, *Bound};
31+
}
32+
if (StartOfInterval) {
33+
return {0, {}};
34+
} else {
35+
return {1, {}};
36+
}
37+
}
38+
};
39+
40+
TMaybe<TString> ValidateBoundsChainImpl(const std::span<const TPartitionKeyRangeView> bounds) {
41+
if (bounds.empty()) {
42+
return "Empty partitions list";
43+
}
44+
45+
TVector<TPoint> points(Reserve(bounds.size() * 2));
46+
for (const auto& set : bounds) {
47+
TPoint start{
48+
.Bound = set.FromBound,
49+
.StartOfInterval = true,
50+
.Partition = set.PartitionId,
51+
};
52+
TPoint end{
53+
.Bound = set.ToBound,
54+
.StartOfInterval = false,
55+
.Partition = set.PartitionId,
56+
};
57+
if (auto c = start <=> end; !(c < 0)) {
58+
return std::format("Partition {} has invalid bounds range: {}-{}", start.Partition, start.BoundToString(), end.BoundToString());
59+
}
60+
points.push_back(std::move(start));
61+
points.push_back(std::move(end));
62+
}
63+
std::sort(points.begin(), points.end());
64+
if (const auto& first = points.front(); !first.StartOfInterval || first.Bound.GetOrElse({}) != ""sv) {
65+
return std::format("First patrition {} doesn't have the lowest bound {}", first.Partition, first.BoundToString());
66+
}
67+
if (const auto& last = points.back(); last.StartOfInterval || last.Bound.Defined()) {
68+
return std::format("Last patrition {} doesn't have the highest bound {}", last.Partition, last.BoundToString());
69+
}
70+
for (size_t i = 0; i + 1 < points.size(); ++i) {
71+
const TPoint& prev = points[i + 0];
72+
const TPoint& next = points[i + 1];
73+
if (prev.SignedBound() == next.SignedBound()) {
74+
if (!prev.StartOfInterval && next.StartOfInterval && prev.Partition != next.Partition) {
75+
// ok, adjacent intervals
76+
} else {
77+
return std::format("Partitions {} and {} have overlapped bounds at point {}", prev.Partition, next.Partition, prev.BoundToString());
78+
}
79+
} else {
80+
if (prev.StartOfInterval && !next.StartOfInterval && prev.Partition == next.Partition) {
81+
// ok, same partition
82+
} else {
83+
if (!prev.StartOfInterval && next.StartOfInterval) {
84+
return std::format("Partitions {} and {} have a bounds gap {}-{} between them", prev.Partition, next.Partition, prev.BoundToString(), next.BoundToString());
85+
} else {
86+
return std::format("Partitions {} and {} bounds overlap at the point {}", prev.Partition, next.Partition, next.BoundToString());
87+
}
88+
}
89+
}
90+
}
91+
return Nothing();
92+
}
93+
} // namespace
94+
95+
std::expected<void, std::string> ValidateKeyRangeSequence(const std::span<const TPartitionKeyRangeView> bounds) {
96+
auto error = ValidateBoundsChainImpl(bounds);
97+
if (error.Empty()) {
98+
return {};
99+
}
100+
return std::unexpected(std::move(*error));
101+
}
102+
103+
} // namespace NKikimr::NPQ
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include <util/generic/maybe.h>
4+
#include <util/generic/strbuf.h>
5+
#include <expected>
6+
#include <span>
7+
8+
namespace NKikimr::NPQ {
9+
10+
struct TPartitionKeyRangeView {
11+
ui32 PartitionId;
12+
TMaybe<TStringBuf> FromBound;
13+
TMaybe<TStringBuf> ToBound;
14+
};
15+
16+
// checks if set of intetervals is disjoined and covers all (-inf, +inf) range of keys
17+
std::expected<void, std::string> ValidateKeyRangeSequence(const std::span<const TPartitionKeyRangeView> bounds);
18+
19+
} // namespace NKikimr::NPQ
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include "partition_key_range_sequence.h"
2+
#include <library/cpp/testing/unittest/registar.h>
3+
#include <span>
4+
5+
using namespace NKikimr::NPQ;
6+
7+
Y_UNIT_TEST_SUITE(TPartitionKeyRangeSequenceTest) {
8+
Y_UNIT_TEST(ValidSequence) {
9+
// Simple valid sequence with 3 partitions
10+
const TPartitionKeyRangeView ranges[] = {
11+
{0, Nothing(), "\x45"},
12+
{1, "\x45", "\x4F"},
13+
{2, "\x4F", Nothing()},
14+
};
15+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
16+
UNIT_ASSERT_C(result.has_value(), result.error());
17+
}
18+
19+
Y_UNIT_TEST(ValidSinglePartition) {
20+
// Single partition covering full range
21+
const TPartitionKeyRangeView ranges[] = {
22+
{0, Nothing(), Nothing()},
23+
};
24+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
25+
UNIT_ASSERT_C(result.has_value(), result.error());
26+
}
27+
28+
Y_UNIT_TEST(InvalidOverlap) {
29+
// Overlapping partitions
30+
const TPartitionKeyRangeView ranges[] = {
31+
{0, Nothing(), "\x4F"},
32+
{1, "\x45", "\x55"},
33+
{2, "\x55", Nothing()},
34+
};
35+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
36+
UNIT_ASSERT(!result.has_value());
37+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "overlap");
38+
}
39+
40+
Y_UNIT_TEST(InvalidOverlapLong) {
41+
// Overlapping partitions
42+
const TPartitionKeyRangeView ranges[] = {
43+
{0, Nothing(), "\x41"},
44+
{1, "\x41", "\x45"},
45+
{2, "\x45", "\x4F"},
46+
{3, "\x4F", "\x55"},
47+
{4, "\x55", Nothing()},
48+
{5, "\x4A", "\x51"},
49+
};
50+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
51+
UNIT_ASSERT(!result.has_value());
52+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "overlap");
53+
}
54+
55+
Y_UNIT_TEST(InvalidContains) {
56+
// Overlapping partitions
57+
const TPartitionKeyRangeView ranges[] = {
58+
{0, Nothing(), Nothing()},
59+
{1, "\x45", "\x55"},
60+
};
61+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
62+
UNIT_ASSERT(!result.has_value());
63+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "overlap");
64+
}
65+
66+
Y_UNIT_TEST(InvalidGap) {
67+
// Gap between partitions
68+
const TPartitionKeyRangeView ranges[] = {
69+
{0, Nothing(), "\x45"},
70+
{1, "\x4F", Nothing()},
71+
};
72+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
73+
UNIT_ASSERT(!result.has_value());
74+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "bounds gap");
75+
}
76+
77+
Y_UNIT_TEST(InvalidOrder) {
78+
// Misordered bounds
79+
const TPartitionKeyRangeView ranges[] = {
80+
{0, "\x4F", "\x45"},
81+
};
82+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
83+
UNIT_ASSERT(!result.has_value());
84+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "invalid bounds range");
85+
}
86+
87+
Y_UNIT_TEST(InvalidFullCoverHi) {
88+
const TPartitionKeyRangeView ranges[] = {
89+
{0, Nothing(), "\x4F"},
90+
{1, "\x4F", "\x55"},
91+
};
92+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
93+
UNIT_ASSERT(!result.has_value());
94+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "doesn't have the highest bound");
95+
}
96+
97+
Y_UNIT_TEST(InvalidFullCoverLo) {
98+
const TPartitionKeyRangeView ranges[] = {
99+
{0, "\x45", "\x4F"},
100+
{1, "\x4F", Nothing()},
101+
};
102+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
103+
UNIT_ASSERT(!result.has_value());
104+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "doesn't have the lowest bound");
105+
}
106+
107+
Y_UNIT_TEST(EmptyInput) {
108+
// Empty input
109+
const TPartitionKeyRangeView ranges[] = {};
110+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, 0));
111+
UNIT_ASSERT(!result.has_value());
112+
UNIT_ASSERT_STRING_CONTAINS(result.error(), "Empty partitions list");
113+
}
114+
115+
Y_UNIT_TEST(ValidFivePartitions) {
116+
// Valid sequence with 5 partitions using all specified boundaries
117+
const TPartitionKeyRangeView ranges[] = {
118+
{0, Nothing(), "\x41"},
119+
{1, "\x41", "\x45"},
120+
{2, "\x45", "\x4F"},
121+
{3, "\x4F", "\x55"},
122+
{4, "\x55", Nothing()},
123+
};
124+
auto result = ValidateKeyRangeSequence(std::span<const TPartitionKeyRangeView>(ranges, sizeof(ranges)/sizeof(ranges[0])));
125+
UNIT_ASSERT_C(result.has_value(), result.error());
126+
}
127+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
UNITTEST_FOR(ydb/core/persqueue/partition_key_range)
2+
3+
SRCS(partition_key_range_sequence_ut.cpp)
4+
5+
END()

ydb/core/persqueue/partition_key_range/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
partition_key_range.cpp
5+
partition_key_range_sequence.cpp
56
)
67

78
PEERDIR(
@@ -10,3 +11,5 @@ PEERDIR(
1011
)
1112

1213
END()
14+
15+
RECURSE_FOR_TESTS(ut)

0 commit comments

Comments
 (0)