@@ -393,39 +393,40 @@ public void testVectoredIOEndToEnd() throws Exception {
393
393
394
394
try (FSDataInputStream in = fs .open (path (VECTORED_READ_FILE_NAME ))) {
395
395
in .readVectored (fileRanges , value -> pool .getBuffer (true , value ));
396
- // user can perform other computations while waiting for IO.
397
396
for (FileRange res : fileRanges ) {
398
397
dataProcessor .submit (() -> {
399
398
try {
400
- readBufferValidateDataAndReturnToPool (pool , res , countDown );
399
+ readBufferValidateDataAndReturnToPool (res , countDown );
401
400
} catch (Exception e ) {
402
- LOG .error ("Error while process result for {} " , res , e );
401
+ String error = String .format ("Error while processing result for %s" , res );
402
+ LOG .error (error , e );
403
+ ContractTestUtils .fail (error , e );
403
404
}
404
405
});
405
406
}
406
- if (!countDown .await (100 , TimeUnit .SECONDS )) {
407
- throw new AssertionError ("Error while processing vectored io results" );
407
+ // user can perform other computations while waiting for IO.
408
+ if (!countDown .await (VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
409
+ ContractTestUtils .fail ("Timeout/Error while processing vectored io results" );
408
410
}
409
411
} finally {
410
- pool .release ();
411
412
HadoopExecutors .shutdown (dataProcessor , LOG , 100 , TimeUnit .SECONDS );
412
413
}
413
414
}
414
415
415
- private void readBufferValidateDataAndReturnToPool (ByteBufferPool pool ,
416
- FileRange res ,
416
+ private void readBufferValidateDataAndReturnToPool (FileRange res ,
417
417
CountDownLatch countDownLatch )
418
418
throws IOException , TimeoutException {
419
419
CompletableFuture <ByteBuffer > data = res .getData ();
420
- ByteBuffer buffer = FutureIO .awaitFuture (data ,
421
- VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS ,
422
- TimeUnit .SECONDS );
423
420
// Read the data and perform custom operation. Here we are just
424
421
// validating it with original data.
425
- assertDatasetEquals ((int ) res .getOffset (), "vecRead" ,
426
- buffer , res .getLength (), DATASET );
427
- // return buffer to pool.
428
- pool .putBuffer (buffer );
422
+ FutureIO .awaitFuture (data .thenAccept (buffer -> {
423
+ assertDatasetEquals ((int ) res .getOffset (),
424
+ "vecRead" , buffer , res .getLength (), DATASET );
425
+ // return buffer to the pool once read.
426
+ pool .putBuffer (buffer );
427
+ }), VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS , TimeUnit .SECONDS );
428
+
429
+ // countdown to notify main thread that processing has been done.
429
430
countDownLatch .countDown ();
430
431
}
431
432
0 commit comments