|
125 | 125 | * +--------+-------------------------------------------------+----------------+ |
126 | 126 | * </pre> |
127 | 127 | */ |
128 | | -@SuppressWarnings("checkstyle:LineLength") |
129 | 128 | public class ShuffleChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> { |
130 | 129 | private final ShuffleChannelHandlerContext handlerCtx; |
131 | 130 |
|
@@ -313,7 +312,6 @@ public void sendMap(ReduceContext reduceContext) { |
313 | 312 | } |
314 | 313 | LOG.trace("Calling sendMapOutput; channel='{}'", reduceContext.ctx.channel().id()); |
315 | 314 | ChannelFuture nextMap = sendMapOutput( |
316 | | - reduceContext.getCtx(), |
317 | 315 | reduceContext.getCtx().channel(), |
318 | 316 | reduceContext.getUser(), mapId, |
319 | 317 | reduceContext.getReduceId(), info); |
@@ -493,8 +491,7 @@ public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws IOExcept |
493 | 491 | return wrappedBuffer(dob.getData(), 0, dob.getLength()); |
494 | 492 | } |
495 | 493 |
|
496 | | - protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, |
497 | | - String user, String mapId, int reduce, |
| 494 | + protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId, int reduce, |
498 | 495 | MapOutputInfo mapOutputInfo) |
499 | 496 | throws IOException { |
500 | 497 | final IndexRecord info = mapOutputInfo.indexRecord; |
@@ -678,30 +675,36 @@ static class ReduceMapFileCount implements ChannelFutureListener { |
678 | 675 |
|
679 | 676 | @Override |
680 | 677 | public void operationComplete(ChannelFuture future) throws Exception { |
681 | | - LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'", this.reduceContext.getMapsToWait().get(), future.channel().id()); |
| 678 | + LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'", |
| 679 | + this.reduceContext.getMapsToWait().get(), future.channel().id()); |
682 | 680 | if (!future.isSuccess()) { |
683 | | - LOG.error("Future is unsuccessful. channel='{}' Cause: ", future.channel().id(), future.cause()); |
| 681 | + LOG.error("Future is unsuccessful. channel='{}' Cause: ", |
| 682 | + future.channel().id(), future.cause()); |
684 | 683 | future.channel().close(); |
685 | 684 | return; |
686 | 685 | } |
687 | 686 | int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); |
688 | 687 | if (waitCount == 0) { |
689 | | - ChannelFuture lastContentFuture = future.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
| 688 | + ChannelFuture lastContentFuture = |
| 689 | + future.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
690 | 690 | handler.handlerCtx.metrics.operationComplete(future); |
691 | 691 |
|
692 | 692 | // Let the idle timer handler close keep-alive connections |
693 | 693 | if (reduceContext.getKeepAlive()) { |
694 | | - LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'", future.channel().id()); |
| 694 | + LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'", |
| 695 | + future.channel().id()); |
695 | 696 | ChannelPipeline pipeline = future.channel().pipeline(); |
696 | 697 | ShuffleHandler.TimeoutHandler timeoutHandler = |
697 | 698 | (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); |
698 | 699 | timeoutHandler.setEnabledTimeout(true); |
699 | 700 | } else { |
700 | | - LOG.trace("SendMap operation complete, closing connection; channel='{}'", future.channel().id()); |
| 701 | + LOG.trace("SendMap operation complete, closing connection; channel='{}'", |
| 702 | + future.channel().id()); |
701 | 703 | lastContentFuture.addListener(ChannelFutureListener.CLOSE); |
702 | 704 | } |
703 | 705 | } else { |
704 | | - LOG.trace("SendMap operation complete, waitCount > 0, invoking sendMap with reduceContext; channel='{}'", |
| 706 | + LOG.trace("SendMap operation complete, waitCount > 0, " + |
| 707 | + "invoking sendMap with reduceContext; channel='{}'", |
705 | 708 | future.channel().id()); |
706 | 709 | handler.sendMap(reduceContext); |
707 | 710 | } |
|
0 commit comments