13
13
14
14
#include < util/system/unaligned_mem.h>
15
15
16
+ constexpr size_t MAX_REQS_PER_CYCLE = 200 ; // 200 requests take ~0.2ms in EnqueueAll function
17
+
16
18
namespace NKikimr {
17
19
namespace NPDisk {
18
20
@@ -843,9 +845,6 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
843
845
844
846
guard.Release ();
845
847
846
- LWTRACK (PDiskChunkWritePieceSendToDevice, evChunkWrite->Orbit , PDiskId, evChunkWrite->Owner , chunkIdx,
847
- pieceShift, pieceSize);
848
-
849
848
ui32 bytesAvailable = pieceSize;
850
849
Y_ABORT_UNLESS (evChunkWrite->BytesWritten == pieceShift);
851
850
const ui32 count = evChunkWrite->PartsPtr ->Size ();
@@ -904,6 +903,9 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
904
903
LOG_INFO (*ActorSystem, NKikimrServices::BS_PDISK, " PDiskId# %" PRIu32 " chunkIdx# %" PRIu32
905
904
" was zero-padded after writing" , (ui32)PDiskId, (ui32)chunkIdx);
906
905
}
906
+ LWTRACK (PDiskChunkWriteLastPieceSendToDevice, evChunkWrite->Orbit , PDiskId, evChunkWrite->Owner , chunkIdx,
907
+ pieceShift, pieceSize);
908
+
907
909
auto traceId = evChunkWrite->SpanStack .GetTraceId ();
908
910
evChunkWrite->Completion ->Orbit = std::move (evChunkWrite->Orbit );
909
911
writer.Flush (evChunkWrite->ReqId , &traceId, evChunkWrite->Completion .Release ());
@@ -946,7 +948,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt
946
948
}
947
949
948
950
TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece (TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector,
949
- ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId) {
951
+ ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit ) {
950
952
if (read->IsReplied ) {
951
953
return ReadPieceResultOk;
952
954
}
@@ -1015,6 +1017,8 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
1015
1017
THolder<TCompletionChunkReadPart> completion (new TCompletionChunkReadPart (this , read, bytesToRead,
1016
1018
payloadBytesToRead, payloadOffset, read->FinalCompletion , isTheLastPart, Cfg->UseT1ha0HashInFooter , std::move (span)));
1017
1019
completion->CostNs = DriveModel.TimeForSizeNs (bytesToRead, read->ChunkIdx , TDriveModel::OP_TYPE_READ);
1020
+ LWTRACK (PDiskChunkReadPiecesSendToDevice, orbit, PDiskId);
1021
+ completion->Orbit = std::move (orbit);
1018
1022
Y_ABORT_UNLESS (bytesToRead <= completion->GetBuffer ()->Size ());
1019
1023
ui8 *data = completion->GetBuffer ()->Data ();
1020
1024
BlockDevice->PreadAsync (data, bytesToRead, readOffset, completion.Release (),
@@ -2279,7 +2283,7 @@ void TPDisk::ProcessChunkReadQueue() {
2279
2283
ui64 currentLimit = Min (bufferSize, piece->PieceSizeLimit - size);
2280
2284
ui64 reallyReadDiskBytes;
2281
2285
EChunkReadPieceResult result = ChunkReadPiece (read, piece->PieceCurrentSector + size / Format.SectorSize ,
2282
- currentLimit, &reallyReadDiskBytes, piece->SpanStack .GetTraceId ());
2286
+ currentLimit, &reallyReadDiskBytes, piece->SpanStack .GetTraceId (), std::move (piece-> Orbit ) );
2283
2287
isComplete = (result != ReadPieceResultInProgress);
2284
2288
// Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly
2285
2289
// currentLimit bytes
@@ -2941,7 +2945,6 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
2941
2945
2942
2946
auto result = std::make_unique<TEvChunkWriteResult>(NKikimrProto::OK, ev.ChunkIdx , ev.Cookie ,
2943
2947
GetStatusFlags (ev.Owner , ev.OwnerGroupType ), TString ());
2944
- result->Orbit = std::move (ev.Orbit );
2945
2948
2946
2949
++state.OperationsInProgress ;
2947
2950
++ownerData.InFlight ->ChunkWrites ;
@@ -3185,7 +3188,7 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
3185
3188
TChunkWritePiece *piece = new TChunkWritePiece (whole, smallJobCount * smallJobSize, largeJobSize, std::move (span));
3186
3189
piece->EstimateCost (DriveModel);
3187
3190
AddJobToForseti (cbs, piece, request->JobKind );
3188
- LWTRACK (PDiskAddWritePieceToScheduler , request->Orbit , PDiskId, request->ReqId .Id ,
3191
+ LWTRACK (PDiskChunkWriteAddToScheduler , request->Orbit , PDiskId, request->ReqId .Id ,
3189
3192
HPSecondsFloat (HPNow () - request->CreationTime ), request->Owner , request->IsFast ,
3190
3193
request->PriorityClass , whole->TotalSize );
3191
3194
} else if (request->GetType () == ERequestType::RequestChunkRead) {
@@ -3206,7 +3209,8 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
3206
3209
// Schedule small job.
3207
3210
auto piece = new TChunkReadPiece (read, idx * smallJobSize,
3208
3211
smallJobSize * Format.SectorSize , false , std::move (span));
3209
- LWTRACK (PDiskChunkReadPieceAddToScheduler, read->Orbit , PDiskId, idx, idx * smallJobSize,
3212
+ read->Orbit .Fork (piece->Orbit );
3213
+ LWTRACK (PDiskChunkReadPieceAddToScheduler, piece->Orbit , PDiskId, idx, idx * smallJobSize * Format.SectorSize ,
3210
3214
smallJobSize * Format.SectorSize );
3211
3215
piece->EstimateCost (DriveModel);
3212
3216
piece->SelfPointer = piece;
@@ -3217,8 +3221,9 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
3217
3221
span.Attribute (" is_last_piece" , true );
3218
3222
auto piece = new TChunkReadPiece (read, smallJobCount * smallJobSize,
3219
3223
largeJobSize * Format.SectorSize , true , std::move (span));
3220
- LWTRACK (PDiskChunkReadPieceAddToScheduler, read->Orbit , PDiskId, smallJobCount,
3221
- smallJobCount * smallJobSize, largeJobSize * Format.SectorSize );
3224
+ read->Orbit .Fork (piece->Orbit );
3225
+ LWTRACK (PDiskChunkReadPieceAddToScheduler, piece->Orbit , PDiskId, smallJobCount,
3226
+ smallJobCount * smallJobSize * Format.SectorSize , largeJobSize * Format.SectorSize );
3222
3227
piece->EstimateCost (DriveModel);
3223
3228
piece->SelfPointer = piece;
3224
3229
AddJobToForseti (cbs, piece, request->JobKind );
@@ -3406,7 +3411,14 @@ void TPDisk::ProcessYardInitSet() {
3406
3411
}
3407
3412
3408
3413
void TPDisk::EnqueueAll () {
3414
+ TInstant start = TInstant::Now ();
3415
+
3409
3416
TGuard<TMutex> guard (StateMutex);
3417
+ size_t initialQueueSize = InputQueue.GetWaitingSize ();
3418
+ size_t processedReqs = 0 ;
3419
+ size_t pushedToForsetiReqs = 0 ;
3420
+
3421
+
3410
3422
while (InputQueue.GetWaitingSize () > 0 ) {
3411
3423
TRequestBase* request = InputQueue.Pop ();
3412
3424
AtomicSub (InputQueueCost, request->Cost );
@@ -3449,9 +3461,17 @@ void TPDisk::EnqueueAll() {
3449
3461
} else {
3450
3462
if (PreprocessRequest (request)) {
3451
3463
PushRequestToForseti (request);
3464
+ ++pushedToForsetiReqs;
3452
3465
}
3453
3466
}
3467
+ ++processedReqs;
3468
+ if (processedReqs >= MAX_REQS_PER_CYCLE) {
3469
+ break ;
3470
+ }
3454
3471
}
3472
+
3473
+ double spentTimeMs = (TInstant::Now () - start).MillisecondsFloat ();
3474
+ LWPROBE (PDiskEnqueueAllDetails, PDiskId, initialQueueSize, processedReqs, pushedToForsetiReqs, spentTimeMs);
3455
3475
}
3456
3476
3457
3477
void TPDisk::Update () {
0 commit comments