Skip to content

Commit

Permalink
#442 Fix the file streamer to return only the exact bytes range it is…
Browse files Browse the repository at this point in the history
… defined for
  • Loading branch information
yruslan committed Nov 30, 2021
1 parent 9a3a2d7 commit c9c6a41
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,8 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
// In the latter case
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
// Last record
if (pendingBytesSize < recordSize && pendingBytesSize < 3) {
// Broken record. This occurs on indexing boundaries since index upper boundary can include line break character,
// but the record might not contain that line ending character. This results in 1 or 2 characters (depending on
// LT or CRLF) from the next record to be propagated to the current index, resulting in 1 additional record being
// generated.
curRecordSize = 0
curPayloadSize = 0
} else {
// There is some in fo the record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
}
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
} else {
// This is an errors situation - no line breaks between records
// Return a record worth of data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
package za.co.absa.cobrix.spark.cobol.source.streaming

import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.slf4j.LoggerFactory

class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) {
private val logger = LoggerFactory.getLogger(this.getClass)

val bytesInMegabyte: Int = 1048576

if (bufferSizeInMegabytes <=0 || bufferSizeInMegabytes > 1000) {
Expand All @@ -30,8 +27,6 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff

var in: FSDataInputStream = fileSystem.open(filePath)
if (startOffset > 0) {
val size = fileSystem.getContentSummary(filePath).getLength
logger.info(s"Offset: $startOffset, file size: $size")
in.seek(startOffset)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.ContentSummary
import za.co.absa.cobrix.cobol.reader.common.Constants

/**
* This class provides methods for streaming bytes from an HDFS file.
* This class provides methods for streaming bytes from an Hadoop file.
*
* It is stateful, which means that it stores the offset until which the file has been consumed.
*
Expand All @@ -40,10 +40,10 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =

private var byteIndex = startOffset

// Use a buffer to read the data from HDFS in big chunks
private var buferredStream = new BufferedFSDataInputStream(getHDFSPath(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
// Use a buffer to read the data from Hadoop in big chunks
private var bufferedStream = new BufferedFSDataInputStream(getHadoopPath(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)

private val fileSize = getHDFSFileSize(getHDFSPath(filePath))
private val fileSize = getHadoopFileSize(getHadoopPath(filePath))

override def inputFileName: String = filePath

Expand All @@ -64,27 +64,34 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
* @return
*/
override def next(numberOfBytes: Int): Array[Byte] = {
if ((maximumBytes > 0 && byteIndex - startOffset >= maximumBytes) || buferredStream.isClosed) {
val actualBytesToRead = if (maximumBytes > 0) {
Math.min(maximumBytes - byteIndex + startOffset, numberOfBytes).toInt
} else {
numberOfBytes
}

if (numberOfBytes <= 0) {
new Array[Byte](0)
} else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) {
close()
new Array[Byte](0)
} else {
val buffer = new Array[Byte](actualBytesToRead)

val buffer = new Array[Byte](numberOfBytes)

var readBytes = buferredStream.readFully(buffer, 0, numberOfBytes)
val readBytes = bufferedStream.readFully(buffer, 0, actualBytesToRead)

if (readBytes > 0) {
byteIndex = byteIndex + readBytes
}

if (readBytes == numberOfBytes) {
buffer
}
else {
} else {
logger.warn(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
// resize buffer so that the consumer knows how many bytes are there
close()
if (readBytes > 0) {
if (readBytes == actualBytesToRead) {
buffer
} else if (readBytes > 0) {
val shrunkBuffer = new Array[Byte](readBytes)
System.arraycopy(buffer, 0, shrunkBuffer, 0, readBytes)
shrunkBuffer
Expand All @@ -96,36 +103,27 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
}

override def close(): Unit = {
if (buferredStream != null && !buferredStream.isClosed) {
buferredStream.close()
buferredStream = null
if (bufferedStream != null && !bufferedStream.isClosed) {
bufferedStream.close()
bufferedStream = null
}
}

/**
* Gets an HDFS [[Path]] to the file.
* Gets a Hadoop [[Path]] (HDFS, S3, DBFS, etc) to the file.
*
* Throws IllegalArgumentException in case the file does not exist.
*/
private def getHDFSPath(path: String) = {

if (fileSystem == null) {
throw new IllegalArgumentException("Null FileSystem instance.")
}

if (path == null) {
throw new IllegalArgumentException("Null input file.")
}

val hdfsPath = new Path(path)
if (!fileSystem.exists(hdfsPath)) {
throw new IllegalArgumentException(s"Inexistent file: $path")
private def getHadoopPath(path: String) = {
val hadoopPath = new Path(path)
if (!fileSystem.exists(hadoopPath)) {
throw new IllegalArgumentException(s"File does not exist: $path")
}
hdfsPath
hadoopPath
}

private def getHDFSFileSize(hdfsPath: Path): Long = {
val cSummary: ContentSummary = fileSystem.getContentSummary(hdfsPath)
private def getHadoopFileSize(hadoopPath: Path): Long = {
val cSummary: ContentSummary = fileSystem.getContentSummary(hadoopPath)
cSummary.getLength
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,33 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix
}
}

"correctly read text files with a double EOL characters and 1 byte record and index" in {
val copybook2 =
""" 01 ENTITY.
05 A PIC X(1).
"""

val text = "A\r\nB\r\nC\nD\nE\nF"
withTempBinFile("crlf_empty", ".dat", text.getBytes()) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook2)
.option("pedantic", "true")
.option("record_format", "D")
.option("input_split_records", 2)
.load(tmpFileName)

val expected = """[{"A":"A"},{"A":"B"},{"A":"C"},{"A":"D"},{"A":"E"},{"A":"F"}]"""

val count = df.count()
val actual = df.orderBy("A").toJSON.collect().mkString("[", ",", "]")

assert(count == 6)
assertEqualsMultiline(actual, expected)
}
}

"correctly read text files with a double EOL characters and the last record is too short" in {
val copybook2 =
""" 01 ENTITY.
Expand All @@ -216,4 +243,32 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix
assertEqualsMultiline(actual, expected)
}
}

"correctly read text files with indexing and a copybook that has a slightly greater record size" in {
val copybook2 =
""" 01 ENTITY.
05 A PIC X(7).
"""

val text = "AAAAA\r\nBBBBB\nCCCCC\nDDDDD\nEEEEE\nFFFFF\nGGGGG\nHHHHH"
withTempBinFile("crlf_empty", ".dat", text.getBytes()) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook2)
.option("pedantic", "true")
.option("record_format", "D")
.option("input_split_records", 2)
.load(tmpFileName)

val expected = """[{"A":"AAAAA"},{"A":"BBBBB"},{"A":"CCCCC"},{"A":"DDDDD"},{"A":"EEEEE"},{"A":"FFFFF"},{"A":"GGGGG"},{"A":"HHHHH"}]"""

val count = df.count()
val actual = df.orderBy("A").toJSON.collect().mkString("[", ",", "]")

assert(count == 8)
assertEqualsMultiline(actual, expected)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,6 @@ class FileStreamerSpec extends FlatSpec with BeforeAndAfter with Matchers {

behavior of classOf[FileStreamer].getName

it should "throw if file path is null" in {
val caught = intercept[IllegalArgumentException] {
new FileStreamer(null, FileSystem.get(new Configuration()))
}
assert(caught.getMessage.toLowerCase.contains("null"))
}

it should "throw if FileSystem is null" in {
val caught = intercept[IllegalArgumentException] {
new FileStreamer(createTempFile(2).getAbsolutePath, null)
}
assert(caught.getMessage.toLowerCase.contains("null"))
}

it should "throw if file does not exist" in {
val caught = intercept[IllegalArgumentException] {
new FileStreamer(new File(TEMP_DIR, "inexistent").getAbsolutePath, FileSystem.get(new Configuration()))
Expand Down

0 comments on commit c9c6a41

Please sign in to comment.