Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Oct 10, 2025

Closes #788

Summary by CodeRabbit

  • New Features

    • Fluent builder-based CLI for COBOL processing with withCopybookContents(...).load(...).save(...) flow.
    • New Spark-enabled processor with builder API and optional multithreaded parallel file processing.
    • Serializable record-processor support for distributed/Spark usage.
  • Refactor

    • Simplified per-record processing to use a unified processing context and streamlined builder entry point.
  • Documentation

    • README updated with builder examples and an experimental Spark processing section.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 10, 2025

Walkthrough

Updates the record-processing API to use CobolProcessorContext, refactors CobolProcessor to a fluent builder with load/save workflow, introduces CobolProcessorImpl and CobolProcessorLoader, adds SerializableRawRecordProcessor, and adds a new SparkCobolProcessor with parallel per-file processing. Tests and README are updated accordingly.

Changes

Cohort / File(s) Summary
Public API: context + builder
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala, cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala, cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala, cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
Replaced RawRecordProcessor.processRecord signature to (record: Array[Byte], ctx: CobolProcessorContext). Added CobolProcessorContext. CobolProcessor exposes a no-arg builder with fluent methods (withCopybookContents, withRecordProcessor, option(s), load, save). StreamProcessor now constructs and passes CobolProcessorContext per record.
Processor implementation and loader
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala, cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
Added CobolProcessorImpl implementing processing logic and getRecordExtractor. Introduced CobolProcessorLoader to encapsulate file-based load/save flows and resource/exception handling; builder.load(...) returns a loader that can save(...) producing processed record counts.
Serializable processor for Spark
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala
Added trait SerializableRawRecordProcessor extending RawRecordProcessor with Serializable for use in distributed/Spark contexts.
Stream orchestration changes
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
StreamProcessor now invokes recordProcessor.processRecord(record, CobolProcessorContext(...)) replacing prior multi-parameter call; header/footer handling unchanged.
Tests & test fixtures (parser)
cobol-parser/src/test/.../CobolProcessorBuilderSuite.scala, cobol-parser/src/test/.../impl/CobolProcessorImplSuite.scala, cobol-parser/src/test/.../base/BinaryFileFixture.scala
Tests updated to use builder.withCopybookContents and new RawRecordProcessor signature; added CobolProcessorImplSuite for extractor behavior; new BinaryFileFixture for temporary file I/O and helpers.
Spark processing feature & tests
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala, spark-cobol/src/test/.../SparkCobolProcessorSuite.scala, spark-cobol/src/test/.../source/fixtures/BinaryFileFixture.scala
New SparkCobolProcessor with builder (withCopybookContents, withRecordProcessor, withMultithreaded, option(s), load) and loader.save that parallelizes per-file processing using a thread pool/Futures and aggregates counts. Added Spark test suite and binary read/write helpers.
Docs
README.md
Updated examples and documentation to show the builder-based API, new RawRecordProcessor signature, and experimental Spark processing path.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant U as User
  participant B as CobolProcessor.Builder
  participant L as CobolProcessor.Loader
  participant I as CobolProcessorImpl
  participant SP as StreamProcessor
  participant RP as RawRecordProcessor

  U->>B: builder().withCopybookContents(...).withRecordProcessor(...).option(s)...
  U->>B: load(inputPath)
  B-->>U: Loader
  U->>L: save(outputPath)
  L->>I: create CobolProcessorImpl(...)
  L->>I: process(inputStream, outputStream)(RP)
  I->>I: getRecordExtractor(...)
  I->>SP: processStream(copybook, options, dataStream, extractor, RP, output)
  loop for each record
    SP->>RP: processRecord(record, CobolProcessorContext(copybook, options, offset))
    RP-->>SP: updatedRecord
    SP-->>I: write(updatedRecord)
  end
  I-->>L: count
  L-->>U: count
Loading
sequenceDiagram
  autonumber
  participant U as User
  participant SB as SparkCobolProcessor.Builder
  participant SL as SparkCobolProcessor.Loader
  participant SC as SparkCobolProcessor (runtime)
  participant T as Thread Pool
  participant CP as CobolProcessor (per task)

  U->>SB: builder(spark).withCopybookContents(...).withRecordProcessor(...).withMultithreaded(n).option(s)
  U->>SB: load(Seq[file...])
  SB-->>U: Loader
  U->>SL: save(outputPath)
  SL->>SC: build runtime processor
  SC->>T: submit tasks per file (<= n threads)
  loop each file
    T->>CP: process file via CobolProcessorImpl.process(...)(SerializableRawRecordProcessor)
  end
  T-->>SC: counts per file
  SC-->>U: total count
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related issues

Possibly related PRs

Poem

I nibble bytes and hop through streams,
Builders bloom like carrot dreams.
Context tucked beneath my paw,
Threads hum softly — processing law.
Counted records, tidy night, I thump — delight! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Out of Scope Changes Check ⚠️ Warning The pull request introduces extensive public-API and builder-pattern changes in the core cobol-parser module—including a new RawRecordProcessor signature, CobolProcessorContext, CobolProcessorLoader, and a fluent builder overhaul—that are not described in the linked issue, which focused solely on adding a Spark executor builder in the spark-cobol module. Split the core API refactoring into a separate pull request or clearly document how these foundational changes enable the spark-cobol implementation to keep the scope aligned with the original issue.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the primary change—adding a Spark-based mainframe file processor that uses RDDs—and directly relates to the core new functionality introduced in this changeset. It clearly conveys that a Spark executor solution has been added without extraneous detail. Although it includes the issue number, the wording remains specific and focused on the main feature.
Linked Issues Check ✅ Passed The pull request implements the requested Spark‐based builder in the spark-cobol module, adds parallel execution across input files via a fixed thread pool and Futures, and includes comprehensive validation and functional tests to verify copybook and processor configuration, satisfying the objectives of issue #788.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/788-add-cobol-processor-on-executors

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Oct 10, 2025

JaCoCo code coverage report - 'cobol-parser'

Overall Project 91.75% -0.44% 🍏
Files changed 54.11% 🍏

File Coverage
CobolProcessorContext.scala 100%
CobolProcessorImpl.scala 94.85% -5.15% 🍏
StreamProcessor.scala 80.72% 🍏
CobolProcessor.scala 69.06% -27.62% 🍏

@github-actions
Copy link

github-actions bot commented Oct 10, 2025

JaCoCo code coverage report - 'spark-cobol'

File Coverage [84.74%] 🍏
SparkCobolProcessor.scala 84.74% 🍏
Total Project Coverage 80.16% 🍏

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala (1)

33-37: Prefer Scala lambda over Java anonymous inner class.

The recursive deletion logic is correct, but the Consumer can be written more idiomatically in Scala.

Apply this diff to use a Scala lambda:

-    Files.walk(tmpPath)
-      .sorted(Comparator.reverseOrder())
-      .forEach(new Consumer[Path] {
-        override def accept(f: Path): Unit = Files.delete(f)
-      })
+    Files.walk(tmpPath)
+      .sorted(Comparator.reverseOrder())
+      .forEach((f: Path) => Files.delete(f))
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9873a3d and 104a8d2.

📒 Files selected for processing (13)
  • README.md (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (3 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (8)
  • processor (127-129)
  • processor (131-135)
  • processor (137-137)
  • CobolProcessor (48-193)
  • builder (190-192)
  • withCopybookContents (93-96)
  • build (54-63)
  • options (122-125)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (1)
  • processor (63-76)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
  • ByteStreamMock (21-55)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (2)
  • RecordFormat (21-41)
  • VariableLength (24-24)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
  • FixedRecordLengthRawRecordExtractor (19-56)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala (1)
  • TextFullRecordExtractor (32-196)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
  • ReaderParameters (80-135)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala (2)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
  • processRecord (42-44)
  • processRecord (70-72)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
  • processRecord (32-34)
  • processRecord (91-93)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)
  • options (118-121)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • FileUtils (35-235)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (3)
  • processor (63-76)
  • CobolProcessorImpl (41-77)
  • process (45-61)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • CobolParametersParser (39-1008)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala (1)
  • FSStream (21-62)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
  • fromReaderParameters (75-150)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (6)
  • SparkCobolProcessor (45-242)
  • builder (150-152)
  • load (54-59)
  • load (61-79)
  • withCopybookContents (81-84)
  • save (130-147)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (3)
  • withTempDirectory (71-78)
  • writeBinaryFile (116-120)
  • readBinaryFile (122-124)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (5)
  • processor (127-129)
  • processor (131-135)
  • processor (137-137)
  • CobolProcessor (48-193)
  • options (122-125)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
  • ReaderParameters (80-135)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (54-89)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
  • StreamProcessor (26-71)
  • processStream (39-70)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (12)
  • processor (127-129)
  • processor (131-135)
  • processor (137-137)
  • CobolProcessor (48-193)
  • load (65-91)
  • withCopybookContents (93-96)
  • withRecordProcessor (98-101)
  • option (110-114)
  • options (122-125)
  • save (146-187)
  • builder (190-192)
  • build (54-63)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
  • SerializableConfiguration (38-56)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • FileStreamer (37-135)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • FileUtils (35-235)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (3)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
  • ByteStreamMock (21-55)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (9)
  • builder (190-192)
  • CobolProcessor (48-193)
  • withCopybookContents (93-96)
  • processor (127-129)
  • processor (131-135)
  • processor (137-137)
  • withRecordProcessor (98-101)
  • load (65-91)
  • save (146-187)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala (3)
  • withTempDirectory (27-38)
  • writeBinaryFile (40-42)
  • readBinaryFile (44-46)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
  • CobolProcessorContext (21-23)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
  • processRecord (42-44)
  • processRecord (70-72)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (2)
  • processRecord (32-34)
  • processRecord (91-93)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)

21-23: LGTM! Clean context holder for per-record processing.

The case class appropriately encapsulates copybook, options, and offset into a single context object, simplifying method signatures and making the API more maintainable.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)

51-53: LGTM! Context-based processing simplifies the API.

The context object is correctly constructed with the current copybook, options, and offset, then passed to the per-record processor. This aligns with the new RawRecordProcessor signature and improves API maintainability.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala (1)

19-29: LGTM! Well-documented marker trait for Spark serialization.

The trait appropriately extends Serializable to enable distributed processing in Spark, and the documentation clearly explains the usage patterns for standalone vs. Spark applications.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala (1)

26-79: LGTM! Comprehensive test coverage for record extraction.

The test suite thoroughly validates getRecordExtractor behavior for fixed-length, variable-length, and error cases. The use of ByteStreamMock provides good test isolation.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (1)

25-108: LGTM! Comprehensive validation and end-to-end test coverage.

The test suite thoroughly validates builder constraints and includes an end-to-end test that exercises the full Spark-based processing pipeline from input to output verification.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala (1)

24-24: New processRecord signature applied; ensure documentation updated.
All code and tests now use processRecord(record, ctx)—no occurrences of the old four-parameter signature remain. Verify that release notes or the migration guide document this breaking API change.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (3)

163-163: Set numSlices to bound task count and align with grouping

Specify the partition count to one per file-group.

-    val rdd = spark.sparkContext.parallelize(groupedFiles)
+    val rdd = spark.sparkContext.parallelize(groupedFiles, groupedFiles.size)

139-141: Remove unused parameter copybookContents in executor path

copybookContents is passed through but never used in executor functions. Drop it to simplify the API.

-          getFileProcessorRdd(listOfFiles, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
+          getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
-  private def getFileProcessorRdd(listOfFiles: Seq[String],
-                                  outputPath: String,
-                                  copybookContents: String,
-                                  cobolProcessor: CobolProcessor,
+  private def getFileProcessorRdd(listOfFiles: Seq[String],
+                                  outputPath: String,
+                                  cobolProcessor: CobolProcessor,
                                   rawRecordProcessor: SerializableRawRecordProcessor,
                                   sconf: SerializableConfiguration,
                                   numberOfThreads: Int
                                  )(implicit spark: SparkSession): RDD[Long] = {
     val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq
     val rdd = spark.sparkContext.parallelize(groupedFiles)
-    rdd.map(group => {
-      processListOfFiles(group, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
+    rdd.map(group => {
+      processListOfFiles(group, outputPath, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
     })
-  private def processListOfFiles(listOfFiles: Seq[String],
-                                 outputPath: String,
-                                 copybookContents: String,
-                                 cobolProcessor: CobolProcessor,
+  private def processListOfFiles(listOfFiles: Seq[String],
+                                 outputPath: String,
+                                 cobolProcessor: CobolProcessor,
                                  rawRecordProcessor: SerializableRawRecordProcessor,
                                  sconf: SerializableConfiguration,
                                  numberOfThreads: Int
                                 ): Long = {

Also applies to: 154-161, 165-166, 169-176


118-121: Trim keys in bulk options for consistency with single-key setter

Mirror the option() behavior to avoid keys with leading/trailing spaces.

-      caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
+      caseInsensitiveOptions ++= options.map { case (k, v) => (k.trim.toLowerCase, v) }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 104a8d2 and fb9e55c.

📒 Files selected for processing (2)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala
🧰 Additional context used
🧬 Code graph analysis (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (5)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (12)
  • processor (127-129)
  • processor (131-135)
  • processor (137-137)
  • CobolProcessor (48-193)
  • load (65-91)
  • withCopybookContents (93-96)
  • withRecordProcessor (98-101)
  • option (110-114)
  • options (122-125)
  • save (146-187)
  • builder (190-192)
  • build (54-63)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
  • SerializableConfiguration (38-56)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • FileStreamer (37-135)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (1)
  • FileUtils (35-235)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolBuilder.scala (1)
  • copybookContents (23-25)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
🔇 Additional comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)

130-144: CobolProcessorImpl is already Serializable — it extends CobolProcessor with Serializable, so capturing it in the RDD closure won’t trigger a NotSerializableException.

Likely an incorrect or invalid review comment.

Comment on lines +181 to +183
val fileName = new Path(inputFIle).getName
val outputPathFileName = new Path(outputPath, fileName).toString

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid output overwrites: base-name collisions will clobber files

Using only getName for output causes collisions (same filename in different dirs) and data loss, especially with overwrite=true. Generate a unique target (e.g., encode relative path or append a short hash of the full input path) and use it for both the preview path and outputFile.

Example:

val safeName = s"$fileName.${Integer.toHexString(inputFIle.hashCode)}"
val outputPathFileName = new Path(outputPath, safeName).toString
// ...
val outputFile = new Path(outputPath, safeName)

Also applies to: 190-191

🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala
around lines 181-183 (and similarly 190-191), the code currently uses only the
input file base name which can collide across different directories; change this
to a stable, unique target name by combining the base name with a short
deterministic fingerprint of the full input path (for example append
Integer.toHexString(inputFile.hashCode) or a truncated SHA-1 of the full path)
and use that single safeName when constructing both the preview path and the
outputFile Path so files from different source dirs don’t clobber each other;
ensure the generated name is filesystem-safe (replace/encode path separators)
and update all Path constructors at the referenced lines to use the new
safeName.

Comment on lines +186 to +231
Future {
val hadoopConfig = sconf.value
val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
val ifs = new FileStreamer(inputFIle, inputFs)
val outputFile = new Path(outputPath, fileName)
val outputFs = outputFile.getFileSystem(hadoopConfig)
val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))

var originalException: Throwable = null

val recordCount = try {
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
} catch {
case ex: Throwable =>
originalException = ex
0L
} finally {
// Ugly code to ensure no exceptions escape unnoticed.
try {
ifs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}

try {
ofs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}
}

if (originalException != null) throw originalException

log.info(s"Writing to $outputFile succeeded!")
recordCount
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix resource leak on stream acquisition failures; create streams inside try/finally and null-check closes

If outputFs.create(...) (or earlier) throws, ifs is never closed because both streams are created before the try. Move acquisition into the try block, keep ifs/ofs as vars, close them with null checks in finally, and ensure the output dir exists.

       Future {
-        val hadoopConfig = sconf.value
-        val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
-        val ifs = new FileStreamer(inputFIle, inputFs)
-        val outputFile = new Path(outputPath, fileName)
-        val outputFs = outputFile.getFileSystem(hadoopConfig)
-        val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
-
-        var originalException: Throwable = null
-
-        val recordCount = try {
-          cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
-        } catch {
-          case ex: Throwable =>
-            originalException = ex
-            0L
-        } finally {
-          // Ugly code to ensure no exceptions escape unnoticed.
-          try {
-            ifs.close()
-          } catch {
-            case e: Throwable =>
-              if (originalException != null) {
-                originalException.addSuppressed(e)
-              } else {
-                originalException = e
-              }
-          }
-
-          try {
-            ofs.close()
-          } catch {
-            case e: Throwable =>
-              if (originalException != null) {
-                originalException.addSuppressed(e)
-              } else {
-                originalException = e
-              }
-          }
-        }
+        val hadoopConfig = sconf.value
+        val outputFile = new Path(outputPath, fileName)
+        val outputFs = outputFile.getFileSystem(hadoopConfig)
+
+        var ifs: FileStreamer = null
+        var ofs: BufferedOutputStream = null
+        var originalException: Throwable = null
+
+        val recordCount =
+          try {
+            val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
+            ifs = new FileStreamer(inputFIle, inputFs)
+            // Ensure the output directory exists
+            outputFs.mkdirs(new Path(outputPath))
+            ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
+            cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
+          } catch {
+            case ex: Throwable =>
+              originalException = ex
+              0L
+          } finally {
+            // Ensure no exceptions escape unnoticed.
+            try {
+              if (ifs != null) ifs.close()
+            } catch {
+              case e: Throwable =>
+                if (originalException != null) originalException.addSuppressed(e) else originalException = e
+            }
+            try {
+              if (ofs != null) ofs.close()
+            } catch {
+              case e: Throwable =>
+                if (originalException != null) originalException.addSuppressed(e) else originalException = e
+            }
+          }
 
         if (originalException != null) throw originalException
 
         log.info(s"Writing to $outputFile succeeded!")
         recordCount
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Future {
val hadoopConfig = sconf.value
val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
val ifs = new FileStreamer(inputFIle, inputFs)
val outputFile = new Path(outputPath, fileName)
val outputFs = outputFile.getFileSystem(hadoopConfig)
val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
var originalException: Throwable = null
val recordCount = try {
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
} catch {
case ex: Throwable =>
originalException = ex
0L
} finally {
// Ugly code to ensure no exceptions escape unnoticed.
try {
ifs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}
try {
ofs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}
}
if (originalException != null) throw originalException
log.info(s"Writing to $outputFile succeeded!")
recordCount
}
Future {
val hadoopConfig = sconf.value
val outputFile = new Path(outputPath, fileName)
val outputFs = outputFile.getFileSystem(hadoopConfig)
var ifs: FileStreamer = null
var ofs: BufferedOutputStream = null
var originalException: Throwable = null
val recordCount =
try {
val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
ifs = new FileStreamer(inputFIle, inputFs)
// Ensure the output directory exists
outputFs.mkdirs(new Path(outputPath))
ofs = new BufferedOutputStream(outputFs.create(outputFile, true))
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
} catch {
case ex: Throwable =>
originalException = ex
0L
} finally {
// Ensure no exceptions escape unnoticed.
try {
if (ifs != null) ifs.close()
} catch {
case e: Throwable =>
if (originalException != null) originalException.addSuppressed(e)
else originalException = e
}
try {
if (ofs != null) ofs.close()
} catch {
case e: Throwable =>
if (originalException != null) originalException.addSuppressed(e)
else originalException = e
}
}
if (originalException != null) throw originalException
log.info(s"Writing to $outputFile succeeded!")
recordCount
}
🤖 Prompt for AI Agents
In
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala
around lines 186 to 231, the code currently creates both input and output
streams before entering the try block so if outputFs.create(...) (or earlier)
throws the input stream (ifs) is never closed; refactor by declaring ifs and ofs
as vars initialized to null, move the FileStreamer and output stream creation
inside the try block, ensure the output directory exists (create parent dirs on
outputFs) before creating the output stream, and in the finally block close ifs
and ofs only if they are non-null while preserving and suppressing any original
exception.

@yruslan yruslan merged commit 869eb29 into master Oct 10, 2025
7 checks passed
@yruslan yruslan deleted the feature/788-add-cobol-processor-on-executors branch October 10, 2025 07:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow EBCDIC raw record processors to run on Spark executors

2 participants