Skip to content

[SPARK-38237][SQL][SS] Allow ClusteredDistribution to require full clustering keys #35574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Feb 19, 2022

What changes were proposed in this pull request?

This PR is to allowClusteredDistribution (such as operator with window, aggregate, etc) to require full clustering keys. Traditionally operator with ClusteredDistribution can be satisfied with HashPartitioning on subset of clustering keys. This behavior could potentially lead to data skewness (comments raised from #35552). Although we have various way to deal with the data skewness in this case, such as adding repartition(), disabling bucketing, adding custom AQE rule etc. There's still case we cannot handle e.g. data skewness in the same stage - (join(t1.x = t2.x) followed by window(t1.x, t1.y)). With the newly introduced config spark.sql.requireAllClusterKeysForDistribution.

Why are the changes needed?

Allow users to work around data skewness issue when partitioned on subset of keys.

Does this PR introduce any user-facing change?

Yes, the added config, but disable by default.

How was this patch tested?

Added unit test in DataFrameWindowFunctionsSuite.scala and DistributionSuite.scala

@c21
Copy link
Contributor Author

c21 commented Feb 19, 2022

@@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest
val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
checkAnswer(df, Row(2, 3, 1))
}

test("SPARK-38237: require all cluster keys for child required distribution") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same unit test added in #35552, credited to @HeartSaVioR. I may change the test query to window query instead of aggregate, to make it more convincing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Please feel free to adjust the test case or leverage the test case to deduce additional test cases.

case (p: HashPartitioning, d: ClusteredDistribution) =>
if (conf.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION) &&
requiredChildDistributions.size == 1 && !p.isPartitionedOnFullKeys(d)) {
// Add an extra shuffle for `ClusteredDistribution` even though its child
Copy link
Contributor

@sigmod sigmod Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if branch actually means the requirement is not satisfied.

Either adding back HashClusteredPartition or adding a boolean into ClusteredPartition seems neater for me, because there're other existing call sites of satisfies and might be more future call sites such that we don't want to replicate special logic there, e.g.,
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala#L34-L50

I'd prefer to adding HashClusteredDistribution back as it's conceptually a very different requirement from ClusteredDistribution. Dynamic dispatching is more preferable than if-else for polymorphism, in general. But that's a secondary issue.

Copy link
Contributor Author

@c21 c21 Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there're other existing call sites of satisfies and might be more future call sites such that we don't want to replicate special the logic there, e.g.,
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala#L34-L50

This is a good point, thanks for pointing it out @sigmod. I am changing the approach to rewrite logic inside HashPartitioning.satisfies0, also suggested by #35552 (comment) from @sunchao .

Copy link
Contributor

@sigmod sigmod Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @c21!

It's probably discussed in the another comment thread. But I'd like to reiterate that although currently it's a global config per a query, the examples I mentioned show that it's a physical plan alternative that shouldn't be ignored by default.

The planner eventually probably wants to make a more automatic decision between HashClusteredDistribution and ClusteredDistribution at each operator level (with per-operator hints, heuristics, or stats). By that time, we will still need to bring HashClusteredDistribution back anyway, regardless how it is named and where the dispatch goes (a dedicated class or a bool field in ClusteredDistribution).

Copy link
Contributor

@sigmod sigmod Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if we have HashClusteredDistribution, we can put the following small heuristics here:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala#L48

if (partitionSpec.exists(_.dataType.isInstanceOf[BooleanType])) {
    HashClusteredDistribution(partitionSpec) :: Nil
 } else {
    ClusteredDistribution(partitionSpec) :: Nil
 }

Happy to learn what an alternative implementation for such a heuristic would look like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what does this heuristic mean? why we make the exception when all partition key types are boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the rule create correctness issue?
The rule changed the input shuffle of w without re-considering the downstream operators/shuffles of w, such that the distribution requirement of a downstream operator of w may actually not be met with the newly injected ShuffleExchangeExec.

@sigmod - yes, this is mainly to show that we can do the same thing as physical plan rules. We can either put the logic inside EnsureRequirements, or run EnsureRequirements again inside the new rule (we did similar thing in OptimizeSkewedJoin). I was mainly wanted to point out there is alternative options to achieve same effect.

Copy link
Contributor

@sigmod sigmod Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either put the logic inside EnsureRequirements,
or run EnsureRequirements again inside the new rule

Thanks for elaborating. Ok, it's do-able, but feels more complex than necessary and maybe riskier than simply expressing a requirement. Since there has already been a requirement matching framework in EnsureRequirements, expressing a different requirement seems neater/more natural to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (partitionSpec.exists(_.dataType.isInstanceOf[BooleanType])) {
    HashClusteredDistribution(partitionSpec) :: Nil
 } else {
    ClusteredDistribution(partitionSpec) :: Nil
 }

I'm not sure if this is a good example, since you also need to check child.outputPartitioning, right? otherwise, the partition keys in the child output partitioning may not contain boolean at all but you'd still require a full partition key match, which may not be optimal. For instance:

WindowExec(partitionSpec=(x: bool, y: int, z: long))
  - child partitioned by y, z 

So in the end, it seems like this is not only expressing requirements but also doing some sort of matching similar to what's done in EnsureRequirements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the end, it seems like this is not only expressing requirements but also doing some sort of matching

If we want to achieve such plans, I think we can (should?) still capture such things in requirements and satisfies, instead of baking complexities into EnsureRequirements or yet-another physical rule. E.g,

case class ClusteredDistribution(
    clustering: Seq[Expression],
    containing: Set[Expression],  /* satisfied partitioning must contain those expressions */
    disallowedCombinations: Set(Set[Expression]), /* disallow those combinations */
    requiredNumPartitions: Option[Int] = None) 

where the latter two specify what subset of possibilities are considered. There may be other ways to express the same thing. Then, @sunchao, the requirement to capture what you want looks like:
ClusteredDistribution({x, y, z}, {}, {{x}})

If we have that, HashClusteredDistribution is just a special case and not needed, because
HashClusteredDistribution({x, y, z}) = ClusteredDistribution({x, y, z}, {x, y, z}, {})

Without this augmented ClusteredDistribution, HashClusteredDistribution is the only thing we can play with.
That being said, we need more expressive power for ClusteredDistribution, either more fields or the previous HashClusteredDistribution :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to achieve such plans, I think we can (should?) still capture such things in requirements and satisfies, instead of baking complexities into EnsureRequirements or yet-another physical rule. E.g,

Yes agreed. It's better to avoid case analysis in EnsureRequirements or some other rules.

On the augmented ClusteredDistribution, I'm not sure whether we'll need containing and disallowedCombinations for broader scenarios though, or it is sufficient to just consider simple knobs like spark.sql.requireAllClusterKeysForHashPartition for the majority use cases. But as you demonstrated, I think we can just keep ClusteredDistribution, evolve it and make it more expressive if there're strong requirements in future.

@c21 c21 changed the title [SPARK-38237][SQL][SS] Allow operator with ClusteredDistribution to require full clustering keys [SPARK-38237][SQL][SS] Allow HashPartitioning to satisfy ClusteredDistribution only with full clustering keys Feb 19, 2022
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me. We might be able to tune further e.g a threshold of the number of partitions , but we can add later whenever we find the concrete needs.

* [[ClusteredDistribution]].
*/
def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = {
expressions.length == distribution.clustering.length &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition is more restrict than we explain in the config (e.g. is the ordering important here?), but I'm fine with this if we are all OK with this, as my proposal is technically the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR - this is a good point. I am more inclined to more restricted condition, as hash(x1, x2) will send rows in different partitions compared to hash(x2, x1), and potentially it can have data skew in one case, but not in the other. Before me update the doc, cc more folks for commenting, @cloud-fan and @sunchao.

Copy link
Member

@sunchao sunchao Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if ordering is important here: is it a common case that data skewness is introduced after changing the order the hash keys? I'll be surprised if murmur3 hash exhibits this kind of property.

This also makes the optimization harder to kick in (imagine users have to carefully align join or aggregation keys to the same order as that of bucket keys in the table). It is also a behavior change of bucket join, since currently Spark is more relaxed and will reorder the hash keys w.r.t join keys in EnsureRequirements.reorderJoinPredicates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm also in favor of having more restricted condition. With more restricted condition, end users can change the order of keys to turn their query further as a last resort if simply turning the config on isn't performant enough. We expect that changing the order of the hash keys would make a change on the partition ID, right?

The scenario when end users will turn on this config is a major point. They wouldn't turn on this config before they try running the query. (This config is marked as internal, and by default it's disabled.) They would turn on the config after running the query and Spark worked badly. One can argue that they can add repartition manually in their code/SQL statement which makes sense in general, but we have counter-arguments, 1) they don't only have a few of queries 2) the queries could be machine/tool-generated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with more strict condition. It looks no harm and will be less likely to bring unexpected case of data skewness. And I think for such config, it is intended to give users more control so ordering might be also one.

// `ClusteredDistribution`. Opt in this feature with enabling
// "spark.sql.requireAllClusterKeysForHashPartition", can help avoid potential data
// skewness for some jobs.
isPartitionedOnFullKeys(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up with strict ordering, we could document the method doc on isPartitionedOnFullKeys that it is also requiring exact order, and replace the condition of StatefulOpClusteredDistribution with isPartitionedOnFullKeys. I'm wondering we would care about ordering for cases we described.

partitioning.expressions.zip(distribution.clustering).forall {
case (l, r) => l.semanticEquals(r)
}
partitioning.isPartitionedOnFullKeys(distribution)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is beyond the scope of the PR, same thing applies here. Would we need to require strict order of keys? Just curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably need strict order here, as it feels more safer with requiring of order, and this is the original behavior when join requires HashClusteredDistribution. This essentially restores the join's behavior to what it was with HashClusteredDistribution.

@@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest
val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
checkAnswer(df, Row(2, 3, 1))
}

test("SPARK-38237: require all cluster keys for child required distribution") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Please feel free to adjust the test case or leverage the test case to deduce additional test cases.

case ClusteredDistribution(requiredClustering, _) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
case c @ ClusteredDistribution(requiredClustering, _) =>
if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be worth to think of the trade-off on having a flag on ClusteredDistribution vs checking SQL config in here. For former, we may need to change the whole spots initializing ClusteredDistribution, but it is also meaning that we are open to finer-grained control in the future (additional mix-up conditions and configurations per-operator). For latter, it's probably the simplest change, but here we have no idea about the operator so we are restricted to apply the change in future as global manner.

I don't have strong preference on this as I'm talking about the extensibility which might not happen (or happen sooner). Just a 2 cents.

Copy link
Contributor Author

@c21 c21 Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually also thought about the pros and cons for these two approaches: 1). change behavior of HashPartitioning, vs 2). change behavior of ClusteredDistribution. I am more inclined to 1). change behavior of HashPartitioning for followed reason:

ClusteredDistribution's current definition is pretty clean and flexible, so let's not move backward.

/**
 * Represents data where tuples that share the same values for the `clustering`
 * [[Expression Expressions]] will be co-located in the same partition.
 */
case class ClusteredDistribution

As long as data is partitioned in the way where tuples/rows having same values for clustering is in same partition, then the partitioning can satisfy ClusteredDistribution. It tolerates both full keys and subset of keys, so it's flexible enough to work for a range of operators - aggregate, window, join (together with ShuffleSpec introduced recently for co-partitioning). It does not has any implicit requirement of hash expression, or hash function (so it gets rid of the drawback of HashClusteredDistribution). More partitioning other than HashPartitioning can satisfy ClusteredDistribution (e.g. RangePartitioning and DataSourcePartitioning). Add flag such as requiresFullKeysMatch into ClusteredDistribution would make every Partitioning implementation unnecessarily more complicated, as this is just a problem for HashPartitioning now.

HashPartitioning can decide flexibly by itself when should it satisfy ClusteredDistribution, either subset of keys (current behavior), or full keys (with config introduced in this PR). This leaves other Partitioning (RangePartitioning and DataSourcePartitioning) and ClusteredDistribution untouched. Indeed this is just a local decision made by HashPartitioning. I think this is more flexible and extendable. In the future, if other Partitioning has similar requirement, e.g. DataSourcePartitioning, similar logic can be introduced inside DataSourcePartitioning.satisfies0() locally without any intrusive interface change.

For latter, it's probably the simplest change, but here we have no idea about the operator so we are restricted to apply the change in future as global manner.

It's true, but I think the granularity is tricky to decide, so let's start with best solution to maintain our interface cleanly. We can discuss later if there is a strong requirement. One can further argue if user wants more finer granularity control that he/she wants to specify exact operator in the query (e.g. the query has 3 aggregates, and only wants to enable feature for 1 of them).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also inclined to option 1) for now, and agree to the points that @c21 raised above.

As a Spark developer, I was originally confused when seeing both HashClusteredDistribution and ClusteredDistribution and had to navigate the code base and reason about their behavior differences. Combined with the newly introduced config, a developer now has to remember parsing the value of the config and choose HashClusteredDistribution or ClusteredDistribution accordingly, which is some extra burden. In addition, it's better to have a separate StatefulOpClusteredDistribution dedicated to SS use cases, as it makes them more distinctive.

Of course, having a separate HashClusteredDistribution opens up more opportunities for it to evolve separately. But I'd suggest to only consider that when we have some concrete ideas. So far, I don't see what can't be done with ClusteredDistribution alone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a Spark developer, I was originally confused when seeing both HashClusteredDistribution and ClusteredDistribution and had to navigate the code base and reason about their behavior differences.

The classdoc of two classes were very clear about differences. The confusion may come from the structure we have actual logic about requirement check for distribution in partitioning instead of distribution (I'm not an expert of this part, it might have to go with this way), but as long as the implementation matches up with classdoc, it is pretty clear.

Combined with the newly introduced config, a developer now has to remember parsing the value of the config and choose HashClusteredDistribution or ClusteredDistribution accordingly, which is some extra burden.

Well, I'd say it is more extra burden if we have to expect two different requirements from ClusteredDistribution. Once we understand the difference between HashClusteredDistribution and ClusteredDistribution, it is obvious that we can easily infer the behavior from which class is used.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already made a single exception (spark.sql.requireAllClusterKeysForCoPartition), and we are going to make a broader exception here (spark.sql.requireAllClusterKeysForHashPartition).
(EDIT: shouldn't we want this for data source partitioning as well?)

There have been valid cases to support these exceptions, and they are not edge-cases like from odd data distribution. That said, this is going to be a valid requirement for distribution. In other words, there are known cases ClusteredDistribution solely does not solve the problem nicely. It is trying hard to eliminate shuffle as many as possible (assuming that shuffle is evil), but it is doing nothing for the cases shuffle does help.

So I don't think adding flag in ClusteredDistribution is messing up interface and structure. I think it is opposite. We are making exceptions for requirement of ClusteredDistribution - requiring full keys is not a one of requirements of ClusteredDistribution as of now (and even with this PR), right? We haven't documented and it is now completely depending on the implementation of HashPartitioning. If someone starts to look into ClusteredDistribution, it is likely that someone misses the case. It is also possible we miss the config when implementing DataSourcePartitioning against ClusteredDistribution. I said "trade-off" because it pinpoints the issue and tries to bring a small amount of fix which may be preferable for someone, but my preference is making it clearer.

If we are skeptical to address this in ClusteredDistribution because we don't want to make requirement of ClusteredDistribution be extended further, this is really a good rationalization we should revive back HashClusteredDistribution because the requirement is 100% fit to what we are doing. The only difference is that data source partitioning wouldn't satisfy the requirement in any way which may be considered as major downside according to the roadmap, so yes it brings data source partitioning as a second class for some cases. If we are against of it, please make sure ClusteredDistribution covers everything "by definition" HashClusteredDistribution could cover it solely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerning that requiring full keys is a valid requirement we shouldn't simply drop, at least I see from inputs.

Just to make sure we are on the same page. The operators such as aggregate, window are always using ClusteredDistribution for years. So the problem of having data skew in those operators, is a new problem we are aiming to fix, not a regression coming from recent PRs.

We are technically changing ClusteredDistribution, and we need to make it clear. Otherwise this is also going to be an undocumented one. Instead of just documenting, I prefer having the flag explicitly so that any partitioning would know about the detailed requirement very clear. You don't need to remember the config when you work on DataSourcePartitioning. ClusteredDistribution will provide it for you.

hmm maybe I am thinking too much from my perspective, but I am still not very convinced this is a problem for ClusteredDistribution. Hashing on subset of keys causes data skew seems to me a problem for HashPartitioning only. Other Partitioning such as RangePartitioning or DataSourcePartitioning can partition data very differently from HashPartitioning, or they do not use hash at all. So they might have very different causes other than subset of keys, to lead to data skew (e.g. suboptimal sampling algorithm for RangePartitioning to cause bad choice of partition boundary, or suboptimal user-defined DataSourcePartitioning to cause skew). I am kind of worried about introducing a flag such as requiresFullKeysMatch in ClusteredDistribution might be just useful for HashPartitioning, but not for other Partitioning classes. Once we introduce the flag, it's hard to change/remove the flag, because other developers or users depend on DataSourcePartitioning might be broken once we change ClusteredDistribution again. So just want to make sure we are very cautious about it.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure we are on the same page. The operators such as aggregate, window are always using ClusteredDistribution for years. So the problem of having data skew in those operators, is a new problem we are aiming to fix, not a regression coming from recent PRs.

I agree with this, but the difference is that we dropped HashClusteredDistribution so we have less feature to leverage. In the SPIP doc we said "unify" two classes, but what we had done is "removing" the HashClusteredDistribution without alternatives.

Regarding the problem, there are two perspectives on the problem, 1) data skew 2) insufficient number of partitions. 2) applies to any partitioning.

Let's think about end user's perspective. They run the batch query, and expect Spark to finish it as quick as possible (or be resource-efficient). Spark produces the general config - default number of shuffle partitions - which defines the general parallelism whenever shuffle is introduced.

The point is when the config takes effect. More and more output partitioning ClusteredDistribution could match, less and less shuffle could be kicked in, while end users may expect like "a set of known operators would introduce shuffles, which adjusts parallelism as I set the config". For example, for the case ClusteredDistribution with DataSourcePartitioning, it could be hypothetically "nowhere". The max parallelism could be tied to source's parallelism and nothing except manual change of the query could help since there could be no shuffle at all. AQE won't help the parallelism/skew issue within the stage.

Shuffle may not be something we should try hard to eliminate at all. We also need to think when the shuffle would be likely help. We can't leverage stats in physical execution, so my fall back goes to be heuristic, like the different view and resolution on this problem #35574 (comment).

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In totally ideal world, previous stage could calculate the cardinality of all required grouping keys in next stage, and once the previous stage finishes, query executor decides to split the stage based on the difference of cardinality and the desired threshold of number of partitions (or even the number of values bound to each key). This is totally ideal and I don't know whether it is even technically feasible. But if we agree that this is ideal, we are in agreement that shuffle is not always an evil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle may not be something we should try hard to eliminate at all. We also need to think when the shuffle would be likely help. We can't leverage stats in physical execution, so my fall back goes to be heuristic, like the different view and resolution on this problem #35574 (comment).

Instead, having a threshold (minimum) of the number of partitions doesn't sound crazy for me. The threshold could be heuristic one, or config - number or ratio compared to the default number of shuffle partitions, or default number of shuffle partitions if we wouldn't want to bring another config (but it may be too high to use for minimum).

@HeartSaVioR - First I agree with you sometime shuffle is good to have, so I guess this PR is aiming for the same goal - add proper shuffle on full clustering keys based on config, right? Just for my understanding, are you proposing above to have a config to set the minimal threshold for number of partitions for all queries needed shuffle? Can you elaborate more how query and which part of query would be rewritten if violating the config? With cardinality stats from CBO and AQE (we don't have cardinality stats collected in AQE for now), we may potentially give some hint in query plan during logical planning. But this approach still sounds a little bit too high level for me, without elaborating details of algorithm.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this PR is aiming for the same goal - add proper shuffle on full clustering keys based on config, right?

Yes, but we still have an argument that why it is needed. I wouldn't say it is due to HashPartitioning, as I mentioned about two different perspectives. Even if the hash function of HashPartitioning is somewhat inefficient on proper distribution on some sort of grouping keys and data, it is only contributing to data skew. In any partitioning, the number of partitions are the physical limit of parallelism. It is ClusteredDistribution preventing two operators to have shuffle in between, coupling operators in the same stage, with same partitioning & parallelism.
(Please correct me if AQE can decide to split the single stage to multiple stages injecting shuffles in between.)

Considering two different perspectives, there are multiple cases of child partitioning for each physical node we may need to deal with:

  1. clustered keys are fully considered and having sufficient number of partitions (ideal)
  2. clustered keys are fully considered but having insufficient number of partitions
  3. only a subset of clustered keys are considered and having insufficient number of partitions
  4. only a subset of clustered keys are considered but having sufficient number of partitions

Since we are going with the manual / heuristic way to deal with it, I would like to see the way addressing more cases with less side-effects. That is the main reason I tried to think about alternatives.

Requiring full clustered keys can deal with 3) and 4), where 4) may be skewed (good case for shuffle) or not (shuffled may not be needed) so the benefit of having shuffle is conditional. Requiring a minimum threshold of number of partitioning can deal with 2) and 3) which are good in general to ensure minimum parallelism for grouping/joining operators, where it misses the case of 4), but we just mentioned the benefit of having shuffle in 4) is conditional. In addition, it is no longer only bound to data skew, hence applies to any partitioning.

Implementation wise, I imagine it is simple as we just add the another constraint of ClusteredDistribution. If the required number of partitions exists, it will strictly follow the number, otherwise we compare numPartitions of partitioning and the threshold being defined in ClusteredDistribution. The value of threshold could be optional if we doubt about the good default value working for majority of queries.

For sure, these constraints can be co-used, to deal with as many cases as they can. Requiring full clustered keys is still needed to deal with 4) - if we are not sure about whether this applies only to HashPartitioning or further partitioning, and want to defer the decision to have this in official constraint of ClusteredDistribution, I'd agree about deferring the decision till we deal with DataSourcePartitioning.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 21, 2022

Thinking out loud, there could be more ideas to solve this. One rough idea:

The loose requirement of ClusteredDistribution aims to avoid shuffle as many as possible, even if the number of partitions are quite small. If we can inject the shuffle in runtime (or even crazily, adaptively scaling in running stage) based on stats then we can be very adaptive, but I wouldn't expect it to happen in near future. Instead, having a threshold (minimum) of the number of partitions doesn't sound crazy for me. The threshold could be heuristic one, or config - number or ratio compared to the default number of shuffle partitions, or default number of shuffle partitions if we wouldn't want to bring another config (but it may be too high to use for minimum).

Rationalization: ClusteredDistribution has a requirement for exact number of partitions, but if I checked right, nowhere uses it except AQE. (And it is only used for physical node of shuffle.) We simply consider the current partitioning as ideal whenever it satisfies the distribution requirement. Adjusting default number of shuffle partitions won't take in effect since there is no shuffle, and AQE also doesn't help. Having a threshold (minimum) of the number of partitions would involve shuffle in many cases where there is an insufficient number of partitions. It still doesn't solve the case child has partitioned with sub-group keys which unfortunately has a bunch of partitions but skewed. But it is really an unusual case we don't have a good idea to pinpoint, and probably unavoidable to enforce end users to handle it manually.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 22, 2022

Another rough idea in parallel: let's consider the physical nodes in the same stage. Unless the stage contains the source node, this stage should have triggered shuffle before, based on the required child distribution of the first physical node (and required child distributions from remaining nodes would satisfy the output partitioning). If all nodes in the same stage require ClusteredDistributions but having different group keys (including more relaxed cases), it might be sensible expectation that picking up most number of grouping keys in this stage to represent the required child distribution for "this stage" would bring less skews (at least cardinality should go up). We are going to introduce shuffle in any way, so let's make the unavoidable shuffle be (heuristically) most effective.

This only works from the second stage - if the required child distribution is satisfied from the source node, there is no chance to inject the shuffle. Above rough idea could enforce injecting the shuffle if the number of partitions on the source node is too low, and after that this idea will take effect.

EDIT: Never mind. The relaxed condition is one way, not bi-directional. So it's not going to work.

@@ -407,6 +407,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention the ordering is also required if it is also in the condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, will change later once we all reach the consensus in comments.

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 23, 2022

Sorry I have not read all the discussions in this PR, just to give my 2 cents if these points are not mentioned yet:

  1. It looks neater to have a special Distribution for this case (maybe add a boolean flag to ClusteredDistribution). Then we don't need to look at a runtime config in several Partitioning implementations, which can easily be missed when adding new Partitioning that can satisfy ClusteredDistribution in the future.
  2. It's more flexible to have a special Distribution, as we can use it only in some of the operators in a query plan, not all of them. While using a config, we must make a global decision for the entire query plan.

@c21
Copy link
Contributor Author

c21 commented Feb 23, 2022

Hi all, I changed the PR to the approach to add boolean field inside ClusteredDistribution, based on feedback from quorum. Could you guys take a look when you have time? Thanks.

cc @cloud-fan, @sunchao, @viirya, @HeartSaVioR, @sigmod.

val expressions = ordering.map(_.child)
if (requiredAllClusterKeys) {
// Checks `RangePartitioning` is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less strict than the previous HashClusteredDistribution, but looks fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's less strict. Previously we don't allow RangePartitioning to satisfy HashClusteredDistribution. I think it should be fine too.

def isShuffleExecByRequirement(
plan: ShuffleExchangeExec,
desiredClusterColumns: Seq[String],
desiredNumPartitions: Int): Boolean = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check num partitions? I think this test should focus on shuffle is added or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - I think we don't need to check number of partitions here, unless @HeartSaVioR do you have a strong opinion on this? Removed it for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relaxed check is OK for me, as long as the partition keys are asserted.

Just curious, is it due to the possibility where the number of partitions is different than the config, or just that we don't need to be strict?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well, we don't strictly require the number of partitions. The test came from checking stateful operator, which should strictly require the number of partitions (with AQE disabled). It seems better to remove the check.


withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shall we test when this conf is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - my bad, fixed.

*/
case class ClusteredDistribution(
clustering: Seq[Expression],
requiredNumPartitions: Option[Int] = None) extends Distribution {
requiredNumPartitions: Option[Int] = None,
requiredAllClusterKeys: Boolean = SQLConf.get.getConf(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call this requireAllClusterKeys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - sure, updated.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, and if we are open to rearrange fields, it seems to be better grouping this with partition expression. Just a 2 cents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR - I agree that would look better. I was originally putting the new field after clustering, but found several other places need code change. Updated now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can move forward since at least two of us think it be better. :)

*/
def areAllClusterKeysMatched(expressions: Seq[Expression]): Boolean = {
expressions.length == clustering.length &&
expressions.zip(clustering).forall {
Copy link
Member

@sunchao sunchao Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For aggregate or window, I'm not sure whether we have any reordering mechanism similar to join. If not, this could be very limited? for instance if users have group by x, y, z while the data distribution is y, z, x, then they have to rewrite the queries to match the distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes back to same discussion here - #35574 (comment) . I am more inclined to require same ordering. But if quorum of folks here think we should relax, then I am also fine. cc @cloud-fan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I forgot this is discussed already (and I participated in the thread too... 😓 ). I'm fine with more strict ordering to start with.

@c21 c21 changed the title [SPARK-38237][SQL][SS] Allow HashPartitioning to satisfy ClusteredDistribution only with full clustering keys [SPARK-38237][SQL][SS] Allow ClusteredDistribution to require full clustering keys Feb 24, 2022
@c21
Copy link
Contributor Author

c21 commented Feb 25, 2022

Addressed all comments, and the PR is ready for review again, thanks.
cc @cloud-fan, @sunchao, @viirya, @HeartSaVioR, @sigmod.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks for driving through the work, and also your patience!

I'd like to defer signing-off to other committers since I don't have strong expertise on this.

*/
case class ClusteredDistribution(
clustering: Seq[Expression],
requireAllClusterKeys: Boolean = SQLConf.get.getConf(
Copy link
Contributor

@cloud-fan cloud-fan Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's less breaking to put new parameter at the end, so that some caller-side code can remain unchanged.

Copy link
Contributor Author

@c21 c21 Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - I agree for the point of caller-side code unchanged. I guess it's just feeling more coherent for others to read and understand code, when putting clustering and requireAllClusterKeys together. This was raised by #35574 (comment) by @HeartSaVioR as well. I am curious would adding the field in the middle here break other external library depending on Spark? I guess otherwise reviewers already paid the cost of time to review this PR, so not sure how important to change the caller-side code back. Just want to understand more here and I am open to change back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK maybe it's not a big deal

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for one comment

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

.doc("When true, the planner requires all the clustering keys as the partition keys " +
"(with same ordering) of the children, to eliminate the shuffle for the operator that " +
"requires its children be clustered distributed, such as AGGREGATE and WINDOW node. " +
"This is to avoid data kews which can lead to significant performance regression if " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: kews -> skews

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - ah my bad, fixed.

Copy link
Contributor

@sigmod sigmod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @c21!

@sunchao sunchao closed this in dc153f5 Feb 25, 2022
@sunchao
Copy link
Member

sunchao commented Feb 25, 2022

Committed to master, thanks @c21 !

@c21
Copy link
Contributor Author

c21 commented Feb 25, 2022

Thank you @sunchao, @HeartSaVioR, @cloud-fan, @viirya and @sigmod for discussion and review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants