Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
true, // defaultWatermarksMode
true); // syncActor
} else {
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
}
if (output) {
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
return TExprBase(output);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ui64 ParseEnableSpillingNodes(const TString &v) {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromString<NYql::TDqSettings::EEnabledSpillingNodes>(s);
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
Expand Down
43 changes: 43 additions & 0 deletions ydb/core/kqp/ut/olap/aggregations_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,49 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
}
}

Y_UNIT_TEST_TWIN(DisableBlockEngineInAggregationWithSpilling, AllowSpilling) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
settings.AppConfig.MutableTableServiceConfig()->SetBlockChannelsMode(NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_FORCE);
if (AllowSpilling) {
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("Aggregation");
} else {
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
}
TKikimrRunner kikimr(settings);

TLocalHelper(kikimr).CreateTestOlapTable();
auto client = kikimr.GetQueryClient();

{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
}

{
TString query = R"(
--!syntax_v1
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
GROUP BY level
)";

auto res = StreamExplainQuery(query, client);
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());

auto plan = CollectStreamResult(res);

bool hasWideCombiner = plan.QueryStats->Getquery_ast().Contains("WideCombiner");
UNIT_ASSERT_C(hasWideCombiner == AllowSpilling, plan.QueryStats->Getquery_ast());
}
}

Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2461,6 +2461,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetBlockChannelsMode(blockChannelsMode);
appConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(true);
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/core/yql_aggregate_expander.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()

HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
[](const auto& child) { return child->ChildrenSize() == 3; });
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
EffectiveCompact = (HaveDistinct && CompactForDistinct && !UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
for (const auto& trait : Traits) {
auto mergeLambda = trait->Child(5);
if (mergeLambda->Tail().IsCallable("Void")) {
Expand Down Expand Up @@ -67,7 +67,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()
return GeneratePhases();
}

if (TypesCtx.IsBlockEngineEnabled()) {
if (UseBlocks) {
if (Suffix == "Combine") {
auto ret = TryGenerateBlockCombine();
if (ret) {
Expand Down Expand Up @@ -2785,7 +2785,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
streams.push_back(SerializeIdxSet(indicies));
}

if (TypesCtx.IsBlockEngineEnabled()) {
if (UseBlocks) {
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
unionAllInputs[i] = Ctx.Builder(Node->Pos())
.Callable("Map")
Expand All @@ -2806,7 +2806,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
}

auto settings = cleanOutputSettings;
if (TypesCtx.IsBlockEngineEnabled()) {
if (UseBlocks) {
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
}

Expand Down Expand Up @@ -2839,7 +2839,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
}

TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
if (UsePartitionsByKeys || !UseBlocks) {
return nullptr;
}

Expand Down Expand Up @@ -2934,7 +2934,7 @@ TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContex
return ret;
}
}
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled(), false);
}

} // namespace NYql
10 changes: 7 additions & 3 deletions ydb/library/yql/core/yql_aggregate_expander.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NYql {
class TAggregateExpander {
public:
TAggregateExpander(bool usePartitionsByKeys, const bool useFinalizeByKeys, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false)
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false, bool allowSpilling = false)
: Node(node)
, Ctx(ctx)
, TypesCtx(typesCtx)
Expand All @@ -25,6 +25,7 @@ class TAggregateExpander {
, HaveSessionSetting(false)
, OriginalRowType(nullptr)
, RowType(nullptr)
, UseBlocks(typesCtx.IsBlockEngineEnabled() && !allowSpilling)
{
PreMap = Ctx.Builder(node->Pos())
.Lambda()
Expand Down Expand Up @@ -115,6 +116,7 @@ class TAggregateExpander {
const TStructExprType* RowType;
TVector<const TItemExprType*> RowItems;
TExprNode::TPtr PreMap;
bool UseBlocks;

TExprNode::TListType InitialColumnNames;
TExprNode::TListType FinalColumnNames;
Expand All @@ -130,8 +132,10 @@ class TAggregateExpander {
std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged;
};

inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const bool useFinalizeByKey, const bool useBlocks) {
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx, true);
inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
const bool useFinalizeByKey, const bool useBlocks, const bool allowSpilling) {
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx,
true, false, false, allowSpilling);
return aggExpander.ExpandAggregate();
}

Expand Down
28 changes: 28 additions & 0 deletions ydb/library/yql/dq/common/dq_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ enum class EHashJoinMode {
GraceAndSelf /* "graceandself" */,
};

enum class EEnabledSpillingNodes : ui64 {
None = 0ULL /* "None" */,
GraceJoin = 1ULL /* "GraceJoin" */,
Aggregation = 2ULL /* "Aggregation" */,
All = ~0ULL /* "All" */,
};

class TSpillingSettings {
public:
TSpillingSettings() = default;
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};

operator bool() const {
return Mask;
}

bool IsGraceJoinSpillingEnabled() const {
return Mask & ui64(EEnabledSpillingNodes::GraceJoin);
}

bool IsAggregationSpillingEnabled() const {
return Mask & ui64(EEnabledSpillingNodes::Aggregation);
}

private:
const ui64 Mask = 0;
};

} // namespace NYql::NDq

IOutputStream& operator<<(IOutputStream& stream, const NYql::NDq::TTxId& txId);
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ using namespace NYql::NNodes;
namespace NYql::NDq {

TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct,
bool usePhases, const bool useFinalizeByKey)
bool usePhases, const bool useFinalizeByKey, const bool allowSpilling)
{
if (!node.Maybe<TCoAggregateBase>()) {
return node;
}
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey,
useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases, allowSpilling);
auto result = aggExpander.ExpandAggregate();
YQL_ENSURE(result);

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/opt/dq_opt_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ namespace NYql {

namespace NYql::NDq {

NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct, bool usePhases, const bool useFinalizeByKey);
NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
bool compactForDistinct, bool usePhases, const bool useFinalizeByKey, const bool allowSpilling);

NNodes::TExprBase DqRewriteTakeSortToTopSort(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);

Expand Down
17 changes: 0 additions & 17 deletions ydb/library/yql/dq/tasks/dq_task_program.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,6 @@

namespace NYql::NDq {

class TSpillingSettings {
public:
TSpillingSettings() = default;
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};

operator bool() const {
return Mask;
}

bool IsGraceJoinSpillingEnabled() const {
return Mask & ui64(TDqConfiguration::EEnabledSpillingNodes::GraceJoin);
}

private:
const ui64 Mask = 0;
};

const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);

TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ TDqConfiguration::TDqConfiguration() {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromString<EEnabledSpillingNodes>(s);
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
Expand Down
6 changes: 0 additions & 6 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ struct TDqSettings {
File /* "file" */,
};

enum class EEnabledSpillingNodes : ui64 {
None = 0ULL /* None */,
GraceJoin = 1ULL /* "GraceJoin" */,
All = ~0ULL /* "All" */,
};

struct TDefault {
static constexpr ui32 MaxTasksPerStage = 20U;
static constexpr ui32 MaxTasksPerOperation = 70U;
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
bool syncActor = Config->ComputeActorType.Get() != "async";
return NHopping::RewriteAsHoppingWindow(node, ctx, input.Cast(), analyticsHopping, lateArrivalDelay, defaultWatermarksMode, syncActor);
} else {
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false));
NDq::TSpillingSettings spillingSettings(Config->GetEnabledSpillingNodes());
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false), spillingSettings.IsAggregationSpillingEnabled());
}
}
return node;
Expand Down
28 changes: 28 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,34 @@
}
],
"test.test[aggregate-aggregate_with_deep_aggregated_column--Results]": [],
"test.test[aggregate-disable_blocks_with_spilling--Analyze]": [
{
"checksum": "7887cfe87307d36449cd6afe65636dd1",
"size": 4615,
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Analyze_/plan.txt"
}
],
"test.test[aggregate-disable_blocks_with_spilling--Debug]": [
{
"checksum": "c3bb2f21048ee6f5a0e7846fd28f481d",
"size": 2423,
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Debug_/opt.yql_patched"
}
],
"test.test[aggregate-disable_blocks_with_spilling--Plan]": [
{
"checksum": "7887cfe87307d36449cd6afe65636dd1",
"size": 4615,
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Plan_/plan.txt"
}
],
"test.test[aggregate-disable_blocks_with_spilling--Results]": [
{
"checksum": "42f51df2ad014764141d891357b0b6b6",
"size": 915,
"uri": "https://{canondata_backend}/1775059/60aa9c77d2376aa1beb6e616fcbdc82d0b2724be/resource.tar.gz#test.test_aggregate-disable_blocks_with_spilling--Results_/results.txt"
}
],
"test.test[aggregate-group_by_column_alias_reuse-default.txt-Analyze]": [
{
"checksum": "bf546487bcd475b8555f2a7883d1f6a0",
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/tests/sql/sql2yql/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,13 @@
"uri": "https://{canondata_backend}/1942415/e6af6d354a98ef890e03fc9f0ff5926afc11a26b/resource.tar.gz#test_sql2yql.test_aggregate-dedup_state_keys_/sql.yql"
}
],
"test_sql2yql.test[aggregate-disable_blocks_with_spilling]": [
{
"checksum": "e1c9df055ae7de78e0d0364ec949dec4",
"size": 1398,
"uri": "https://{canondata_backend}/1936947/cdbc6e86b3a08f513dc20af9f537f10f6b930f5d/resource.tar.gz#test_sql2yql.test_aggregate-disable_blocks_with_spilling_/sql.yql"
}
],
"test_sql2yql.test[aggregate-ensure_count]": [
{
"checksum": "680e664bf810c0f13951de38d3cf94f7",
Expand Down Expand Up @@ -21342,6 +21349,13 @@
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-dedup_state_keys_/formatted.sql"
}
],
"test_sql_format.test[aggregate-disable_blocks_with_spilling]": [
{
"checksum": "ed1c0334420d2ec08b8ccc4020e4fb6b",
"size": 88,
"uri": "https://{canondata_backend}/1920236/3d99d8b2ede4d290229a75d3c17d5a932a859473/resource.tar.gz#test_sql_format.test_aggregate-disable_blocks_with_spilling_/formatted.sql"
}
],
"test_sql_format.test[aggregate-ensure_count]": [
{
"checksum": "7a2ea2eeaf67cc395330f6718ce49635",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
in Input input.txt

providers dq
pragma dq.SpillingEngine="file";
pragma dq.EnableSpillingNodes="Aggregation";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pragma BlockEngine='force';
select count(key) from plato.Input group by key;
Loading