@@ -298,8 +298,93 @@ public void close() {
298298 session .removeAttribute (Constants .HANDSHAKE_COMPLETE );
299299 // clear the delay queue
300300 queue .clear ();
301- // send a proper ws close
302- Packet packet = Packet .build (Constants .CLOSE_MESSAGE_BYTES , MessageType .CLOSE );
301+ // whether to attempt a nice close or a forceful one
302+ if (WebSocketTransport .isNiceClose ()) {
303+ // send a proper ws close
304+ Packet packet = Packet .build (Constants .CLOSE_MESSAGE_BYTES , MessageType .CLOSE );
305+ WriteFuture writeFuture = session .write (packet );
306+ writeFuture .addListener (new IoFutureListener <WriteFuture >() {
307+
308+ @ Override
309+ public void operationComplete (WriteFuture future ) {
310+ if (future .isWritten ()) {
311+ log .debug ("Close message written" );
312+ // only set on success for now to skip boolean check later
313+ session .setAttribute (Constants .STATUS_CLOSE_WRITTEN , Boolean .TRUE );
314+ }
315+ future .removeListener (this );
316+ }
317+
318+ });
319+ // adjust close routine to allow for flushing
320+ CloseFuture closeFuture = session .closeOnFlush ();
321+ closeFuture .addListener (new IoFutureListener <CloseFuture >() {
322+
323+ public void operationComplete (CloseFuture future ) {
324+ if (future .isClosed ()) {
325+ log .debug ("Connection is closed" );
326+ } else {
327+ log .debug ("Connection is not yet closed" );
328+ }
329+ future .removeListener (this );
330+ }
331+
332+ });
333+ } else {
334+ // force close
335+ CloseFuture closeFuture = session .closeNow ();
336+ closeFuture .addListener (new IoFutureListener <CloseFuture >() {
337+
338+ public void operationComplete (CloseFuture future ) {
339+ if (future .isClosed ()) {
340+ log .debug ("Connection is closed" );
341+ } else {
342+ log .debug ("Connection is not yet closed" );
343+ }
344+ future .removeListener (this );
345+ }
346+
347+ });
348+ }
349+ }
350+ }
351+
352+ /**
353+ * Close with an associated error status.
354+ *
355+ * @param statusCode
356+ * @param errResponse
357+ */
358+ public void close (int statusCode , HandshakeResponse errResponse ) {
359+ log .warn ("Closing connection with status: {}" , statusCode );
360+ // remove handshake flag
361+ session .removeAttribute (Constants .HANDSHAKE_COMPLETE );
362+ // clear the delay queue
363+ queue .clear ();
364+ // send http error response
365+ session .write (errResponse );
366+ // whether to attempt a nice close or a forceful one
367+ if (WebSocketTransport .isNiceClose ()) {
368+ // now send close packet with error code
369+ IoBuffer buf = IoBuffer .allocate (16 );
370+ buf .setAutoExpand (true );
371+ // all errors except 403 will use 1002
372+ buf .putUnsigned ((short ) statusCode );
373+ try {
374+ if (statusCode == 1008 ) {
375+ // if its a 403 forbidden
376+ buf .put ("Policy Violation" .getBytes ("UTF8" ));
377+ } else {
378+ buf .put ("Protocol error" .getBytes ("UTF8" ));
379+ }
380+ } catch (Exception e ) {
381+ // shouldnt be any text encoding issues...
382+ }
383+ buf .flip ();
384+ byte [] errBytes = new byte [buf .remaining ()];
385+ buf .get (errBytes );
386+ // construct the packet
387+ Packet packet = Packet .build (errBytes , MessageType .CLOSE );
303388 WriteFuture writeFuture = session .write (packet );
304389 writeFuture .addListener (new IoFutureListener <WriteFuture >() {
305390
@@ -328,71 +413,22 @@ public void operationComplete(CloseFuture future) {
328413 }
329414
330415 });
331- }
332- }
333-
334- /**
335- * Close with an associated error status.
336- *
337- * @param statusCode
338- * @param errResponse
339- */
340- public void close (int statusCode , HandshakeResponse errResponse ) {
341- log .warn ("Closing connection with status: {}" , statusCode );
342- // remove handshake flag
343- session .removeAttribute (Constants .HANDSHAKE_COMPLETE );
344- // clear the delay queue
345- queue .clear ();
346- // send http error response
347- session .write (errResponse );
348- // now send close packet with error code
349- IoBuffer buf = IoBuffer .allocate (16 );
350- buf .setAutoExpand (true );
351- // all errors except 403 will use 1002
352- buf .putUnsigned ((short ) statusCode );
353- try {
354- if (statusCode == 1008 ) {
355- // if its a 403 forbidden
356- buf .put ("Policy Violation" .getBytes ("UTF8" ));
357- } else {
358- buf .put ("Protocol error" .getBytes ("UTF8" ));
359- }
360- } catch (Exception e ) {
361- // shouldnt be any text encoding issues...
362- }
363- buf .flip ();
364- byte [] errBytes = new byte [buf .remaining ()];
365- buf .get (errBytes );
366- // construct the packet
367- Packet packet = Packet .build (errBytes , MessageType .CLOSE );
368- WriteFuture writeFuture = session .write (packet );
369- writeFuture .addListener (new IoFutureListener <WriteFuture >() {
370-
371- @ Override
372- public void operationComplete (WriteFuture future ) {
373- if (future .isWritten ()) {
374- log .debug ("Close message written" );
375- // only set on success for now to skip boolean check later
376- session .setAttribute (Constants .STATUS_CLOSE_WRITTEN , Boolean .TRUE );
377- }
378- future .removeListener (this );
379- }
380-
381- });
382- // adjust close routine to allow for flushing
383- CloseFuture closeFuture = session .closeOnFlush ();
384- closeFuture .addListener (new IoFutureListener <CloseFuture >() {
416+ } else {
417+ // force close
418+ CloseFuture closeFuture = session .closeNow ();
419+ closeFuture .addListener (new IoFutureListener <CloseFuture >() {
385420
386- public void operationComplete (CloseFuture future ) {
387- if (future .isClosed ()) {
388- log .debug ("Connection is closed" );
389- } else {
390- log .debug ("Connection is not yet closed" );
421+ public void operationComplete (CloseFuture future ) {
422+ if (future .isClosed ()) {
423+ log .debug ("Connection is closed" );
424+ } else {
425+ log .debug ("Connection is not yet closed" );
426+ }
427+ future .removeListener (this );
391428 }
392- future .removeListener (this );
393- }
394429
395- });
430+ });
431+ }
396432 log .debug ("Close complete" );
397433 }
398434
0 commit comments