Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,43 @@ The output looks like this:

`common_extended`, `cp037_extended` are code pages supporting non-printable characters that converts to ASCII codes below 32.

## EBCDIC Writer (experimental)

Cobrix's EBCDIC writer is an experimental feature that allows writing Spark DataFrames as EBCDIC mainframe files.

### Usage
```scala
df.write
.format("cobol")
.mode(SaveMode.Overwrite)
.option("copybook_contents", copybookContents)
.save("/some/output/path")
```

### Current Limitations
The writer is still in its early stages and has several limitations:
- Nested GROUPs are not supported. Only flat copybooks can be used, for example:
```cobol
01 RECORD.
05 FIELD_1 PIC X(1).
05 FIELD_2 PIC X(5).
```
- Only `PIC X(n)` fields are supported; numeric types are not.
- Only fixed record length output is supported (`record_format = F`).
- `REDEFINES` and `OCCURS` are not supported.
- Only the core EBCDIC encoder is supported; specific EBCDIC code pages are not yet available.
- Save mode `append` is not supported; only `overwrite` is.
- Partitioning by DataFrame fields is not supported.

### Implementation details
Handling of `PIC X(n)`:
- Values are truncated when longer than n and right-padded when shorter.
- The padding byte is EBCDIC space `0x40`.
- `null` values in DataFrames are written as `0x00` bytes.

Handling of `FILLER`s
- FILLER areas are populated with 0x00 bytes.

## Performance Analysis

Performance tests were performed on synthetic datasets. The setup and results are as follows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
import za.co.absa.cobrix.cobol.parser.ast.datatype._
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive}
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package za.co.absa.cobrix.cobol.parser.ast

import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType, Decimal, Integral}
import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector}
import za.co.absa.cobrix.cobol.parser.encoding.EncoderSelector

/** An abstraction of the statements describing fields of primitive data types in the COBOL copybook
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
import za.co.absa.cobrix.cobol.parser.ast.datatype.AlphaNumeric
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.Encoding
import za.co.absa.cobrix.cobol.parser.encoding.{EncoderSelector, Encoding}
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.parser.decoders
package za.co.absa.cobrix.cobol.parser.encoding

import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon}
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC, Encoding}

import java.nio.charset.{Charset, StandardCharsets}
import java.util

object EncoderSelector {
type Encoder = Any => Array[Byte]
Expand All @@ -36,7 +36,7 @@ object EncoderSelector {
}
}

/** Gets a decoder function for a string data type. Encoder is chosen depending on whether input encoding is EBCDIC or ASCII */
/** Gets an encoder function for a string data type. The encoder is chosen depending on whether the output encoding is EBCDIC or ASCII. */
private def getStringEncoder(encoding: Encoding,
ebcdicCodePage: CodePage,
asciiCharset: Charset,
Expand Down Expand Up @@ -69,6 +69,9 @@ object EncoderSelector {
var i = 0
val buf = new Array[Byte](length)

// PIC X fields are space-filled on mainframe. Use EBCDIC space 0x40.
util.Arrays.fill(buf, 0x40.toByte)

while (i < string.length && i < length) {
val asciiByte = string(i).toByte
buf(i) = conversionTable((asciiByte + 256) % 256)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,25 @@ object CobolParametersParser extends Logging {
recordFormatDefined
}

val copybookPaths = params.get(PARAM_MULTI_COPYBOOK_PATH) match {
case Some(paths) =>
paths.split(',')
.map(_.trim)
.filter(_.nonEmpty)
.toSeq
case None => Seq.empty[String]
}

val copybookPathOpt = params.get(PARAM_COPYBOOK_PATH).map(_.trim).filter(_.nonEmpty)
if (copybookPathOpt.nonEmpty && copybookPaths.nonEmpty) {
throw new IllegalArgumentException(
s"Options '$PARAM_COPYBOOK_PATH' (single path) and '$PARAM_MULTI_COPYBOOK_PATH' (comma-separated list) are mutually exclusive. Use only one."
)
}

val cobolParameters = CobolParameters(
getParameter(PARAM_COPYBOOK_PATH, params),
params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','),
copybookPathOpt,
copybookPaths,
getParameter(PARAM_COPYBOOK_CONTENTS, params),
paths,
recordFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
import za.co.absa.cobrix.cobol.parser.ast.{BinaryProperties, Group, Primitive}
import za.co.absa.cobrix.cobol.parser.decoders.{DecoderSelector, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.encoding.EBCDIC
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.encoding.{EBCDIC, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy

class BinaryExtractorSpec extends AnyFunSuite {

Expand Down Expand Up @@ -217,4 +218,32 @@ class BinaryExtractorSpec extends AnyFunSuite {
assert(fields2.isInstanceOf[Primitive])
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
}

test("Test padding when setting field value by name") {
val fieldName1: String = "COMPANY.SHORT-NAME"
val newValue1: String = "NEWN"
val copybook2 = CopybookParser.parseTree(copyBookContents, stringTrimmingPolicy = StringTrimmingPolicy.KeepAll)
copybook2.setFieldValueByName(fieldName1, bytes, newValue1, startOffset)
val result1: Any = copybook2.getFieldValueByName(fieldName1, bytes, startOffset)
assert(result1.asInstanceOf[String] === "NEWN ")

val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
val fields2 = copybook2.getFieldByName(fieldName2)
assert(fields2.isInstanceOf[Primitive])
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
}

test("Test truncating when setting field value by name") {
val fieldName1: String = "COMPANY.SHORT-NAME"
val newValue1: String = "NEWNAME_TEST123345"
val copybook2 = CopybookParser.parseTree(copyBookContents, stringTrimmingPolicy = StringTrimmingPolicy.KeepAll)
copybook2.setFieldValueByName(fieldName1, bytes, newValue1, startOffset)
val result1: Any = copybook2.getFieldValueByName(fieldName1, bytes, startOffset)
assert(result1.asInstanceOf[String] === "NEWNAME_TE")

val fieldName2: String = "COMPANY.COMPANY-ID-NUM"
val fields2 = copybook2.getFieldByName(fieldName2)
assert(fields2.isInstanceOf[Primitive])
assert(fields2.asInstanceOf[Primitive].encode.isEmpty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
package za.co.absa.cobrix.spark.cobol.source

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, SchemaRelationProvider}
import org.apache.hadoop.io.{BytesWritable, NullWritable}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.spark.cobol.reader._
import za.co.absa.cobrix.spark.cobol.source.copybook.CopybookContentLoader
import za.co.absa.cobrix.spark.cobol.source.parameters._
import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
import za.co.absa.cobrix.spark.cobol.writer.{RawBinaryOutputFormat, RecordCombinerSelector}

/**
* This class represents a Cobol data source.
*/
class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with ReaderFactory
with Logging {
Expand All @@ -44,6 +48,7 @@ class DefaultSource
createRelation(sqlContext, parameters, null)
}

/** Reader relation */
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
CobolParametersValidator.validateOrThrow(parameters, sqlContext.sparkSession.sparkContext.hadoopConfiguration)

Expand All @@ -58,6 +63,70 @@ class DefaultSource
cobolParameters.debugIgnoreFileSize)(sqlContext)
}

/** Writer relation */
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
val outSqlContext = sqlContext
val path = parameters.getOrElse("path",
throw new IllegalArgumentException("Path is required for this data source."))

val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
CobolParametersValidator.checkSanity(cobolParameters)

val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)

val outputPath = new Path(path)
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val fs = outputPath.getFileSystem(hadoopConf)

mode match {
case SaveMode.Overwrite =>
if (fs.exists(outputPath)) {
fs.delete(outputPath, true)
}
case SaveMode.Append =>
if (fs.exists(outputPath)) {
throw new IllegalArgumentException(
s"Save mode '$mode' is not supported by the 'spark-cobol' data source at the moment. " +
"Please use 'Overwrite' mode to write data to a file or folder."
)
}
case SaveMode.ErrorIfExists =>
if (fs.exists(outputPath)) {
throw new IllegalArgumentException(
s"Path '$path' already exists; SaveMode.ErrorIfExists prevents overwriting."
)
}
case SaveMode.Ignore =>
if (fs.exists(outputPath)) {
// Skip the write entirely
return new BaseRelation {
override val sqlContext: SQLContext = outSqlContext
override def schema: StructType = data.schema
}
}
case _ =>
}

val copybookContent = CopybookContentLoader.load(cobolParameters, sqlContext.sparkContext.hadoopConfiguration)
val cobolSchema = CobolSchema.fromReaderParameters(copybookContent, readerParameters)
val combiner = RecordCombinerSelector.selectCombiner(cobolSchema, readerParameters)
val rdd = combiner.combine(data, cobolSchema, readerParameters)

rdd.map(bytes => (NullWritable.get(), new BytesWritable(bytes)))
.saveAsNewAPIHadoopFile(
path,
classOf[NullWritable],
classOf[BytesWritable],
classOf[RawBinaryOutputFormat]
)

new BaseRelation {
override def sqlContext: SQLContext = outSqlContext
override def schema: StructType = data.schema
}
}


//TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
override def buildReader(spark: SparkSession, parameters: Map[String, String]): FixedLenReader = null

Expand Down
Loading
Loading