Skip to content

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Oct 17, 2025

What changes were proposed in this pull request?

This PR proposes to support changing stateless shuffle partitions upon restart of streaming query.

We don't introduce a new config or se - users can simply do the below to change the number of shuffle partitions:

  • stop the query
  • change the value of spark.sql.shuffle.partitions
  • restart the query to take effect

Note that state partitions are still fixed and be unchanged from this. That said, the value of spark.sql.shuffle.partitions for batch 0 will be the number of state partitions and does not change even if the value of the config has changed upon restart.

As an implementation detail, this PR adds a new "internal" SQL config spark.sql.streaming.internal.stateStore.partitions to distinguish stateless shuffle partitions vs stateful shuffle partitions. Unlike other internal configs where we still expect someone (admin?) to use them, this config is NOT meant to be an user facing one and no one should set this up directly. We add this config to implement trick for compatibility, nothing else. We don't support compatibility of this config and there's no promise the config to be available in future. This PR states this as a WARN in the config description.

That said, the value of the new config is expected to be inherited from spark.sql.shuffle.partitions assuming no one will set this up directly.

To support compatibility, we employ a trick into offset log - for stateful shuffle partitions, we refer it to spark.sql.streaming.internal.stateStore.partitions in session config, and we keep using spark.sql.shuffle.partitions in offset log. We handle rebinding between two configs to leave the persistent layer unchanged. This way we can support the query to be both upgraded/downgraded.

Why are the changes needed?

Whenever there is need to change the parallelism of the processing e.g. input volume being changed over time, the size of static table changed over time, skew in stream-static join (though AQE may help resolving this a bit), the only official approach to deal with this was to discard checkpoint and start a new query, implying they have to do full backfill. (For workloads with FEB sink, advanced (and adventurous) users could change the config in their user function, but that's arguably a hack.) Having to discard checkpoint is a one of major pains to use Structured Streaming, and we want to address one of the known reasons.

Does this PR introduce any user-facing change?

Yes, user can change shuffle partitions for stateless operators upon restart, via changing the config spark.sql.shuffle.partitions.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR HeartSaVioR changed the title [WIP][DO-NOT-MERGE][SPARK-XXXXX] Support changing stateless shuffle partitions upon restart of streaming query [WIP][DO-NOT-MERGE][SPARK-53942] Support changing stateless shuffle partitions upon restart of streaming query Oct 17, 2025
@HeartSaVioR
Copy link
Contributor Author

I'll update the PR description once I confirmed CI passes.

@HeartSaVioR HeartSaVioR force-pushed the WIP-change-stateless-shuffle-partitions-in-streaming-query branch from b5f2f89 to 9c36853 Compare October 17, 2025 09:53
@HeartSaVioR HeartSaVioR changed the title [WIP][DO-NOT-MERGE][SPARK-53942] Support changing stateless shuffle partitions upon restart of streaming query [SPARK-53942] Support changing stateless shuffle partitions upon restart of streaming query Oct 17, 2025
@HeartSaVioR
Copy link
Contributor Author

cc. @viirya @anishshri-db Please take a look. Thanks!

@HeartSaVioR HeartSaVioR changed the title [SPARK-53942] Support changing stateless shuffle partitions upon restart of streaming query [SPARK-53942][SS] Support changing stateless shuffle partitions upon restart of streaming query Oct 17, 2025
@HeartSaVioR
Copy link
Contributor Author

https://github.com/HeartSaVioR/spark/runs/53010070456
It's only failing on docker integration test which I don't think is relevant.

}
}

test("SPARK-53942: changing the number of stateless shuffle partitions via config") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests with golden files as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point. Will add.

@anishshri-db
Copy link
Contributor

@HeartSaVioR - what if the user did not intend to change the num partitions value for the stateful operation within the sink (lets say FEB sink) side ? Suppose they have N running streaming queries and they change shuffle partitions like today to set this config for the next/new query. Now, this value will also apply to all existing queries on restart (whether they intend for it or not) Do we think this is a breaking change ? Should we guard the feature with a separate flag/config so that users can opt-in to this functionality ?


package org.apache.spark.sql.execution.streaming.checkpointing

import scala.language.existentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to explicitly state we want to use the feature in Scala 2 which is going to be dropped in Scala 3.

[error] /Users/jungtaek.lim/WorkArea/ScalaProjects/spark-apache/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala:170:21: inferred existential type org.apache.spark.internal.config.ConfigEntry[_1]( forSome { type _1 >: _$2 with T; type _$2 }), which cannot be expressed by wildcards, should be enabled
[error] by making the implicit value scala.language.existentials visible.
[error] This can be achieved by adding the import clause 'import scala.language.existentials'
[error] or by setting the compiler option -language:existentials.
[error] See the Scaladoc for value scala.language.existentials for a discussion
[error] why the feature should be explicitly enabled.
[error] Applicable -Wconf / @nowarn filters for this fatal warning: msg=<part of the message>, cat=feature-existentials, site=org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata.readValueOpt.actualKey
[error]     val actualKey = if (rebindSQLConfsSessionToOffsetLog.contains(confKey)) {
[error]                     ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed

)
}

// Trying to change the number of stateful shuffle partitions, which should be ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for couple more operators like TWS, stream-stream join etc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we want to have more test coverage, but I think we need a balance since we split test suites in SQL for multiple modules to deal with Github Action. We know this won't be different among stateful operators. Please let me know if you have a strong voice with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants