Skip to content

Commit d23ebbe

Browse files
Publish tx counters (#5534)
1 parent 22b4f2d commit d23ebbe

File tree

10 files changed

+379
-51
lines changed

10 files changed

+379
-51
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/base/row_version.h>
66
#include <ydb/core/protos/pqconfig.pb.h>
77
#include <ydb/core/persqueue/blob.h>
8+
#include <ydb/core/persqueue/percentile_counter.h>
89
#include <ydb/core/persqueue/key.h>
910
#include <ydb/core/persqueue/sourceid_info.h>
1011
#include <ydb/core/persqueue/metering_sink.h>
@@ -1080,6 +1081,7 @@ struct TEvPQ {
10801081
ui64 MessagesWrittenTotal;
10811082
ui64 MessagesWrittenGrpc;
10821083
TVector<ui64> MessagesSizes;
1084+
THolder<NPQ::TMultiBucketCounter> InputLags;
10831085
};
10841086

10851087
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {

ydb/core/persqueue/partition.cpp

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,6 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
202202
, InitDone(false)
203203
, NewPartition(newPartition)
204204
, Subscriber(partition, TabletCounters, Tablet)
205-
, WriteCycleSize(0)
206-
, WriteNewSize(0)
207-
, WriteNewSizeInternal(0)
208-
, WriteNewSizeUncompressed(0)
209-
, WriteNewMessages(0)
210-
, WriteNewMessagesInternal(0)
211205
, DiskIsFull(false)
212206
, SubDomainOutOfSpace(subDomainOutOfSpace)
213207
, HasDataReqNum(0)
@@ -1018,6 +1012,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
10181012
response->MessagesWrittenTotal = MsgsWrittenTotal.Value();
10191013
response->MessagesWrittenGrpc = MsgsWrittenGrpc.Value();
10201014
response->MessagesSizes = std::move(MessageSize.GetValues());
1015+
response->InputLags = std::move(SupportivePartitionTimeLag);
10211016

10221017
ctx.Send(ev->Sender, response);
10231018
}
@@ -1850,6 +1845,36 @@ void TPartition::RunPersist() {
18501845
AddCmdWriteConfig(PersistRequest->Record);
18511846
}
18521847
if (PersistRequest->Record.CmdDeleteRangeSize() || PersistRequest->Record.CmdWriteSize() || PersistRequest->Record.CmdRenameSize()) {
1848+
// Apply counters
1849+
for (const auto& writeInfo : WriteInfosApplied) {
1850+
// writeTimeLag
1851+
if (InputTimeLag && writeInfo->InputLags) {
1852+
writeInfo->InputLags->UpdateTimestamp(ctx.Now().MilliSeconds());
1853+
for (const auto& values : writeInfo->InputLags->GetValues()) {
1854+
if (values.second)
1855+
InputTimeLag->IncFor(std::ceil(values.first), values.second);
1856+
}
1857+
}
1858+
//MessageSize
1859+
auto i = 0u;
1860+
for (auto range : MessageSize.GetRanges()) {
1861+
if (i >= writeInfo->MessagesSizes.size()) {
1862+
break;
1863+
}
1864+
MessageSize.IncFor(range, writeInfo->MessagesSizes[i++]);
1865+
}
1866+
1867+
// Bytes Written
1868+
BytesWrittenTotal.Inc(writeInfo->BytesWrittenTotal);
1869+
BytesWrittenGrpc.Inc(writeInfo->BytesWrittenGrpc);
1870+
BytesWrittenUncompressed.Inc(writeInfo->BytesWrittenUncompressed);
1871+
// Messages written
1872+
MsgsWrittenTotal.Inc(writeInfo->MessagesWrittenTotal);
1873+
MsgsWrittenGrpc.Inc(writeInfo->MessagesWrittenTotal);
1874+
}
1875+
WriteInfosApplied.clear();
1876+
//Done with counters.
1877+
18531878
ctx.Send(HaveWriteMsg ? BlobCache : Tablet, PersistRequest.Release());
18541879
KVWriteInProgress = true;
18551880
} else {
@@ -2044,7 +2069,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
20442069
return true;
20452070
}
20462071

2047-
void TPartition::CommitWriteOperations(const TTransaction& t)
2072+
void TPartition::CommitWriteOperations(TTransaction& t)
20482073
{
20492074
if (!t.WriteInfo) {
20502075
return;
@@ -2072,10 +2097,12 @@ void TPartition::CommitWriteOperations(const TTransaction& t)
20722097
.IgnoreQuotaDeadline = true,
20732098
.HeartbeatVersion = std::nullopt,
20742099
}, std::nullopt};
2100+
msg.Internal = true;
20752101
TMessage message(std::move(msg), ctx.Now() - TInstant::Zero());
20762102

20772103
UserActionAndTxPendingCommit.emplace_front(std::move(message));
20782104
}
2105+
WriteInfosApplied.emplace_back(std::move(t.WriteInfo));
20792106
}
20802107

20812108
void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
@@ -2370,7 +2397,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
23702397
return EProcessResult::Continue;
23712398
}
23722399

2373-
void TPartition::ExecImmediateTx(const TTransaction& t)
2400+
void TPartition::ExecImmediateTx(TTransaction& t)
23742401
{
23752402
--ImmediateTxCount;
23762403
auto& record = t.ProposeTransaction->Record;

ydb/core/persqueue/partition.h

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
31+
static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20;
3132

3233
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
3334

@@ -429,7 +430,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
429430
void Handle(TEvPQ::TEvProcessChangeOwnerRequests::TPtr& ev, const TActorContext& ctx);
430431
void StartProcessChangeOwnerRequests(const TActorContext& ctx);
431432

432-
void CommitWriteOperations(const TTransaction& t);
433+
void CommitWriteOperations(TTransaction& t);
433434

434435
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
435436
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
@@ -704,7 +705,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
704705

705706

706707
[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
707-
void ExecImmediateTx(const TTransaction& tx);
708+
void ExecImmediateTx(TTransaction& tx);
708709

709710
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
710711
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
@@ -757,6 +758,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
757758

758759
std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
759760
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
761+
TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;
760762

761763
THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
762764
THashMap<TActorId, TSimpleSharedPtr<TTransaction>> WriteInfosToTx;
@@ -814,14 +816,17 @@ class TPartition : public TActorBootstrapped<TPartition> {
814816
TSubscriber Subscriber;
815817

816818
TInstant WriteCycleStartTime;
817-
ui32 WriteCycleSize;
819+
ui32 WriteCycleSize = 0;
818820
ui32 WriteCycleSizeEstimate = 0;
819821
ui32 WriteKeysSizeEstimate = 0;
820-
ui32 WriteNewSize;
821-
ui32 WriteNewSizeInternal;
822-
ui64 WriteNewSizeUncompressed;
823-
ui32 WriteNewMessages;
824-
ui32 WriteNewMessagesInternal;
822+
ui32 WriteNewSize = 0;
823+
ui32 WriteNewSizeFull = 0;
824+
ui32 WriteNewSizeInternal = 0;
825+
ui64 WriteNewSizeUncompressed = 0;
826+
ui64 WriteNewSizeUncompressedFull = 0;
827+
828+
ui32 WriteNewMessages = 0;
829+
ui32 WriteNewMessagesInternal = 0;
825830

826831
TInstant CurrentTimestamp;
827832

@@ -860,6 +865,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
860865
NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs;
861866
//ToDo - counters.
862867
THolder<TPercentileCounter> InputTimeLag;
868+
THolder<TMultiBucketCounter> SupportivePartitionTimeLag;
863869
TPartitionHistogramWrapper MessageSize;
864870

865871
TPercentileCounter WriteLatency;

ydb/core/persqueue/partition_init.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ namespace NKikimr::NPQ {
88
static const ui32 LEVEL0 = 32;
99
static const TString WRITE_QUOTA_ROOT_PATH = "write-quota";
1010

11-
1211
bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev);
1312
void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
1413
void RequestDataRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition, const TString& key);
@@ -789,7 +788,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
789788

790789
subGroup = GetServiceCounters(counters, "pqproxy|writeInfo");
791790
{
792-
std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(
791+
std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(
793792
subGroup, labels, {{"sensor", "MessageSize" + suffix}}, "Size",
794793
TVector<std::pair<ui64, TString>>{
795794
{1_KB, "1kb"}, {5_KB, "5kb"}, {10_KB, "10kb"},
@@ -869,15 +868,21 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
869868

870869
subgroups.push_back({"name", "topic.write.lag_milliseconds"});
871870

872-
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
873-
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
874-
subgroups, "bin",
875-
TVector<std::pair<ui64, TString>>{
876-
{100, "100"}, {200, "200"}, {500, "500"},
877-
{1000, "1000"}, {2000, "2000"}, {5000, "5000"},
878-
{10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
879-
{180'000,"180000"}, {9'999'999, "999999"}}, true));
871+
if (IsSupportive()) {
872+
SupportivePartitionTimeLag = MakeHolder<TMultiBucketCounter>(
873+
TVector<ui64>{100, 200, 500, 1000, 2000, 5000, 10'000, 30'000, 60'000, 180'000, 9'999'999},
874+
DEFAULT_BUCKET_COUNTER_MULTIPLIER, ctx.Now().MilliSeconds());
875+
} else {
876+
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
877+
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
878+
subgroups, "bin",
879+
TVector<std::pair<ui64, TString>>{
880+
{100, "100"}, {200, "200"}, {500, "500"},
881+
{1000, "1000"}, {2000, "2000"}, {5000, "5000"},
882+
{10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
883+
{180'000,"180000"}, {9'999'999, "999999"}}, true));
880884

885+
}
881886
subgroups.back().second = "topic.write.message_size_bytes";
882887
{
883888
std::unique_ptr<TPercentileCounter> percentileCounter(new TPercentileCounter(

ydb/core/persqueue/partition_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct TWriteMsg {
2020
TMaybe<ui64> Offset;
2121
TEvPQ::TEvWrite::TMsg Msg;
2222
std::optional<ui64> InitialSeqNo;
23+
bool Internal = false;
2324
};
2425

2526
struct TOwnershipMsg {

ydb/core/persqueue/partition_write.cpp

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
312312
", Offset: " << offset << " is " << (already ? "already written" : "stored on disk")
313313
);
314314

315-
if (PartitionWriteQuotaWaitCounter) {
315+
if (PartitionWriteQuotaWaitCounter && !writeResponse.Internal) {
316316
PartitionWriteQuotaWaitCounter->IncFor(PartitionQuotaWaitTimeForCurrentBlob.MilliSeconds());
317317
}
318318
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
@@ -524,15 +524,20 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
524524
"TPartition::HandleWriteResponse writeNewSize# " << WriteNewSize;
525525
);
526526

527+
if (SupportivePartitionTimeLag) {
528+
SupportivePartitionTimeLag->UpdateTimestamp(now.MilliSeconds());
529+
}
527530
if (SplitMergeEnabled(Config)) {
528-
SplitMergeAvgWriteBytes->Update(WriteNewSize, now);
531+
SplitMergeAvgWriteBytes->Update(WriteNewSizeFull, now);
529532
auto needScaling = CheckScaleStatus(ctx);
530533
ChangeScaleStatusIfNeeded(needScaling);
531534
}
532535
WriteCycleSize = 0;
533536
WriteNewSize = 0;
537+
WriteNewSizeFull = 0;
534538
WriteNewSizeInternal = 0;
535539
WriteNewSizeUncompressed = 0;
540+
WriteNewSizeUncompressedFull = 0;
536541
WriteNewMessages = 0;
537542
WriteNewMessagesInternal = 0;
538543
UpdateWriteBufferIsFullState(now);
@@ -1044,14 +1049,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
10441049
<< ". Writing seqNo: " << sourceId.UpdatedSeqNo()
10451050
<< ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
10461051
);
1047-
1048-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1049-
MsgsDiscarded.Inc();
1050-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1051-
BytesDiscarded.Inc(p.Msg.Data.size());
1052+
if (!p.Internal) {
1053+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1054+
MsgsDiscarded.Inc();
1055+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1056+
BytesDiscarded.Inc(p.Msg.Data.size());
1057+
}
10521058
} else {
1053-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
1054-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
1059+
if (!p.Internal) {
1060+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
1061+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
1062+
}
10551063
}
10561064

10571065
TString().swap(p.Msg.Data);
@@ -1153,14 +1161,17 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
11531161
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
11541162
return false;
11551163
}
1156-
1157-
WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
1158-
WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size());
1159-
WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size();
1160-
if (p.Msg.PartNo == 0) {
1161-
++WriteNewMessages;
1162-
if (!p.Msg.External)
1163-
++WriteNewMessagesInternal;
1164+
WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size();
1165+
WriteNewSizeUncompressedFull += p.Msg.UncompressedSize + p.Msg.SourceId.size();
1166+
if (!p.Internal) {
1167+
WriteNewSize += p.Msg.SourceId.size() + p.Msg.Data.size();
1168+
WriteNewSizeUncompressed += p.Msg.UncompressedSize + p.Msg.SourceId.size();
1169+
WriteNewSizeInternal += p.Msg.External ? 0 : (p.Msg.SourceId.size() + p.Msg.Data.size());
1170+
}
1171+
if (p.Msg.PartNo == 0 && !p.Internal) {
1172+
++WriteNewMessages;
1173+
if (!p.Msg.External)
1174+
++WriteNewMessagesInternal;
11641175
}
11651176

11661177
TMaybe<TPartData> partData;
@@ -1176,13 +1187,14 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
11761187
const ui64 writeLagMs =
11771188
(WriteTimestamp - TInstant::MilliSeconds(p.Msg.CreateTimestamp)).MilliSeconds();
11781189
WriteLagMs.Update(writeLagMs, WriteTimestamp);
1179-
if (InputTimeLag) {
1190+
if (InputTimeLag && !p.Internal) {
11801191
InputTimeLag->IncFor(writeLagMs, 1);
1181-
if (p.Msg.PartNo == 0) {
1182-
MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1);
1183-
}
1192+
} else if (SupportivePartitionTimeLag) {
1193+
SupportivePartitionTimeLag->Insert(writeLagMs, 1);
1194+
}
1195+
if (p.Msg.PartNo == 0 && !p.Internal) {
1196+
MessageSize.IncFor(p.Msg.TotalSize + p.Msg.SourceId.size(), 1);
11841197
}
1185-
11861198
bool lastBlobPart = blob.IsLastPart();
11871199

11881200
//will return compacted tmp blob
@@ -1616,6 +1628,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx)
16161628
WriteCycleSize = 0;
16171629
WriteNewSize = 0;
16181630
WriteNewSizeUncompressed = 0;
1631+
WriteNewSizeUncompressed = 0;
16191632
WriteNewMessages = 0;
16201633
UpdateWriteBufferIsFullState(ctx.Now());
16211634
CurrentTimestamp = ctx.Now();

0 commit comments

Comments
 (0)