Skip to content

Commit 994ffaf

Browse files
authored
Test for case of incorrect metering exceeding the reserved topic size (#5043)
1 parent df84217 commit 994ffaf

File tree

6 files changed

+58
-21
lines changed

6 files changed

+58
-21
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
277277
);
278278
}
279279

280-
ui64 TPartition::MeteringDataSize(const TActorContext& /*ctx*/) const {
280+
ui64 TPartition::MeteringDataSize() const {
281281
if (DataKeysBody.size() <= 1) {
282282
// tiny optimization - we do not meter very small queues up to 16MB
283283
return 0;
@@ -296,20 +296,19 @@ ui64 TPartition::ReserveSize() const {
296296
return TopicPartitionReserveSize(Config);
297297
}
298298

299-
ui64 TPartition::StorageSize(const TActorContext& ctx) const {
300-
return std::max<ui64>(MeteringDataSize(ctx), ReserveSize());
299+
ui64 TPartition::StorageSize(const TActorContext&) const {
300+
return std::max<ui64>(MeteringDataSize(), ReserveSize());
301301
}
302302

303-
ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const {
304-
return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
303+
ui64 TPartition::UsedReserveSize(const TActorContext&) const {
304+
return std::min<ui64>(MeteringDataSize(), ReserveSize());
305305
}
306306

307-
ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
308-
const auto now = ctx.Now();
307+
ui64 TPartition::GetUsedStorage(const TInstant& now) {
309308
const auto duration = now - LastUsedStorageMeterTimestamp;
310309
LastUsedStorageMeterTimestamp = now;
311310

312-
auto dataSize = MeteringDataSize(ctx);
311+
auto dataSize = MeteringDataSize();
313312
auto reservedSize = ReserveSize();
314313
ui64 size = dataSize > reservedSize ? dataSize - reservedSize : 0;
315314
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
@@ -338,7 +337,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
338337
ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup());
339338
ctx.Send(Tablet, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters));
340339

341-
ui64 usedStorage = GetUsedStorage(ctx);
340+
ui64 usedStorage = GetUsedStorage(ctx.Now());
342341
if (usedStorage > 0) {
343342
ctx.Send(Tablet, new TEvPQ::TEvMetering(EMeteringJson::UsedStorageV1, usedStorage));
344343
}
@@ -778,7 +777,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
778777

779778
result.SetReadBytesQuota(maxQuota);
780779

781-
result.SetPartitionSize(MeteringDataSize(ctx));
780+
result.SetPartitionSize(MeteringDataSize());
782781
result.SetUsedReserveSize(UsedReserveSize(ctx));
783782

784783
result.SetLastWriteTimestampMs(WriteTimestamp.MilliSeconds());

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
8787
public:
8888
const TString& TopicName() const;
8989

90+
ui64 GetUsedStorage(const TInstant& ctx);
91+
9092
private:
9193
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
9294
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;
@@ -252,8 +254,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
252254
const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount,
253255
ui32* rsize, ui64* insideHeadOffset, ui64 lastOffset);
254256

255-
ui64 GetUsedStorage(const TActorContext& ctx);
256-
257257
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> MakeHasDataInfoResponse(ui64 lagSize, const TMaybe<ui64>& cookie, bool readingFinished = false);
258258

259259
void ProcessTxsAndUserActs(const TActorContext& ctx);
@@ -401,7 +401,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
401401
}
402402

403403
// The size of the data realy was persisted in the storage by the partition
404-
ui64 MeteringDataSize(const TActorContext& ctx) const;
404+
ui64 MeteringDataSize() const;
405405
// The size of the storage that was reserved by the partition
406406
ui64 ReserveSize() const;
407407
// The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data.

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,7 +1619,7 @@ bool TPartition::WaitingForPreviousBlobQuota() const {
16191619
return TopicQuotaRequestCookie != 0;
16201620
}
16211621

1622-
bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize) const {
1622+
bool TPartition::WaitingForSubDomainQuota(const TActorContext& /*ctx*/, const ui64 withSize) const {
16231623
if (!SubDomainOutOfSpace || !AppData()->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) {
16241624
return false;
16251625
}
@@ -1629,7 +1629,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w
16291629
return withSize > 0 || Size() > 0;
16301630
}
16311631

1632-
return MeteringDataSize(ctx) + withSize > ReserveSize();
1632+
return MeteringDataSize() + withSize > ReserveSize();
16331633
}
16341634

16351635
void TPartition::RequestBlobQuota(size_t quotaSize)

ydb/core/persqueue/ut/make_config.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#include "make_config.h"
22

3+
#include <util/datetime/base.h>
4+
35
#include <ydb/core/persqueue/utils.h>
46

57
namespace NKikimr::NPQ::NHelpers {
68

79
NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
810
const TVector<TCreateConsumerParams>& consumers,
9-
ui32 partitionsCount)
11+
ui32 partitionsCount,
12+
NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode)
1013
{
1114
NKikimrPQ::TPQTabletConfig config;
1215

@@ -27,7 +30,9 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
2730
config.SetLocalDC(true);
2831
config.SetYdbDatabasePath("");
2932

30-
config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
33+
config.SetMeteringMode(meteringMode);
34+
config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Hours(24).Seconds());
35+
config.MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(10 << 20);
3136

3237
Migrate(config);
3338

ydb/core/persqueue/ut/make_config.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ struct TCreateConsumerParams {
2020

2121
NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
2222
const TVector<TCreateConsumerParams>& consumers,
23-
ui32 partitionsCount = 1);
23+
ui32 partitionsCount = 1,
24+
NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
2425

2526
NKikimrPQ::TBootstrapConfig MakeBootstrapConfig();
2627

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NHelpers {
2828
struct TConfigParams {
2929
ui64 Version = 0;
3030
TVector<TCreateConsumerParams> Consumers;
31+
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS;
3132
};
3233

3334
struct TCreatePartitionParams {
@@ -320,7 +321,9 @@ TPartition* TPartitionFixture::CreatePartitionActor(const TPartitionId& id,
320321
TabletCounters = counters->GetSecondTabletCounters().Release();
321322

322323
Config = MakeConfig(config.Version,
323-
config.Consumers);
324+
config.Consumers,
325+
1,
326+
config.MeteringMode);
324327
Config.SetLocalDC(true);
325328

326329
NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
@@ -681,7 +684,9 @@ void TPartitionFixture::SendConfigResponse(const TConfigParams& config)
681684

682685
TString out;
683686
Y_ABORT_UNLESS(MakeConfig(config.Version,
684-
config.Consumers).SerializeToString(&out));
687+
config.Consumers,
688+
1,
689+
config.MeteringMode).SerializeToString(&out));
685690

686691
read->SetValue(out);
687692
}
@@ -930,7 +935,9 @@ void TPartitionFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher)
930935
void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config)
931936
{
932937
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version,
933-
config.Consumers));
938+
config.Consumers,
939+
1,
940+
config.MeteringMode));
934941
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
935942
}
936943

@@ -1969,6 +1976,31 @@ Y_UNIT_TEST_F(ShadowPartitionCountersRestore, TPartitionFixture) {
19691976

19701977
}
19711978

1979+
Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) {
1980+
auto* actor = CreatePartition({
1981+
.Partition=TPartitionId{2, 10, 100'001},
1982+
.Begin=0, .End=10,
1983+
//
1984+
// partition configuration
1985+
//
1986+
.Config={.Version=1, .Consumers={}, .MeteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY}
1987+
},
1988+
//
1989+
// tablet configuration
1990+
//
1991+
{.Version=2, .Consumers={}, .MeteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY}
1992+
);
1993+
1994+
auto now = TInstant::Now();
1995+
1996+
// Check integer overflow when reserved size great than used size
1997+
// LOGBROKER-9105
1998+
auto usedStorage = actor->GetUsedStorage(now + TDuration::Minutes(1));
1999+
UNIT_ASSERT_VALUES_EQUAL(0, usedStorage);
2000+
2001+
2002+
} // GetPartitionWriteInfoErrors
2003+
19722004
} // End of suite
19732005

19742006
} // namespace

0 commit comments

Comments
 (0)