Skip to content

get rid of wide fields in wide combiner. Not used in llvm #6536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

95 changes: 55 additions & 40 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,24 @@ struct TCombinerNodes {
}
}

void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** from, NUdf::TUnboxedValue* to) const {
for (ui32 i = 0U; i < ItemNodes.size(); ++i) {
if (from[i]) {
to[i] = std::move(*(from[i]));
}
}
}

void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* from, NUdf::TUnboxedValue** to) const {
for (size_t i = 0, j = 0; i != ItemNodes.size(); ++i) {
if (IsInputItemNodeUsed(i)) {
*to[i] = std::move(from[j++]);
} else {
to[i] = nullptr;
}
}
}

void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
if (keys) {
std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod());
Expand Down Expand Up @@ -346,16 +364,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
enum class ETasteResult: i8 {
Init = -1,
Update,
Skip
ConsumeRawData,
ExtractRawData
};
TSpillingSupportState(
TMemoryUsageInfo* memInfo, size_t wideFieldsIndex,
TMemoryUsageInfo* memInfo,
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
)
: TBase(memInfo)
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
, WideFieldsIndex(wideFieldsIndex)
, UsedInputItemType(usedInputItemType)
, KeyAndStateType(keyAndStateType)
, KeyWidth(keyWidth)
Expand All @@ -380,7 +398,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
bool IsProcessingRequired() const {
if (InputStatus != EFetchResult::Finish) return true;

return HasDataForProcessing;
return HasRawDataToExtract || HasDataForProcessing;
}

bool UpdateAndWait() {
Expand Down Expand Up @@ -424,10 +442,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
if (GetMode() == EOperatingMode::ProcessSpilled) {
if (HasRawDataToExtract) {
// Tongue not used here.
Throat = BufferForUsedInputItems.data();
HasRawDataToExtract = false;
HasDataForProcessing = true;
return ETasteResult::ExtractRawData;
}
HasDataForProcessing = false;
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
Tongue = SpilledBuckets.front().InMemoryProcessingState->Tongue;
BufferForUsedInputItems.resize(0);
return isNew ? ETasteResult::Init : ETasteResult::Update;
}

Expand All @@ -445,9 +472,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
BufferForKeyAndState.resize(0);
TryToSpillRawData(bucket, bucketId);
// Prepare space for raw data
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
BufferForUsedInputItems.resize(ItemNodesSize);
BufferForUsedInputItemsBucketId = bucketId;
Throat = BufferForUsedInputItems.data();

return ETasteResult::Skip;
return ETasteResult::ConsumeRawData;
}

NUdf::TUnboxedValuePod* Extract() {
Expand All @@ -472,25 +503,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
BufferForKeyAndState.resize(0);
}

// Copies data from WideFields to local and tries to spill it using suitable bucket.
// if the bucket is already busy, then the buffer will wait for the next iteration.
void TryToSpillRawData(TSpilledBucket& bucket, size_t bucketId) {
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");

for (size_t i = 0; i < ItemNodesSize; ++i) {
if (fields[i]) {
BufferForUsedInputItems.push_back(*fields[i]);
}
}
if (bucket.AsyncWriteOperation.has_value()) {
BufferForUsedInputItemsBucketId = bucketId;
return;
}
bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
BufferForUsedInputItems.resize(0);
}

bool FlushSpillingBuffersAndWait() {
UpdateSpillingBuckets();

Expand Down Expand Up @@ -620,8 +632,14 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}
AsyncReadOperation = std::nullopt;
}

auto& bucket = SpilledBuckets.front();
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
if (HasDataForProcessing) {
Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;
return false;
}
//recover spilled state
while(!bucket.SpilledState->Empty()) {
RecoverState = true;
Expand Down Expand Up @@ -651,17 +669,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
if (AsyncReadOperation) {
return true;
}
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) {
if (fields[i]) {
fields[i] = &(BufferForUsedInputItems[j++]);
}
}

Tongue = bucket.InMemoryProcessingState->Tongue;
Throat = bucket.InMemoryProcessingState->Throat;

HasDataForProcessing = true;
HasRawDataToExtract = true;
return false;
}
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
Expand Down Expand Up @@ -725,8 +737,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

bool HasDataForProcessing = false;

bool HasRawDataToExtract = false;

TState InMemoryProcessingState;
const size_t WideFieldsIndex;
const TMultiType* const UsedInputItemType;
const TMultiType* const KeyAndStateType;
const size_t KeyWidth;
Expand Down Expand Up @@ -1237,6 +1250,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
, AllowSpilling(allowSpilling)
{}

// MARK: DoCAlculate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this mark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, forgot to remove it 😢

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
Expand All @@ -1246,14 +1260,12 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
auto **fields = ctx.WideFields.data() + WideFieldsIndex;

while (true) {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);

if (ptr->UpdateAndWait()) {
return EFetchResult::Yield;
}

if (ptr->InputStatus != EFetchResult::Finish) {
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
break;
Expand All @@ -1274,7 +1286,11 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
case TSpillingSupportState::ETasteResult::Update:
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::Skip:
case TSpillingSupportState::ETasteResult::ConsumeRawData:
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
break;
case TSpillingSupportState::ETasteResult::ExtractRawData:
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
break;
}
continue;
Expand Down Expand Up @@ -1553,8 +1569,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
#endif
private:
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TSpillingSupportState>(WideFieldsIndex,
UsedInputItemType, KeyAndStateType,
state = ctx.HolderFactory.Create<TSpillingSupportState>(UsedInputItemType, KeyAndStateType,
Nodes.KeyNodes.size(),
Nodes.ItemNodes.size(),
#ifdef MKQL_DISABLE_CODEGEN
Expand Down
Loading