@@ -1161,29 +1161,45 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
11611161 when(mockedFs.open(path)).thenReturn(in)
11621162 when(in.getWrappedStream).thenReturn(dfsIn)
11631163 when(dfsIn.getFileLength).thenReturn(200 )
1164+
11641165 // FileStatus.getLen is more than logInfo fileSize
11651166 var fileStatus = new FileStatus (200 , false , 0 , 0 , 0 , path)
1167+ when(mockedFs.getFileStatus(path)).thenReturn(fileStatus)
11661168 var logInfo = new LogInfo (path.toString, 0 , LogType .EventLogs , Some (" appId" ),
11671169 Some (" attemptId" ), 100 , None , false )
1168- assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
1170+ var reader = EventLogFileReader .getEventLogReader(mockedFs, path)
1171+ assert(reader.isDefined)
1172+ assert(mockedProvider.shouldReloadLog(logInfo, reader.get))
11691173
11701174 fileStatus = new FileStatus ()
11711175 fileStatus.setPath(path)
1176+ when(mockedFs.getFileStatus(path)).thenReturn(fileStatus)
11721177 // DFSInputStream.getFileLength is more than logInfo fileSize
11731178 logInfo = new LogInfo (path.toString, 0 , LogType .EventLogs , Some (" appId" ),
11741179 Some (" attemptId" ), 100 , None , false )
1175- assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
1180+ reader = EventLogFileReader .getEventLogReader(mockedFs, path)
1181+ assert(reader.isDefined)
1182+ assert(mockedProvider.shouldReloadLog(logInfo, reader.get))
1183+
11761184 // DFSInputStream.getFileLength is equal to logInfo fileSize
11771185 logInfo = new LogInfo (path.toString, 0 , LogType .EventLogs , Some (" appId" ),
11781186 Some (" attemptId" ), 200 , None , false )
1179- assert(! mockedProvider.shouldReloadLog(logInfo, fileStatus))
1187+ reader = EventLogFileReader .getEventLogReader(mockedFs, path)
1188+ assert(reader.isDefined)
1189+ assert(! mockedProvider.shouldReloadLog(logInfo, reader.get))
1190+
11801191 // in.getWrappedStream returns other than DFSInputStream
11811192 val bin = mock(classOf [BufferedInputStream ])
11821193 when(in.getWrappedStream).thenReturn(bin)
1183- assert(! mockedProvider.shouldReloadLog(logInfo, fileStatus))
1194+ reader = EventLogFileReader .getEventLogReader(mockedFs, path)
1195+ assert(reader.isDefined)
1196+ assert(! mockedProvider.shouldReloadLog(logInfo, reader.get))
1197+
11841198 // fs.open throws exception
11851199 when(mockedFs.open(path)).thenThrow(new IOException (" Throwing intentionally" ))
1186- assert(! mockedProvider.shouldReloadLog(logInfo, fileStatus))
1200+ reader = EventLogFileReader .getEventLogReader(mockedFs, path)
1201+ assert(reader.isDefined)
1202+ assert(! mockedProvider.shouldReloadLog(logInfo, reader.get))
11871203 }
11881204
11891205 test(" log cleaner with the maximum number of log files" ) {
0 commit comments