-
Notifications
You must be signed in to change notification settings - Fork 86
#780 Add spark-cobol options to the raw record extractor interface
#781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#780 Add spark-cobol options to the raw record extractor interface
#781
Conversation
WalkthroughAdds a new CobolProcessor builder API, replaces RecordProcessorBuilder, moves some processor classes into an impl package, introduces RawRecordContext builder and an options field propagated through CobolParameters → ReaderParameters → RawRecordContext, and updates tests to use new builders and assert options propagation. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant Builder as CobolProcessor.builder
participant Proc as CobolProcessor
participant Reader as VarLenNestedReader / RawRecordContext.Builder
participant SP as impl.StreamProcessor
participant RRE as RawRecordExtractor
participant RRP as RawRecordProcessor
User->>Builder: builder(copybook).option(...).options(...).build()
Builder-->>User: CobolProcessor
User->>Proc: process(inputStream, outputStream)(RRP)
Proc->>Proc: getReaderParameters(options) → ReaderParameters(options)
Proc->>Proc: getCobolSchema(readerParameters)
Proc->>Reader: RawRecordContext.builder(...).withReaderParams(readerParameters).build()
Reader-->>Proc: RawRecordContext (includes options)
Proc->>SP: processStream(schema, options, dataStream, extractor, RRP, outputStream)
SP->>RRE: nextRecord()
RRE->>RRP: raw record + ctx.options
SP-->>Proc: finished
Proc-->>User: done (streams closed)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
JaCoCo code coverage report - 'cobol-parser'
|
JaCoCo code coverage report - 'spark-cobol'
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (17)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
43-49: Remove unused loop counter.Variable i is incremented but never used.
Apply:
- var i = 0 while (recordExtractor.hasNext) { - i += 1 val record = recordExtractor.next()
31-31: Clarify options casing in Scaladoc.Document that option keys are lower-cased for case-insensitive handling.
- * @param options arbitrary options used for splitting input data into records. Same as options to 'spark-cobol'. Can contain custom options as well. + * @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options). + * Keys are lower-cased for case-insensitive handling. Can contain custom options as well.spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala (1)
75-76: Good assertions to verify options propagation.These checks validate the end-to-end flow into the extractor.
Optionally reset the captured options before each test to avoid cross-test leakage:
// e.g., at the start of each test block CustomRecordExtractorMock.options = Map.emptycobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
67-73: Prefer safe default BDW decoder instead of RDW fallback.If bdwDecoderOpt is empty, using RDW as a BDW fallback is surprising. Default to a neutral BDW decoder to keep semantics explicit.
- .withBdwDecoder(bdwDecoderOpt.getOrElse(rdwDecoder)) + .withBdwDecoder(bdwDecoderOpt.getOrElse(new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = true, 0))))cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (2)
66-67: Doc: note options key casing.Clarify that keys are lower-cased for case-insensitive handling.
- * @param options Options passed to 'spark-cobol'. + * @param options Options passed to 'spark-cobol'. Keys are lower-cased.
109-111: Add default foroptionsin CobolParameters- metadataPolicy: MetadataPolicy, - options: Map[String, String] + metadataPolicy: MetadataPolicy, + options: Map[String, String] = Map.emptycobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
134-135: ReaderParameters: options field looks good.Default value and doc are appropriate.
Add a brief doc note on lower-cased keys (mirrors builder behavior):
- * @param options Options passed to spark-cobol + * @param options Options passed to spark-cobol (keys are lower-cased)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (1)
55-59: Global mutable test state: consider reset for isolation.If suites ever run in parallel, the shared var may leak across tests. Reset in each test or mark the suite as non-parallel.
// e.g., add beforeEach in a trait mixed into the suite: override def beforeEach(): Unit = { CustomRecordExtractorMock.options = Map.empty super.beforeEach() }cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1)
35-35: Use Long literal for clarityPrefer
0Lto avoid implicit widening.- val ctx = RawRecordContext.builder(0, dataStream, headerStream, null).build() + val ctx = RawRecordContext.builder(0L, dataStream, headerStream, null).build()cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)
45-45: Unify on Long literals forstartingRecordNumberAlign with the Long-typed API for consistency.
- val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build() + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()- val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build() + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()- val rc = RawRecordContext.builder(0, ibs, hbs, copybook).build() + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build()Also applies to: 88-88, 144-144
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
154-158: Prefer0Lfor the Long-typed builder parameterMinor consistency tweak.
- RawRecordContext.builder(0, ibs, hbs, copybook) + RawRecordContext.builder(0L, ibs, hbs, copybook) .withRdwDecoder(rdwDecoder) .withBdwDecoder(bdwDecoder) .build()cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)
84-87: Consider validating empty or whitespace-only keys.While the case-insensitive handling is good, consider adding validation to reject empty or whitespace-only keys to prevent potential issues.
def option(key: String, value: String): CobolProcessorBuilder = { + require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") caseInsensitiveOptions += (key.toLowerCase -> value) this }cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
25-26: Scaladoc tweaks (minor).Docs read well. Optional: clarify that options is the full spark-cobol options map as received by the reader (no filtering), and whether keys/values are expected to be immutable/case-sensitive.
Also applies to: 32-33
59-63: Validate startingRecordNumber.Guard against negative values early.
- def withStartingRecordNumber(startingRecordNumber: Long): RawRecordContextBuilder = { - context = context.copy(startingRecordNumber = startingRecordNumber) + def withStartingRecordNumber(startingRecordNumber: Long): RawRecordContextBuilder = { + require(startingRecordNumber >= 0, s"startingRecordNumber must be >= 0, got: $startingRecordNumber") + context = context.copy(startingRecordNumber = startingRecordNumber) this }
79-82: Defensive copy for options.Avoid surprises if a mutable Map is passed and later mutated by the caller.
- def withOptions(options: Map[String, String]): RawRecordContextBuilder = { - context = context.copy(options = options) + def withOptions(options: Map[String, String]): RawRecordContextBuilder = { + context = context.copy(options = options.toMap) this }
90-101: Add missing Scaladoc for startingRecordNumber (nit).This overload’s Scaladoc omits @param startingRecordNumber.
/** - * Creates a new instance of `RawRecordContextBuilder` with the specified input stream, + * Creates a new instance of `RawRecordContextBuilder` with the specified starting record number, + * input stream, * header stream, and copybook. The builder allows further customization of the * `RawRecordContext` before building it. @@ - * @param inputStream the main data stream containing the record data. + * @param startingRecordNumber the zero-based record number where inputStream is positioned. + * @param inputStream the main data stream containing the record data.
111-121: Header stream creation assumption — verify copy semantics.This overload presumes reading from the start and derives headerStream via inputStream.copyStream(). Please confirm copyStream returns a stream positioned at the beginning; if it clones the current position, headers might be misread when inputStream isn’t at offset 0. If uncertain, either (a) document the precondition explicitly here, or (b) add an overload taking headerStream, or (c) add a builder setter withHeaderStream for override.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala(0 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala(2 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala(6 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala(6 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala(3 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala(5 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala(2 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala(1 hunks)
💤 Files with no reviewable changes (1)
- cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala
🧰 Additional context used
🧬 Code graph analysis (18)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
processor(100-102)processor(104-108)processor(110-127)processor(129-129)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala (2)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (2)
CustomRecordExtractorMock(26-53)CustomRecordExtractorMock(55-59)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
contains(66-69)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (9)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)withRdwDecoder(64-67)withBdwDecoder(69-72)withAdditionalInfo(74-77)withOptions(79-82)build(84-86)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/policies/MetadataPolicy.scala (2)
MetadataPolicy(21-34)Basic(23-23)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala (1)
options(38-41)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala (1)
options(38-41)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
ArrayOfAnyHandler(27-33)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-66)processStream(37-65)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
FixedRecordLengthRawRecordExtractor(19-56)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
CobolParametersParser(39-1004)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(56-99)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
fromReaderParameters(75-150)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(56-99)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(56-99)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(56-99)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (7)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)withRdwDecoder(64-67)withBdwDecoder(69-72)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (10)
builder(132-134)CobolProcessor(49-135)processor(100-102)processor(104-108)processor(110-127)processor(129-129)options(95-98)build(53-75)process(55-73)option(84-87)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (2)
RecordFormat(21-41)AsciiText(26-26)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(34-43)RawRecordContext(45-122)builder(102-109)builder(119-120)build(84-86)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/CopybookParser.scala (2)
CopybookParser(42-490)parseSimple(86-104)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (2)
getOrElse(78-81)getMap(88-88)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala (1)
RecordHeaderDecoderBdw(23-90)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala (1)
RecordHeaderDecoderRdw(23-59)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala (1)
RecordHeaderParameters(20-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (24)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala (1)
17-17: No remaining imports of the old package path; approve changes.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
17-17: Package move verified; no direct references toArrayOfAnyHandlerremain
Ran a repository-wide search forza.co.absa.cobrix.cobol.processor.ArrayOfAnyHandlerand found no call sites still importing the old package.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)
17-21: Move/import OK.No issues spotted with the package relocation and new import.
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala (1)
27-31: Capturing ctx.options is correct.This enables tests to assert end-to-end propagation.
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala (1)
172-172: Builder migration looks goodSwitch to
RawRecordContext.builder(...).build()is correct; using0Lmatches the new Long signature.spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala (1)
70-70: LGTMBuilder usage with
0Lis correct.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala (1)
66-66: LGTMConsistent builder migration with
0L. No functional changes.Also applies to: 88-88, 108-108, 151-151, 181-181, 210-210
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala (1)
47-47: LGTMCorrect switch to builder and Long literal.
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala (2)
93-93: Verify null copybook usagePassing
nullforcopybookrelies on downstream code never dereferencing it. Confirm this is intentional for this test.
130-132: LGTMNew builder overload with single stream + parsed copybook reads clearly and matches API.
Also applies to: 172-174, 215-217, 266-268
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (2)
306-308: LGTM! Options propagation correctly added to CobolParameters.The addition of
params.getMapas theoptionsparameter properly captures all spark-cobol options for propagation to custom record extractors, addressing the primary objective from issue #780.
450-452: LGTM! Options correctly passed through to ReaderParameters.The propagation of
parameters.optionstoReaderParametersensures that all spark-cobol options flow through the reader chain to reach the custom record extractors.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala (1)
31-38: LGTM! Test helper simplified with builder pattern.The refactoring to use
RawRecordContext.builderpattern appropriately simplifies the test factory method by removing the need to explicitly handle decoders and additional info parameters, which are now managed by the builder's defaults.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
28-28: LGTM! Test class renamed to match new API.The renaming from
RecordProcessorBuilderSuitetoCobolProcessorBuilderSuitecorrectly reflects the newCobolProcessorAPI.
37-46: LGTM! Test updated for new builder pattern.The test correctly uses the new
CobolProcessor.builder(copybook).build().process()pattern.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
30-48: Well-designed public API for the new CobolProcessor trait.The trait provides a clean, minimal interface for processing COBOL data streams with proper separation of concerns.
49-75: Solid builder implementation with proper resource management.The builder pattern implementation properly manages stream resources with try-finally and correctly propagates options through to the StreamProcessor.
110-127: LGTM! Record extractor creation properly handles options.The implementation correctly creates the RawRecordContext with builder pattern for fixed-length records and properly attempts VarLenNestedReader first, which will have access to options through ReaderParameters.
132-135: Clean factory method for builder creation.The companion object provides a clear entry point for creating CobolProcessor instances.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
20-20: Import expansion is correct.Needed for defaults used in the builder.
45-57: Sensible defaults for RDW/BDW decoders.Big-endian + zero adjustment aligns with typical mainframe RDW/BDW.
64-67: Builder setters for decoders look good.Straightforward and fluent.
Also applies to: 69-72
74-77: Additional info setter is fine.
84-86: Builder build() is fine.
| additionalInfo: String, | ||
| options: Map[String, String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Preserve source-compat by defaulting options; call out bin-compat in release notes.
Adding a parameter to a public case class is a breaking change for existing call sites. Defaulting the new parameter helps source compatibility after upgrade; still document the binary change in release notes/MiMa.
Apply within the case class params:
- additionalInfo: String,
- options: Map[String, String]
+ additionalInfo: String,
+ options: Map[String, String] = Map.emptyDefault options parameter for source compatibility
Default the new options parameter to Map.empty to preserve source compatibility; still document the binary-compatibility break in release notes/MiMa.
- additionalInfo: String,
- options: Map[String, String]
+ additionalInfo: String,
+ options: Map[String, String] = Map.empty🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala
around lines 41 to 42, the new constructor/parameter list added an options:
Map[String, String] parameter without a default, breaking source compatibility;
add a default value of Map.empty to the options parameter (i.e., options:
Map[String, String] = Map.empty) so existing callers continue to compile, and
then update any call sites or tests that explicitly relied on the non-default
behavior if needed; also ensure the change is noted in release notes/MiMa as a
binary-compatibility break.
| val builder = CobolProcessor.builder(copybook) | ||
| .option("record_format", "D") | ||
|
|
||
| assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText) | ||
| assert(builder.getReaderParameters.options.contains("record_format")) | ||
| assert(builder.getOptions.contains("record_format")) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify options propagation in getReaderParameters test.
The test correctly verifies that options are present in the ReaderParameters. Good coverage of the options propagation feature.
🏁 Script executed:
#!/bin/bash
# Verify that custom record extractors can access options through RawRecordContext
ast-grep --pattern 'class $_ extends RawRecordExtractor {
$$$
}'Length of output: 0
🏁 Script executed:
#!/bin/bash
# Search for any custom extractors extending RawRecordExtractor
rg -n 'class .* extends RawRecordExtractor' -g '*.scala'
# Search for RawRecordContext usage to confirm option propagation support
rg -n 'RawRecordContext' -g '*.scala'Length of output: 20103
🏁 Script executed:
#!/bin/bash
set -e
# Search for how CobolProcessor builds RawRecordContext and if it propagates reader parameters options
rg -n 'RawRecordContext.builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5
rg -n 'withOptions' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala'
rg -n 'options' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5Length of output: 1597
🏁 Script executed:
#!/bin/bash
# Inspect RawRecordContextBuilder implementation and builder methods
rg -n 'class RawRecordContextBuilder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala' -C10
rg -n 'def builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala' -C5Length of output: 6039
🏁 Script executed:
#!/bin/bash
set -e
rg -n 'object CobolProcessor' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C10
rg -n 'def builder' -g 'cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala' -C5Length of output: 3742
Propagate builder options into RawRecordContext
In CobolProcessor.scala (around line 120), call .withOptions(getOptions) on the RawRecordContext.builder(…) before .build() so that any custom RawRecordExtractor can access the configured options via its RawRecordContext.
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around line 120, the RawRecordContext is built without propagating the processor
builder options, so custom RawRecordExtractor implementations cannot access
configured options; modify the RawRecordContext.builder(...) chain to call
.withOptions(getOptions) before .build() so the context includes the builder
options and RawRecordExtractor can read them from its RawRecordContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (1)
33-44: Default options to Map.empty for source compatibilityAdding a new parameter to a public case class breaks source for any direct constructors. Defaulting preserves source compatibility (binary will still change—document in release notes/MiMa).
case class RawRecordContext( ... - additionalInfo: String, - options: Map[String, String] + additionalInfo: String, + options: Map[String, String] = Map.empty )#!/bin/bash # Find direct RawRecordContext constructor call sites that may still pass 7 args. rg -n 'RawRecordContext\\s*\\(' -g '*.scala' -C2
🧹 Nitpick comments (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (3)
27-29: Use Locale.ROOT for case-foldingLowercasing with the default locale can misbehave (e.g., Turkish). Import
Localeto useLocale.ROOT.-import java.io.OutputStream +import java.io.OutputStream +import java.util.Locale import scala.collection.mutable
84-88: Harden builder.option: locale-safe lowercasing and null checks
- Use
Locale.ROOT.- Guard against
nullvalues to avoid NPEs downstream.- def option(key: String, value: String): CobolProcessorBuilder = { - require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") - caseInsensitiveOptions += (key.trim.toLowerCase -> value) - this - } + def option(key: String, value: String): CobolProcessorBuilder = { + require(key != null && key.trim.nonEmpty, "Option key must not be null/empty or whitespace-only") + require(value != null, s"Option '$key' value must not be null") + caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value) + this + }
96-99: Normalize bulk options; skip nullsEnsure consistent, locale-safe key normalization; drop nulls.
- def options(options: Map[String, String]): CobolProcessorBuilder = { - caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) - this - } + def options(options: Map[String, String]): CobolProcessorBuilder = { + caseInsensitiveOptions ++= options.collect { + case (k, v) if k != null && v != null => (k.trim.toLowerCase(Locale.ROOT), v) + } + this + }cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (4)
37-46: Adopts new builder + explicit build step—LGTM; add an assertion to prove options reach processorsFlow looks correct. To also validate PR objective end-to-end, assert that options configured on the builder arrive to RawRecordProcessor.
- val builder = CobolProcessor.builder(copybook) + val builder = CobolProcessor.builder(copybook).option("opt_k", "opt_v") - val processor = new RawRecordProcessor { + var seenOptions: Map[String, String] = Map.empty + val processor = new RawRecordProcessor { override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = { - record.map(v => (v - 1).toByte) + seenOptions = options + record.map(v => (v - 1).toByte) } } builder.build().process(is, os)(processor) + assert(seenOptions.get("opt_k").contains("opt_v"))
58-63: Prefer exercising schema with the same parameters the builder would useMinor: drive getCobolSchema with builder’s computed ReaderParameters to align with real usage.
- val cobolSchema = builder.getCobolSchema(ReaderParameters()) + val cobolSchema = builder.getCobolSchema(builder.getReaderParameters)
68-74: Cache reader parameters; assert exact valueAvoid repeated parsing and assert the expected value directly.
- assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText) - assert(builder.getReaderParameters.options.contains("record_format")) + val rp = builder.getReaderParameters + assert(rp.recordFormat == RecordFormat.AsciiText) + assert(rp.options.get("record_format").contains("D")) assert(builder.getOptions.contains("record_format"))
78-90: Fix test name grammar and verify options propagation into extractor context (fixed-length path)
- Wording: “a fixed-record-length file”.
- Consider asserting that the fixed-length extractor receives the options from ReaderParameters (the test currently passes options but doesn’t check they arrive to the extractor’s RawRecordContext). If the context isn’t exposed, add a tiny accessor in test scope or cover via a processor-level test.
- "work for an fixed-record-length files" in { + "work for a fixed-record-length file" in {If feasible, add an assertion akin to:
- downcast to FixedRecordLengthRawRecordExtractor and read its context.options, or
- run process with a RawRecordProcessor that inspects options emitted by the extractor path.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala(2 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala(6 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala(3 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
- cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala
- cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
🧰 Additional context used
🧬 Code graph analysis (5)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (10)
builder(135-137)CobolProcessor(49-138)processor(101-103)processor(105-109)processor(111-130)processor(132-132)options(96-99)build(53-75)process(55-73)option(84-88)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (5)
RawRecordContext(35-44)RawRecordContext(46-139)builder(119-126)builder(136-137)build(101-103)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
ArrayOfAnyHandler(27-33)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-65)processStream(38-64)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
VarLenNestedReader(42-272)getCobolSchema(89-89)recordExtractor(54-87)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedRecordLengthRawRecordExtractor.scala (1)
FixedRecordLengthRawRecordExtractor(19-56)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (6)
RawRecordContext(35-44)RawRecordContext(46-139)build(101-103)builder(119-126)builder(136-137)withReaderParams(60-73)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
CobolParametersParser(39-1004)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
Parameters(27-98)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (7)
RawRecordContext(35-44)RawRecordContext(46-139)builder(119-126)builder(136-137)withRdwDecoder(81-84)withBdwDecoder(86-89)build(101-103)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala (1)
RecordHeaderDecoderBdw(23-90)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala (1)
RecordHeaderDecoderRdw(23-59)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala (1)
RecordHeaderParameters(20-26)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (13)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (3)
62-73: Good: processing stream lifecycle
dataStreamis properly closed infinally. Once the extractor close lifecycle is introduced, consider closing it here as well; for now there’s no extractorclose()API.Can we confirm all extractor implementations close their internal
headerStreamwhen no longer needed? The FixedLength path does; variable-length extractors should, too, to prevent leaks.
101-109: Minor: getReaderParameters now redundant in build pathAfter snapshotting in
build(), this helper is unused there but still useful for tests. No change required.
121-124: Lower-case readerParameters.options keys before storing in RawRecordContextApply Locale.ROOT lower-casing to the options map so custom extractors see case-insensitive keys:
… - .withOptions(readerParameters.options) + .withOptions( + readerParameters.options.map { case (k, v) => + k.toLowerCase(java.util.Locale.ROOT) -> v + } + )cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala (1)
154-157: LGTM: migrated to builder APIBuilder-based
RawRecordContextconstruction is correct; explicit0Lmatches the new Long index.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala (3)
44-44: LGTM: builder usageConstruction via
RawRecordContext.builder(0L, ...)aligns with the new API.
87-87: LGTM: builder usageConsistent migration; no functional changes.
143-143: LGTM: builder usageConsistent across tests; good.
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala (2)
92-101: Variable-length extractor selection—LGTMCorrectly exercises VarLen path and asserts TextFullRecordExtractor.
106-116: Unsupported format error path—LGTMException type and message check are appropriate.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (4)
50-58: Sane defaults in builder state—LGTMZero start, default decoders, empty additionalInfo/options are reasonable.
60-73: withReaderParams correctly propagates additionalInfo, RDW/BDW, and options—LGTMThis satisfies the PR objective of making options available to custom extractors via RawRecordContext.
119-137: Builder overload with explicit startingRecordNumber—LGTMThe API is clear and validates non-negative values.
136-137: Convenience builder that derives headerStream—LGTMReasonable default to copy the stream for headers.
| def build(): CobolProcessor = { | ||
| new CobolProcessor { | ||
| override def process(inputStream: SimpleStream, | ||
| outputStream: OutputStream) | ||
| (rawRecordProcessor: RawRecordProcessor): Unit = { | ||
| val readerParameters = getReaderParameters | ||
| val cobolSchema = getCobolSchema(readerParameters) | ||
| val recordExtractor = getRecordExtractor(readerParameters, inputStream) | ||
|
|
||
| val dataStream = inputStream.copyStream() | ||
| try { | ||
| StreamProcessor.processStream(cobolSchema.copybook, | ||
| caseInsensitiveOptions.toMap, | ||
| dataStream, | ||
| recordExtractor, | ||
| rawRecordProcessor, | ||
| outputStream) | ||
| } finally { | ||
| dataStream.close() | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Snapshot options at build-time; precompute parameters/schema for immutability and perf
Currently, process() re-parses options and schema on each call and reads from the mutable builder map, so later option(s) mutations affect an already-built processor (surprising, not thread-safe). Snapshot options in build(), precompute readerParameters and cobolSchema, and pass them to getRecordExtractor.
Apply:
- def build(): CobolProcessor = {
- new CobolProcessor {
- override def process(inputStream: SimpleStream,
- outputStream: OutputStream)
- (rawRecordProcessor: RawRecordProcessor): Unit = {
- val readerParameters = getReaderParameters
- val cobolSchema = getCobolSchema(readerParameters)
- val recordExtractor = getRecordExtractor(readerParameters, inputStream)
-
- val dataStream = inputStream.copyStream()
- try {
- StreamProcessor.processStream(cobolSchema.copybook,
- caseInsensitiveOptions.toMap,
- dataStream,
- recordExtractor,
- rawRecordProcessor,
- outputStream)
- } finally {
- dataStream.close()
- }
- }
- }
- }
+ def build(): CobolProcessor = {
+ // Immutable snapshot for this processor instance
+ val optionsSnapshot = caseInsensitiveOptions.toMap
+ val readerParameters = {
+ val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot))
+ CobolParametersParser.getReaderProperties(cobolParams, None)
+ }
+ val cobolSchema = getCobolSchema(readerParameters)
+
+ new CobolProcessor {
+ override def process(inputStream: SimpleStream,
+ outputStream: OutputStream)
+ (rawRecordProcessor: RawRecordProcessor): Unit = {
+ val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream)
+
+ val dataStream = inputStream.copyStream()
+ try {
+ StreamProcessor.processStream(
+ cobolSchema.copybook,
+ optionsSnapshot,
+ dataStream,
+ recordExtractor,
+ rawRecordProcessor,
+ outputStream
+ )
+ } finally {
+ dataStream.close()
+ }
+ }
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def build(): CobolProcessor = { | |
| new CobolProcessor { | |
| override def process(inputStream: SimpleStream, | |
| outputStream: OutputStream) | |
| (rawRecordProcessor: RawRecordProcessor): Unit = { | |
| val readerParameters = getReaderParameters | |
| val cobolSchema = getCobolSchema(readerParameters) | |
| val recordExtractor = getRecordExtractor(readerParameters, inputStream) | |
| val dataStream = inputStream.copyStream() | |
| try { | |
| StreamProcessor.processStream(cobolSchema.copybook, | |
| caseInsensitiveOptions.toMap, | |
| dataStream, | |
| recordExtractor, | |
| rawRecordProcessor, | |
| outputStream) | |
| } finally { | |
| dataStream.close() | |
| } | |
| } | |
| } | |
| } | |
| def build(): CobolProcessor = { | |
| // Immutable snapshot for this processor instance | |
| val optionsSnapshot = caseInsensitiveOptions.toMap | |
| val readerParameters = { | |
| val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot)) | |
| CobolParametersParser.getReaderProperties(cobolParams, None) | |
| } | |
| val cobolSchema = getCobolSchema(readerParameters) | |
| new CobolProcessor { | |
| override def process(inputStream: SimpleStream, | |
| outputStream: OutputStream) | |
| (rawRecordProcessor: RawRecordProcessor): Unit = { | |
| val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream) | |
| val dataStream = inputStream.copyStream() | |
| try { | |
| StreamProcessor.processStream( | |
| cobolSchema.copybook, | |
| optionsSnapshot, | |
| dataStream, | |
| recordExtractor, | |
| rawRecordProcessor, | |
| outputStream | |
| ) | |
| } finally { | |
| dataStream.close() | |
| } | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 53–75, snapshot mutable builder state at build time by computing
readerParameters and cobolSchema once in build() (and copying
caseInsensitiveOptions to an immutable Map), then return a CobolProcessor whose
process() uses those precomputed values instead of calling
getReaderParameters/getCobolSchema on each invocation; keep getRecordExtractor
but call it with the precomputed readerParameters (and the runtime inputStream)
so later mutations to the builder do not affect an already-built processor and
behavior becomes thread-safe and stable.
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { | ||
| val dataStream = inputStream.copyStream() | ||
| val headerStream = inputStream.copyStream() | ||
|
|
||
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | ||
|
|
||
| reader.recordExtractor(0, dataStream, headerStream) match { | ||
| case Some(extractor) => extractor | ||
| case None if readerParameters.recordFormat == FixedLength => | ||
| val dataStream = inputStream.copyStream() | ||
| val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook) | ||
| .withReaderParams(readerParameters) | ||
| .build() | ||
| new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength) | ||
| case None => | ||
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | ||
| "Please check the copybook and the reader parameters." | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reuse streams and close unused header stream; extend signature to avoid re-parsing schema
- Avoid re-creating
dataStreamin the FixedLength fallback and close the unusedheaderStream. - Close both streams when failing to create an extractor.
- Accept
cobolSchemato avoid a second schema build.
- private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
+ private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+ cobolSchema: CobolSchema,
+ inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None if readerParameters.recordFormat == FixedLength =>
- val dataStream = inputStream.copyStream()
- val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook)
+ // Release the header stream; reuse the data stream for the fixed extractor.
+ headerStream.close()
+ val ctx = RawRecordContext.builder(dataStream, cobolSchema.copybook)
.withReaderParams(readerParameters)
.build()
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
case None =>
+ headerStream.close()
+ dataStream.close()
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { | |
| val dataStream = inputStream.copyStream() | |
| val headerStream = inputStream.copyStream() | |
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | |
| reader.recordExtractor(0, dataStream, headerStream) match { | |
| case Some(extractor) => extractor | |
| case None if readerParameters.recordFormat == FixedLength => | |
| val dataStream = inputStream.copyStream() | |
| val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook) | |
| .withReaderParams(readerParameters) | |
| .build() | |
| new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength) | |
| case None => | |
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | |
| "Please check the copybook and the reader parameters." | |
| ) | |
| } | |
| } | |
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, | |
| cobolSchema: CobolSchema, | |
| inputStream: SimpleStream): RawRecordExtractor = { | |
| val dataStream = inputStream.copyStream() | |
| val headerStream = inputStream.copyStream() | |
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | |
| reader.recordExtractor(0, dataStream, headerStream) match { | |
| case Some(extractor) => | |
| extractor | |
| case None if readerParameters.recordFormat == FixedLength => | |
| // Release the header stream; reuse the data stream for the fixed extractor. | |
| headerStream.close() | |
| val ctx = RawRecordContext.builder(dataStream, cobolSchema.copybook) | |
| .withReaderParams(readerParameters) | |
| .build() | |
| new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength) | |
| case None => | |
| headerStream.close() | |
| dataStream.close() | |
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | |
| "Please check the copybook and the reader parameters." | |
| ) | |
| } | |
| } |
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 111 to 130, the method creates two stream copies but re-creates
dataStream in the FixedLength fallback, never closes the unused headerStream,
and calls getCobolSchema again; change the method signature to accept a
cobolSchema parameter, reuse the already-created dataStream for the FixedLength
fallback (do not recreate it), ensure headerStream is closed when it is not
used, and close both streams in the failure path (use try/finally or a managed
resource pattern) so no streams leak while avoiding a second schema build.
…e other record extractors are created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
169-191: Ensure recordExtractor’s streams are closed after index generation (VarLenNestedReader.scala:169–191). RawRecordExtractor has no close() contract and extractors retain dataStream/headerStream, leading to resource leaks. Make RawRecordExtractor extend AutoCloseable (implement close() in all extractors) or explicitly close the underlying streams after calling IndexGenerator.sparseIndexGenerator.
♻️ Duplicate comments (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (2)
52-74: Snapshot options and precompute parameters/schema at build-time (thread-safety + perf).Avoid re-parsing options and schema per process() and builder mutations affecting built processors. This repeats a prior suggestion.
- def build(): CobolProcessor = { - new CobolProcessor { + def build(): CobolProcessor = { + // Immutable snapshot + val optionsSnapshot = caseInsensitiveOptions.toMap + val readerParameters = { + val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot)) + CobolParametersParser.getReaderProperties(cobolParams, None) + } + val cobolSchema = getCobolSchema(readerParameters) + new CobolProcessor { override def process(inputStream: SimpleStream, outputStream: OutputStream) (rawRecordProcessor: RawRecordProcessor): Unit = { - val readerParameters = getReaderParameters - val cobolSchema = getCobolSchema(readerParameters) - val recordExtractor = getRecordExtractor(readerParameters, inputStream) + val recordExtractor = getRecordExtractor(readerParameters, cobolSchema, inputStream) val dataStream = inputStream.copyStream() try { StreamProcessor.processStream(cobolSchema.copybook, - caseInsensitiveOptions.toMap, + optionsSnapshot, dataStream, recordExtractor, rawRecordProcessor, outputStream) } finally { dataStream.close() } } } }
110-123: Manage stream lifecycle and avoid duplicate stream creation in fallback.Close header/data streams on failure and reuse existing dataStream in FixedLength fallback; also accept cobolSchema to avoid extra parse. Mirrors prior feedback.
- private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { - val dataStream = inputStream.copyStream() - val headerStream = inputStream.copyStream() + private[processor] def getRecordExtractor(readerParameters: ReaderParameters, + cobolSchema: CobolSchema, + inputStream: SimpleStream): RawRecordExtractor = { + val dataStream = inputStream.copyStream() + val headerStream = inputStream.copyStream() val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) - reader.recordExtractor(0, dataStream, headerStream) match { - case Some(extractor) => extractor - case None => - throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + - "Please check the copybook and the reader parameters." - ) - } + reader.recordExtractor(0, dataStream, headerStream) match { + case Some(extractor) => extractor + case None if readerParameters.recordFormat == za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength => + // Release header stream; reuse data stream + headerStream.close() + val ctx = za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordContext + .builder(dataStream, cobolSchema.copybook) + .withReaderParams(readerParameters) + .build() + new za.co.absa.cobrix.cobol.reader.extractors.raw.FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength) + case None => + try { + headerStream.close() + dataStream.close() + } finally { + throw new IllegalArgumentException( + "Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters." + ) + } + } }
🧹 Nitpick comments (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
74-77: Guard against missing BDW when FixedBlock is selected.Avoid bdwOpt.get throwing NoSuchElementException; fail fast with a clear message.
- case None if readerProperties.recordFormat == FixedBlock => - val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock) + case None if readerProperties.recordFormat == FixedBlock => + require(bdwOpt.isDefined, "FixedBlock record format requires BDW parameters (bdw).") + val fb = bdwOpt.get + val fbParams = FixedBlockParameters(readerProperties.recordLength, fb.blockLength, fb.recordsPerBlock) FixedBlockParameters.validate(fbParams) Some(new FixedBlockRawRecordExtractor(reParams, fbParams))cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (2)
26-27: Make case-insensitive handling locale-safe and trim keys in bulk options.Use Locale.ROOT to avoid locale traps (e.g., Turkish i) and trim keys in options().
import java.io.OutputStream +import java.util.Locale @@ - def option(key: String, value: String): CobolProcessorBuilder = { + def option(key: String, value: String): CobolProcessorBuilder = { require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") - caseInsensitiveOptions += (key.trim.toLowerCase -> value) + caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value) this } @@ - def options(options: Map[String, String]): CobolProcessorBuilder = { - caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) + def options(options: Map[String, String]): CobolProcessorBuilder = { + caseInsensitiveOptions ++= options.iterator.map { case (k, v) => (k.trim.toLowerCase(Locale.ROOT), v) } this }Also applies to: 83-87, 95-98
49-50: Minor naming consistency.Parameter is singular (copybookContent) in builder() but plural (copybookContents) in class ctor. Consider consistent naming to reduce confusion.
Also applies to: 128-130
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala (6)
RawRecordContext(35-44)RawRecordContext(46-139)builder(119-126)builder(136-137)withReaderParams(60-73)build(101-103)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (1)
FixedLength(22-22)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
ArrayOfAnyHandler(27-33)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-65)processStream(38-64)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
VarLenNestedReader(42-274)recordExtractor(54-89)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
CobolParametersParser(39-1004)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
Parameters(27-98)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
fromReaderParameters(75-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
🔇 Additional comments (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
58-60: Good move to the builder and options propagation.Switching to RawRecordContext.builder(...).withReaderParams(readerProperties).build() centralizes RDW/BDW setup and correctly forwards options to custom extractors.
69-73: Double-check extractor choice for VariableLength + lengthFieldExpression.Using FixedWithRecordLengthExprRawRecordExtractor for VariableLength may be intentional, but the naming is misleading. Please confirm behavior (esp. header handling) matches expectations for VL records with a length expression.
84-85: Nice addition: fixed-length extractor path.Covers FixedLength without length expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)
52-55: Snapshot options at build time; keep runtime behavior deterministic and thread-safe
process()readscaseInsensitiveOptions.toMapfrom the mutable builder, which can diverge from the options used to computereaderParameters/cobolSchema. Snapshot once inbuild()and use that snapshot throughout.- def build(): CobolProcessor = { - val readerParameters = getReaderParameters - val cobolSchema = getCobolSchema(readerParameters) + def build(): CobolProcessor = { + val optionsSnapshot = caseInsensitiveOptions.toMap + val readerParameters = { + val cobolParams = CobolParametersParser.parse(new Parameters(optionsSnapshot)) + CobolParametersParser.getReaderProperties(cobolParams, None) + } + val cobolSchema = getCobolSchema(readerParameters) new CobolProcessor { override def process(inputStream: SimpleStream, outputStream: OutputStream) (rawRecordProcessor: RawRecordProcessor): Unit = { - val recordExtractor = getRecordExtractor(readerParameters, inputStream) + // Streams and extractor are created/closed in process() val dataStream = inputStream.copyStream() try { StreamProcessor.processStream(cobolSchema.copybook, - caseInsensitiveOptions.toMap, + optionsSnapshot, dataStream, recordExtractor, rawRecordProcessor, outputStream) } finally { dataStream.close() } } } }Also applies to: 64-66
🧹 Nitpick comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)
84-87: Locale-safe lowercasing for option keysUse
Locale.ROOTto avoid locale-specific surprises (e.g., Turkish i).+import java.util.Locale @@ - caseInsensitiveOptions += (key.trim.toLowerCase -> value) + caseInsensitiveOptions += (key.trim.toLowerCase(Locale.ROOT) -> value) @@ - caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) + caseInsensitiveOptions ++= options.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }Also applies to: 96-99, 26-27
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(1 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (2)
generateIndex(137-192)isRdwBigEndian(97-97)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala (1)
close(107-112)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala (1)
close(41-47)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (6)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala (1)
ArrayOfAnyHandler(27-33)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-65)processStream(38-64)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (3)
VarLenNestedReader(42-274)getCobolSchema(91-91)recordExtractor(54-89)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
CobolParametersParser(39-1004)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Parameters.scala (1)
Parameters(27-98)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
fromReaderParameters(75-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
🔇 Additional comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
166-171: Good fix: ensure streams are closed even on failuresWrapping
reader.generateIndex(...)withtry/finallyand closing bothinputStreamandheaderStreamprevents leaks during index generation. Looks correct and low-risk.
| new CobolProcessor { | ||
| override def process(inputStream: SimpleStream, | ||
| outputStream: OutputStream) | ||
| (rawRecordProcessor: RawRecordProcessor): Unit = { | ||
| val recordExtractor = getRecordExtractor(readerParameters, inputStream) | ||
|
|
||
| val dataStream = inputStream.copyStream() | ||
| try { | ||
| StreamProcessor.processStream(cobolSchema.copybook, | ||
| caseInsensitiveOptions.toMap, | ||
| dataStream, | ||
| recordExtractor, | ||
| rawRecordProcessor, | ||
| outputStream) | ||
| } finally { | ||
| dataStream.close() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close all opened streams; avoid leaking header/data streams created for the extractor
process() closes only the dataStream it creates, but the extractor’s own dataStream and headerStream created in getRecordExtractor() are never closed. Either create both streams in process() and pass them into getRecordExtractor(...), or return the streams from getRecordExtractor(...) so process() can close them in a finally.
Apply:
- override def process(inputStream: SimpleStream,
- outputStream: OutputStream)
- (rawRecordProcessor: RawRecordProcessor): Unit = {
- val recordExtractor = getRecordExtractor(readerParameters, inputStream)
-
- val dataStream = inputStream.copyStream()
- try {
- StreamProcessor.processStream(cobolSchema.copybook,
- caseInsensitiveOptions.toMap,
- dataStream,
- recordExtractor,
- rawRecordProcessor,
- outputStream)
- } finally {
- dataStream.close()
- }
- }
+ override def process(inputStream: SimpleStream,
+ outputStream: OutputStream)
+ (rawRecordProcessor: RawRecordProcessor): Unit = {
+ val dataStream = inputStream.copyStream()
+ val headerStream = inputStream.copyStream()
+ val recordExtractor = getRecordExtractor(readerParameters, dataStream, headerStream)
+ try {
+ StreamProcessor.processStream(
+ cobolSchema.copybook,
+ optionsSnapshot,
+ dataStream,
+ recordExtractor,
+ rawRecordProcessor,
+ outputStream
+ )
+ } finally {
+ try headerStream.close() finally dataStream.close()
+ }
+ }Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 56 to 74, process(...) currently only closes the dataStream it
creates but not the extractor’s internal dataStream and headerStream, leaking
streams; change the implementation so process() is responsible for creating and
closing all streams: either (A) refactor getRecordExtractor(...) to return both
the extractor and the two streams (or a small case class/tuple containing
extractor, dataStream, headerStream) so process() can register them and close
them in the finally block, or (B) change getRecordExtractor(...) to accept the
already-created data and header streams as parameters; in either case ensure all
streams are closed in the finally (null/Option-safe) and preserve original
exception behavior by closing in reverse-open order and suppressing/adding
suppressed exceptions appropriately.
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { | ||
| val dataStream = inputStream.copyStream() | ||
| val headerStream = inputStream.copyStream() | ||
|
|
||
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | ||
|
|
||
| reader.recordExtractor(0, dataStream, headerStream) match { | ||
| case Some(extractor) => extractor | ||
| case None => | ||
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | ||
| "Please check the copybook and the reader parameters." | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Align getRecordExtractor with stream lifecycle; close on failure
Change the signature to accept streams created by process() and ensure they’re closed on the error path.
- private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
- val dataStream = inputStream.copyStream()
- val headerStream = inputStream.copyStream()
-
- val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
-
- reader.recordExtractor(0, dataStream, headerStream) match {
- case Some(extractor) => extractor
- case None =>
- throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
- "Please check the copybook and the reader parameters."
- )
- }
- }
+ private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+ dataStream: SimpleStream,
+ headerStream: SimpleStream): RawRecordExtractor = {
+ val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
+ reader.recordExtractor(0, dataStream, headerStream) match {
+ case Some(extractor) => extractor
+ case None =>
+ try headerStream.close() finally dataStream.close()
+ throw new IllegalArgumentException(
+ s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
+ )
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { | |
| val dataStream = inputStream.copyStream() | |
| val headerStream = inputStream.copyStream() | |
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | |
| reader.recordExtractor(0, dataStream, headerStream) match { | |
| case Some(extractor) => extractor | |
| case None => | |
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | |
| "Please check the copybook and the reader parameters." | |
| ) | |
| } | |
| } | |
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, | |
| dataStream: SimpleStream, | |
| headerStream: SimpleStream): RawRecordExtractor = { | |
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | |
| reader.recordExtractor(0, dataStream, headerStream) match { | |
| case Some(extractor) => extractor | |
| case None => | |
| // Ensure both streams are closed before bubbling up the error | |
| try headerStream.close() finally dataStream.close() | |
| throw new IllegalArgumentException( | |
| s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters." | |
| ) | |
| } | |
| } |
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 111 to 124, update getRecordExtractor to accept the actual streams
produced by process() (i.e., accept the data/header SimpleStream instances
created upstream rather than creating copies internally) and ensure both streams
are closed on the error path: if reader.recordExtractor returns None, close
dataStream and headerStream (or call their close/copyStream close equivalent)
before throwing the IllegalArgumentException; keep normal success path returning
the extractor without closing the streams.
Closes #780
Summary by CodeRabbit
New Features
Refactor
Tests