@@ -404,13 +404,21 @@ private Set<StripedDataStreamer> checkStreamers() throws IOException {
404404 LOG .debug ("newly failed streamers: " + newFailed );
405405 }
406406 if (failCount > (numAllBlocks - numDataBlocks )) {
407+ closeAllStreamers ();
407408 throw new IOException ("Failed: the number of failed blocks = "
408409 + failCount + " > the number of parity blocks = "
409410 + (numAllBlocks - numDataBlocks ));
410411 }
411412 return newFailed ;
412413 }
413414
415+ private void closeAllStreamers () {
416+ // The write has failed, Close all the streamers.
417+ for (StripedDataStreamer streamer : streamers ) {
418+ streamer .close (true );
419+ }
420+ }
421+
414422 private void handleCurrentStreamerFailure (String err , Exception e )
415423 throws IOException {
416424 currentPacket = null ;
@@ -670,6 +678,8 @@ private void checkStreamerFailures(boolean isNeedFlushAllPackets)
670678 newFailed = waitCreatingStreamers (healthySet );
671679 if (newFailed .size () + failedStreamers .size () >
672680 numAllBlocks - numDataBlocks ) {
681+ // The write has failed, Close all the streamers.
682+ closeAllStreamers ();
673683 throw new IOException (
674684 "Data streamers failed while creating new block streams: "
675685 + newFailed + ". There are not enough healthy streamers." );
@@ -1169,32 +1179,32 @@ void setClosed() {
11691179
11701180 @ Override
11711181 protected synchronized void closeImpl () throws IOException {
1172- if (isClosed ()) {
1173- exceptionLastSeen .check (true );
1174-
1175- // Writing to at least {dataUnits} replicas can be considered as success,
1176- // and the rest of data can be recovered.
1177- final int minReplication = ecPolicy .getNumDataUnits ();
1178- int goodStreamers = 0 ;
1179- final MultipleIOException .Builder b = new MultipleIOException .Builder ();
1180- for (final StripedDataStreamer si : streamers ) {
1181- try {
1182- si .getLastException ().check (true );
1183- goodStreamers ++;
1184- } catch (IOException e ) {
1185- b .add (e );
1182+ try {
1183+ if (isClosed ()) {
1184+ exceptionLastSeen .check (true );
1185+
1186+ // Writing to at least {dataUnits} replicas can be considered as
1187+ // success, and the rest of data can be recovered.
1188+ final int minReplication = ecPolicy .getNumDataUnits ();
1189+ int goodStreamers = 0 ;
1190+ final MultipleIOException .Builder b = new MultipleIOException .Builder ();
1191+ for (final StripedDataStreamer si : streamers ) {
1192+ try {
1193+ si .getLastException ().check (true );
1194+ goodStreamers ++;
1195+ } catch (IOException e ) {
1196+ b .add (e );
1197+ }
11861198 }
1187- }
1188- if ( goodStreamers < minReplication ) {
1189- final IOException ioe = b . build ();
1190- if ( ioe != null ) {
1191- throw ioe ;
1199+ if ( goodStreamers < minReplication ) {
1200+ final IOException ioe = b . build ();
1201+ if ( ioe != null ) {
1202+ throw ioe ;
1203+ }
11921204 }
1205+ return ;
11931206 }
1194- return ;
1195- }
11961207
1197- try {
11981208 try {
11991209 // flush from all upper layers
12001210 flushBuffer ();
0 commit comments