-
Notifications
You must be signed in to change notification settings - Fork 86
#795 Add the ability to process EBCDIC files in-place and convert them to VRL format #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR introduces a pluggable EBCDIC processing strategy system for COBOL record transformation. It defines a new Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Builder as CobolProcessor.Builder
participant Strategy as CobolProcessingStrategy
participant Loader as CobolProcessorLoader
participant Processor as CobolProcessor<br/>(InPlace|ToRdw)
participant StreamProc as StreamProcessor
App->>Builder: withProcessingStrategy(strategy)
Builder->>Builder: store strategy
App->>Builder: load(files)
Builder->>Loader: create with strategy
App->>Loader: save(output)
Loader->>Strategy: match on strategy
alt InPlace
Loader->>Processor: new CobolProcessorInPlace()
else ToVariableLength
Loader->>Processor: new CobolProcessorToRdw()
end
Processor->>StreamProc: select processing method
alt InPlace
StreamProc->>StreamProc: processStreamInPlace()
else ToVariableLength
StreamProc->>StreamProc: processStreamToRdw()
end
StreamProc-->>Processor: record count
Processor-->>Loader: completed
sequenceDiagram
participant App as Application
participant SparkBuilder as SparkCobolProcessor.Builder
participant SparkLoader as SparkCobolProcessorLoader
participant CoblProcBase as CobolProcessorBase
participant IndexBuilder as IndexBuilder
participant RDD as RDD[Array[Byte]]
App->>SparkBuilder: withProcessingStrategy(strategy)
App->>SparkBuilder: load(files)
SparkBuilder->>SparkLoader: create with strategy & optional processor
App->>SparkLoader: toRDD()
alt Enable Indexes
SparkLoader->>IndexBuilder: build index from files
IndexBuilder-->>SparkLoader: index metadata
SparkLoader->>CoblProcBase: getRecordExtractor(indexed ranges)
else Disable Indexes
SparkLoader->>SparkLoader: getRecordRdd(whole files)
end
SparkLoader->>RDD: create from record extractors
RDD-->>App: RDD[Array[Byte]]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Areas requiring extra attention:
Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
JaCoCo code coverage report - 'cobol-parser'
|
JaCoCo code coverage report - 'spark-cobol'
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (1)
83-89: Remove duplicate validation check.Lines 83-85 and 87-89 both check if
rawRecordProcessorOpt.isEmpty. This is redundant.Apply this diff:
if (rawRecordProcessorOpt.isEmpty) { throw new IllegalArgumentException("A RawRecordProcessor must be provided.") } - if (rawRecordProcessorOpt.isEmpty) { - throw new IllegalArgumentException("A RawRecordProcessor must be provided.") - } -
🧹 Nitpick comments (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (1)
33-38: Consider stream copying efficiency.When
headerStreamOptisNone, this method creates two copies ofinputStream(lines 34 and 37). Additionally, calling code inCobolProcessorInPlace.scala(line 49) creates yet another copy. This could be inefficient for large streams.Consider whether all these copies are necessary, or if the logic could be optimized to reduce redundant stream duplication.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (1)
47-60: Potential redundant stream copy.Line 49 creates
dataStreamas a copy ofinputStream, but line 47's call toCobolProcessorBase.getRecordExtractoralready creates copies of the input stream internally (lines 34 and 37 of CobolProcessorBase.scala).Consider whether the
dataStreamcopy at line 49 is necessary, or if you can passinputStreamdirectly toprocessStreamInPlaceto avoid redundant copying.spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (1)
198-203: Consider documenting ignoreRedundantOptions parameter.Line 199 adds a
PARAM_GENERATE_RECORD_IDoption and line 201 uses!ignoreRedundantOptionsfor validation. The logic is correct, but it might be helpful to add a comment explaining when redundant options should be ignored vs validated.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
README.md(5 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessingStrategy.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala(6 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala(1 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala(2 hunks)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala(4 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala(3 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlaceSuite.scala(1 hunks)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdwSuite.scala(1 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala(6 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala(2 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala(6 hunks)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala(1 hunks)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (13)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(54-89)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdwSuite.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (8)
processor(136-138)processor(140-144)processor(146-146)CobolProcessor(48-206)builder(203-205)withProcessingStrategy(107-110)withCopybookContents(97-100)build(55-67)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
ByteStreamMock(21-55)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
CobolProcessorContext(21-23)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala (1)
process(42-52)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (2)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)
sqlContext(121-121)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala (3)
FileUtils(35-235)getFiles(51-53)getFiles(60-84)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/BinaryUtils.scala (1)
cobol(165-209)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/internal/Logging.scala (1)
logger(33-38)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (1)
CobolProcessorInPlace(40-63)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala (1)
CobolProcessorToRdw(38-53)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessingStrategy.scala (3)
CobolProcessingStrategy(21-24)InPlace(22-22)ToVariableLength(23-23)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (3)
withProcessingStrategy(99-102)options(131-134)save(144-166)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (2)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (3)
CobolProcessorBase(30-30)CobolProcessorBase(32-50)getRecordExtractor(33-49)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-108)processStreamInPlace(39-70)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala (5)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (1)
process(44-60)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala (1)
recordExtractor(54-89)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (3)
CobolProcessorBase(30-30)CobolProcessorBase(32-50)getRecordExtractor(33-49)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (2)
StreamProcessor(26-108)processStreamToRdw(83-106)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlaceSuite.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (7)
processor(136-138)processor(140-144)processor(146-146)CobolProcessor(48-206)withProcessingStrategy(107-110)withCopybookContents(97-100)build(55-67)cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala (1)
ByteStreamMock(21-55)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (1)
process(44-60)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (4)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (10)
processor(136-138)processor(140-144)processor(146-146)save(156-200)withRecordProcessor(102-105)builder(203-205)withCopybookContents(97-100)withProcessingStrategy(107-110)load(69-95)option(119-123)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessingStrategy.scala (2)
CobolProcessingStrategy(21-24)ToVariableLength(23-23)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (11)
save(144-166)withRecordProcessor(94-97)SparkCobolProcessor(55-322)builder(180-182)withCopybookContents(89-92)withProcessingStrategy(99-102)load(66-71)load(73-87)option(119-123)getParsedCopybook(168-172)toRDD(174-177)spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala (3)
withTempDirectory(71-78)writeBinaryFile(116-118)readBinaryFile(120-122)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala (1)
CobolProcessorContext(21-23)
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala (3)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (3)
CobolProcessorBase(30-30)CobolProcessorBase(32-50)getRecordExtractor(33-49)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala (1)
ReaderParameters(80-135)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala (2)
RecordFormat(21-41)VariableLength(24-24)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (8)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (12)
cobol(47-331)cobol(68-107)cobol(113-133)cobol(138-153)cobol(155-180)cobol(182-204)cobol(206-270)cobol(272-292)cobol(297-302)cobol(307-319)cobol(321-323)buildIndex(48-62)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (3)
CobolProcessorBase(30-30)CobolProcessorBase(32-50)getRecordExtractor(33-49)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/entry/SparseIndexEntry.scala (1)
SparseIndexEntry(19-19)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (3)
CobolParametersParser(39-1008)getReaderProperties(366-453)parse(225-311)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala (1)
CobolParameters(68-111)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (7)
schema(78-80)source(105-112)CobolRelation(65-113)CobolRelation(115-132)SerializableConfiguration(38-56)getListFilesWithOrder(122-131)isRecursiveRetrieval(100-103)spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (6)
schema(101-101)schema(122-122)DefaultSource(38-129)DefaultSource(131-189)createVariableLengthReader(180-188)sqlContext(121-121)cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala (1)
fromReaderParameters(75-150)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Spark 2.4.8 on Scala 2.11.12
- GitHub Check: test (2.12.20, 2.12, 3.3.4, 0, 80, 20)
- GitHub Check: Spark 3.5.7 on Scala 2.12.20
- GitHub Check: Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Spark 3.5.7 on Scala 2.13.17
🔇 Additional comments (22)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala (1)
47-47: LGTM! Visibility narrowing aligns with package encapsulation.The change from
private[source]toprivate[cobol]appropriately restricts the visibility ofIndexBuilderto the cobol package scope, supporting better encapsulation as index-based processing evolves.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlaceSuite.scala (1)
1-60: LGTM! Well-structured test for InPlace processing strategy.The test effectively validates the InPlace processing path with:
- Clear arrangement of test data (4 input bytes)
- Explicit strategy selection via
CobolProcessingStrategy.InPlace- Simple transformation logic (decrement each byte)
- Comprehensive assertions on count, length, and output values
The test aligns well with the new strategy-based processing architecture.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessingStrategy.scala (1)
19-24: LGTM! Clean strategy pattern implementation.The abstraction is well-designed:
- Minimal trait definition provides clear type hierarchy
- Two concrete strategies (InPlace and ToVariableLength) clearly express intent
- No unnecessary complexity or premature abstraction
This provides a solid foundation for strategy-driven processing paths throughout the codebase.
README.md (2)
1665-1666: LGTM! Documentation clearly explains the new processing strategies.The updated description effectively communicates:
- InPlace strategy preserves original format
- ToVariableLength strategy converts to VRL with RDWs
- Clear distinction between the two approaches
1747-1774: LGTM! Well-documented experimental RDD feature.The new section provides:
- Clear explanation of the feature's purpose
- Complete working example showing copybook parsing, RDD creation, and field extraction
- Practical use case (extracting segment identifiers)
The example aligns with the new
getParsedCopybookandtoRDDmethods introduced in SparkCobolProcessor.cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdwSuite.scala (1)
34-58: LGTM! Test correctly validates RDW format conversion.The test properly verifies ToVariableLength processing:
- Transforms 4 single-byte records into 2-byte records with RDW headers
- Expected output: 4 records × (4-byte RDW + 2-byte payload) = 24 bytes
- RDW format validation:
[0, 2, 0, 0]indicates big-endian length of 2 bytesThe test aligns with the RDW generation logic in StreamProcessor.processStreamToRdw.
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala (3)
109-150: LGTM! Comprehensive end-to-end test for ToVariableLength strategy.This test effectively validates:
- RDW header generation with proper format (big-endian, 4-byte headers)
- Round-trip verification by reading the output file back via Spark COBOL format
- JSON representation matches expected transformed values
The validation approach using Spark's COBOL reader provides strong confidence in the RDW format correctness.
153-184: LGTM! RDD generation test validates non-indexed path.The test confirms:
- RDD can be generated directly via
toRDDmethod- Parsed copybook structure is accessible via
getParsedCopybook- Record count and distinct values are correct
- Non-indexed processing works as expected
186-215: LGTM! Index-based RDD generation test validates indexed path.The test verifies:
- Index generation works with
enable_indexes=trueandinput_split_records=2- Record extraction is correct across index boundaries
- Partitioning logic respects the split configuration
This complements the non-indexed test and validates the index-aware processing path.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala (1)
42-52: LGTM! Clean implementation of ToVariableLength strategy.The implementation correctly:
- Reuses record extraction via
CobolProcessorBase.getRecordExtractor- Delegates to
StreamProcessor.processStreamToRdwfor RDW format conversion- Avoids unnecessary stream copying (unlike InPlace which needs to preserve headers)
The simpler stream handling compared to CobolProcessorInPlace is appropriate since RDW conversion doesn't need to preserve original record boundaries.
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala (1)
74-74: LGTM! Refactoring improves code reusability.The extraction of file listing logic to a public companion object method:
- Makes file discovery logic reusable across components (e.g., SparkCobolProcessor)
- Maintains identical functionality while improving organization
- Follows idiomatic Scala patterns for utility methods
- Supports the broader index-aware processing infrastructure
Also applies to: 115-131
cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala (1)
26-26: LGTM! Clean test refactoring.The test class has been properly renamed and updated to use the new
CobolProcessorBase.getRecordExtractorAPI. All test cases consistently pass the required parameters and maintain good coverage of fixed-length, variable-length, and error scenarios.Also applies to: 36-36, 49-52, 61-64
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala (1)
40-48: LGTM! Clear error handling.The record extractor creation logic is well-structured with a clear error message that guides users to check their configuration.
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala (1)
39-70: LGTM! Header/footer preservation in place.The renamed
processStreamInPlacemethod correctly preserves headers and footers from the input stream while processing records in place. The logic properly handles offset calculations and maintains the original format.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala (1)
225-225: LGTM! Clean conditional validation.The addition of the
validateRedundantOptionsparameter with a default value oftruepreserves backward compatibility while allowing callers to skip redundant option validation when needed. This is useful for scenarios where options are dynamically extended or when using custom processors.Also applies to: 309-309, 756-756, 949-961
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala (1)
40-43: LGTM! Clear class naming and inheritance.The class rename from
CobolProcessorImpltoCobolProcessorInPlacebetter reflects its purpose, and extendingCobolProcessorBasepromotes code reuse.cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala (2)
53-53: LGTM! Clean strategy pattern implementation.The addition of the processing strategy pattern with a sensible default (
InPlace) is well-designed. The builder pattern integration is clean and allows flexible processor instantiation.Also applies to: 63-66, 107-110
154-160: LGTM! Strategy-based processor instantiation.The loader correctly instantiates the appropriate processor implementation based on the selected strategy.
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala (1)
131-132: Public API expansion - verify intent.The helper methods
buildEitherReader,createTextReader,createFixedLengthReader, andcreateVariableLengthReaderare now public in the companion object. This expands the public API surface.Ensure this is intentional and that these methods are meant to be used externally. If they're only for internal use, consider keeping them private or package-private.
Also applies to: 139-139, 157-157, 167-167, 180-180
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala (3)
63-63: LGTM! Good design: optional processor at load, required at save.Making
rawRecordProcessoroptional at load time but required atsave()time is a good design decision. This enables use cases like schema inspection viagetParsedCopybookwithout requiring a processor upfront.Also applies to: 99-102, 139-140, 145-147
168-177: LGTM! Useful utility methods added.The
getParsedCopybookandtoRDDmethods provide valuable utilities for advanced use cases, allowing users to inspect the schema and generate RDDs without persisting to disk.
215-246: LGTM! Well-structured index-aware processing.The
getRecordRddmethod correctly handles both indexed and non-indexed processing paths, properly usingheaderStreamfor indexed paths and omitting it for non-indexed paths. The logging provides good visibility into the processing logic.
| def processStreamToRdw(copybook: Copybook, | ||
| options: Map[String, String], | ||
| recordExtractor: RawRecordExtractor, | ||
| recordProcessor: RawRecordProcessor, | ||
| outputStream: OutputStream): Long = { | ||
| var recordCount = 0L | ||
|
|
||
| while (recordExtractor.hasNext) { | ||
| recordCount += 1 | ||
| val record = recordExtractor.next() | ||
| val recordSize = record.length | ||
|
|
||
| val ctx = CobolProcessorContext(copybook, options, recordExtractor.offset) | ||
|
|
||
| val updatedRecord = recordProcessor.processRecord(record, ctx) | ||
|
|
||
| val rdw = Array[Byte](((updatedRecord.length >> 8) & 0xFF).toByte, ((updatedRecord.length) & 0xFF).toByte, 0, 0) | ||
|
|
||
| outputStream.write(rdw) | ||
| outputStream.write(updatedRecord) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify RDW length calculation includes RDW header.
Line 99 calculates the RDW using only updatedRecord.length. According to IBM mainframe VRL standards, the RDW length field typically includes the 4-byte RDW header itself. This means the length should be updatedRecord.length + 4.
Verify that the current calculation matches your intended RDW format specification. If this needs to be compatible with standard mainframe VRL readers, the length should likely include the RDW header.
🏁 Script executed:
#!/bin/bash
# Check if there are any tests or documentation that clarify the expected RDW format
rg -n -C3 "RDW|rdw_adjustment|Record Descriptor Word" --type=scalaLength of output: 131567
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Find tests for StreamProcessor.processStreamToRdw method
rg -n "processStreamToRdw|StreamProcessor" --type=scala -A5 -B5 | head -100Length of output: 6151
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Search for tests of CobolProcessorToRdw
find . -name "*Test*" -type f -exec grep -l "CobolProcessorToRdw\|processStreamToRdw" {} \;Length of output: 0
[/web_search]
🌐 Web query:
IBM mainframe RDW Record Descriptor Word length field includes header standard
💡 Result:
Short answer: The RDW is 4 bytes and the length field (the first two bytes of the RDW) contains the logical record length including those 4 RDW bytes. The second two RDW bytes are the segment/spanning descriptor (zero if not spanned). [1][2]
Sources:
- IBM z/OS SMF header documentation (RDW description). [1]
- Variable-record format / RECFM documentation explaining 4‑byte RDW and that LRECL includes the 4 bytes. [2]
Correct the RDW length calculation to include the 4-byte header.
The RDW length field must include the 4-byte RDW header itself per IBM mainframe standard. The RDW length field contains the logical record length including those 4 RDW bytes. Line 99 should calculate the RDW as:
val rdw = Array[Byte](
(((updatedRecord.length + 4) >> 8) & 0xFF).toByte,
((updatedRecord.length + 4) & 0xFF).toByte,
0,
0
)Without this adjustment, the output will not conform to standard mainframe VRL format and will be incompatible with standard RDW readers.
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
around lines 83 to 103, the RDW length calculation currently omits the 4-byte
RDW header; update the RDW byte array computation to use (updatedRecord.length +
4) when computing the two length bytes so the RDW length field equals the
logical record length including the 4-byte header, then write the RDW and record
as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have observed both cases:
- when RDW includes the size of the header itself
- when RDW does not include the header size, only the payload.
Historically, Cobrix uses the second option, and loading files which has RDWs that include header size need an adjustment by -4 (option("rdw_adjustment", "-1")). Going to keep this implementation because it matches Cobrix defaults. But if later different types of RDWs are required, we can always add an option of RDW adjustment for the writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yruslan, thank you for the clarification! That makes perfect sense. Maintaining consistency with Cobrix's historical RDW format (payload-only) ensures backward compatibility and aligns with the existing rdw_adjustment option behavior. The flexibility to add writer-side RDW adjustment options later if needed sounds like a solid approach.
I've noted this convention for future reference.
(\__/)
(='.'=) *noted*
(")_(")
✏️ Learnings added
Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 799
File: cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala:83-103
Timestamp: 2025-11-13T14:28:55.848Z
Learning: In Cobrix, RDW (Record Descriptor Word) length field contains only the payload size, not including the 4-byte RDW header itself. This is a historical convention. When reading files with RDW that includes the header size, users should use `option("rdw_adjustment", "-4")`.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
Closes #795
Summary by CodeRabbit
New Features
Documentation