Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Sep 8, 2025

Closes #780

Summary by CodeRabbit

  • New Features

    • New processor builder API for configuring and running COBOL data processing and passing options through processing contexts.
    • Builder-based creation for record/reader contexts with options propagation to custom extractors.
  • Refactor

    • Replaced legacy builder with the new processor builder and moved internal helpers into implementation-only namespaces.
    • Improved resource handling during index generation to ensure streams are closed on errors.
  • Tests

    • Tests updated to use the new builders and to assert options are propagated.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 8, 2025

Walkthrough

Adds a new CobolProcessor builder API, replaces RecordProcessorBuilder, moves some processor classes into an impl package, introduces RawRecordContext builder and an options field propagated through CobolParameters → ReaderParameters → RawRecordContext, and updates tests to use new builders and assert options propagation.

Changes

Cohort / File(s) Summary
New public API: CobolProcessor
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
Adds CobolProcessor trait, companion with CobolProcessorBuilder(copybookContents), fluent option/options, build() returning a CobolProcessor that orchestrates reader params, schema, record extractor and delegates to StreamProcessor.
Removed legacy builder
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala
Deletes the old RecordProcessorBuilder API and its orchestration logic.
Processor internals moved to impl/
cobol-parser/src/main/scala/.../processor/impl/ArrayOfAnyHandler.scala
.../processor/impl/MapOfAnyHandler.scala
.../processor/impl/StreamProcessor.scala
Package relocated to .impl; StreamProcessor gained RawRecordProcessor import, doc update, and removal of an unused loop counter.
Parameters: propagate options
.../reader/parameters/CobolParameters.scala
.../reader/parameters/CobolParametersParser.scala
.../reader/parameters/ReaderParameters.scala
Adds options: Map[String,String] to CobolParameters and ReaderParameters; parser forwards Spark options map into these fields.
RawRecordContext builder & options
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala and many tests
Introduces RawRecordContext companion builder (entry points and builder methods: .withReaderParams, .withRdwDecoder, .withBdwDecoder, .withOptions, .withStartingRecordNumber, .build()), adds options field on RawRecordContext; call sites updated to use builder.
Reader: builder usage and fixed-length path
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
Replaces direct RawRecordContext construction with builder usage; adds FixedLength extractor path when readerProperties.recordFormat == FixedLength.
Tests: adapt to new APIs & assert propagation
cobol-parser/src/test/..., spark-cobol/src/test/... (multiple files)
Replace usages of RecordProcessorBuilder with CobolProcessor.builder(...).build(), update RawRecordContext instantiations to builder form, add CustomRecordExtractorMock.options var and assertions verifying options propagation.
Index resource handling
spark-cobol/src/main/scala/.../IndexBuilder.scala
Ensures streams are closed in a try/finally when generating index entries.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    actor User
    participant Builder as CobolProcessor.builder
    participant Proc as CobolProcessor
    participant Reader as VarLenNestedReader / RawRecordContext.Builder
    participant SP as impl.StreamProcessor
    participant RRE as RawRecordExtractor
    participant RRP as RawRecordProcessor
    User->>Builder: builder(copybook).option(...).options(...).build()
    Builder-->>User: CobolProcessor
    User->>Proc: process(inputStream, outputStream)(RRP)
    Proc->>Proc: getReaderParameters(options) → ReaderParameters(options)
    Proc->>Proc: getCobolSchema(readerParameters)
    Proc->>Reader: RawRecordContext.builder(...).withReaderParams(readerParameters).build()
    Reader-->>Proc: RawRecordContext (includes options)
    Proc->>SP: processStream(schema, options, dataStream, extractor, RRP, outputStream)
    SP->>RRE: nextRecord()
    RRE->>RRP: raw record + ctx.options
    SP-->>Proc: finished
    Proc-->>User: done (streams closed)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Assessment against linked issues

Objective Addressed Explanation
Pass spark-cobol properties to custom record extractors (#780)

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
Introduce new CobolProcessor API and builder (cobol-parser/src/main/scala/.../CobolProcessor.scala) New facade/refactor beyond the single objective of passing options to extractors.
Remove RecordProcessorBuilder file (cobol-parser/src/main/scala/.../RecordProcessorBuilder.scala) Full API removal/replacement not required solely for options propagation.
Move processor classes to .impl package (cobol-parser/src/main/scala/.../processor/impl/*.scala) Package reorganization and namespace changes are broader refactors unrelated to passing options to extractors.

Possibly related PRs

Poem

I nibble through bytes with a twitch and a hop,
Builder in paw, I stitch options on top.
From copybook to extractor the map finds its way,
Headers and streams tidy, then off I will play.
Rabbit-approved processing — hop, hop, hooray! 🐇

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/780-add-spark-cobol-options-to-record-extractors

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Sep 8, 2025

JaCoCo code coverage report - 'cobol-parser'

Overall Project 91.64% -0.04% 🍏
Files changed 90.84% 🍏

File Coverage
CobolProcessor.scala 89.56% -10.44% 🍏
StreamProcessor.scala 76.39% 🍏
VarLenNestedReader.scala 69.94% -0.47% 🍏
ArrayOfAnyHandler.scala 18.75% 🍏
MapOfAnyHandler.scala 0% 🍏

@github-actions
Copy link

github-actions bot commented Sep 8, 2025

JaCoCo code coverage report - 'spark-cobol'

File Coverage [97.06%] 🍏
IndexBuilder.scala 97.06% 🍏
Total Project Coverage 79.79% 🍏

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (17)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)

43-49: Remove unused loop counter.

Variable i is incremented but never used.

Apply:

-    var i = 0
     while (recordExtractor.hasNext) {
-      i += 1
       val record = recordExtractor.next()

31-31: Clarify options casing in Scaladoc.

Document that option keys are lower-cased for case-insensitive handling.

-   * @param options arbitrary options used for splitting input data into records. Same as options to 'spark-cobol'. Can contain custom options as well.
+   * @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options).
+   *                Keys are lower-cased for case-insensitive handling. Can contain custom options as well.
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala (1)

75-76: Good assertions to verify options propagation.

These checks validate the end-to-end flow into the extractor.

Optionally reset the captured options before each test to avoid cross-test leakage:

// e.g., at the start of each test block
CustomRecordExtractorMock.options = Map.empty
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

67-73: Prefer safe default BDW decoder instead of RDW fallback.

If bdwDecoderOpt is empty, using RDW as a BDW fallback is surprising. Default to a neutral BDW decoder to keep semantics explicit.

-      .withBdwDecoder(bdwDecoderOpt.getOrElse(rdwDecoder))
+      .withBdwDecoder(bdwDecoderOpt.getOrElse(new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = true, 0))))
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (2)

66-67: Doc: note options key casing.

Clarify that keys are lower-cased for case-insensitive handling.

-  * @param options                 Options passed to 'spark-cobol'.
+  * @param options                 Options passed to 'spark-cobol'. Keys are lower-cased.

109-111: Add default for options in CobolParameters

-                            metadataPolicy:          MetadataPolicy,
-                            options:                 Map[String, String]
+                            metadataPolicy:          MetadataPolicy,
+                            options:                 Map[String, String] = Map.empty
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)

134-135: ReaderParameters: options field looks good.

Default value and doc are appropriate.

Add a brief doc note on lower-cased keys (mirrors builder behavior):

-  * @param options                 Options passed to spark-cobol
+  * @param options                 Options passed to spark-cobol (keys are lower-cased)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (1)

55-59: Global mutable test state: consider reset for isolation.

If suites ever run in parallel, the shared var may leak across tests. Reset in each test or mark the suite as non-parallel.

// e.g., add beforeEach in a trait mixed into the suite:
override def beforeEach(): Unit = {
  CustomRecordExtractorMock.options = Map.empty
  super.beforeEach()
}
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1)

35-35: Use Long literal for clarity

Prefer 0L to avoid implicit widening.

-    val ctx = RawRecordContext.builder(0, dataStream, headerStream, null).build()
+    val ctx = RawRecordContext.builder(0L, dataStream, headerStream, null).build()
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)

45-45: Unify on Long literals for startingRecordNumber

Align with the Long-typed API for consistency.

-      val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build()
+      val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()
-      val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build()
+      val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()
-    val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build()
+    val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()

Also applies to: 88-88, 144-144

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)

154-158: Prefer 0L for the Long-typed builder parameter

Minor consistency tweak.

-    RawRecordContext.builder(0, ibs, hbs, copybook)
+    RawRecordContext.builder(0L, ibs, hbs, copybook)
       .withRdwDecoder(rdwDecoder)
       .withBdwDecoder(bdwDecoder)
       .build()
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)

84-87: Consider validating empty or whitespace-only keys.

While the case-insensitive handling is good, consider adding validation to reject empty or whitespace-only keys to prevent potential issues.

 def option(key: String, value: String): CobolProcessorBuilder = {
+  require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only")
   caseInsensitiveOptions += (key.toLowerCase -> value)
   this
 }
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)

25-26: Scaladoc tweaks (minor).

Docs read well. Optional: clarify that options is the full spark-cobol options map as received by the reader (no filtering), and whether keys/values are expected to be immutable/case-sensitive.

Also applies to: 32-33


59-63: Validate startingRecordNumber.

Guard against negative values early.

-    def withStartingRecordNumber(startingRecordNumber: Long): RawRecordContextBuilder = {
-      context = context.copy(startingRecordNumber = startingRecordNumber)
+    def withStartingRecordNumber(startingRecordNumber: Long): RawRecordContextBuilder = {
+      require(startingRecordNumber >= 0, s"startingRecordNumber must be >= 0, got: $startingRecordNumber")
+      context = context.copy(startingRecordNumber = startingRecordNumber)
       this
     }

79-82: Defensive copy for options.

Avoid surprises if a mutable Map is passed and later mutated by the caller.

-    def withOptions(options: Map[String, String]): RawRecordContextBuilder = {
-      context = context.copy(options = options)
+    def withOptions(options: Map[String, String]): RawRecordContextBuilder = {
+      context = context.copy(options = options.toMap)
       this
     }

90-101: Add missing Scaladoc for startingRecordNumber (nit).

This overload’s Scaladoc omits @param startingRecordNumber.

   /**
-    * Creates a new instance of `RawRecordContextBuilder` with the specified input stream,
+    * Creates a new instance of `RawRecordContextBuilder` with the specified starting record number,
+    * input stream,
     * header stream, and copybook. The builder allows further customization of the
     * `RawRecordContext` before building it.
@@
-    * @param inputStream  the main data stream containing the record data.
+    * @param startingRecordNumber the zero-based record number where inputStream is positioned.
+    * @param inputStream  the main data stream containing the record data.

111-121: Header stream creation assumption — verify copy semantics.

This overload presumes reading from the start and derives headerStream via inputStream.copyStream(). Please confirm copyStream returns a stream positioned at the beginning; if it clones the current position, headers might be misread when inputStream isn’t at offset 0. If uncertain, either (a) document the precondition explicitly here, or (b) add an overload taking headerStream, or (c) add a builder setter withHeaderStream for override.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 387c8a8 and bc8c400.

📒 Files selected for processing (22)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala (0 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (2 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (6 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala (6 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (3 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala (5 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (2 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala (1 hunks)
  • spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala
🧰 Additional context used
🧬 Code graph analysis (18)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
  • processor (100-102)
  • processor (104-108)
  • processor (110-127)
  • processor (129-129)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala (2)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (2)
  • CustomRecordExtractorMock (26-53)
  • CustomRecordExtractorMock (55-59)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • contains (66-69)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (9)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • withRdwDecoder (64-67)
  • withBdwDecoder (69-72)
  • withAdditionalInfo (74-77)
  • withOptions (79-82)
  • build (84-86)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/policies/MetadataPolicy.scala (2)
  • MetadataPolicy (21-34)
  • Basic (23-23)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala (1)
  • options (38-41)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala (1)
  • options (38-41)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
  • ArrayOfAnyHandler (27-33)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
  • StreamProcessor (26-66)
  • processStream (37-65)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
  • FixedRecordLengthRawRecordExtractor (19-56)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • CobolParametersParser (39-1004)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
  • ReaderParameters (80-135)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (56-99)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
  • fromReaderParameters (75-150)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (56-99)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (56-99)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
  • recordExtractor (56-99)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (7)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • withRdwDecoder (64-67)
  • withBdwDecoder (69-72)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (10)
  • builder (132-134)
  • CobolProcessor (49-135)
  • processor (100-102)
  • processor (104-108)
  • processor (110-127)
  • processor (129-129)
  • options (95-98)
  • build (53-75)
  • process (55-73)
  • option (84-87)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (2)
  • RecordFormat (21-41)
  • AsciiText (26-26)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (34-43)
  • RawRecordContext (45-122)
  • builder (102-109)
  • builder (119-120)
  • build (84-86)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/CopybookParser.scala (2)
  • CopybookParser (42-490)
  • parseSimple (86-104)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (2)
  • getOrElse (78-81)
  • getMap (88-88)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala (1)
  • RecordHeaderDecoderBdw (23-90)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala (1)
  • RecordHeaderDecoderRdw (23-59)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala (1)
  • RecordHeaderParameters (20-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (24)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala (1)

17-17: No remaining imports of the old package path; approve changes.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)

17-17: Package move verified; no direct references to ArrayOfAnyHandler remain
Ran a repository-wide search for za.co.absa.cobrix.cobol.processor.ArrayOfAnyHandler and found no call sites still importing the old package.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)

17-21: Move/import OK.

No issues spotted with the package relocation and new import.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (1)

27-31: Capturing ctx.options is correct.

This enables tests to assert end-to-end propagation.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala (1)

172-172: Builder migration looks good

Switch to RawRecordContext.builder(...).build() is correct; using 0L matches the new Long signature.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala (1)

70-70: LGTM

Builder usage with 0L is correct.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala (1)

66-66: LGTM

Consistent builder migration with 0L. No functional changes.

Also applies to: 88-88, 108-108, 151-151, 181-181, 210-210

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala (1)

47-47: LGTM

Correct switch to builder and Long literal.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala (2)

93-93: Verify null copybook usage

Passing null for copybook relies on downstream code never dereferencing it. Confirm this is intentional for this test.


130-132: LGTM

New builder overload with single stream + parsed copybook reads clearly and matches API.

Also applies to: 172-174, 215-217, 266-268

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (2)

306-308: LGTM! Options propagation correctly added to CobolParameters.

The addition of params.getMap as the options parameter properly captures all spark-cobol options for propagation to custom record extractors, addressing the primary objective from issue #780.


450-452: LGTM! Options correctly passed through to ReaderParameters.

The propagation of parameters.options to ReaderParameters ensures that all spark-cobol options flow through the reader chain to reach the custom record extractors.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala (1)

31-38: LGTM! Test helper simplified with builder pattern.

The refactoring to use RawRecordContext.builder pattern appropriately simplifies the test factory method by removing the need to explicitly handle decoders and additional info parameters, which are now managed by the builder's defaults.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)

28-28: LGTM! Test class renamed to match new API.

The renaming from RecordProcessorBuilderSuite to CobolProcessorBuilderSuite correctly reflects the new CobolProcessor API.


37-46: LGTM! Test updated for new builder pattern.

The test correctly uses the new CobolProcessor.builder(copybook).build().process() pattern.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)

30-48: Well-designed public API for the new CobolProcessor trait.

The trait provides a clean, minimal interface for processing COBOL data streams with proper separation of concerns.


49-75: Solid builder implementation with proper resource management.

The builder pattern implementation properly manages stream resources with try-finally and correctly propagates options through to the StreamProcessor.


110-127: LGTM! Record extractor creation properly handles options.

The implementation correctly creates the RawRecordContext with builder pattern for fixed-length records and properly attempts VarLenNestedReader first, which will have access to options through ReaderParameters.


132-135: Clean factory method for builder creation.

The companion object provides a clear entry point for creating CobolProcessor instances.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)

20-20: Import expansion is correct.

Needed for defaults used in the builder.


45-57: Sensible defaults for RDW/BDW decoders.

Big-endian + zero adjustment aligns with typical mainframe RDW/BDW.


64-67: Builder setters for decoders look good.

Straightforward and fluent.

Also applies to: 69-72


74-77: Additional info setter is fine.


84-86: Builder build() is fine.

Comment on lines +41 to +42
additionalInfo: String,
options: Map[String, String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Preserve source-compat by defaulting options; call out bin-compat in release notes.

Adding a parameter to a public case class is a breaking change for existing call sites. Defaulting the new parameter helps source compatibility after upgrade; still document the binary change in release notes/MiMa.

Apply within the case class params:

-                             additionalInfo: String,
-                             options: Map[String, String]
+                             additionalInfo: String,
+                             options: Map[String, String] = Map.empty

Default options parameter for source compatibility
Default the new options parameter to Map.empty to preserve source compatibility; still document the binary-compatibility break in release notes/MiMa.

- additionalInfo: String,
- options: Map[String, String]
+ additionalInfo: String,
+ options: Map[String, String] = Map.empty
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala
around lines 41 to 42, the new constructor/parameter list added an options:
Map[String, String] parameter without a default, breaking source compatibility;
add a default value of Map.empty to the options parameter (i.e., options:
Map[String, String] = Map.empty) so existing callers continue to compile, and
then update any call sites or tests that explicitly relied on the non-default
behavior if needed; also ensure the change is noted in release notes/MiMa as a
binary-compatibility break.

Comment on lines +68 to 74
val builder = CobolProcessor.builder(copybook)
.option("record_format", "D")

assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText)
assert(builder.getReaderParameters.options.contains("record_format"))
assert(builder.getOptions.contains("record_format"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify options propagation in getReaderParameters test.

The test correctly verifies that options are present in the ReaderParameters. Good coverage of the options propagation feature.


🏁 Script executed:

#!/bin/bash
# Verify that custom record extractors can access options through RawRecordContext
ast-grep --pattern 'class $_ extends RawRecordExtractor {
  $$$
}'

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Search for any custom extractors extending RawRecordExtractor
rg -n 'class .* extends RawRecordExtractor' -g '*.scala'

# Search for RawRecordContext usage to confirm option propagation support
rg -n 'RawRecordContext' -g '*.scala'

Length of output: 20103


🏁 Script executed:

#!/bin/bash
set -e
# Search for how CobolProcessor builds RawRecordContext and if it propagates reader parameters options
rg -n 'RawRecordContext.builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5
rg -n 'withOptions' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala'
rg -n 'options' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5

Length of output: 1597


🏁 Script executed:

#!/bin/bash
# Inspect RawRecordContextBuilder implementation and builder methods
rg -n 'class RawRecordContextBuilder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala' -C10
rg -n 'def builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala' -C5

Length of output: 6039


🏁 Script executed:

#!/bin/bash
set -e
rg -n 'object CobolProcessor' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C10
rg -n 'def builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5

Length of output: 3742


Propagate builder options into RawRecordContext
In CobolProcessor.scala (around line 120), call .withOptions(getOptions) on the RawRecordContext.builder(…) before .build() so that any custom RawRecordExtractor can access the configured options via its RawRecordContext.

🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around line 120, the RawRecordContext is built without propagating the processor
builder options, so custom RawRecordExtractor implementations cannot access
configured options; modify the RawRecordContext.builder(...) chain to call
.withOptions(getOptions) before .build() so the context includes the builder
options and RawRecordExtractor can read them from its RawRecordContext.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (1)

33-44: Default options to Map.empty for source compatibility

Adding a new parameter to a public case class breaks source for any direct constructors. Defaulting preserves source compatibility (binary will still change—document in release notes/MiMa).

 case class RawRecordContext(
   ...
-  additionalInfo: String,
-  options: Map[String, String]
+  additionalInfo: String,
+  options: Map[String, String] = Map.empty
 )
#!/bin/bash
# Find direct RawRecordContext constructor call sites that may still pass 7 args.
rg -n 'RawRecordContext\\s*\\(' -g '*.scala' -C2
🧹 Nitpick comments (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (3)

27-29: Use Locale.ROOT for case-folding

Lowercasing with the default locale can misbehave (e.g., Turkish). Import Locale to use Locale.ROOT.

-import java.io.OutputStream
+import java.io.OutputStream
+import java.util.Locale
 import scala.collection.mutable

84-88: Harden builder.option: locale-safe lowercasing and null checks

  • Use Locale.ROOT.
  • Guard against null values to avoid NPEs downstream.
-    def option(key: String, value: String): CobolProcessorBuilder = {
-      require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only")
-      caseInsensitiveOptions += (key.trim.toLowerCase -> value)
-      this
-    }
+    def option(key: String, value: String): CobolProcessorBuilder = {
+      require(key != null && key.trim.nonEmpty, "Option key must not be null/empty or whitespace-only")
+      require(value != null, s"Option '$key' value must not be null")
+      caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value)
+      this
+    }

96-99: Normalize bulk options; skip nulls

Ensure consistent, locale-safe key normalization; drop nulls.

-    def options(options: Map[String, String]): CobolProcessorBuilder = {
-      caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
-      this
-    }
+    def options(options: Map[String, String]): CobolProcessorBuilder = {
+      caseInsensitiveOptions ++= options.collect {
+        case (k, v) if k != null && v != null => (k.trim.toLowerCase(Locale.ROOT), v)
+      }
+      this
+    }
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (4)

37-46: Adopts new builder + explicit build step—LGTM; add an assertion to prove options reach processors

Flow looks correct. To also validate PR objective end-to-end, assert that options configured on the builder arrive to RawRecordProcessor.

-      val builder = CobolProcessor.builder(copybook)
+      val builder = CobolProcessor.builder(copybook).option("opt_k", "opt_v")

-      val processor = new RawRecordProcessor {
+      var seenOptions: Map[String, String] = Map.empty
+      val processor = new RawRecordProcessor {
         override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
-          record.map(v => (v - 1).toByte)
+          seenOptions = options
+          record.map(v => (v - 1).toByte)
         }
       }
 
       builder.build().process(is, os)(processor)
+      assert(seenOptions.get("opt_k").contains("opt_v"))

58-63: Prefer exercising schema with the same parameters the builder would use

Minor: drive getCobolSchema with builder’s computed ReaderParameters to align with real usage.

-      val cobolSchema = builder.getCobolSchema(ReaderParameters())
+      val cobolSchema = builder.getCobolSchema(builder.getReaderParameters)

68-74: Cache reader parameters; assert exact value

Avoid repeated parsing and assert the expected value directly.

-      assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText)
-      assert(builder.getReaderParameters.options.contains("record_format"))
+      val rp = builder.getReaderParameters
+      assert(rp.recordFormat == RecordFormat.AsciiText)
+      assert(rp.options.get("record_format").contains("D"))
       assert(builder.getOptions.contains("record_format"))

78-90: Fix test name grammar and verify options propagation into extractor context (fixed-length path)

  • Wording: “a fixed-record-length file”.
  • Consider asserting that the fixed-length extractor receives the options from ReaderParameters (the test currently passes options but doesn’t check they arrive to the extractor’s RawRecordContext). If the context isn’t exposed, add a tiny accessor in test scope or cover via a processor-level test.
-    "work for an fixed-record-length files" in {
+    "work for a fixed-record-length file" in {

If feasible, add an assertion akin to:

  • downcast to FixedRecordLengthRawRecordExtractor and read its context.options, or
  • run process with a RawRecordProcessor that inspects options emitted by the extractor path.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc8c400 and d547863.

📒 Files selected for processing (8)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (2 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (6 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (3 hunks)
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
  • cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
🧰 Additional context used
🧬 Code graph analysis (5)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (10)
  • builder (135-137)
  • CobolProcessor (49-138)
  • processor (101-103)
  • processor (105-109)
  • processor (111-130)
  • processor (132-132)
  • options (96-99)
  • build (53-75)
  • process (55-73)
  • option (84-88)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
  • ReaderParameters (80-135)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
  • RawRecordContext (35-44)
  • RawRecordContext (46-139)
  • builder (119-126)
  • builder (136-137)
  • build (101-103)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
  • ArrayOfAnyHandler (27-33)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
  • StreamProcessor (26-65)
  • processStream (38-64)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
  • VarLenNestedReader (42-272)
  • getCobolSchema (89-89)
  • recordExtractor (54-87)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
  • FixedRecordLengthRawRecordExtractor (19-56)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (6)
  • RawRecordContext (35-44)
  • RawRecordContext (46-139)
  • build (101-103)
  • builder (119-126)
  • builder (136-137)
  • withReaderParams (60-73)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • CobolParametersParser (39-1004)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • Parameters (27-98)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (7)
  • RawRecordContext (35-44)
  • RawRecordContext (46-139)
  • builder (119-126)
  • builder (136-137)
  • withRdwDecoder (81-84)
  • withBdwDecoder (86-89)
  • build (101-103)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
  • ReaderParameters (80-135)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala (1)
  • RecordHeaderDecoderBdw (23-90)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala (1)
  • RecordHeaderDecoderRdw (23-59)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala (1)
  • RecordHeaderParameters (20-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (13)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (3)

62-73: Good: processing stream lifecycle

dataStream is properly closed in finally. Once the extractor close lifecycle is introduced, consider closing it here as well; for now there’s no extractor close() API.

Can we confirm all extractor implementations close their internal headerStream when no longer needed? The FixedLength path does; variable-length extractors should, too, to prevent leaks.


101-109: Minor: getReaderParameters now redundant in build path

After snapshotting in build(), this helper is unused there but still useful for tests. No change required.


121-124: Lower-case readerParameters.options keys before storing in RawRecordContext

Apply Locale.ROOT lower-casing to the options map so custom extractors see case-insensitive keys:

-  .withOptions(readerParameters.options)
+  .withOptions(
+    readerParameters.options.map { case (k, v) =>
+      k.toLowerCase(java.util.Locale.ROOT) -> v
+    }
+  )
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)

154-157: LGTM: migrated to builder API

Builder-based RawRecordContext construction is correct; explicit 0L matches the new Long index.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (3)

44-44: LGTM: builder usage

Construction via RawRecordContext.builder(0L, ...) aligns with the new API.


87-87: LGTM: builder usage

Consistent migration; no functional changes.


143-143: LGTM: builder usage

Consistent across tests; good.

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)

92-101: Variable-length extractor selection—LGTM

Correctly exercises VarLen path and asserts TextFullRecordExtractor.


106-116: Unsupported format error path—LGTM

Exception type and message check are appropriate.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (4)

50-58: Sane defaults in builder state—LGTM

Zero start, default decoders, empty additionalInfo/options are reasonable.


60-73: withReaderParams correctly propagates additionalInfo, RDW/BDW, and options—LGTM

This satisfies the PR objective of making options available to custom extractors via RawRecordContext.


119-137: Builder overload with explicit startingRecordNumber—LGTM

The API is clear and validates non-negative values.


136-137: Convenience builder that derives headerStream—LGTM

Reasonable default to copy the stream for headers.

Comment on lines 53 to 75
def build(): CobolProcessor = {
new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit = {
val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)
val recordExtractor = getRecordExtractor(readerParameters, inputStream)

val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(cobolSchema.copybook,
caseInsensitiveOptions.toMap,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream)
} finally {
dataStream.close()
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Snapshot options at build-time; precompute parameters/schema for immutability and perf

Currently, process() re-parses options and schema on each call and reads from the mutable builder map, so later option(s) mutations affect an already-built processor (surprising, not thread-safe). Snapshot options in build(), precompute readerParameters and cobolSchema, and pass them to getRecordExtractor.

Apply:

-    def build(): CobolProcessor = {
-      new CobolProcessor {
-        override def process(inputStream: SimpleStream,
-                             outputStream: OutputStream)
-                            (rawRecordProcessor: RawRecordProcessor): Unit = {
-          val readerParameters = getReaderParameters
-          val cobolSchema = getCobolSchema(readerParameters)
-          val recordExtractor = getRecordExtractor(readerParameters, inputStream)
-
-          val dataStream = inputStream.copyStream()
-          try {
-            StreamProcessor.processStream(cobolSchema.copybook,
-              caseInsensitiveOptions.toMap,
-              dataStream,
-              recordExtractor,
-              rawRecordProcessor,
-              outputStream)
-          } finally {
-            dataStream.close()
-          }
-        }
-      }
-    }
+    def build(): CobolProcessor = {
+      // Immutable snapshot for this processor instance
+      val optionsSnapshot = caseInsensitiveOptions.toMap
+      val readerParameters = {
+        val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot))
+        CobolParametersParser.getReaderProperties(cobolParams, None)
+      }
+      val cobolSchema = getCobolSchema(readerParameters)
+
+      new CobolProcessor {
+        override def process(inputStream: SimpleStream,
+                             outputStream: OutputStream)
+                            (rawRecordProcessor: RawRecordProcessor): Unit = {
+          val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream)
+
+          val dataStream = inputStream.copyStream()
+          try {
+            StreamProcessor.processStream(
+              cobolSchema.copybook,
+              optionsSnapshot,
+              dataStream,
+              recordExtractor,
+              rawRecordProcessor,
+              outputStream
+            )
+          } finally {
+            dataStream.close()
+          }
+        }
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build(): CobolProcessor = {
new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit = {
val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(cobolSchema.copybook,
caseInsensitiveOptions.toMap,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream)
} finally {
dataStream.close()
}
}
}
}
def build(): CobolProcessor = {
// Immutable snapshot for this processor instance
val optionsSnapshot = caseInsensitiveOptions.toMap
val readerParameters = {
val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot))
CobolParametersParser.getReaderProperties(cobolParams, None)
}
val cobolSchema = getCobolSchema(readerParameters)
new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit = {
val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream)
val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(
cobolSchema.copybook,
optionsSnapshot,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream
)
} finally {
dataStream.close()
}
}
}
}
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 53–75, snapshot mutable builder state at build time by computing
readerParameters and cobolSchema once in build() (and copying
caseInsensitiveOptions to an immutable Map), then return a CobolProcessor whose
process() uses those precomputed values instead of calling
getReaderParameters/getCobolSchema on each invocation; keep getRecordExtractor
but call it with the precomputed readerParameters (and the runtime inputStream)
so later mutations to the builder do not affect an already-built processor and
behavior becomes thread-safe and stable.

Comment on lines 111 to 130
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None if readerParameters.recordFormat == FixedLength =>
val dataStream = inputStream.copyStream()
val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook)
.withReaderParams(readerParameters)
.build()
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reuse streams and close unused header stream; extend signature to avoid re-parsing schema

  • Avoid re-creating dataStream in the FixedLength fallback and close the unused headerStream.
  • Close both streams when failing to create an extractor.
  • Accept cobolSchema to avoid a second schema build.
-    private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
+    private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+                                              cobolSchema: CobolSchema,
+                                              inputStream: SimpleStream): RawRecordExtractor = {
       val dataStream = inputStream.copyStream()
       val headerStream = inputStream.copyStream()
 
       val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
 
       reader.recordExtractor(0, dataStream, headerStream) match {
         case Some(extractor) => extractor
         case None if readerParameters.recordFormat == FixedLength =>
-          val dataStream = inputStream.copyStream()
-          val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook)
+          // Release the header stream; reuse the data stream for the fixed extractor.
+          headerStream.close()
+          val ctx = RawRecordContext.builder(dataStream, cobolSchema.copybook)
             .withReaderParams(readerParameters)
             .build()
           new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
         case None =>
+          headerStream.close()
+          dataStream.close()
           throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
             "Please check the copybook and the reader parameters."
           )
       }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None if readerParameters.recordFormat == FixedLength =>
val dataStream = inputStream.copyStream()
val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook)
.withReaderParams(readerParameters)
.build()
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
cobolSchema: CobolSchema,
inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) =>
extractor
case None if readerParameters.recordFormat == FixedLength =>
// Release the header stream; reuse the data stream for the fixed extractor.
headerStream.close()
val ctx = RawRecordContext.builder(dataStream, cobolSchema.copybook)
.withReaderParams(readerParameters)
.build()
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
case None =>
headerStream.close()
dataStream.close()
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 111 to 130, the method creates two stream copies but re-creates
dataStream in the FixedLength fallback, never closes the unused headerStream,
and calls getCobolSchema again; change the method signature to accept a
cobolSchema parameter, reuse the already-created dataStream for the FixedLength
fallback (do not recreate it), ensure headerStream is closed when it is not
used, and close both streams in the failure path (use try/finally or a managed
resource pattern) so no streams leak while avoiding a second schema build.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

169-191: Ensure recordExtractor’s streams are closed after index generation (VarLenNestedReader.scala:169–191). RawRecordExtractor has no close() contract and extractors retain dataStream/headerStream, leading to resource leaks. Make RawRecordExtractor extend AutoCloseable (implement close() in all extractors) or explicitly close the underlying streams after calling IndexGenerator.sparseIndexGenerator.

♻️ Duplicate comments (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (2)

52-74: Snapshot options and precompute parameters/schema at build-time (thread-safety + perf).

Avoid re-parsing options and schema per process() and builder mutations affecting built processors. This repeats a prior suggestion.

-    def build(): CobolProcessor = {
-      new CobolProcessor {
+    def build(): CobolProcessor = {
+      // Immutable snapshot
+      val optionsSnapshot = caseInsensitiveOptions.toMap
+      val readerParameters = {
+        val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot))
+        CobolParametersParser.getReaderProperties(cobolParams, None)
+      }
+      val cobolSchema = getCobolSchema(readerParameters)
+      new CobolProcessor {
         override def process(inputStream: SimpleStream,
                              outputStream: OutputStream)
                             (rawRecordProcessor: RawRecordProcessor): Unit = {
-          val readerParameters = getReaderParameters
-          val cobolSchema = getCobolSchema(readerParameters)
-          val recordExtractor = getRecordExtractor(readerParameters, inputStream)
+          val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream)
 
           val dataStream = inputStream.copyStream()
           try {
             StreamProcessor.processStream(cobolSchema.copybook,
-              caseInsensitiveOptions.toMap,
+              optionsSnapshot,
               dataStream,
               recordExtractor,
               rawRecordProcessor,
               outputStream)
           } finally {
             dataStream.close()
           }
         }
       }
     }

110-123: Manage stream lifecycle and avoid duplicate stream creation in fallback.

Close header/data streams on failure and reuse existing dataStream in FixedLength fallback; also accept cobolSchema to avoid extra parse. Mirrors prior feedback.

-    private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
-      val dataStream = inputStream.copyStream()
-      val headerStream = inputStream.copyStream()
+    private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+                                              cobolSchema: CobolSchema,
+                                              inputStream: SimpleStream): RawRecordExtractor = {
+      val dataStream = inputStream.copyStream()
+      val headerStream = inputStream.copyStream()
 
       val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
 
-      reader.recordExtractor(0, dataStream, headerStream) match {
-        case Some(extractor) => extractor
-        case None =>
-          throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
-            "Please check the copybook and the reader parameters."
-          )
-      }
+      reader.recordExtractor(0, dataStream, headerStream) match {
+        case Some(extractor) => extractor
+        case None if readerParameters.recordFormat == za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength =>
+          // Release header stream; reuse data stream
+          headerStream.close()
+          val ctx = za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordContext
+            .builder(dataStream, cobolSchema.copybook)
+            .withReaderParams(readerParameters)
+            .build()
+          new za.co.absa.cobrix.cobol.reader.extractors.raw.FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
+        case None =>
+          try {
+            headerStream.close()
+            dataStream.close()
+          } finally {
+            throw new IllegalArgumentException(
+              "Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
+            )
+          }
+      }
     }
🧹 Nitpick comments (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)

74-77: Guard against missing BDW when FixedBlock is selected.

Avoid bdwOpt.get throwing NoSuchElementException; fail fast with a clear message.

-      case None if readerProperties.recordFormat == FixedBlock =>
-        val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
+      case None if readerProperties.recordFormat == FixedBlock =>
+        require(bdwOpt.isDefined, "FixedBlock record format requires BDW parameters (bdw).")
+        val fb = bdwOpt.get
+        val fbParams = FixedBlockParameters(readerProperties.recordLength, fb.blockLength, fb.recordsPerBlock)
         FixedBlockParameters.validate(fbParams)
         Some(new FixedBlockRawRecordExtractor(reParams, fbParams))
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (2)

26-27: Make case-insensitive handling locale-safe and trim keys in bulk options.

Use Locale.ROOT to avoid locale traps (e.g., Turkish i) and trim keys in options().

 import java.io.OutputStream
+import java.util.Locale
@@
-  def option(key: String, value: String): CobolProcessorBuilder = {
+  def option(key: String, value: String): CobolProcessorBuilder = {
     require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only")
-    caseInsensitiveOptions += (key.trim.toLowerCase -> value)
+    caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value)
     this
   }
@@
-  def options(options: Map[String, String]): CobolProcessorBuilder = {
-    caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
+  def options(options: Map[String, String]): CobolProcessorBuilder = {
+    caseInsensitiveOptions ++= options.iterator.map { case (k, v) => (k.trim.toLowerCase(Locale.ROOT), v) }
     this
   }

Also applies to: 83-87, 95-98


49-50: Minor naming consistency.

Parameter is singular (copybookContent) in builder() but plural (copybookContents) in class ctor. Consider consistent naming to reduce confusion.

Also applies to: 128-130

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d547863 and 836f65b.

📒 Files selected for processing (2)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1 hunks)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (6)
  • RawRecordContext (35-44)
  • RawRecordContext (46-139)
  • builder (119-126)
  • builder (136-137)
  • withReaderParams (60-73)
  • build (101-103)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (1)
  • FixedLength (22-22)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
  • ArrayOfAnyHandler (27-33)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
  • StreamProcessor (26-65)
  • processStream (38-64)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
  • VarLenNestedReader (42-274)
  • recordExtractor (54-89)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • CobolParametersParser (39-1004)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • Parameters (27-98)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
  • fromReaderParameters (75-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
🔇 Additional comments (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)

58-60: Good move to the builder and options propagation.

Switching to RawRecordContext.builder(...).withReaderParams(readerProperties).build() centralizes RDW/BDW setup and correctly forwards options to custom extractors.


69-73: Double-check extractor choice for VariableLength + lengthFieldExpression.

Using FixedWithRecordLengthExprRawRecordExtractor for VariableLength may be intentional, but the naming is misleading. Please confirm behavior (esp. header handling) matches expectations for VL records with a length expression.


84-85: Nice addition: fixed-length extractor path.

Covers FixedLength without length expression.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)

52-55: Snapshot options at build time; keep runtime behavior deterministic and thread-safe

process() reads caseInsensitiveOptions.toMap from the mutable builder, which can diverge from the options used to compute readerParameters/cobolSchema. Snapshot once in build() and use that snapshot throughout.

-    def build(): CobolProcessor = {
-      val readerParameters = getReaderParameters
-      val cobolSchema = getCobolSchema(readerParameters)
+    def build(): CobolProcessor = {
+      val optionsSnapshot = caseInsensitiveOptions.toMap
+      val readerParameters = {
+        val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot))
+        CobolParametersParser.getReaderProperties(cobolParams, None)
+      }
+      val cobolSchema = getCobolSchema(readerParameters)
       new CobolProcessor {
         override def process(inputStream: SimpleStream,
                              outputStream: OutputStream)
                             (rawRecordProcessor: RawRecordProcessor): Unit = {
-          val recordExtractor = getRecordExtractor(readerParameters, inputStream)
+          // Streams and extractor are created/closed in process()
           val dataStream = inputStream.copyStream()
           try {
             StreamProcessor.processStream(cobolSchema.copybook,
-              caseInsensitiveOptions.toMap,
+              optionsSnapshot,
               dataStream,
               recordExtractor,
               rawRecordProcessor,
               outputStream)
           } finally {
             dataStream.close()
           }
         }
       }
     }

Also applies to: 64-66

🧹 Nitpick comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)

84-87: Locale-safe lowercasing for option keys

Use Locale.ROOT to avoid locale-specific surprises (e.g., Turkish i).

+import java.util.Locale
@@
-      caseInsensitiveOptions += (key.trim.toLowerCase -> value)
+      caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value)
@@
-      caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
+      caseInsensitiveOptions ++= options.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }

Also applies to: 96-99, 26-27

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 836f65b and 773a222.

📒 Files selected for processing (2)
  • cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1 hunks)
  • spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
  • generateIndex (137-192)
  • isRdwBigEndian (97-97)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
  • close (107-112)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
  • close (41-47)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
  • ArrayOfAnyHandler (27-33)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
  • StreamProcessor (26-65)
  • processStream (38-64)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
  • VarLenNestedReader (42-274)
  • getCobolSchema (91-91)
  • recordExtractor (54-89)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
  • CobolParametersParser (39-1004)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
  • Parameters (27-98)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
  • fromReaderParameters (75-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)

166-171: Good fix: ensure streams are closed even on failures

Wrapping reader.generateIndex(...) with try/finally and closing both inputStream and headerStream prevents leaks during index generation. Looks correct and low-risk.

Comment on lines +56 to +74
new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit = {
val recordExtractor = getRecordExtractor(readerParameters, inputStream)

val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(cobolSchema.copybook,
caseInsensitiveOptions.toMap,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream)
} finally {
dataStream.close()
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Close all opened streams; avoid leaking header/data streams created for the extractor

process() closes only the dataStream it creates, but the extractor’s own dataStream and headerStream created in getRecordExtractor() are never closed. Either create both streams in process() and pass them into getRecordExtractor(...), or return the streams from getRecordExtractor(...) so process() can close them in a finally.

Apply:

-        override def process(inputStream: SimpleStream,
-                             outputStream: OutputStream)
-                            (rawRecordProcessor: RawRecordProcessor): Unit = {
-          val recordExtractor = getRecordExtractor(readerParameters, inputStream)
-
-          val dataStream = inputStream.copyStream()
-          try {
-            StreamProcessor.processStream(cobolSchema.copybook,
-              caseInsensitiveOptions.toMap,
-              dataStream,
-              recordExtractor,
-              rawRecordProcessor,
-              outputStream)
-          } finally {
-            dataStream.close()
-          }
-        }
+        override def process(inputStream: SimpleStream,
+                             outputStream: OutputStream)
+                            (rawRecordProcessor: RawRecordProcessor): Unit = {
+          val dataStream   = inputStream.copyStream()
+          val headerStream = inputStream.copyStream()
+          val recordExtractor = getRecordExtractor(readerParameters, dataStream, headerStream)
+          try {
+            StreamProcessor.processStream(
+              cobolSchema.copybook,
+              optionsSnapshot,
+              dataStream,
+              recordExtractor,
+              rawRecordProcessor,
+              outputStream
+            )
+          } finally {
+            try headerStream.close() finally dataStream.close()
+          }
+        }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 56 to 74, process(...) currently only closes the dataStream it
creates but not the extractor’s internal dataStream and headerStream, leaking
streams; change the implementation so process() is responsible for creating and
closing all streams: either (A) refactor getRecordExtractor(...) to return both
the extractor and the two streams (or a small case class/tuple containing
extractor, dataStream, headerStream) so process() can register them and close
them in the finally block, or (B) change getRecordExtractor(...) to accept the
already-created data and header streams as parameters; in either case ensure all
streams are closed in the finally (null/Option-safe) and preserve original
exception behavior by closing in reverse-open order and suppressing/adding
suppressed exceptions appropriately.

Comment on lines +111 to +124
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Align getRecordExtractor with stream lifecycle; close on failure

Change the signature to accept streams created by process() and ensure they’re closed on the error path.

-    private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
-      val dataStream = inputStream.copyStream()
-      val headerStream = inputStream.copyStream()
-
-      val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
-
-      reader.recordExtractor(0, dataStream, headerStream) match {
-        case Some(extractor) => extractor
-        case None =>
-          throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
-            "Please check the copybook and the reader parameters."
-          )
-      }
-    }
+    private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+                                              dataStream: SimpleStream,
+                                              headerStream: SimpleStream): RawRecordExtractor = {
+      val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
+      reader.recordExtractor(0, dataStream, headerStream) match {
+        case Some(extractor) => extractor
+        case None =>
+          try headerStream.close() finally dataStream.close()
+          throw new IllegalArgumentException(
+            s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
+          )
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
dataStream: SimpleStream,
headerStream: SimpleStream): RawRecordExtractor = {
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
// Ensure both streams are closed before bubbling up the error
try headerStream.close() finally dataStream.close()
throw new IllegalArgumentException(
s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
)
}
}
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 111 to 124, update getRecordExtractor to accept the actual streams
produced by process() (i.e., accept the data/header SimpleStream instances
created upstream rather than creating copies internally) and ensure both streams
are closed on the error path: if reader.recordExtractor returns None, close
dataStream and headerStream (or call their close/copyStream close equivalent)
before throwing the IllegalArgumentException; keep normal success path returning
the extractor without closing the streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pass spark-cobol properties to custom record extractors

2 participants