From d66669429c98e75bf31caa4b3e59c7c86edd5c48 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 22 Apr 2024 09:01:53 +0200 Subject: [PATCH] #672 Add tests for various scenarios. --- README.md | 82 ++++++++++++++++++- .../cobol/reader/schema/CobolSchema.scala | 2 +- .../parser/decoders/StringDecodersSpec.scala | 2 - .../spark/cobol/schema/CobolSchema.scala | 2 +- .../spark/cobol/source/DefaultSource.scala | 2 +- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 52 +++++++++++- 6 files changed, 132 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index a0050893..ab10050d 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Among the motivations for this project, it is possible to highlight: - Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays) -- Supports nested structures and arrays (including "flattened" nested names) +- Supports nested structures and arrays - Supports HDFS as well as local file systems @@ -350,8 +350,18 @@ Currently, specifying multiple paths in `load()` is not supported. Use the follo ### Spark SQL schema extraction This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks. -If you want to extract a Spark SQL schema from a copybook: +If you want to extract a Spark SQL schema from a copybook by providing same options you provide to Spark: +```scala +// Same options that you use for spark.read.format("cobol").option() +val options = Map("schema_retention_policy" -> "keep_original") + +val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook), options) +val sparkSchema = cobolSchema.getSparkSchema.toString() + +println(sparkSchema) +``` +If you want to extract a Spark SQL schema from a copybook using the Cobol parser directly: ```scala import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy @@ -1397,6 +1407,74 @@ When using `9` 8 refers to the number of digits the number has. Here, the size o ``` You can have decimals when using COMP-3 as well. +### Flattening schema with GROUPs and OCCURS +Flattening could be helpful when migrating data from mainframe data with fields that have OCCURs (arrays) to a relational +databases that do not support nested arrays. + +Cobrix has a method that can flatten the schema automatically given a DataFrame produced by `spark-cobol`. + +Spark Scala example: +```scala +val dfFlat = SparkUtils.flattenSchema(df, useShortFieldNames = false) +``` + +PySpark example +```python +from pyspark.sql import SparkSession, DataFrame, SQLContext +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType +from py4j.java_gateway import java_import + +schema = StructType([ + StructField("id", IntegerType(), True), + StructField("name", StringType(), True), + StructField("subjects", ArrayType(StringType()), True) +]) + +# Sample data +data = [ + (1, "Alice", ["Math", "Science"]), + (2, "Bob", ["History", "Geography"]), + (3, "Charlie", ["English", "Math", "Physics"]) +] + +# Create a test DataFrame +df = spark.createDataFrame(data, schema) + +# Show the Dataframe before flattening +df.show() + +# Flatten the schema using Cobrix Scala 'SparkUtils.flattenSchema' method +sc = spark.sparkContext +java_import(sc._gateway.jvm, "za.co.absa.cobrix.spark.cobol.utils.SparkUtils") +dfFlatJvm = spark._jvm.SparkUtils.flattenSchema(df._jdf, False) +dfFlat = DataFrame(dfFlatJvm, SQLContext(sc)) + +# Show the Dataframe after flattening +dfFlat.show(truncate=False) +dfFlat.printSchema() +``` + +The output looks like this: +``` +# Before flattening ++---+-------+------------------------+ +|id |name |subjects | ++---+-------+------------------------+ +|1 |Alice |[Math, Science] | +|2 |Bob |[History, Geography] | +|3 |Charlie|[English, Math, Physics]| ++---+-------+------------------------+ + +# After flattening ++---+-------+----------+----------+----------+ +|id |name |subjects_0|subjects_1|subjects_2| ++---+-------+----------+----------+----------+ +|1 |Alice |Math |Science |null | +|2 |Bob |History |Geography |null | +|3 |Charlie|English |Math |Physics | ++---+-------+----------+----------+----------+ +``` + ## Summary of all available options ##### File reading options diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 234cde8a..4411b91f 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -137,4 +137,4 @@ object CobolSchema { case None => CodePage.getCodePageByName(codePageName) } } -} \ No newline at end of file +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala index e0180939..d49636f2 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala @@ -123,8 +123,6 @@ class StringDecodersSpec extends AnyWordSpec { val actual = decodeEbcdicString(bytes, KeepAll, new CodePage500, improvedNullDetection = false) - println(actual) - assert(actual == expected) } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index ac892009..9024a070 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -341,4 +341,4 @@ object CobolSchema { CobolSchema.fromBaseReader(CobolReaderSchema.fromReaderParameters(copyBookContents, readerParameters)) } -} \ No newline at end of file +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index c6447cea..5994763b 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -112,4 +112,4 @@ class DefaultSource copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize) ) } -} \ No newline at end of file +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 0229065c..70367b28 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -16,7 +16,7 @@ package za.co.absa.cobrix.spark.cobol -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StringType, StructType} import org.scalatest.wordspec.AnyWordSpec import org.slf4j.{Logger, LoggerFactory} import za.co.absa.cobrix.cobol.parser.CopybookParser @@ -483,8 +483,6 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val sparkSchema = cobolSchema.getSparkSchema - sparkSchema.printTreeString() - assert(sparkSchema.fields.length == 3) assert(sparkSchema.fields.head.name == "HEADER") assert(sparkSchema.fields.head.dataType == StringType) @@ -502,6 +500,54 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(seg1.fields(2).name == "SEG3") assert(seg1.fields(2).dataType.isInstanceOf[ArrayType]) } + + "return a schema for a multi-segment copybook" in { + val copybook: String = + """ 01 RECORD. + | 05 HEADER PIC X(5). + | 05 SEGMENT-ID PIC X(2). + | 05 SEG1. + | 10 FIELD1 PIC 9(7). + | 05 SEG2 REDEFINES SEG1. + | 10 FIELD3 PIC X(7). + | 05 SEG3 REDEFINES SEG1. + | 10 FIELD4 PIC S9(7). + |""".stripMargin + + val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook), + Map( + "segment_field" -> "SEGMENT-ID", + "redefine-segment-id-map:0" -> "SEG1 => 01", + "redefine-segment-id-map:1" -> "SEG2 => 02", + "redefine-segment-id-map:2" -> "SEG3 => 03", + "segment_field" -> "SEGMENT-ID", + "segment_id_level0" -> "TEST", + "generate_record_id" -> "true" + ) + ) + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 9) + assert(sparkSchema.fields.head.name == "File_Id") + assert(sparkSchema.fields.head.dataType == IntegerType) + assert(sparkSchema.fields(1).name == "Record_Id") + assert(sparkSchema.fields(1).dataType == LongType) + assert(sparkSchema.fields(2).name == "Record_Byte_Length") + assert(sparkSchema.fields(2).dataType == IntegerType) + assert(sparkSchema.fields(3).name == "Seg_Id0") + assert(sparkSchema.fields(3).dataType == StringType) + assert(sparkSchema.fields(4).name == "HEADER") + assert(sparkSchema.fields(4).dataType == StringType) + assert(sparkSchema.fields(5).name == "SEGMENT_ID") + assert(sparkSchema.fields(5).dataType == StringType) + assert(sparkSchema.fields(6).name == "SEG1") + assert(sparkSchema.fields(6).dataType.isInstanceOf[StructType]) + assert(sparkSchema.fields(7).name == "SEG2") + assert(sparkSchema.fields(7).dataType.isInstanceOf[StructType]) + assert(sparkSchema.fields(8).name == "SEG3") + assert(sparkSchema.fields(8).dataType.isInstanceOf[StructType]) + } } }