Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EOFException when reading large file (~1,5Gb) #442

Closed
A-SMDev opened this issue Nov 28, 2021 · 9 comments
Closed

EOFException when reading large file (~1,5Gb) #442

A-SMDev opened this issue Nov 28, 2021 · 9 comments
Assignees
Labels
bug Something isn't working

Comments

@A-SMDev
Copy link

A-SMDev commented Nov 28, 2021

Describe the bug

Hello
I have a problem when I start processing with a large file, but not with a smaller file

thanks in advance

To Reproduce

Env = Databricks 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

val df = spark
.read
.format("cobol")
.option("copybook", copybookPath)
.option("encoding", "ascii")
.option("is_text", "true")
.option("schema_retention_policy", "collapse_root")
.option("drop_value_fillers", "false")
.load(dataPath)
df.count()

or with
.option("record_format", "D")

Error Log

Job aborted due to stage failure.
Caused by: EOFException: Attempted to seek or read past the end of the file
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.seek(AbfsInputStream.java:365)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
at za.co.absa.cobrix.spark.cobol.source.streaming.BufferedFSDataInputStream.(BufferedFSDataInputStream.scala:30)
at za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer.(FileStreamer.scala:44)
at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.$anonfun$buildScanForVarLenIndex$2(CobolScanners.scala:52)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1305)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1305)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3036)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.EOFException: Attempted to seek or read past the end of the file
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.seek(AbfsInputStream.java:365)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
at za.co.absa.cobrix.spark.cobol.source.streaming.BufferedFSDataInputStream.(BufferedFSDataInputStream.scala:30)
at za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer.(FileStreamer.scala:44)
at za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners$.$anonfun$buildScanForVarLenIndex$2(CobolScanners.scala:52)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@A-SMDev A-SMDev added the bug Something isn't working label Nov 28, 2021
@yruslan
Copy link
Collaborator

yruslan commented Nov 29, 2021

Hi, This is a very interesting case. The way data files are processed is

  1. Cobrix creates indexes for input files containing starting file offsets for each index entry
  2. Parallel record processing of each index block.

Not sure how it happens, but it looks like a file is longer when indexes are generated. So when it comes to the actual processing the worker cannot seek an index offset since it is greater than the length of the file.

As a workaround, try using the older version of Cobrix, say, 2.2.3, and the following options:

val df = spark
  .read
  .format("cobol")
  .option("copybook", copybookPath)
  .option("encoding", "ascii")
  .option("is_text", "true")
  .option("is_record_sequence", "false")
  .option("schema_retention_policy", "collapse_root")
  .option("drop_value_fillers", "false")
  .load(dataPath)

Let me know if it works. Older versions of Cobrix used native Spark tools for splitting text files. This is not used anymore in favor of the indexing approach, so switching to 2.2.3 or 2.2.0 might work, but not older, since older versions support only hdfs.

In order to actually reproduce and fix the bug, please, answer the following questions:

  1. How many files and what's the actual size of dataPath?
  2. Which storage type is used ('hdfs://', 's3://', ...)?
  3. You are using Databricks on Azure, right?

@A-SMDev
Copy link
Author

A-SMDev commented Nov 29, 2021

Hi, thanks for your quick response

I will test with these recommendations, and here are my answers to your questions :

How many files and what's the actual size of dataPath? for the moment 1 and the size is 1,15 Go, on the other hand, in the long term it will be ~100 files
Which storage type is used ('hdfs://', 's3://', ...)? it's adfs, but we use a file mount (so dbfs/)
You are using Databricks on Azure, right? Yes, we use Azure

@A-SMDev
Copy link
Author

A-SMDev commented Nov 29, 2021

I just did the test, I no longer have the error but I have not output data

Output

df: org.apache.spark.sql.DataFrame = [ENT......]
res0: Long = 0

Warning when running code

ANTLR Tool version 4.7.2 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7.2 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.7.2 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7.2 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.7.2 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7.2 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.7.2 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7.2 used for parser compilation does not match the current runtime version 4.8copybookPath: String = dbfs:/mnt/....

@yruslan
Copy link
Collaborator

yruslan commented Nov 29, 2021

Great, thanks for the quick response! Will try to reproduce on a 2GB text file on Databricks on AWS. Hopefully, will hit the same issue

@yruslan
Copy link
Collaborator

yruslan commented Nov 29, 2021

df: org.apache.spark.sql.DataFrame = [ENT......]
res0: Long = 0

This is VERY strange

@yruslan
Copy link
Collaborator

yruslan commented Nov 29, 2021

I was able to reproduce an error on DBFS, but not the same as the one you have. Probably because I'm on AWS and you are on Azure. Will look into it further.

A workaround is to disable indexing:

val df = spark
  .read
  .format("cobol")
  .option("copybook", copybookPath)
  .option("record_format", "D")
  .option("enable_indexes", "false")
  .option("drop_value_fillers", "false")
  .load(dataPath)

The processing will be slower. We will try to fix the issue with dbfs/aws, hopefully the fix will work on dbfs/azure.

@A-SMDev
Copy link
Author

A-SMDev commented Nov 29, 2021

Thank you for the response,
Indeed it works but it's very slow

image

I'll wait for the fix!

@yruslan
Copy link
Collaborator

yruslan commented Dec 2, 2021

This should be fixed in 2.4.5, which should be available in Maven Central in a couple of hours.

Please, let me know if this fixes the issue on your end.

@A-SMDev
Copy link
Author

A-SMDev commented Dec 6, 2021

Hi
Thank you for your help, indeed it works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants