Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid a race condition on Fixed record-length file processing. #714

Merged
merged 1 commit into from
Oct 7, 2024

Conversation

vinodkc
Copy link

@vinodkc vinodkc commented Oct 5, 2024

Spark-cobrix connectors processing fixed-length files can encounter race conditions when used concurrently in multiple threads or multiple notebooks sharing the same Spark driver. This will lead to assertion error

Py4JJavaError: An error occurred while calling o2411.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 3238.0 failed 4 times, most recent failure: Lost task 22.3 in stage 3238.0 (TID 50850) (x.x.x.x executor 27): java.lang.AssertionError: assertion failed: Byte array does not have correct length
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.SparkContext.$anonfun$binaryRecords$2(SparkContext.scala:1603)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage24.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

This is because, in buildScanForFixedLength(...) method , the object sqlContext.sparkContext.hadoopConfiguration is passed to the spark method sqlContext.sparkContext.binaryRecords(...) inside binaryRecords hadoopConfiguration is getting modified like this conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength). When multiple threads try to modify common hadoopConfiguration, there can be a race condition.

Solution: Make a local copy of hadoopConfiguration for each thread, ensuring that modifications are isolated to that thread.

Issue Ref

…ple threads or multiple notebooks sharing the same spark driver
@vinodkc vinodkc requested a review from yruslan as a code owner October 5, 2024 21:58
@yruslan yruslan merged commit 360ecff into AbsaOSS:master Oct 7, 2024
5 of 6 checks passed
@yruslan yruslan mentioned this pull request Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants