Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,8 @@ The output looks like this:
`common_extended`, `cp037_extended` are code pages supporting non-printable characters that converts to ASCII codes below 32.

## EBCDIC Processor (experimental)
The EBCDIC processor allows processing files by replacing value of fields without changing the underlying format.
The EBCDIC processor allows processing files by replacing value of fields without changing the underlying format (`CobolProcessingStrategy.InPlace`)
or with conversion of the input format to variable-record-length format with big-endian RDWs (`CobolProcessingStrategy.ToVariableLength`).

The processing does not require Spark. A processing application can have only the COBOL parser as a dependency (`cobol-parser`).

Expand All @@ -1676,6 +1677,7 @@ val builder = CobolProcessor.builder(copybookContents)

val builder = CobolProcessor.builder
.withCopybookContents("...some copybook...")
.withProcessingStrategy(CobolProcessingStrategy.InPlace) // Or CobolProcessingStrategy.ToVariableLength

val processor = new RawRecordProcessor {
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
Expand All @@ -1699,6 +1701,7 @@ import za.co.absa.cobrix.cobol.processor.{CobolProcessor, CobolProcessorContext}

val count = CobolProcessor.builder
.withCopybookContents(copybook)
.withProcessingStrategy(CobolProcessingStrategy.InPlace) // Or CobolProcessingStrategy.ToVariableLength
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
// The transformation logic goes here
val value = copybook.getFieldValueByName("some_field", record, 0)
Expand Down Expand Up @@ -1726,6 +1729,7 @@ val copybookContents = "...some copybook..."

SparkCobolProcessor.builder
.withCopybookContents(copybook)
.withProcessingStrategy(CobolProcessingStrategy.InPlace) // Or CobolProcessingStrategy.ToVariableLength
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
// The transformation logic goes here
val value = ctx.copybook.getFieldValueByName("some_field", record, 0)
Expand All @@ -1740,6 +1744,35 @@ SparkCobolProcessor.builder
.save(outputPath)
```

## EBCDIC Spark raw record RDD generator (experimental)
You can process raw records of a mainframe file as an `RDD[Array[Byte]]`. This can be useful for custom processing without converting
to Spark data types. You can still access fields via parsed copybooks.

Example:
```scala
import org.apache.spark.rdd.RDD
import za.co.absa.cobrix.spark.cobol.SparkCobolProcessor

val copybookContents = "...some copybook..."

val rddBuilder = SparkCobolProcessor.builder
.withCopybookContents(copybookContents)
.option("record_format", "F")
.load("s3://bucket/some/path")

// Fetch the parsed copybook and the RDD separately
val copybook = rddBuilder.getParsedCopybook
val rdd: RDD[Array[Byte]] = rddBuilder.toRDD

val segmentRdds RDD[String] = recordsRdd.flatMap { record =>
val seg = copybook.getFieldValueByName("SEGMENT_ID", record).toString
seg
}

// Print the list of unique segments
segmentRdds.distinct.collect.sorted.foreach(println)
```

## EBCDIC Writer (experimental)

Cobrix's EBCDIC writer is an experimental feature that allows writing Spark DataFrames as EBCDIC mainframe files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor

trait CobolProcessingStrategy

object CobolProcessingStrategy {
case object InPlace extends CobolProcessingStrategy
case object ToVariableLength extends CobolProcessingStrategy
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.cobrix.cobol.processor

import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl
import za.co.absa.cobrix.cobol.processor.impl.{CobolProcessorInPlace, CobolProcessorToRdw}
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}
Expand Down Expand Up @@ -50,6 +50,7 @@ object CobolProcessor {
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
private var copybookContentsOpt: Option[String] = None
private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None
private var cobolProcessingStrategy: CobolProcessingStrategy = CobolProcessingStrategy.InPlace

def build(): CobolProcessor = {
if (copybookContentsOpt.isEmpty) {
Expand All @@ -59,7 +60,10 @@ object CobolProcessor {
val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)

new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
cobolProcessingStrategy match {
case CobolProcessingStrategy.InPlace => new CobolProcessorInPlace(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
case CobolProcessingStrategy.ToVariableLength => new CobolProcessorToRdw(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
}
}

def load(path: String): CobolProcessorLoader = {
Expand Down Expand Up @@ -87,7 +91,7 @@ object CobolProcessor {
val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)

new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, caseInsensitiveOptions.toMap)
new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, cobolProcessingStrategy, caseInsensitiveOptions.toMap)
}

def withCopybookContents(copybookContents: String): CobolProcessorBuilder = {
Expand All @@ -100,6 +104,11 @@ object CobolProcessor {
this
}

def withProcessingStrategy(strategy: CobolProcessingStrategy): CobolProcessorBuilder = {
cobolProcessingStrategy = strategy
this
}

/**
* Adds a single option to the builder.
*
Expand Down Expand Up @@ -142,9 +151,13 @@ object CobolProcessor {
copybook: Copybook,
rawRecordProcessor: RawRecordProcessor,
readerParameters: ReaderParameters,
cobolProcessingStrategy: CobolProcessingStrategy,
options: Map[String, String]) {
def save(outputFile: String): Long = {
val processor = new CobolProcessorImpl(readerParameters, copybook, copybookContents, options)
val processor = cobolProcessingStrategy match {
case CobolProcessingStrategy.InPlace => new CobolProcessorInPlace(readerParameters, copybook, copybookContents, options)
case CobolProcessingStrategy.ToVariableLength => new CobolProcessorToRdw(readerParameters, copybook, copybookContents, options)
}

val ifs = new FSStream(fileToProcess)
val ofs = new BufferedOutputStream(new FileOutputStream(outputFile))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor.impl

import za.co.absa.cobrix.cobol.processor.CobolProcessor
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

/**
* Implements common methods of direct EBCDIC to EBCDIC data processor implementations.
*
* The processing can be done from inside an RDD so this is why it is serializable.
*/
abstract class CobolProcessorBase extends CobolProcessor with Serializable

object CobolProcessorBase {
def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream, headerStreamOpt: Option[SimpleStream]): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = headerStreamOpt match {
case Some(stream) => stream
case None => inputStream.copyStream()
}

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import java.io.OutputStream

/**
* Implementation of the CobolProcessor trait, responsible for processing COBOL data streams
* by extracting records and applying a user-defined raw record processor.
*
* The processing can be done from inside an RDD so this is why it is serializable.
* by extracting records and applying a user-defined raw record processor. This processor
* retains the original COBOL data format in the output.
*
* Please, do not use this class directly. Use `CobolProcessor.builder()` instead.
*
Expand All @@ -38,18 +37,18 @@ import java.io.OutputStream
* @param copybookContents The raw textual representation of the copybook.
* @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`).
*/
class CobolProcessorImpl(readerParameters: ReaderParameters,
copybook: Copybook,
copybookContents: String,
options: Map[String, String]) extends CobolProcessor with Serializable {
class CobolProcessorInPlace(readerParameters: ReaderParameters,
copybook: Copybook,
copybookContents: String,
options: Map[String, String]) extends CobolProcessorBase {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Long = {
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)

val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(copybook,
StreamProcessor.processStreamInPlace(copybook,
options,
dataStream,
recordExtractor,
Expand All @@ -60,18 +59,5 @@ class CobolProcessorImpl(readerParameters: ReaderParameters,
}
}

private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor.impl

import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.processor.RawRecordProcessor
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

import java.io.OutputStream

/**
* Implementation of the CobolProcessor trait, responsible for processing COBOL data streams
* by extracting records and applying a user-defined raw record processor. This processor
* converts the input format to the variable record length format with big-endian RDW records.
*
* Please, do not use this class directly. Use `CobolProcessor.builder()` instead.
*
* @param readerParameters Configuration for record extraction and COBOL file parsing.
* @param copybook The copybook definition used for interpreting COBOL data structures.
* @param copybookContents The raw textual representation of the copybook.
* @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`).
*/
class CobolProcessorToRdw(readerParameters: ReaderParameters,
copybook: Copybook,
copybookContents: String,
options: Map[String, String]) extends CobolProcessorBase {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Long = {
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)

StreamProcessor.processStreamToRdw(copybook,
options,
recordExtractor,
rawRecordProcessor,
outputStream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ object StreamProcessor {
* @param outputStream the output stream where the processed records will be written.
* @return The number of records processed.
*/
def processStream(copybook: Copybook,
options: Map[String, String],
inputStream: SimpleStream,
recordExtractor: RawRecordExtractor,
recordProcessor: RawRecordProcessor,
outputStream: OutputStream): Long = {
def processStreamInPlace(copybook: Copybook,
options: Map[String, String],
inputStream: SimpleStream,
recordExtractor: RawRecordExtractor,
recordProcessor: RawRecordProcessor,
outputStream: OutputStream): Long = {
var recordCount = 0L
while (recordExtractor.hasNext) {
recordCount += 1
Expand All @@ -68,4 +68,41 @@ object StreamProcessor {
}
recordCount
}

/**
* Processes a stream of COBOL raw records and writes it back as a variable length format with big-endian RDW headers.
*
* @param copybook the COBOL copybook that describes the schema of the records.
* @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options).
* Keys are lower-cased for case-insensitive handling. Can contain custom options as well.
* @param recordExtractor the extractor that extracts raw records from the input stream.
* @param recordProcessor the per-record processing logic implementation.
* @param outputStream the output stream where the processed records will be written.
* @return The number of records processed.
*/
def processStreamToRdw(copybook: Copybook,
options: Map[String, String],
recordExtractor: RawRecordExtractor,
recordProcessor: RawRecordProcessor,
outputStream: OutputStream): Long = {
var recordCount = 0L

while (recordExtractor.hasNext) {
recordCount += 1
val record = recordExtractor.next()
val recordSize = record.length

val ctx = CobolProcessorContext(copybook, options, recordExtractor.offset)

val updatedRecord = recordProcessor.processRecord(record, ctx)

val rdw = Array[Byte](((updatedRecord.length >> 8) & 0xFF).toByte, ((updatedRecord.length) & 0xFF).toByte, 0, 0)

outputStream.write(rdw)
outputStream.write(updatedRecord)
}
Comment on lines +83 to +103
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify RDW length calculation includes RDW header.

Line 99 calculates the RDW using only updatedRecord.length. According to IBM mainframe VRL standards, the RDW length field typically includes the 4-byte RDW header itself. This means the length should be updatedRecord.length + 4.

Verify that the current calculation matches your intended RDW format specification. If this needs to be compatible with standard mainframe VRL readers, the length should likely include the RDW header.


🏁 Script executed:

#!/bin/bash
# Check if there are any tests or documentation that clarify the expected RDW format
rg -n -C3 "RDW|rdw_adjustment|Record Descriptor Word" --type=scala

Length of output: 131567


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Find tests for StreamProcessor.processStreamToRdw method
rg -n "processStreamToRdw|StreamProcessor" --type=scala -A5 -B5 | head -100

Length of output: 6151


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Search for tests of CobolProcessorToRdw
find . -name "*Test*" -type f -exec grep -l "CobolProcessorToRdw\|processStreamToRdw" {} \;

Length of output: 0


[/web_search]


🌐 Web query:

IBM mainframe RDW Record Descriptor Word length field includes header standard

💡 Result:

Short answer: The RDW is 4 bytes and the length field (the first two bytes of the RDW) contains the logical record length including those 4 RDW bytes. The second two RDW bytes are the segment/spanning descriptor (zero if not spanned). [1][2]

Sources:

  • IBM z/OS SMF header documentation (RDW description). [1]
  • Variable-record format / RECFM documentation explaining 4‑byte RDW and that LRECL includes the 4 bytes. [2]

Correct the RDW length calculation to include the 4-byte header.

The RDW length field must include the 4-byte RDW header itself per IBM mainframe standard. The RDW length field contains the logical record length including those 4 RDW bytes. Line 99 should calculate the RDW as:

val rdw = Array[Byte](
  (((updatedRecord.length + 4) >> 8) & 0xFF).toByte, 
  ((updatedRecord.length + 4) & 0xFF).toByte, 
  0, 
  0
)

Without this adjustment, the output will not conform to standard mainframe VRL format and will be incompatible with standard RDW readers.

🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala
around lines 83 to 103, the RDW length calculation currently omits the 4-byte
RDW header; update the RDW byte array computation to use (updatedRecord.length +
4) when computing the two length bytes so the RDW length field equals the
logical record length including the 4-byte header, then write the RDW and record
as before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have observed both cases:

  • when RDW includes the size of the header itself
  • when RDW does not include the header size, only the payload.

Historically, Cobrix uses the second option, and loading files which has RDWs that include header size need an adjustment by -4 (option("rdw_adjustment", "-1")). Going to keep this implementation because it matches Cobrix defaults. But if later different types of RDWs are required, we can always add an option of RDW adjustment for the writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yruslan, thank you for the clarification! That makes perfect sense. Maintaining consistency with Cobrix's historical RDW format (payload-only) ensures backward compatibility and aligns with the existing rdw_adjustment option behavior. The flexibility to add writer-side RDW adjustment options later if needed sounds like a solid approach.

I've noted this convention for future reference.

(\__/)
(='.'=)  *noted*
(")_(")

✏️ Learnings added
Learnt from: yruslan
Repo: AbsaOSS/cobrix PR: 799
File: cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala:83-103
Timestamp: 2025-11-13T14:28:55.848Z
Learning: In Cobrix, RDW (Record Descriptor Word) length field contains only the payload size, not including the 4-byte RDW header itself. This is a historical convention. When reading files with RDW that includes the header size, users should use `option("rdw_adjustment", "-4")`.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


recordCount
}

}
Loading
Loading