-
Notifications
You must be signed in to change notification settings - Fork 86
#788 Add mainframe file processor that runs in Spark via RDDs #789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughUpdates the record-processing API to use CobolProcessorContext, refactors CobolProcessor to a fluent builder with load/save workflow, introduces CobolProcessorImpl and CobolProcessorLoader, adds SerializableRawRecordProcessor, and adds a new SparkCobolProcessor with parallel per-file processing. Tests and README are updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant U as User
participant B as CobolProcessor.Builder
participant L as CobolProcessor.Loader
participant I as CobolProcessorImpl
participant SP as StreamProcessor
participant RP as RawRecordProcessor
U->>B: builder().withCopybookContents(...).withRecordProcessor(...).option(s)...
U->>B: load(inputPath)
B-->>U: Loader
U->>L: save(outputPath)
L->>I: create CobolProcessorImpl(...)
L->>I: process(inputStream, outputStream)(RP)
I->>I: getRecordExtractor(...)
I->>SP: processStream(copybook, options, dataStream, extractor, RP, output)
loop for each record
SP->>RP: processRecord(record, CobolProcessorContext(copybook, options, offset))
RP-->>SP: updatedRecord
SP-->>I: write(updatedRecord)
end
I-->>L: count
L-->>U: count
sequenceDiagram
autonumber
participant U as User
participant SB as SparkCobolProcessor.Builder
participant SL as SparkCobolProcessor.Loader
participant SC as SparkCobolProcessor (runtime)
participant T as Thread Pool
participant CP as CobolProcessor (per task)
U->>SB: builder(spark).withCopybookContents(...).withRecordProcessor(...).withMultithreaded(n).option(s)
U->>SB: load(Seq[file...])
SB-->>U: Loader
U->>SL: save(outputPath)
SL->>SC: build runtime processor
SC->>T: submit tasks per file (<= n threads)
loop each file
T->>CP: process file via CobolProcessorImpl.process(...)(SerializableRawRecordProcessor)
end
T-->>SC: counts per file
SC-->>U: total count
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
JaCoCo code coverage report - 'cobol-parser'
|
JaCoCo code coverage report - 'spark-cobol'
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala (1)
33-37: Prefer Scala lambda over Java anonymous inner class.The recursive deletion logic is correct, but the Consumer can be written more idiomatically in Scala.
Apply this diff to use a Scala lambda:
- Files.walk(tmpPath) - .sorted(Comparator.reverseOrder()) - .forEach(new Consumer[Path] { - override def accept(f: Path): Unit = Files.delete(f) - }) + Files.walk(tmpPath) + .sorted(Comparator.reverseOrder()) + .forEach((f: Path) => Files.delete(f))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
README.md(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(4 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala(2 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala(3 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala(1 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (8)
processor(127-129)processor(131-135)processor(137-137)CobolProcessor(48-193)builder(190-192)withCopybookContents(93-96)build(54-63)options(122-125)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (1)
processor(63-76)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
ByteStreamMock(21-55)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (2)
RecordFormat(21-41)VariableLength(24-24)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
FixedRecordLengthRawRecordExtractor(19-56)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala (1)
TextFullRecordExtractor(32-196)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala (2)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
processRecord(42-44)processRecord(70-72)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
processRecord(32-34)processRecord(91-93)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)
options(118-121)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
FileUtils(35-235)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (3)
processor(63-76)CobolProcessorImpl(41-77)process(45-61)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
CobolParametersParser(39-1008)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala (1)
FSStream(21-62)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
fromReaderParameters(75-150)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (6)
SparkCobolProcessor(45-242)builder(150-152)load(54-59)load(61-79)withCopybookContents(81-84)save(130-147)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (3)
withTempDirectory(71-78)writeBinaryFile(116-120)readBinaryFile(122-124)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (5)
processor(127-129)processor(131-135)processor(137-137)CobolProcessor(48-193)options(122-125)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(54-89)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-71)processStream(39-70)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (12)
processor(127-129)processor(131-135)processor(137-137)CobolProcessor(48-193)load(65-91)withCopybookContents(93-96)withRecordProcessor(98-101)option(110-114)options(122-125)save(146-187)builder(190-192)build(54-63)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
SerializableConfiguration(38-56)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
FileStreamer(37-135)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
FileUtils(35-235)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (3)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
ByteStreamMock(21-55)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (9)
builder(190-192)CobolProcessor(48-193)withCopybookContents(93-96)processor(127-129)processor(131-135)processor(137-137)withRecordProcessor(98-101)load(65-91)save(146-187)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala (3)
withTempDirectory(27-38)writeBinaryFile(40-42)readBinaryFile(44-46)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
CobolProcessorContext(21-23)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
processRecord(42-44)processRecord(70-72)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
processRecord(32-34)processRecord(91-93)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
21-23: LGTM! Clean context holder for per-record processing.The case class appropriately encapsulates copybook, options, and offset into a single context object, simplifying method signatures and making the API more maintainable.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)
51-53: LGTM! Context-based processing simplifies the API.The context object is correctly constructed with the current copybook, options, and offset, then passed to the per-record processor. This aligns with the new
RawRecordProcessorsignature and improves API maintainability.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala (1)
19-29: LGTM! Well-documented marker trait for Spark serialization.The trait appropriately extends
Serializableto enable distributed processing in Spark, and the documentation clearly explains the usage patterns for standalone vs. Spark applications.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala (1)
26-79: LGTM! Comprehensive test coverage for record extraction.The test suite thoroughly validates
getRecordExtractorbehavior for fixed-length, variable-length, and error cases. The use ofByteStreamMockprovides good test isolation.spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (1)
25-108: LGTM! Comprehensive validation and end-to-end test coverage.The test suite thoroughly validates builder constraints and includes an end-to-end test that exercises the full Spark-based processing pipeline from input to output verification.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala (1)
24-24: NewprocessRecordsignature applied; ensure documentation updated.
All code and tests now useprocessRecord(record, ctx)—no occurrences of the old four-parameter signature remain. Verify that release notes or the migration guide document this breaking API change.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala
Show resolved
Hide resolved
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala
Show resolved
Hide resolved
...k-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (3)
163-163: Set numSlices to bound task count and align with groupingSpecify the partition count to one per file-group.
- val rdd = spark.sparkContext.parallelize(groupedFiles) + val rdd = spark.sparkContext.parallelize(groupedFiles, groupedFiles.size)
139-141: Remove unused parametercopybookContentsin executor path
copybookContentsis passed through but never used in executor functions. Drop it to simplify the API.- getFileProcessorRdd(listOfFiles, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads) + getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)- private def getFileProcessorRdd(listOfFiles: Seq[String], - outputPath: String, - copybookContents: String, - cobolProcessor: CobolProcessor, + private def getFileProcessorRdd(listOfFiles: Seq[String], + outputPath: String, + cobolProcessor: CobolProcessor, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, numberOfThreads: Int )(implicit spark: SparkSession): RDD[Long] = { val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq val rdd = spark.sparkContext.parallelize(groupedFiles) - rdd.map(group => { - processListOfFiles(group, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads) + rdd.map(group => { + processListOfFiles(group, outputPath, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads) })- private def processListOfFiles(listOfFiles: Seq[String], - outputPath: String, - copybookContents: String, - cobolProcessor: CobolProcessor, + private def processListOfFiles(listOfFiles: Seq[String], + outputPath: String, + cobolProcessor: CobolProcessor, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, numberOfThreads: Int ): Long = {Also applies to: 154-161, 165-166, 169-176
118-121: Trim keys in bulk options for consistency with single-key setterMirror the
option()behavior to avoid keys with leading/trailing spaces.- caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) + caseInsensitiveOptions ++= options.map { case (k, v) => (k.trim.toLowerCase, v) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala
🧰 Additional context used
🧬 Code graph analysis (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (5)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (12)
processor(127-129)processor(131-135)processor(137-137)CobolProcessor(48-193)load(65-91)withCopybookContents(93-96)withRecordProcessor(98-101)option(110-114)options(122-125)save(146-187)builder(190-192)build(54-63)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
SerializableConfiguration(38-56)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
FileStreamer(37-135)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
FileUtils(35-235)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolBuilder.scala (1)
copybookContents(23-25)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)
130-144: CobolProcessorImpl is already Serializable — it extendsCobolProcessor with Serializable, so capturing it in the RDD closure won’t trigger aNotSerializableException.Likely an incorrect or invalid review comment.
| val fileName = new Path(inputFIle).getName | ||
| val outputPathFileName = new Path(outputPath, fileName).toString | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid output overwrites: base-name collisions will clobber files
Using only getName for output causes collisions (same filename in different dirs) and data loss, especially with overwrite=true. Generate a unique target (e.g., encode relative path or append a short hash of the full input path) and use it for both the preview path and outputFile.
Example:
val safeName = s"$fileName.${Integer.toHexString(inputFIle.hashCode)}"
val outputPathFileName = new Path(outputPath, safeName).toString
// ...
val outputFile = new Path(outputPath, safeName)Also applies to: 190-191
🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala
around lines 181-183 (and similarly 190-191), the code currently uses only the
input file base name which can collide across different directories; change this
to a stable, unique target name by combining the base name with a short
deterministic fingerprint of the full input path (for example append
Integer.toHexString(inputFile.hashCode) or a truncated SHA-1 of the full path)
and use that single safeName when constructing both the preview path and the
outputFile Path so files from different source dirs don’t clobber each other;
ensure the generated name is filesystem-safe (replace/encode path separators)
and update all Path constructors at the referenced lines to use the new
safeName.
| Future { | ||
| val hadoopConfig = sconf.value | ||
| val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig) | ||
| val ifs = new FileStreamer(inputFIle, inputFs) | ||
| val outputFile = new Path(outputPath, fileName) | ||
| val outputFs = outputFile.getFileSystem(hadoopConfig) | ||
| val ofs = new BufferedOutputStream(outputFs.create(outputFile, true)) | ||
|
|
||
| var originalException: Throwable = null | ||
|
|
||
| val recordCount = try { | ||
| cobolProcessor.process(ifs, ofs)(rawRecordProcessor) | ||
| } catch { | ||
| case ex: Throwable => | ||
| originalException = ex | ||
| 0L | ||
| } finally { | ||
| // Ugly code to ensure no exceptions escape unnoticed. | ||
| try { | ||
| ifs.close() | ||
| } catch { | ||
| case e: Throwable => | ||
| if (originalException != null) { | ||
| originalException.addSuppressed(e) | ||
| } else { | ||
| originalException = e | ||
| } | ||
| } | ||
|
|
||
| try { | ||
| ofs.close() | ||
| } catch { | ||
| case e: Throwable => | ||
| if (originalException != null) { | ||
| originalException.addSuppressed(e) | ||
| } else { | ||
| originalException = e | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (originalException != null) throw originalException | ||
|
|
||
| log.info(s"Writing to $outputFile succeeded!") | ||
| recordCount | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix resource leak on stream acquisition failures; create streams inside try/finally and null-check closes
If outputFs.create(...) (or earlier) throws, ifs is never closed because both streams are created before the try. Move acquisition into the try block, keep ifs/ofs as vars, close them with null checks in finally, and ensure the output dir exists.
Future {
- val hadoopConfig = sconf.value
- val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
- val ifs = new FileStreamer(inputFIle, inputFs)
- val outputFile = new Path(outputPath, fileName)
- val outputFs = outputFile.getFileSystem(hadoopConfig)
- val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
-
- var originalException: Throwable = null
-
- val recordCount = try {
- cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
- } catch {
- case ex: Throwable =>
- originalException = ex
- 0L
- } finally {
- // Ugly code to ensure no exceptions escape unnoticed.
- try {
- ifs.close()
- } catch {
- case e: Throwable =>
- if (originalException != null) {
- originalException.addSuppressed(e)
- } else {
- originalException = e
- }
- }
-
- try {
- ofs.close()
- } catch {
- case e: Throwable =>
- if (originalException != null) {
- originalException.addSuppressed(e)
- } else {
- originalException = e
- }
- }
- }
+ val hadoopConfig = sconf.value
+ val outputFile = new Path(outputPath, fileName)
+ val outputFs = outputFile.getFileSystem(hadoopConfig)
+
+ var ifs: FileStreamer = null
+ var ofs: BufferedOutputStream = null
+ var originalException: Throwable = null
+
+ val recordCount =
+ try {
+ val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
+ ifs = new FileStreamer(inputFIle, inputFs)
+ // Ensure the output directory exists
+ outputFs.mkdirs(new Path(outputPath))
+ ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
+ cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
+ } catch {
+ case ex: Throwable =>
+ originalException = ex
+ 0L
+ } finally {
+ // Ensure no exceptions escape unnoticed.
+ try {
+ if (ifs != null) ifs.close()
+ } catch {
+ case e: Throwable =>
+ if (originalException != null) originalException.addSuppressed(e) else originalException = e
+ }
+ try {
+ if (ofs != null) ofs.close()
+ } catch {
+ case e: Throwable =>
+ if (originalException != null) originalException.addSuppressed(e) else originalException = e
+ }
+ }
if (originalException != null) throw originalException
log.info(s"Writing to $outputFile succeeded!")
recordCount
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Future { | |
| val hadoopConfig = sconf.value | |
| val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig) | |
| val ifs = new FileStreamer(inputFIle, inputFs) | |
| val outputFile = new Path(outputPath, fileName) | |
| val outputFs = outputFile.getFileSystem(hadoopConfig) | |
| val ofs = new BufferedOutputStream(outputFs.create(outputFile, true)) | |
| var originalException: Throwable = null | |
| val recordCount = try { | |
| cobolProcessor.process(ifs, ofs)(rawRecordProcessor) | |
| } catch { | |
| case ex: Throwable => | |
| originalException = ex | |
| 0L | |
| } finally { | |
| // Ugly code to ensure no exceptions escape unnoticed. | |
| try { | |
| ifs.close() | |
| } catch { | |
| case e: Throwable => | |
| if (originalException != null) { | |
| originalException.addSuppressed(e) | |
| } else { | |
| originalException = e | |
| } | |
| } | |
| try { | |
| ofs.close() | |
| } catch { | |
| case e: Throwable => | |
| if (originalException != null) { | |
| originalException.addSuppressed(e) | |
| } else { | |
| originalException = e | |
| } | |
| } | |
| } | |
| if (originalException != null) throw originalException | |
| log.info(s"Writing to $outputFile succeeded!") | |
| recordCount | |
| } | |
| Future { | |
| val hadoopConfig = sconf.value | |
| val outputFile = new Path(outputPath, fileName) | |
| val outputFs = outputFile.getFileSystem(hadoopConfig) | |
| var ifs: FileStreamer = null | |
| var ofs: BufferedOutputStream = null | |
| var originalException: Throwable = null | |
| val recordCount = | |
| try { | |
| val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig) | |
| ifs = new FileStreamer(inputFIle, inputFs) | |
| // Ensure the output directory exists | |
| outputFs.mkdirs(new Path(outputPath)) | |
| ofs = new BufferedOutputStream(outputFs.create(outputFile, true)) | |
| cobolProcessor.process(ifs, ofs)(rawRecordProcessor) | |
| } catch { | |
| case ex: Throwable => | |
| originalException = ex | |
| 0L | |
| } finally { | |
| // Ensure no exceptions escape unnoticed. | |
| try { | |
| if (ifs != null) ifs.close() | |
| } catch { | |
| case e: Throwable => | |
| if (originalException != null) originalException.addSuppressed(e) | |
| else originalException = e | |
| } | |
| try { | |
| if (ofs != null) ofs.close() | |
| } catch { | |
| case e: Throwable => | |
| if (originalException != null) originalException.addSuppressed(e) | |
| else originalException = e | |
| } | |
| } | |
| if (originalException != null) throw originalException | |
| log.info(s"Writing to $outputFile succeeded!") | |
| recordCount | |
| } |
🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala
around lines 186 to 231, the code currently creates both input and output
streams before entering the try block so if outputFs.create(...) (or earlier)
throws the input stream (ifs) is never closed; refactor by declaring ifs and ofs
as vars initialized to null, move the FileStreamer and output stream creation
inside the try block, ensure the output directory exists (create parent dirs on
outputFs) before creating the output stream, and in the finally block close ifs
and ofs only if they are non-null while preserving and suppressing any original
exception.
Closes #788
Summary by CodeRabbit
New Features
Refactor
Documentation