-
Notifications
You must be signed in to change notification settings - Fork 86
#780 Add spark-cobol options to the raw record extractor interface
#781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
140942e
bc8c400
d547863
836f65b
773a222
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,132 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Copyright 2018 ABSA Group Limited | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package za.co.absa.cobrix.cobol.processor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.reader.VarLenNestedReader | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.reader.schema.CobolSchema | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import za.co.absa.cobrix.cobol.reader.stream.SimpleStream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.OutputStream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.collection.mutable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * A trait that defines a processor for raw COBOL data streams. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * It provides a method to process a COBOL file or a stream, provided record processor. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| trait CobolProcessor { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Processes the input stream of COBOL records and writes the output to the specified output stream. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param inputStream the input stream containing raw COBOL records. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param outputStream the output stream where processed records will be written. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param rawRecordProcessor the processor that processes each raw record. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def process(inputStream: SimpleStream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| outputStream: OutputStream) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (rawRecordProcessor: RawRecordProcessor): Unit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| object CobolProcessor { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class CobolProcessorBuilder(copybookContents: String) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val caseInsensitiveOptions = new mutable.HashMap[String, String]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def build(): CobolProcessor = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val readerParameters = getReaderParameters | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val cobolSchema = getCobolSchema(readerParameters) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new CobolProcessor { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def process(inputStream: SimpleStream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| outputStream: OutputStream) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (rawRecordProcessor: RawRecordProcessor): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val recordExtractor = getRecordExtractor(readerParameters, inputStream) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val dataStream = inputStream.copyStream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamProcessor.processStream(cobolSchema.copybook, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| caseInsensitiveOptions.toMap, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dataStream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| recordExtractor, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rawRecordProcessor, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| outputStream) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dataStream.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Adds a single option to the builder. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param key the option key. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param value the option value. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @return this builder instance for method chaining. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def option(key: String, value: String): CobolProcessorBuilder = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| caseInsensitiveOptions += (key.trim.toLowerCase -> value) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Adds multiple options to the builder. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param options a map of option key-value pairs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @return this builder instance for method chaining. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def options(options: Map[String, String]): CobolProcessorBuilder = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[processor] def getReaderParameters: ReaderParameters = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CobolParametersParser.getReaderProperties(cobolParameters, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val dataStream = inputStream.copyStream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val headerStream = inputStream.copyStream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reader.recordExtractor(0, dataStream, headerStream) match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case Some(extractor) => extractor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case None => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Please check the copybook and the reader parameters." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+111
to
+124
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Align Change the signature to accept streams created by - private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
- val dataStream = inputStream.copyStream()
- val headerStream = inputStream.copyStream()
-
- val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
-
- reader.recordExtractor(0, dataStream, headerStream) match {
- case Some(extractor) => extractor
- case None =>
- throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
- "Please check the copybook and the reader parameters."
- )
- }
- }
+ private[processor] def getRecordExtractor(readerParameters: ReaderParameters,
+ dataStream: SimpleStream,
+ headerStream: SimpleStream): RawRecordExtractor = {
+ val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
+ reader.recordExtractor(0, dataStream, headerStream) match {
+ case Some(extractor) => extractor
+ case None =>
+ try headerStream.close() finally dataStream.close()
+ throw new IllegalArgumentException(
+ s"Cannot create a record extractor for the given reader parameters. Please check the copybook and the reader parameters."
+ )
+ }
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def builder(copybookContent: String): CobolProcessorBuilder = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new CobolProcessorBuilder(copybookContent) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close all opened streams; avoid leaking header/data streams created for the extractor
process()closes only thedataStreamit creates, but the extractor’s owndataStreamandheaderStreamcreated ingetRecordExtractor()are never closed. Either create both streams inprocess()and pass them intogetRecordExtractor(...), or return the streams fromgetRecordExtractor(...)soprocess()can close them in afinally.Apply:
🤖 Prompt for AI Agents