[SPARK-52228][SS][PYSPARK] Construct the benchmark purposed TWS state server with in-memory state impls and the benchmark code in python #50952
+782
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR proposes to introduce the benchmark tool which can perform performance test with state interactions between TWS state server and Python worker.
Since it requires two processes (JVM and Python) with socket connection between the two, we are not going to follow the benchmark suites we have in SQL module as of now. We leave the tool to run manually. It'd be ideal if we can make this to be standardized with existing benchmark suites as well as running automatically, but this is not an immediate goal.
Why are the changes needed?
It has been very painful to run the benchmark and look into the performance of state interactions. It required adding debug logs and running E2E queries, which is really so much work just to see the numbers.
For example, after this benchmark tool has introduced, we can verify the upcoming improvements w.r.t. state interactions - for example, we still have spots to use Arrow in state interactions, and I think this tool can show the perf benefit for the upcoming fix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually tested.
./dev/make-distribution.sh
cd dist
java -classpath "./jars/*" --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED org.apache.spark.sql.execution.python.streaming.BenchmarkTransformWithStateInPySparkStateServer
cd python
python3 pyspark/sql/streaming/benchmark/benchmark_tws_state_server.py <port that state server use> <state type> <params if required>
For Python process, it is required to install libraries PySpark required first (including numpy since it's used in the benchmark).
Result will be printed out like following (NOTE: I ran the same benchmark 3 times):
https://gist.github.com/HeartSaVioR/fa4805af4d7a4dc9789c8e3437506be1
Was this patch authored or co-authored using generative AI tooling?
No.