Skip to content

Commit 67ec8a8

Browse files
Merge 7b18bc1 into 2d98fad
2 parents 2d98fad + 7b18bc1 commit 67ec8a8

File tree

3 files changed

+63
-61
lines changed

3 files changed

+63
-61
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 63 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,7 +1160,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11601160
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
11611161
TableBuckets[largestBucketIndex] = TTableBucket{};
11621162

1163-
return TableBucketsSpillers[largestBucketIndex].HasRunningAsyncIoOperation();
1163+
return TableBucketsSpillers[largestBucketIndex].IsProcessingSpilling();
11641164
}
11651165

11661166
void TTable::UpdateSpilling() {
@@ -1319,12 +1319,6 @@ TTableBucket&& TTableBucketSpiller::ExtractBucket() {
13191319
return std::move(CurrentBucket);
13201320
}
13211321

1322-
bool TTableBucketSpiller::HasRunningAsyncIoOperation() const {
1323-
return StateUi64Adapter.HasRunningAsyncIoOperation()
1324-
|| StateUi32Adapter.HasRunningAsyncIoOperation()
1325-
|| StateCharAdapter.HasRunningAsyncIoOperation();
1326-
}
1327-
13281322
bool TTableBucketSpiller::IsInMemory() const {
13291323
return State == EState::InMemory;
13301324
}
@@ -1358,37 +1352,37 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
13581352
while (NextVectorToProcess != ENextVectorToProcess::None) {
13591353
switch (NextVectorToProcess) {
13601354
case ENextVectorToProcess::KeyAndVals:
1361-
if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return;
1355+
if (!StateUi64Adapter.IsAcceptingData()) return;
13621356

13631357
StateUi64Adapter.AddData(std::move(CurrentBucket.KeyIntVals));
13641358
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
13651359
break;
13661360
case ENextVectorToProcess::DataIntVals:
1367-
if (StateUi64Adapter.HasRunningAsyncIoOperation() || !StateUi64Adapter.IsAcceptingData()) return;
1361+
if (!StateUi64Adapter.IsAcceptingData()) return;
13681362

13691363
StateUi64Adapter.AddData(std::move(CurrentBucket.DataIntVals));
13701364
NextVectorToProcess = ENextVectorToProcess::StringsValues;
13711365
break;
13721366
case ENextVectorToProcess::StringsValues:
1373-
if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return;
1367+
if (!StateCharAdapter.IsAcceptingData()) return;
13741368

13751369
StateCharAdapter.AddData(std::move(CurrentBucket.StringsValues));
13761370
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
13771371
break;
13781372
case ENextVectorToProcess::StringsOffsets:
1379-
if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return;
1373+
if (!StateUi32Adapter.IsAcceptingData()) return;
13801374

13811375
StateUi32Adapter.AddData(std::move(CurrentBucket.StringsOffsets));
13821376
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
13831377
break;
13841378
case ENextVectorToProcess::InterfaceValues:
1385-
if (StateCharAdapter.HasRunningAsyncIoOperation() || !StateCharAdapter.IsAcceptingData()) return;
1379+
if (!StateCharAdapter.IsAcceptingData()) return;
13861380

13871381
StateCharAdapter.AddData(std::move(CurrentBucket.InterfaceValues));
13881382
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
13891383
break;
13901384
case ENextVectorToProcess::InterfaceOffsets:
1391-
if (StateUi32Adapter.HasRunningAsyncIoOperation() || !StateUi32Adapter.IsAcceptingData()) return;
1385+
if (!StateUi32Adapter.IsAcceptingData()) return;
13921386

13931387
StateUi32Adapter.AddData(std::move(CurrentBucket.InterfaceOffsets));
13941388
NextVectorToProcess = ENextVectorToProcess::None;
@@ -1433,71 +1427,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
14331427
while (NextVectorToProcess != ENextVectorToProcess::None) {
14341428
switch (NextVectorToProcess) {
14351429
case ENextVectorToProcess::KeyAndVals:
1436-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1430+
if (StateUi64Adapter.IsDataReady()) {
1431+
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
1432+
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
1433+
break;
1434+
}
14371435

1438-
if (!StateUi64Adapter.IsDataReady()) {
1436+
if (StateUi64Adapter.IsAcceptingDataRequests()) {
14391437
StateUi64Adapter.RequestNextVector();
1440-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1438+
break;
14411439
}
1442-
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
1443-
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
1444-
break;
1440+
return;
14451441
case ENextVectorToProcess::DataIntVals:
1446-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1442+
if (StateUi64Adapter.IsDataReady()) {
1443+
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
1444+
NextVectorToProcess = ENextVectorToProcess::StringsValues;
1445+
break;
1446+
}
14471447

1448-
if (!StateUi64Adapter.IsDataReady()) {
1448+
if (StateUi64Adapter.IsAcceptingDataRequests()) {
14491449
StateUi64Adapter.RequestNextVector();
1450-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1450+
break;
14511451
}
1452-
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
1453-
NextVectorToProcess = ENextVectorToProcess::StringsValues;
1454-
break;
1452+
return;
14551453
case ENextVectorToProcess::StringsValues:
1456-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1454+
if (StateCharAdapter.IsDataReady()) {
1455+
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
1456+
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
1457+
break;
1458+
}
14571459

1458-
if (!StateCharAdapter.IsDataReady()) {
1460+
if (StateCharAdapter.IsAcceptingDataRequests()) {
14591461
StateCharAdapter.RequestNextVector();
1460-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1462+
break;
14611463
}
1462-
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
1463-
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
1464-
break;
1464+
return;
14651465
case ENextVectorToProcess::StringsOffsets:
1466-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1466+
if (StateUi32Adapter.IsDataReady()) {
1467+
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
1468+
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
1469+
break;
1470+
}
14671471

1468-
if (!StateUi32Adapter.IsDataReady()) {
1472+
if (StateUi32Adapter.IsAcceptingDataRequests()) {
14691473
StateUi32Adapter.RequestNextVector();
1470-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1474+
break;
14711475
}
1472-
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
1473-
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
1474-
break;
1476+
return;
14751477
case ENextVectorToProcess::InterfaceValues:
1476-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1478+
if (StateCharAdapter.IsDataReady()) {
1479+
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
1480+
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
1481+
break;
1482+
}
14771483

1478-
if (!StateCharAdapter.IsDataReady()) {
1484+
if (StateCharAdapter.IsAcceptingDataRequests()) {
14791485
StateCharAdapter.RequestNextVector();
1480-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1486+
break;
14811487
}
1482-
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
1483-
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
1484-
break;
1488+
return;
14851489
case ENextVectorToProcess::InterfaceOffsets:
1486-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1490+
if (StateUi32Adapter.IsDataReady()) {
1491+
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
1492+
1493+
SpilledBucketsCount--;
1494+
if (SpilledBucketsCount == 0) {
1495+
NextVectorToProcess = ENextVectorToProcess::None;
1496+
State = EState::WaitingForExtraction;
1497+
} else {
1498+
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1499+
}
1500+
1501+
break;
1502+
}
14871503

1488-
if (!StateUi32Adapter.IsDataReady()) {
1504+
if (StateUi32Adapter.IsAcceptingDataRequests()) {
14891505
StateUi32Adapter.RequestNextVector();
1490-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1506+
break;
14911507
}
1492-
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
1493-
SpilledBucketsCount--;
1494-
if (SpilledBucketsCount == 0) {
1495-
NextVectorToProcess = ENextVectorToProcess::None;
1496-
State = EState::WaitingForExtraction;
1497-
} else {
1498-
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1499-
}
1500-
break;
1508+
return;
15011509
default:
15021510
return;
15031511

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,6 @@ class TTableBucketSpiller {
113113
void Update();
114114
// Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
115115
void Finalize();
116-
// Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
117-
bool HasRunningAsyncIoOperation() const;
118116
// Is bucket in memory. False if spilled.
119117
bool IsInMemory() const;
120118
// Is bucket loaded to memory but still owned by spilled.

ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ class TVectorSpillerAdapter {
2929
{
3030
}
3131

32-
bool HasRunningAsyncIoOperation() const {
33-
return ReadOperation.has_value() && !ReadOperation->HasValue() || WriteOperation.has_value() && !WriteOperation->HasValue();
34-
}
35-
3632
///Returns current stete of the adapter
3733
EState GetState() const {
3834
return State;

0 commit comments

Comments
 (0)