Skip to content

Commit 755dca1

Browse files
committed
fixup
1 parent 6453f97 commit 755dca1

File tree

1 file changed

+56
-42
lines changed

1 file changed

+56
-42
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,71 +1433,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
14331433
while (NextVectorToProcess != ENextVectorToProcess::None) {
14341434
switch (NextVectorToProcess) {
14351435
case ENextVectorToProcess::KeyAndVals:
1436-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1436+
if (StateUi64Adapter.IsDataReady()) {
1437+
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
1438+
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
1439+
break;
1440+
}
14371441

1438-
if (!StateUi64Adapter.IsDataReady()) {
1442+
if (StateUi64Adapter.IsAcceptingDataRequests()) {
14391443
StateUi64Adapter.RequestNextVector();
1440-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1444+
break;
14411445
}
1442-
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
1443-
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
1444-
break;
1446+
return;
14451447
case ENextVectorToProcess::DataIntVals:
1446-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1448+
if (StateUi64Adapter.IsDataReady()) {
1449+
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
1450+
NextVectorToProcess = ENextVectorToProcess::StringsValues;
1451+
break;
1452+
}
14471453

1448-
if (!StateUi64Adapter.IsDataReady()) {
1454+
if (StateUi64Adapter.IsAcceptingDataRequests()) {
14491455
StateUi64Adapter.RequestNextVector();
1450-
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
1456+
break;
14511457
}
1452-
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
1453-
NextVectorToProcess = ENextVectorToProcess::StringsValues;
1454-
break;
1458+
return;
14551459
case ENextVectorToProcess::StringsValues:
1456-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1460+
if (StateCharAdapter.IsDataReady()) {
1461+
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
1462+
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
1463+
break;
1464+
}
14571465

1458-
if (!StateCharAdapter.IsDataReady()) {
1466+
if (StateCharAdapter.IsAcceptingDataRequests()) {
14591467
StateCharAdapter.RequestNextVector();
1460-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1468+
break;
14611469
}
1462-
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
1463-
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
1464-
break;
1470+
return;
14651471
case ENextVectorToProcess::StringsOffsets:
1466-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1472+
if (StateUi32Adapter.IsDataReady()) {
1473+
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
1474+
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
1475+
break;
1476+
}
14671477

1468-
if (!StateUi32Adapter.IsDataReady()) {
1478+
if (StateUi32Adapter.IsAcceptingDataRequests()) {
14691479
StateUi32Adapter.RequestNextVector();
1470-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1480+
break;
14711481
}
1472-
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
1473-
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
1474-
break;
1482+
return;
14751483
case ENextVectorToProcess::InterfaceValues:
1476-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1484+
if (StateCharAdapter.IsDataReady()) {
1485+
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
1486+
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
1487+
break;
1488+
}
14771489

1478-
if (!StateCharAdapter.IsDataReady()) {
1490+
if (StateCharAdapter.IsAcceptingDataRequests()) {
14791491
StateCharAdapter.RequestNextVector();
1480-
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
1492+
break;
14811493
}
1482-
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
1483-
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
1484-
break;
1494+
return;
14851495
case ENextVectorToProcess::InterfaceOffsets:
1486-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1496+
if (StateUi32Adapter.IsDataReady()) {
1497+
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
1498+
1499+
SpilledBucketsCount--;
1500+
if (SpilledBucketsCount == 0) {
1501+
NextVectorToProcess = ENextVectorToProcess::None;
1502+
State = EState::WaitingForExtraction;
1503+
} else {
1504+
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1505+
}
1506+
1507+
break;
1508+
}
14871509

1488-
if (!StateUi32Adapter.IsDataReady()) {
1510+
if (StateUi32Adapter.IsAcceptingDataRequests()) {
14891511
StateUi32Adapter.RequestNextVector();
1490-
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
1512+
break;
14911513
}
1492-
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
1493-
SpilledBucketsCount--;
1494-
if (SpilledBucketsCount == 0) {
1495-
NextVectorToProcess = ENextVectorToProcess::None;
1496-
State = EState::WaitingForExtraction;
1497-
} else {
1498-
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1499-
}
1500-
break;
1514+
return;
15011515
default:
15021516
return;
15031517

0 commit comments

Comments
 (0)