@@ -368,7 +368,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
368368 YQL_ENSURE (false , " Consume() called on wide block stream" );
369369 }
370370
371- void WideConsume (TUnboxedValue* values, ui32 count) final {
371+ void WideConsume (TUnboxedValue values[] , ui32 count) final {
372372 YQL_ENSURE (!IsWaitingFlag_);
373373 YQL_ENSURE (count == OutputWidth_);
374374
@@ -379,7 +379,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
379379
380380 TVector<const arrow::Datum*> datums;
381381 datums.reserve (count - 1 );
382- for (ui32 i = 0 ; i + 1 < count; ++i) {
382+ for (ui32 i = 0 ; i < count - 1 ; ++i) {
383383 datums.push_back (&TArrowBlock::From (values[i]).GetDatum ());
384384 }
385385
@@ -404,9 +404,10 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
404404 if (src->is_scalar ()) {
405405 output.emplace_back (*src);
406406 } else {
407- IArrayBuilder::TArrayDataItem dataItem;
408- dataItem.Data = src->array ().get ();
409- dataItem.StartOffset = 0 ;
407+ IArrayBuilder::TArrayDataItem dataItem {
408+ .Data = src->array ().get (),
409+ .StartOffset = 0 ,
410+ };
410411 Builders_[j]->AddMany (&dataItem, 1 , indexes, outputBlockLen);
411412 output.emplace_back (Builders_[j]->Build (true ));
412413 }
@@ -418,11 +419,14 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
418419 }
419420
420421 void DoConsume (TVector<std::unique_ptr<TArgsDechunker>>&& outputData) const {
422+ Y_ENSURE (outputData.size () == Outputs_.size ());
423+
421424 while (!outputData.empty ()) {
422425 bool hasData = false ;
423426 for (size_t i = 0 ; i < Outputs_.size (); ++i) {
424427 if (Outputs_[i]->IsFull ()) {
425428 IsWaitingFlag_ = true ;
429+ Y_ENSURE (OutputData_.empty ());
426430 OutputData_ = std::move (outputData);
427431 return ;
428432 }
@@ -474,7 +478,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
474478 return !IsWaitingFlag_;
475479 }
476480
477- size_t GetHashPartitionIndex (const arrow::Datum** values, ui64 blockIndex) {
481+ size_t GetHashPartitionIndex (const arrow::Datum* values[] , ui64 blockIndex) {
478482 ui64 hash = 0 ;
479483 for (size_t keyId = 0 ; keyId < KeyColumns_.size (); keyId++) {
480484 const ui32 columnIndex = KeyColumns_[keyId].Index ;
@@ -512,7 +516,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
512516 if (blockType->GetShape () == NMiniKQL::TBlockType::EShape::Many) {
513517 auto itemType = blockType->GetItemType ();
514518 YQL_ENSURE (!itemType->IsPg (), " pg types are not supported yet" );
515- Builders_.emplace_back (MakeArrayBuilder (helper, itemType, *NYql::NUdf::GetYqlMemoryPool (), maxBlockLen, nullptr ));
519+ Builders_.emplace_back (MakeArrayBuilder (helper, itemType, *NYql::NUdf::GetYqlMemoryPool (), maxBlockLen, nullptr , {. MinFillPercentage = 100 } ));
516520 } else {
517521 Builders_.emplace_back ();
518522 }
0 commit comments