Skip to content

Commit 5f3ba6f

Browse files
committed
#710 Move record length parser out of the reader since the record extractor is used instead.
1 parent 084fb7f commit 5f3ba6f

File tree

5 files changed

+58
-126
lines changed

5 files changed

+58
-126
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.internal.Logging
2020
import za.co.absa.cobrix.cobol.parser.Copybook
2121
import za.co.absa.cobrix.cobol.parser.common.Constants
2222
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
23-
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock}
23+
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock, VariableLength}
2424
import za.co.absa.cobrix.cobol.reader.extractors.raw._
2525
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
2626
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
@@ -77,6 +77,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
7777
Some(new TextFullRecordExtractor(reParams))
7878
case None if readerProperties.recordFormat == FixedLength && (readerProperties.lengthFieldExpression.nonEmpty || readerProperties.lengthFieldMap.nonEmpty) =>
7979
Some(new FixedWithRecordLengthExprRawRecordExtractor(reParams, readerProperties))
80+
case None if readerProperties.recordFormat == VariableLength && (readerProperties.lengthFieldExpression.nonEmpty || readerProperties.lengthFieldMap.nonEmpty) =>
81+
Some(new FixedWithRecordLengthExprRawRecordExtractor(reParams, readerProperties))
8082
case None if readerProperties.recordFormat == FixedBlock =>
8183
val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
8284
FixedBlockParameters.validate(fbParams)

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala

Lines changed: 1 addition & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,7 @@ class VRLRecordReader(cobolSchema: Copybook,
5151
private var byteIndex = startingFileOffset
5252
private var recordIndex = startRecordId - 1
5353

54-
final private val copyBookRecordSize = cobolSchema.getRecordSize
55-
final private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, cobolSchema)
56-
final private val lengthField = recordLengthField.map(_.field)
57-
final private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty)
58-
final private val isLengthMapEmpty = lengthMap.isEmpty
5954
final private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
60-
final private val recordLengthAdjustment = readerProperties.rdwAdjustment
61-
final private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
6255
final private val minimumRecordLength = readerProperties.minimumRecordLength
6356
final private val maximumRecordLength = readerProperties.maximumRecordLength
6457

@@ -90,13 +83,7 @@ class VRLRecordReader(cobolSchema: Copybook,
9083
None
9184
}
9285
case None =>
93-
if (useRdw) {
94-
fetchRecordUsingRdwHeaders()
95-
} else if (lengthField.nonEmpty) {
96-
fetchRecordUsingRecordLengthField()
97-
} else {
98-
fetchRecordUsingRecordLengthFieldExpression(lengthFieldExpr.get)
99-
}
86+
fetchRecordUsingRdwHeaders()
10087
}
10188

10289
binaryData match {
@@ -117,110 +104,6 @@ class VRLRecordReader(cobolSchema: Copybook,
117104

118105
def getRecordIndex: Long = recordIndex
119106

120-
private def fetchRecordUsingRecordLengthField(): Option[Array[Byte]] = {
121-
if (lengthField.isEmpty) {
122-
throw new IllegalStateException(s"For variable length reader either RDW record headers or record length field should be provided.")
123-
}
124-
125-
val lengthFieldBlock = lengthField.get.binaryProperties.offset + lengthField.get.binaryProperties.actualSize
126-
127-
val binaryDataStart = dataStream.next(readerProperties.startOffset + lengthFieldBlock)
128-
129-
byteIndex += readerProperties.startOffset + lengthFieldBlock
130-
131-
if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) {
132-
return None
133-
}
134-
135-
val recordLength = lengthField match {
136-
case Some(lengthAST) => getRecordLengthFromField(lengthAST, binaryDataStart)
137-
case None => copyBookRecordSize
138-
}
139-
140-
val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset
141-
142-
byteIndex += restOfDataLength
143-
144-
if (restOfDataLength > 0) {
145-
Some(binaryDataStart ++ dataStream.next(restOfDataLength))
146-
} else {
147-
Some(binaryDataStart)
148-
}
149-
}
150-
151-
final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = {
152-
val length = if (isLengthMapEmpty) {
153-
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
154-
case i: Int => i
155-
case l: Long => l.toInt
156-
case s: String => s.toInt
157-
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
158-
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
159-
}
160-
} else {
161-
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
162-
case i: Int => getRecordLengthFromMapping(i.toString)
163-
case l: Long => getRecordLengthFromMapping(l.toString)
164-
case s: String => getRecordLengthFromMapping(s)
165-
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
166-
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
167-
}
168-
}
169-
length + recordLengthAdjustment
170-
}
171-
172-
final private def getRecordLengthFromMapping(v: String): Int = {
173-
lengthMap.get(v) match {
174-
case Some(len) => len
175-
case None => throw new IllegalStateException(s"Record length value '$v' is not mapped to a record length.")
176-
}
177-
}
178-
179-
final private def getBytesAsHexString(bytes: Array[Byte]): String = {
180-
bytes.map("%02X" format _).mkString
181-
}
182-
183-
private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = {
184-
val lengthFieldBlock = expr.requiredBytesToread
185-
val evaluator = expr.evaluator
186-
187-
val binaryDataStart = dataStream.next(readerProperties.startOffset + lengthFieldBlock)
188-
189-
byteIndex += readerProperties.startOffset + lengthFieldBlock
190-
191-
if (binaryDataStart.length < readerProperties.startOffset + lengthFieldBlock) {
192-
return None
193-
}
194-
195-
expr.fields.foreach{
196-
case (name, field) =>
197-
val obj = cobolSchema.extractPrimitiveField(field, binaryDataStart, readerProperties.startOffset)
198-
try {
199-
obj match {
200-
case i: Int => evaluator.setValue(name, i)
201-
case l: Long => evaluator.setValue(name, l.toInt)
202-
case s: String => evaluator.setValue(name, s.toInt)
203-
case _ => throw new IllegalStateException(s"Record length value of the field ${field.name} must be an integral type.")
204-
}
205-
} catch {
206-
case ex: NumberFormatException =>
207-
throw new IllegalStateException(s"Encountered an invalid value of the record length field. Cannot parse '$obj' as an integer in: ${field.name} = '$obj'.", ex)
208-
}
209-
}
210-
211-
val recordLength = evaluator.eval()
212-
213-
val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset
214-
215-
byteIndex += restOfDataLength
216-
217-
if (restOfDataLength > 0) {
218-
Some(binaryDataStart ++ dataStream.next(restOfDataLength))
219-
} else {
220-
Some(binaryDataStart)
221-
}
222-
}
223-
224107
private def fetchRecordUsingRdwHeaders(): Option[Array[Byte]] = {
225108
val rdwHeaderBlock = recordHeaderParser.getHeaderLength
226109

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec
2020
import za.co.absa.cobrix.cobol.mock.{ByteStreamMock, RecordExtractorMock, RecordHeadersParserMock}
2121
import za.co.absa.cobrix.cobol.parser.CopybookParser
2222
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserRDW}
23-
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}
23+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedWithRecordLengthExprRawRecordExtractor, RawRecordContext, RawRecordExtractor}
2424
import za.co.absa.cobrix.cobol.reader.parameters.{MultisegmentParameters, ReaderParameters}
2525

2626
class VRLRecordReaderSpec extends AnyWordSpec {
@@ -127,10 +127,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
127127
0x00, 0x07, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
128128
).map(_.toByte)
129129

130+
val streamH = new ByteStreamMock(records)
131+
val streamD = new ByteStreamMock(records)
132+
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")
133+
134+
val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))
135+
130136
val reader = getUseCase(
131137
copybook = copybookWithFieldLength,
132138
records = records,
133-
lengthFieldExpression = Some("LEN"))
139+
lengthFieldExpression = Some("LEN"),
140+
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
134141

135142
assert(reader.hasNext)
136143
val (segment1, record1) = reader.next()
@@ -163,10 +170,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
163170
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF1, 0xF5, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
164171
).map(_.toByte)
165172

173+
val streamH = new ByteStreamMock(records)
174+
val streamD = new ByteStreamMock(records)
175+
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")
176+
177+
val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))
178+
166179
val reader = getUseCase(
167180
copybook = copybookWithFieldLength,
168181
records = records,
169-
lengthFieldExpression = Some("LEN"))
182+
lengthFieldExpression = Some("LEN"),
183+
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
170184

171185
assert(reader.hasNext)
172186
val (segment1, record1) = reader.next()
@@ -195,12 +209,18 @@ class VRLRecordReaderSpec extends AnyWordSpec {
195209
"""
196210

197211
val records = Array[Byte](0x00)
212+
val streamH = new ByteStreamMock(records)
213+
val streamD = new ByteStreamMock(records)
214+
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")
215+
216+
val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))
198217

199218
val ex = intercept[IllegalStateException] {
200219
getUseCase(
201220
copybook = copybookWithFieldLength,
202221
records = records,
203-
lengthFieldExpression = Some("LEN"))
222+
lengthFieldExpression = Some("LEN"),
223+
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
204224
}
205225

206226
assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.")
@@ -220,10 +240,17 @@ class VRLRecordReaderSpec extends AnyWordSpec {
220240
0x00, 0x08, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8
221241
).map(_.toByte)
222242

243+
val streamH = new ByteStreamMock(records)
244+
val streamD = new ByteStreamMock(records)
245+
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")
246+
247+
val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN - 1"))
248+
223249
val reader = getUseCase(
224250
copybook = copybookWithFieldLength,
225251
records = records,
226-
lengthFieldExpression = Some("LEN - 1"))
252+
lengthFieldExpression = Some("LEN - 1"),
253+
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
227254

228255
assert(reader.hasNext)
229256
val (segment1, record1) = reader.next()

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,26 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
6666
}
6767
}
6868

69+
"work for simple mappings with format=V" in {
70+
withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile =>
71+
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
72+
73+
val df = spark.read
74+
.format("cobol")
75+
.option("copybook_contents", copybook)
76+
.option("record_format", "V")
77+
.option("record_length_field", "SEG-ID")
78+
.option("input_split_records", "2")
79+
.option("pedantic", "true")
80+
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
81+
.load(tempFile)
82+
83+
val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
84+
85+
assert(actual == expected)
86+
}
87+
}
88+
6989
"work for numeric mappings" in {
7090
withTempBinFile("record_length_mapping", ".tmp", dataNumeric) { tempFile =>
7191
val expected = """{"SEG_ID":"1","TEXT":"123"},{"SEG_ID":"2","TEXT":"123456"},{"SEG_ID":"3","TEXT":"1234567"}"""

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test26FixLengthWithIdGeneration.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
5656
// A4
5757
0xC1.toByte, 0xF3.toByte, 0xF4.toByte,
5858
// A5
59-
0xC1.toByte, 0xF3.toByte, 0xF5.toByte,
59+
0xC1.toByte, 0xF3.toByte, 0xF5.toByte
6060
)
6161

6262
val binFileContentsLengthExpr: Array[Byte] = Array[Byte](
@@ -77,7 +77,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
7777
// A4
7878
0xC1.toByte, 0xF2.toByte, 0xF4.toByte,
7979
// A5
80-
0xC1.toByte, 0xF2.toByte, 0xF5.toByte,
80+
0xC1.toByte, 0xF2.toByte, 0xF5.toByte
8181
)
8282

8383
val expected: String =

0 commit comments

Comments
 (0)