Skip to content

Commit 9d1fb43

Browse files
committed
#719 Improve the error message about file size not divisible by record size.
1 parent 070938d commit 9d1fb43

File tree

5 files changed

+80
-39
lines changed

5 files changed

+80
-39
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,19 @@ private[source] object CobolScanners extends Logging {
9595
val recordSize = reader.getRecordSize
9696

9797
sourceDirs.foreach(sourceDir => {
98-
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, conf, recordSize)) {
99-
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
98+
if (!debugIgnoreFileSize) {
99+
val nonDivisibleFiles = getNonDivisibleFiles(sourceDir, conf, recordSize)
100+
101+
if (nonDivisibleFiles.nonEmpty) {
102+
nonDivisibleFiles.head match {
103+
case (name, size) =>
104+
if (nonDivisibleFiles.length > 1) {
105+
throw new IllegalArgumentException(s"Multiple file sizes are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Example file: $name size ($size bytes).")
106+
} else {
107+
throw new IllegalArgumentException(s"File $name size ($size bytes) is NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record).")
108+
}
109+
}
110+
}
100111
}
101112
})
102113

@@ -164,15 +175,14 @@ private[source] object CobolScanners extends Logging {
164175
recordParser(reader, records)
165176
}
166177

167-
private def areThereNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Boolean = {
168-
178+
private def getNonDivisibleFiles(sourceDir: String, hadoopConfiguration: Configuration, divisor: Int): Seq[(String, Long)] = {
169179
val fileSystem = new Path(sourceDir).getFileSystem(hadoopConfiguration)
170180

171181
if (FileUtils.getNumberOfFilesInDir(sourceDir, fileSystem) < FileUtils.THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK) {
172-
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem) > 0
182+
FileUtils.findAndLogAllNonDivisibleFiles(sourceDir, divisor, fileSystem)
173183
}
174184
else {
175-
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem)
185+
FileUtils.findAndLogFirstNonDivisibleFile(sourceDir, divisor, fileSystem).toSeq
176186
}
177187
}
178-
}
188+
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ object FileUtils extends Logging {
186186
/**
187187
* Finds the first file that is non-divisible by a given divisor and logs its name.
188188
*/
189-
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Boolean = {
189+
def findAndLogFirstNonDivisibleFile(sourceDir: String, divisor: Long, fileSystem: FileSystem): Option[(String, Long)] = {
190190

191191
val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))
192192

@@ -197,13 +197,13 @@ object FileUtils extends Logging {
197197
logger.error(s"File ${firstNonDivisibleFile.get.getPath} size (${firstNonDivisibleFile.get.getLen}) IS NOT divisible by $divisor.")
198198
}
199199

200-
firstNonDivisibleFile.isDefined
200+
firstNonDivisibleFile.map(status => (status.getPath.toString, status.getLen))
201201
}
202202

203203
/**
204204
* Finds all the files the are not divisible by a given divisor and logs their names.
205205
*/
206-
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Long = {
206+
def findAndLogAllNonDivisibleFiles(sourceDir: String, divisor: Long, fileSystem: FileSystem): Seq[(String, Long)] = {
207207

208208
val allFiles = expandDirectories(fileSystem, fileSystem.globStatus(new Path(sourceDir), hiddenFileFilter))
209209

@@ -213,7 +213,7 @@ object FileUtils extends Logging {
213213
allNonDivisibleFiles.foreach(file => logger.error(s"File ${file.getPath} size (${file.getLen}) IS NOT divisible by $divisor."))
214214
}
215215

216-
allNonDivisibleFiles.length
216+
allNonDivisibleFiles.map(status => (status.getPath.toString, status.getLen))
217217
}
218218

219219
private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF
7878
val ex = intercept[IllegalArgumentException] {
7979
getDataFrame(tmpFileName, Map("record_length" -> "7")).collect()
8080
}
81-
assert(ex.getMessage.contains("are NOT DIVISIBLE by the RECORD SIZE"))
81+
assert(ex.getMessage.contains("NOT DIVISIBLE by the RECORD SIZE"))
8282
}
8383
}
8484
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test07IgnoreHiddenFiles.scala

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,70 @@ class Test07IgnoreHiddenFiles extends AnyFunSuite with BinaryFileFixture with Sp
3030

3131
test("Test findAndLogFirstNonDivisibleFile() finds a file") {
3232
withTempDirectory("testHidden1") { tmpDir =>
33-
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
34-
assert(FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
35-
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
33+
createFileSize1(Files.createFile(Paths.get(tmpDir, "a-file.dat")))
34+
35+
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
36+
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)
37+
38+
assert(nonDivisibleFileOpt.isDefined)
39+
assert(nonDivisibleFileOpt.get._1.endsWith("a-file.dat"))
40+
assert(nonDivisibleFileOpt.get._2 == 1)
41+
42+
assert(nonDivisibleFiles.length == 1)
43+
assert(nonDivisibleFiles.head._2 == 1)
44+
assert(nonDivisibleFiles.head._1.endsWith("a-file.dat"))
3645
}
3746
}
3847

3948
test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file") {
4049
withTempDirectory("testHidden1") { tmpDir =>
4150
createFileSize1(Files.createFile(Paths.get(tmpDir, ".a")))
42-
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
43-
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
51+
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
52+
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)
53+
54+
assert(nonDivisibleFileOpt.isEmpty)
55+
assert(nonDivisibleFiles.isEmpty)
4456
}
4557
}
4658

4759
test("Test findAndLogFirstNonDivisibleFile() ignores a hidden file in a nested dir") {
4860
withTempDirectory("testHidden3") { tmpDir =>
4961
Files.createDirectory(Paths.get(tmpDir, "dir1"))
5062
createFileSize1(Files.createFile(Paths.get(tmpDir, "dir1", ".b2")))
51-
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
52-
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
63+
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
64+
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)
65+
66+
assert(nonDivisibleFileOpt.isEmpty)
67+
assert(nonDivisibleFiles.isEmpty)
5368
}
5469
}
5570

5671
test("Test findAndLogFirstNonDivisibleFile() ignores a hidden dir") {
5772
withTempDirectory("testHidden4") { tmpDir =>
5873
Files.createDirectory(Paths.get(tmpDir, ".dir2"))
5974
createFileSize1(Files.createFile(Paths.get(tmpDir, ".dir2", "c1")))
60-
assert(!FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem))
61-
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 0)
75+
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
76+
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)
77+
78+
assert(nonDivisibleFileOpt.isEmpty)
79+
assert(nonDivisibleFiles.isEmpty)
6280
}
6381
}
6482

6583
test("Test findAndLogFirstNonDivisibleFile() works with globbing") {
6684
withTempDirectory("testHidden1") { tmpDir =>
67-
createFileSize1(Files.createFile(Paths.get(tmpDir, "a")))
68-
assert(FileUtils.findAndLogFirstNonDivisibleFile(s"$tmpDir/*", 2, fileSystem))
69-
assert(FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem) == 1)
85+
createFileSize1(Files.createFile(Paths.get(tmpDir, "a.dat")))
86+
87+
val nonDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(tmpDir, 2, fileSystem)
88+
val nonDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(tmpDir, 2, fileSystem)
89+
90+
assert(nonDivisibleFileOpt.isDefined)
91+
assert(nonDivisibleFileOpt.get._1.endsWith("a.dat"))
92+
assert(nonDivisibleFileOpt.get._2 == 1)
93+
94+
assert(nonDivisibleFiles.length == 1)
95+
assert(nonDivisibleFiles.head._1.endsWith("a.dat"))
96+
assert(nonDivisibleFiles.head._2 == 1)
7097
}
7198
}
7299

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtilsSpec.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
127127
}
128128

129129
it should "return the number of files inside a directory" in {
130-
131130
val length = 10
132131
val numFiles = 5
133132

@@ -137,52 +136,54 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
137136
}
138137

139138
it should "return 0 if there are no files inside a directory" in {
140-
141139
assertResult(0)(FileUtils.getNumberOfFilesInDir(controlledLengthFilesDir.getAbsolutePath, fileSystem))
142140
}
143141

144142
it should "return 1 if there source is actually a file" in {
145-
146143
val aFile = getRandomFileToBeWritten
147144
produceFileOfLength(aFile, 10)
148145

149146
assertResult(1)(FileUtils.getNumberOfFilesInDir(aFile.getAbsolutePath, fileSystem))
150147
}
151148

152149
it should "return the file itself if non-divisible and if asked for first file" in {
153-
154150
val aFile = getRandomFileToBeWritten
155151

156152
val divisor = 10
157153
produceFileOfLength(aFile, divisor + 1)
158154

159-
assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem))
155+
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(aFile.getAbsolutePath, divisor, fileSystem)
156+
157+
assert(notDivisibleFileOpt.isDefined)
158+
assert(notDivisibleFileOpt.get._2 == 11)
160159
}
161160

162161
it should "return the file itself if non-divisible and if asked for multiple files" in {
163-
164162
val aFile = getRandomFileToBeWritten
165163

166164
val divisor = 10
167165
produceFileOfLength(aFile, divisor + 1)
168166

169-
assertResult(1)(FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem))
167+
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(aFile.getAbsolutePath, divisor, fileSystem)
168+
169+
assert(notDivisibleFiles.length == 1)
170+
assert(notDivisibleFiles.head._2 == 11)
170171
}
171172

172173
it should "return true if found first non-divisible file" in {
173-
174174
val divisor = 10
175175

176176
produceFileOfLength(getRandomFileToBeWritten, divisor)
177177
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
178178
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
179179
produceFileOfLength(getRandomFileToBeWritten, divisor + 1) // non-divisible
180180

181-
assertResult(true)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
181+
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)
182+
183+
assert(notDivisibleFileOpt.isDefined)
182184
}
183185

184186
it should "return number of non-divisible files" in {
185-
186187
val divisor = 10
187188

188189
produceFileOfLength(getRandomFileToBeWritten, divisor)
@@ -191,24 +192,25 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
191192
produceFileOfLength(getRandomFileToBeWritten, divisor * 4 + 1) // non-divisible
192193
produceFileOfLength(getRandomFileToBeWritten, divisor * 5 + 1) // non-divisible
193194

194-
assertResult(2)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
195+
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)
196+
197+
assert(notDivisibleFiles.length == 2)
195198
}
196199

197200
it should "return false if no files are non-divisible by expected divisor" in {
198-
199201
val divisor = 10
200202

201203
produceFileOfLength(getRandomFileToBeWritten, divisor)
202204
produceFileOfLength(getRandomFileToBeWritten, divisor * 2)
203205
produceFileOfLength(getRandomFileToBeWritten, divisor * 3)
204206
produceFileOfLength(getRandomFileToBeWritten, divisor * 4)
205207

206-
assertResult(false)(FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
208+
val notDivisibleFileOpt = FileUtils.findAndLogFirstNonDivisibleFile(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)
207209

210+
assert(notDivisibleFileOpt.isEmpty)
208211
}
209212

210213
it should "return 0 if no files are non-divisible by expected divisor" in {
211-
212214
val divisor = 10
213215

214216
produceFileOfLength(getRandomFileToBeWritten, divisor)
@@ -217,7 +219,9 @@ class FileUtilsSpec extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAft
217219
produceFileOfLength(getRandomFileToBeWritten, divisor * 4) // non-divisible
218220
produceFileOfLength(getRandomFileToBeWritten, divisor * 5) // non-divisible
219221

220-
assertResult(0)(FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem))
222+
val notDivisibleFiles = FileUtils.findAndLogAllNonDivisibleFiles(controlledLengthFilesDir.getAbsolutePath, divisor, fileSystem)
223+
224+
assert(notDivisibleFiles.isEmpty)
221225
}
222226

223227
private def getRandomFileToBeWritten: File = new File(controlledLengthFilesDir, UUID.randomUUID().toString)

0 commit comments

Comments
 (0)