Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1668,15 +1668,17 @@ The EBCDIC processor allows processing files by replacing value of fields withou

The processing does not require Spark. A processing application can have only the COBOL parser as a dependency (`cobol-parser`).

Here is an example usage:
Here is an example usage (using streams of bytes):
```scala
val is = new FSStream(inputFile)
val os = new FileOutputStream(outputFile)
val copybookContents = "...some copybook..."
val builder = CobolProcessor.builder(copybookContents)

val builder = CobolProcessor.builder
.withCopybookContents("...some copybook...")

val processor = new RawRecordProcessor {
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
// The transformation logic goes here
val value = copybook.getFieldValueByName("some_field", record, 0)
// Change the field v
Expand All @@ -1688,10 +1690,53 @@ val processor = new RawRecordProcessor {
}
}

builder.build().process(is, os)(processor)
val count = builder.build().process(is, os)(processor)
```

Here is an example usage (using paths):
```scala
val count = CobolProcessor.builder
.withCopybookContents(copybook)
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
// The transformation logic goes here
val value = copybook.getFieldValueByName("some_field", record, 0)
// Change the field v
// val newValue = ...
// Write the changed value back
copybook.setFieldValueByName("some_field", record, newValue, 0)
// Return the changed record
record
}
.load(inputFile)
.save(outputFile)
```


## EBCDIC Spark Processor (experimental)
This allows in-place processing of data retaining original format in parallel uring RDDs under the hood.

Here is an example usage:
```scala
import za.co.absa.cobrix.spark.cobol.SparkCobolProcessor

val copybookContents = "...some copybook..."

SparkCobolProcessor.builder
.withCopybookContents(copybook)
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
// The transformation logic goes here
val value = ctx.copybook.getFieldValueByName("some_field", record, 0)
// Change the field v
// val newValue = ...
// Write the changed value back
ctx.copybook.setFieldValueByName("some_field", record, newValue, 0)
// Return the changed record
record
}
.load(inputPath)
.save(outputPath)
```

## EBCDIC Writer (experimental)

Cobrix's EBCDIC writer is an experimental feature that allows writing Spark DataFrames as EBCDIC mainframe files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package za.co.absa.cobrix.cobol.processor

import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor}
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}

import java.io.OutputStream
import java.io.{BufferedInputStream, BufferedOutputStream, FileOutputStream, OutputStream}
import scala.collection.mutable


Expand All @@ -47,32 +46,58 @@ trait CobolProcessor {
}

object CobolProcessor {
class CobolProcessorBuilder(copybookContents: String) {
class CobolProcessorBuilder {
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
private var copybookContentsOpt: Option[String] = None
private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None

def build(): CobolProcessor = {
if (copybookContentsOpt.isEmpty) {
throw new IllegalArgumentException("Copybook contents must be provided.")
}

val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)

new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Long = {
val recordExtractor = getRecordExtractor(readerParameters, inputStream)

val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(cobolSchema.copybook,
caseInsensitiveOptions.toMap,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream)
} finally {
dataStream.close()
}
}
new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
}

def load(path: String): CobolProcessorLoader = {
val file = new java.io.File(path)
if (!file.exists) {
throw new IllegalArgumentException(s"Path $path does not exist.")
}

if (file.isDirectory) {
throw new IllegalArgumentException(s"Path $path should be a file, not a directory.")
}

if (copybookContentsOpt.isEmpty) {
throw new IllegalArgumentException("Copybook contents must be provided.")
}

if (rawRecordProcessorOpt.isEmpty) {
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
}

if (rawRecordProcessorOpt.isEmpty) {
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
}

val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)

new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, caseInsensitiveOptions.toMap)
}

def withCopybookContents(copybookContents: String): CobolProcessorBuilder = {
copybookContentsOpt = Option(copybookContents)
this
}

def withRecordProcessor(processor: RawRecordProcessor): CobolProcessorBuilder = {
rawRecordProcessorOpt = Option(processor)
this
}

/**
Expand Down Expand Up @@ -100,7 +125,7 @@ object CobolProcessor {
}

private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
CobolSchema.fromReaderParameters(Seq(copybookContentsOpt.get), readerParameters)
}

private[processor] def getReaderParameters: ReaderParameters = {
Expand All @@ -109,25 +134,60 @@ object CobolProcessor {
CobolParametersParser.getReaderProperties(cobolParameters, None)
}

private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
}

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
class CobolProcessorLoader(fileToProcess: String,
copybookContents: String,
copybook: Copybook,
rawRecordProcessor: RawRecordProcessor,
readerParameters: ReaderParameters,
options: Map[String, String]) {
def save(outputFile: String): Long = {
val processor = new CobolProcessorImpl(readerParameters, copybook, copybookContents, options)

val ifs = new FSStream(fileToProcess)
val ofs = new BufferedOutputStream(new FileOutputStream(outputFile))

var originalException: Throwable = null

val recordCount = try {
processor.process(ifs, ofs)(rawRecordProcessor)
} catch {
case ex: Throwable =>
originalException = ex
0L
} finally {
try {
ifs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
try {
ofs.close()
} catch {
case e: Throwable =>
if (originalException != null) {
originalException.addSuppressed(e)
} else {
originalException = e
}
}
}
}

private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
if (originalException != null) throw originalException

recordCount
}
}

def builder(copybookContent: String): CobolProcessorBuilder = {
new CobolProcessorBuilder(copybookContent)
def builder: CobolProcessorBuilder = {
new CobolProcessorBuilder
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor

import za.co.absa.cobrix.cobol.parser.Copybook

case class CobolProcessorContext(copybook: Copybook,
options: Map[String, String],
currentOffset: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@

package za.co.absa.cobrix.cobol.processor

import za.co.absa.cobrix.cobol.parser.Copybook

/**
* A trait that defines a processor for raw COBOL records.
* It provides a method to process a single COBOL record based on the provided copybook and options.
*/
trait RawRecordProcessor {
def processRecord(copybook: Copybook,
options: Map[String, String],
record: Array[Byte],
offset: Long): Array[Byte]

def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor

/**
* A serializable version of RawRecordProcessor for distributed processing in Spark.
*
* Usage patterns:
* - For standalone JVM applications: Use CobolProcessor with RawRecordProcessor
* - For Spark applications: Use SparkCobolProcessor with SerializableRawRecordProcessor
*
* This trait extends Serializable since Spark distributes processing code across the network
* to worker nodes, requiring all components to be serializable.
*/
trait SerializableRawRecordProcessor extends RawRecordProcessor with Serializable
Loading
Loading