Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
// return !HasMemoryForProcessing();
}


void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
switch(mode) {
case EOperatingMode::InMemory: {
Expand Down Expand Up @@ -816,18 +815,23 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
RightPacker->TablePtr->UpdateSpilling();
}

bool HasRunningAsyncOperation() const {
return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation();

bool IsSpillingFinished() const {
return LeftPacker->TablePtr->IsSpillingFinished() && RightPacker->TablePtr->IsSpillingFinished();
}

bool IsProcessingFinished() {
return LeftPacker->TablePtr->IsProcessingFinished() || RightPacker->TablePtr->IsProcessingFinished();
bool IsReadyForSpilledDataProcessing() const {
return LeftPacker->TablePtr->IsSpillingAcceptingDataRequests() && RightPacker->TablePtr->IsSpillingAcceptingDataRequests();
}

bool IsRestoringSpilledBuckets() const {
return LeftPacker->TablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets();
}

void DoCalculateWithSpilling(TComputationContext& ctx) {
UpdateSpilling();

if (!HasMemoryForProcessing()) {
if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
bool isWaitingForReduce = TryToReduceMemoryAndWait();
if (isWaitingForReduce) return;
}
Expand All @@ -838,15 +842,16 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
}

if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
UpdateSpilling();
if (HasRunningAsyncOperation() || !IsProcessingFinished()) return;
if (!IsSpillingFinished()) return;
if (!IsSpillingFinalized) {
LeftPacker->TablePtr->FinalizeSpilling();
RightPacker->TablePtr->FinalizeSpilling();
IsSpillingFinalized = true;

if (HasRunningAsyncOperation()) return;
UpdateSpilling();
}
if (!IsReadyForSpilledDataProcessing()) return;

SwitchMode(EOperatingMode::ProcessSpilled, ctx);
return;
}
Expand All @@ -855,8 +860,15 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
UpdateSpilling();
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;

if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (HasRunningAsyncOperation()) return EFetchResult::Yield;
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
Expand Down Expand Up @@ -886,8 +898,6 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*

NextBucketToJoin++;
} else {
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
Expand Down
87 changes: 60 additions & 27 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ bool TTable::TryToReduceMemoryAndWait() {
i32 largestBucketIndex = 0;
ui64 largestBucketSize = 0;
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation() || !TableBucketsSpillers[bucket].IsProcessingFinished()) return true;
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;

ui64 bucketSize = GetSizeOfBucket(bucket);
if (bucketSize > largestBucketSize) {
Expand All @@ -1169,39 +1169,50 @@ void TTable::UpdateSpilling() {
}
}

void TTable::FinalizeSpilling() {
MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error");
bool TTable::IsSpillingFinished() const {
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
}
return true;
}

for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
if (!TableBucketsSpillers[bucket].IsInMemory()) {
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
TableBuckets[bucket] = TTableBucket{};
TableBucketsSpillers[bucket].Finalize();
}
bool TTable::IsSpillingAcceptingDataRequests() const {
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
if (TableBucketsSpillers[i].IsInMemory()) continue;

if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) return false;
}
return true;
}

bool TTable::HasRunningAsyncIoOperation() const {
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true;
bool TTable::IsRestoringSpilledBuckets() const {
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
if (TableBucketsSpillers[i].IsRestoring()) return true;
}
return false;
}

bool TTable::IsProcessingFinished() const {
void TTable::FinalizeSpilling() {
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return false;
if (!TableBucketsSpillers[bucket].IsInMemory()) {
TableBucketsSpillers[bucket].Finalize();
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
TableBuckets[bucket] = TTableBucket{};

}
}
return true;
}

bool TTable::IsBucketInMemory(ui32 bucket) const {
return TableBucketsSpillers[bucket].IsInMemory();
}

bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
return TableBucketsSpillers[bucket].IsExtractionRequired();
}

void TTable::StartLoadingBucket(ui32 bucket) {
MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return;

TableBucketsSpillers[bucket].StartBucketRestoration();
}
Expand Down Expand Up @@ -1280,19 +1291,20 @@ void TTableBucketSpiller::Update() {

if (State == EState::Spilling) {
ProcessBucketSpilling();
} else if (State == EState::Finalizing) {
ProcessFinalizing();
} else if (State == EState::Restoring) {
ProcessBucketRestoration();
}
}

void TTableBucketSpiller::Finalize() {
IsFinalizing = true;
IsFinalizingRequested = true;
}

void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
State = EState::Spilling;
IsBucketOwnedBySpiller = true;

CurrentBucket = std::move(bucket);
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
Expand All @@ -1301,9 +1313,9 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
}

TTableBucket&& TTableBucketSpiller::ExtractBucket() {
MKQL_ENSURE(State == EState::InMemory, "Internal logic error");
MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error");
MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
IsBucketOwnedBySpiller = false;
State = EState::InMemory;
return std::move(CurrentBucket);
}

Expand All @@ -1318,14 +1330,27 @@ bool TTableBucketSpiller::IsInMemory() const {
}

bool TTableBucketSpiller::IsExtractionRequired() const {
return IsBucketOwnedBySpiller;
return State == EState::WaitingForExtraction;
}

bool TTableBucketSpiller::IsProcessingSpilling() const {
return State == EState::Spilling;
}

bool TTableBucketSpiller::IsAcceptingDataRequests() const {
return State == EState::AcceptingDataRequests;
}

bool TTableBucketSpiller::IsRestoring() const {
return State == EState::Restoring;
}

void TTableBucketSpiller::StartBucketRestoration() {
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");

NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
State = EState::Restoring;
ProcessBucketRestoration();
}

Expand Down Expand Up @@ -1374,15 +1399,23 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
return;
}
}
if (!HasRunningAsyncIoOperation() && IsFinalizing) {

if (IsFinalizingRequested) {
if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
State = EState::Finalizing;
StateUi64Adapter.Finalize();
StateUi32Adapter.Finalize();
StateCharAdapter.Finalize();

if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
State = EState::Restoring;
}
ProcessFinalizing();
return;
}
State = EState::AcceptingData;
}

void TTableBucketSpiller::ProcessFinalizing() {
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
State = EState::AcceptingDataRequests;
}
}

Expand Down Expand Up @@ -1460,7 +1493,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
State = EState::InMemory;
State = EState::WaitingForExtraction;
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}
Expand Down
39 changes: 27 additions & 12 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,35 @@ class TTableBucketSpiller {
void Finalize();
// Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
bool HasRunningAsyncIoOperation() const;

// Is bucket in memory. False if spilled.
bool IsInMemory() const;
// Is bucket loaded to memory but still owned by spilled.
// ExtractBucket must be called if true.
bool IsExtractionRequired() const;

bool IsProcessingFinished() const {
return NextVectorToProcess == ENextVectorToProcess::None;
}
// Is there any bucket that is being spilled right now.
bool IsProcessingSpilling() const;
// Is spiller ready to start loading new bucket.
bool IsAcceptingDataRequests() const;
// Is there any bucket that is being restored right now.
bool IsRestoring() const;

private:
void ProcessBucketSpilling();
template <class T>
void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
void ProcessBucketRestoration();
void ProcessFinalizing();

private:

enum class EState {
InMemory,
Spilling,
AcceptingData,
Finalizing,
AcceptingDataRequests,
Restoring,
InMemory
WaitingForExtraction
};

enum class ENextVectorToProcess {
Expand All @@ -156,11 +166,9 @@ class TTableBucketSpiller {

ui64 SpilledBucketsCount = 0;

bool IsFinalizing = false;
bool IsFinalizingRequested = false;

TTableBucket CurrentBucket;

bool IsBucketOwnedBySpiller = false;
};


Expand Down Expand Up @@ -286,14 +294,21 @@ class TTable {
// Flushes all the spillers.
void FinalizeSpilling();

// Checks if there any async operation running. If return value is true it's safe to return Yield.
bool HasRunningAsyncIoOperation() const;
// Checks if spilling has any running save operation
bool IsSpillingFinished() const;

// Checks if spilling ready for requesting buckets for restoration.
bool IsSpillingAcceptingDataRequests() const;

bool IsProcessingFinished() const;
// Checks is spilling has any running load operation
bool IsRestoringSpilledBuckets() const;

// Checks if bucket fully loaded to memory and may be joined.
bool IsBucketInMemory(ui32 bucket) const;

// Checks if extraction of bucket is required
bool IsSpilledBucketWaitingForExtraction(ui32 bucket) const;

// Starts loading spilled bucket to memory.
void StartLoadingBucket(ui32 bucket);

Expand Down