Skip to content

#719 Improve the error message about file size not divisible by record size. #720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,19 @@ private[source] object CobolScanners extends Logging {
val recordSize = reader.getRecordSize

sourceDirs.foreach(sourceDir => {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
if (!debugIgnoreFileSize) {
val nonDivisibleFiles = getNonDivisibleFiles(sourceDir, conf, recordSize)

if (nonDivisibleFiles.nonEmpty) {
nonDivisibleFiles.head match {
case (name, size) =>
if (nonDivisibleFiles.length > 1) {
throw new IllegalArgumentException(s"Multiple file sizes are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Example file: $name size ($size bytes).")
} else {
throw new IllegalArgumentException(s"File $name size ($size bytes) is NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record).")
}
}
}
}
})

Expand Down Expand Up @@ -164,15 +175,14 @@ private[source] object CobolScanners extends Logging {
recordParser(reader, records)
}

private def areThereNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Boolean = {

private def getNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Seq[(String, Long)] = {
val fileSystem = new Path(sourceDir).getFileSystem(hadoopConfiguration)

if (FileUtils.getNumberOfFilesInDir(sourceDir, fileSystem) < FileUtils.THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK) {
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) > 0
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem)
}
else {
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem)
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem).toSeq
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object FileUtils extends Logging {
/**
* Finds the first file that is non-divisible by a given divisor and logs its name.
*/
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Boolean = {
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Option[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -197,13 +197,13 @@ object FileUtils extends Logging {
logger.error(s"File ${firstNonDivisibleFile.get.getPath} size (${firstNonDivisibleFile.get.getLen}) IS NOT divisible by $divisor.")
}

firstNonDivisibleFile.isDefined
firstNonDivisibleFile.map(status => (status.getPath.toString, status.getLen))
}

/**
* Finds all the files the are not divisible by a given divisor and logs their names.
*/
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Long = {
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Seq[(String, Long)] = {

val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))

Expand All @@ -213,7 +213,7 @@ object FileUtils extends Logging {
allNonDivisibleFiles.foreach(file => logger.error(s"File ${file.getPath} size (${file.getLen}) IS NOT divisible by $divisor."))
}

allNonDivisibleFiles.length
allNonDivisibleFiles.map(status => (status.getPath.toString, status.getLen))
}

private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF
val ex = intercept[IllegalArgumentException] {
getDataFrame(tmpFileName, Map("record_length" -> "7")).collect()
}
assert(ex.getMessage.contains("are NOT DIVISIBLE by the RECORD SIZE"))
assert(ex.getMessage.contains("NOT DIVISIBLE by the RECORD SIZE"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,70 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp

test("Test findAndLogFirstNonDivisibleFile() finds a file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a-file.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isDefined)
assert(nonDivisibleFileOpt.get._1.endsWith("a-file.dat"))
assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._2 == 1)
assert(nonDivisibleFiles.head._1.endsWith("a-file.dat"))
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, ".a")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file in a nested dir") {
withTempDirectory("testHidden3") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, "dir1"))
createFileSize1(Files.createFile(Paths.get(tmpDir, "dir1", ".b2")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() ignores a hidden dir") {
withTempDirectory("testHidden4") { tmpDir =>
Files.createDirectory(Paths.get(tmpDir, ".dir2"))
createFileSize1(Files.createFile(Paths.get(tmpDir, ".dir2", "c1")))
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isEmpty)
assert(nonDivisibleFiles.isEmpty)
}
}

test("Test findAndLogFirstNonDivisibleFile() works with globbing") {
withTempDirectory("testHidden1") { tmpDir =>
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
assert(FileUtils.findAndLogFirstNonDivisibleFile(s"$tmpDir/*", 2, fileSystem))
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
createFileSize1(Files.createFile(Paths.get(tmpDir, "a.dat")))

val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)

assert(nonDivisibleFileOpt.isDefined)
assert(nonDivisibleFileOpt.get._1.endsWith("a.dat"))
assert(nonDivisibleFileOpt.get._2 == 1)

assert(nonDivisibleFiles.length == 1)
assert(nonDivisibleFiles.head._1.endsWith("a.dat"))
assert(nonDivisibleFiles.head._2 == 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
}

it should "return the number of files inside a directory" in {

val length = 10
val numFiles = 5

Expand All @@ -137,52 +136,54 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
}

it should "return 0 if there are no files inside a directory" in {

assertResult(0)(FileUtils.getNumberOfFilesInDir(controlledLengthFilesDir.getAbsolutePath, fileSystem))
}

it should "return 1 if there source is actually a file" in {

val aFile = getRandomFileToBeWritten
produceFileOfLength(aFile, 10)

assertResult(1)(FileUtils.getNumberOfFilesInDir(aFile.getAbsolutePath, fileSystem))
}

it should "return the file itself if non-divisible and if asked for first file" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isDefined)
assert(notDivisibleFileOpt.get._2 == 11)
}

it should "return the file itself if non-divisible and if asked for multiple files" in {

val aFile = getRandomFileToBeWritten

val divisor = 10
produceFileOfLength(aFile, divisor + 1)

assertResult(1)(FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 1)
assert(notDivisibleFiles.head._2 == 11)
}

it should "return true if found first non-divisible file" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor + 1) // non-divisible

assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isDefined)
}

it should "return number of non-divisible files" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -191,24 +192,25 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4 + 1) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5 + 1) // non-divisible

assertResult(2)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.length == 2)
}

it should "return false if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
produceFileOfLength(getRandomFileToBeWritten, divisor * 4)

assertResult(false)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFileOpt.isEmpty)
}

it should "return 0 if no files are non-divisible by expected divisor" in {

val divisor = 10

produceFileOfLength(getRandomFileToBeWritten, divisor)
Expand All @@ -217,7 +219,9 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
produceFileOfLength(getRandomFileToBeWritten, divisor * 4) // non-divisible
produceFileOfLength(getRandomFileToBeWritten, divisor * 5) // non-divisible

assertResult(0)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)

assert(notDivisibleFiles.isEmpty)
}

private def getRandomFileToBeWritten: File = new File(controlledLengthFilesDir, UUID.randomUUID().toString)
Expand Down
Loading