Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor

import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor}
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

import java.io.OutputStream
import scala.collection.mutable


/**
* A trait that defines a processor for raw COBOL data streams.
* It provides a method to process a COBOL file or a stream, provided record processor.
*/
trait CobolProcessor {
/**
* Processes the input stream of COBOL records and writes the output to the specified output stream.
*
* @param inputStream the input stream containing raw COBOL records.
* @param outputStream the output stream where processed records will be written.
* @param rawRecordProcessor the processor that processes each raw record.
*/
def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit

}

object CobolProcessor {
class CobolProcessorBuilder(copybookContents: String) {
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()

def build(): CobolProcessor = {
val readerParameters = getReaderParameters
val cobolSchema = getCobolSchema(readerParameters)

new CobolProcessor {
override def process(inputStream: SimpleStream,
outputStream: OutputStream)
(rawRecordProcessor: RawRecordProcessor): Unit = {
val recordExtractor = getRecordExtractor(readerParameters, inputStream)

val dataStream = inputStream.copyStream()
try {
StreamProcessor.processStream(cobolSchema.copybook,
caseInsensitiveOptions.toMap,
dataStream,
recordExtractor,
rawRecordProcessor,
outputStream)
} finally {
dataStream.close()
}
}
}
Comment on lines +56 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Close all opened streams; avoid leaking header/data streams created for the extractor

process() closes only the dataStream it creates, but the extractor’s own dataStream and headerStream created in getRecordExtractor() are never closed. Either create both streams in process() and pass them into getRecordExtractor(...), or return the streams from getRecordExtractor(...) so process() can close them in a finally.

Apply:

-        override def process(inputStream: SimpleStream,
-                             outputStream: OutputStream)
-                            (rawRecordProcessor: RawRecordProcessor): Unit = {
-          val recordExtractor = getRecordExtractor(readerParameters, inputStream)
-
-          val dataStream = inputStream.copyStream()
-          try {
-            StreamProcessor.processStream(cobolSchema.copybook,
-              caseInsensitiveOptions.toMap,
-              dataStream,
-              recordExtractor,
-              rawRecordProcessor,
-              outputStream)
-          } finally {
-            dataStream.close()
-          }
-        }
+        override def process(inputStream: SimpleStream,
+                             outputStream: OutputStream)
+                            (rawRecordProcessor: RawRecordProcessor): Unit = {
+          val dataStream   = inputStream.copyStream()
+          val headerStream = inputStream.copyStream()
+          val recordExtractor = getRecordExtractor(readerParameters, dataStream, headerStream)
+          try {
+            StreamProcessor.processStream(
+              cobolSchema.copybook,
+              optionsSnapshot,
+              dataStream,
+              recordExtractor,
+              rawRecordProcessor,
+              outputStream
+            )
+          } finally {
+            try headerStream.close() finally dataStream.close()
+          }
+        }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 56 to 74, process(...) currently only closes the dataStream it
creates but not the extractor’s internal dataStream and headerStream, leaking
streams; change the implementation so process() is responsible for creating and
closing all streams: either (A) refactor getRecordExtractor(...) to return both
the extractor and the two streams (or a small case class/tuple containing
extractor, dataStream, headerStream) so process() can register them and close
them in the finally block, or (B) change getRecordExtractor(...) to accept the
already-created data and header streams as parameters; in either case ensure all
streams are closed in the finally (null/Option-safe) and preserve original
exception behavior by closing in reverse-open order and suppressing/adding
suppressed exceptions appropriately.

}

/**
* Adds a single option to the builder.
*
* @param key the option key.
* @param value the option value.
* @return this builder instance for method chaining.
*/
def option(key: String, value: String): CobolProcessorBuilder = {
require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only")
caseInsensitiveOptions += (key.trim.toLowerCase -> value)
this
}

/**
* Adds multiple options to the builder.
*
* @param options a map of option key-value pairs.
* @return this builder instance for method chaining.
*/
def options(options: Map[String, String]): CobolProcessorBuilder = {
caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
this
}

private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
}

private[processor] def getReaderParameters: ReaderParameters = {
val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap))

CobolParametersParser.getReaderProperties(cobolParameters, None)
}

private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()

val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)

reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
Comment on lines +111 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Align getRecordExtractor with stream lifecycle; close on failure

Change the signature to accept streams created by process() and ensure they’re closed on the error path.

-    private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
-      val dataStream = inputStream.copyStream()
-      val headerStream = inputStream.copyStream()
-
-      val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
-
-      reader.recordExtractor(0, dataStream, headerStream) match {
-        case Some(extractor) => extractor
-        case None =>
-          throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
-            "Please check the copybook and the reader parameters."
-          )
-      }
-    }
+    private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+                                              dataStream: SimpleStream,
+                                              headerStream: SimpleStream): RawRecordExtractor = {
+      val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
+      reader.recordExtractor(0, dataStream, headerStream) match {
+        case Some(extractor) => extractor
+        case None =>
+          try headerStream.close() finally dataStream.close()
+          throw new IllegalArgumentException(
+            s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
+          )
+      }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
val dataStream = inputStream.copyStream()
val headerStream = inputStream.copyStream()
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
"Please check the copybook and the reader parameters."
)
}
}
private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
dataStream: SimpleStream,
headerStream: SimpleStream): RawRecordExtractor = {
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
reader.recordExtractor(0, dataStream, headerStream) match {
case Some(extractor) => extractor
case None =>
// Ensure both streams are closed before bubbling up the error
try headerStream.close() finally dataStream.close()
throw new IllegalArgumentException(
s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
)
}
}
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala
around lines 111 to 124, update getRecordExtractor to accept the actual streams
produced by process() (i.e., accept the data/header SimpleStream instances
created upstream rather than creating copies internally) and ensure both streams
are closed on the error path: if reader.recordExtractor returns None, close
dataStream and headerStream (or call their close/copyStream close equivalent)
before throwing the IllegalArgumentException; keep normal success path returning
the extractor without closing the streams.


private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
}

def builder(copybookContent: String): CobolProcessorBuilder = {
new CobolProcessorBuilder(copybookContent)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor
package za.co.absa.cobrix.cobol.processor.impl

import za.co.absa.cobrix.cobol.parser.ast.Group
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor
package za.co.absa.cobrix.cobol.processor.impl

import za.co.absa.cobrix.cobol.parser.ast.Group
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.processor
package za.co.absa.cobrix.cobol.processor.impl

import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.processor.RawRecordProcessor
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

Expand All @@ -26,22 +27,21 @@ object StreamProcessor {
/**
* Processes a stream of COBOL raw records and writes it back in the same format as the input data.
*
* @param copybook the COBOL copybook that describes the schema of the records.
* @param options arbitrary options used for splitting input data into records. Same as options to 'spark-cobol'. Can contain custom options as well.
* @param inputStream the input stream containing the raw COBOL records.
* @param copybook the COBOL copybook that describes the schema of the records.
* @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options).
* Keys are lower-cased for case-insensitive handling. Can contain custom options as well.
* @param inputStream the input stream containing the raw COBOL records.
* @param recordExtractor the extractor that extracts raw records from the input stream.
* @param recordProcessor the per-record processing logic implementation.
* @param outputStream the output stream where the processed records will be written.
* @param outputStream the output stream where the processed records will be written.
*/
def processStream(copybook: Copybook,
options: Map[String, String],
inputStream: SimpleStream,
recordExtractor: RawRecordExtractor,
recordProcessor: RawRecordProcessor,
outputStream: OutputStream): Unit = {
var i = 0
while (recordExtractor.hasNext) {
i += 1
val record = recordExtractor.next()
val recordSize = record.length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package za.co.absa.cobrix.cobol.reader

import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock, VariableLength}
Expand All @@ -27,7 +26,6 @@ import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.cobol.reader.iterator.{VarLenHierarchicalIterator, VarLenNestedIterator}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator
Expand Down Expand Up @@ -56,15 +54,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
def recordExtractor(startingRecordNumber: Long,
dataStream: SimpleStream,
headerStream: SimpleStream): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)

val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)

val bdwOpt = readerProperties.bdw
val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment))
val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams))

val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)
val reParams = RawRecordContext.builder(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook)
.withReaderParams(readerProperties)
.build()

readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Expand All @@ -88,6 +81,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
!readerProperties.isRecordSequence &&
readerProperties.lengthFieldExpression.isEmpty =>
Some(new VarOccursRecordExtractor(reParams))
case None if readerProperties.recordFormat == FixedLength =>
Some(new FixedRecordLengthRawRecordExtractor(reParams, readerProperties.recordLength))
case None =>
None
}
Expand Down
Loading
Loading