@@ -613,7 +613,12 @@ void reset() {
613613
614614 MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry .get (msgType );
615615 if (item .factory () != null ) {
616- item .factory ().reset ();
616+ try {
617+ item .factory ().reset ();
618+ } catch (Exception ex ) {
619+ LOG .error ("Failed to reset the factory {} of message type {}." , item .factory ().toString (),
620+ msgType , ex );
621+ }
617622 }
618623 }
619624 // threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
@@ -788,8 +793,8 @@ public void onMessage(String instanceName, List<Message> messages,
788793 List <MessageHandler > nonStateTransitionHandlers = new ArrayList <>();
789794 List <NotificationContext > nonStateTransitionContexts = new ArrayList <>();
790795
791- // message read
792- List <Message > readMsgs = new ArrayList <>();
796+ // message to be updated in ZK
797+ List <Message > msgsToBeUpdated = new ArrayList <>();
793798
794799 String sessionId = manager .getSessionId ();
795800 List <String > curResourceNames =
@@ -804,40 +809,71 @@ public void onMessage(String instanceName, List<Message> messages,
804809 // skip the following operations for the no-op messages.
805810 continue ;
806811 }
807- // create message handlers, if handlers not found, leave its state as NEW
812+
808813 NotificationContext msgWorkingContext = changeContext .clone ();
814+ MessageHandler msgHandler = null ;
809815 try {
810- MessageHandler msgHandler = createMessageHandler (message , msgWorkingContext );
811- if (msgHandler == null ) {
812- // Failed to create message handler, skip processing this message in this callback.
813- // The same message process will be retried in the next round.
814- continue ;
815- }
816- if (message .getMsgType ().equals (MessageType .STATE_TRANSITION .name ()) || message .getMsgType ()
817- .equals (MessageType .STATE_TRANSITION_CANCELLATION .name ())) {
818- if (validateAndProcessStateTransitionMessage (message , instanceName , manager ,
819- stateTransitionHandlers , msgHandler )) {
820- // Need future process by triggering state transition
821- String msgTarget =
822- getMessageTarget (message .getResourceName (), message .getPartitionName ());
823- stateTransitionHandlers .put (msgTarget , msgHandler );
824- stateTransitionContexts .put (msgTarget , msgWorkingContext );
825- } else {
826- // skip the following operations for the invalid/expired state transition messages.
827- continue ;
828- }
816+ // create message handlers, if handlers not found but no exception, leave its state as NEW
817+ msgHandler = createMessageHandler (message , msgWorkingContext );
818+ } catch (Exception ex ) {
819+ // Failed to create message handler and there is an Exception.
820+ int remainingRetryCount = message .getRetryCount ();
821+ LOG .error (
822+ "Exception happens when creating Message Handler for message {}. Current remaining retry count is {}." ,
823+ message .getMsgId (), remainingRetryCount );
824+ // Set the message retry count to avoid infinite retrying.
825+ message .setRetryCount (remainingRetryCount - 1 );
826+ message .setExecuteSessionId (sessionId );
827+ // continue processing in the next section where handler object is double-checked.
828+ }
829+
830+ if (msgHandler == null ) {
831+ // Note that we are re-using the retry count of Message that was original designed to control
832+ // timeout retries. So it is not checked before the first try in order to ensure consistent
833+ // behavior. It is possible that we introduce a new behavior for this method. But it requires
834+ // us to split the configuration item so as to avoid confusion.
835+ if (message .getRetryCount () < 0 ) {
836+ // If no more retry count remains, then mark the message to be UNPROCESSABLE.
837+ String errorMsg = String
838+ .format ("No available message Handler found!"
839+ + " Stop processing message %s since it has a negative remaining retry count %d!" ,
840+ message .getMsgId (), message .getRetryCount ());
841+ updateUnprocessableMessage (message , null , errorMsg , manager );
842+ msgsToBeUpdated .add (message );
829843 } else {
830- // Need future process non state transition messages by triggering the handler
831- nonStateTransitionHandlers .add (msgHandler );
832- nonStateTransitionContexts .add (msgWorkingContext );
844+ // Skip processing this message in this callback. The same message process will be retried
845+ // in the next round.
846+ LOG .warn ("There is no existing handler for message {}."
847+ + " Skip processing it for now. Will retry on the next callback." ,
848+ message .getMsgId ());
833849 }
834- } catch (Exception e ) {
835- handleUnprocessableMessage (message , e , e .getMessage (), accessor , instanceName , manager );
836850 continue ;
837851 }
838852
839- // Update the processed message objects
840- readMsgs .add (markReadMessage (message , msgWorkingContext , manager ));
853+ if (message .getMsgType ().equals (MessageType .STATE_TRANSITION .name ()) || message .getMsgType ()
854+ .equals (MessageType .STATE_TRANSITION_CANCELLATION .name ())) {
855+ if (validateAndProcessStateTransitionMessage (message , manager , stateTransitionHandlers ,
856+ msgHandler )) {
857+ // Need future process by triggering state transition
858+ String msgTarget =
859+ getMessageTarget (message .getResourceName (), message .getPartitionName ());
860+ stateTransitionHandlers .put (msgTarget , msgHandler );
861+ stateTransitionContexts .put (msgTarget , msgWorkingContext );
862+ } else {
863+ // Skip the following operations for the invalid/expired state transition messages.
864+ // Also remove the message since it might block the other state transition messages.
865+ removeMessageFromZK (accessor , message , instanceName );
866+ continue ;
867+ }
868+ } else {
869+ // Need future process non state transition messages by triggering the handler
870+ nonStateTransitionHandlers .add (msgHandler );
871+ nonStateTransitionContexts .add (msgWorkingContext );
872+ }
873+
874+ // Update the normally processed messages
875+ msgsToBeUpdated .add (markReadMessage (message , msgWorkingContext , manager ));
876+
841877 // batch creation of all current state meta data
842878 // do it for non-controller and state transition messages only
843879 if (!message .isControlerMsg () && message .getMsgType ()
@@ -869,25 +905,34 @@ public void onMessage(String instanceName, List<Message> messages,
869905 try {
870906 accessor .createChildren (createCurStateKeys , metaCurStates );
871907 } catch (Exception e ) {
872- LOG .error ("fail to create cur-state znodes for messages: " + readMsgs , e );
908+ LOG .error ("fail to create cur-state znodes for messages: " + msgsToBeUpdated , e );
873909 }
874910 }
875911
876- // update message state to READ in batch and schedule tasks for all read messages
877- if (readMsgs .size () > 0 ) {
878- updateMessageState (readMsgs , accessor , instanceName );
912+ // update message state in batch and schedule tasks for all read messages
913+ updateMessageState (msgsToBeUpdated , accessor , instanceName );
879914
880- for (Map .Entry <String , MessageHandler > handlerEntry : stateTransitionHandlers .entrySet ()) {
881- MessageHandler handler = handlerEntry .getValue ();
882- NotificationContext context = stateTransitionContexts .get (handlerEntry .getKey ());
883- scheduleTaskForMessage (instanceName , accessor , handler , context );
915+ for (Map .Entry <String , MessageHandler > handlerEntry : stateTransitionHandlers .entrySet ()) {
916+ MessageHandler handler = handlerEntry .getValue ();
917+ NotificationContext context = stateTransitionContexts .get (handlerEntry .getKey ());
918+ if (!scheduleTaskForMessage (instanceName , accessor , handler , context )) {
919+ try {
920+ // Record error state to the message handler.
921+ handler .onError (new HelixException (String
922+ .format ("Failed to schedule the task for executing message handler for %s." ,
923+ handler ._message .getMsgId ())), MessageHandler .ErrorCode .ERROR ,
924+ MessageHandler .ErrorType .FRAMEWORK );
925+ } catch (Exception ex ) {
926+ LOG .error ("Failed to trigger onError method of the message handler for {}" ,
927+ handler ._message .getMsgId (), ex );
928+ }
884929 }
930+ }
885931
886- for (int i = 0 ; i < nonStateTransitionHandlers .size (); i ++) {
887- MessageHandler handler = nonStateTransitionHandlers .get (i );
888- NotificationContext context = nonStateTransitionContexts .get (i );
889- scheduleTaskForMessage (instanceName , accessor , handler , context );
890- }
932+ for (int i = 0 ; i < nonStateTransitionHandlers .size (); i ++) {
933+ MessageHandler handler = nonStateTransitionHandlers .get (i );
934+ NotificationContext context = nonStateTransitionContexts .get (i );
935+ scheduleTaskForMessage (instanceName , accessor , handler , context );
891936 }
892937 }
893938
@@ -993,67 +1038,78 @@ private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
9931038 * Preprocess the state transition message to validate if the request is valid.
9941039 * If no operation needs to be triggered, discard the the message.
9951040 * @param message
996- * @param instanceName
9971041 * @param manager
9981042 * @param stateTransitionHandlers
9991043 * @param createHandler
10001044 * @return True if the requested state transition is valid, and need to schedule the transition.
10011045 * False if no more operation is required.
10021046 */
1003- private boolean validateAndProcessStateTransitionMessage (Message message , String instanceName ,
1004- HelixManager manager , Map <String , MessageHandler > stateTransitionHandlers ,
1005- MessageHandler createHandler ) {
1006- HelixDataAccessor accessor = manager .getHelixDataAccessor ();
1007-
1047+ private boolean validateAndProcessStateTransitionMessage (Message message , HelixManager manager ,
1048+ Map <String , MessageHandler > stateTransitionHandlers , MessageHandler createHandler ) {
10081049 String messageTarget = getMessageTarget (message .getResourceName (), message .getPartitionName ());
1009- if (message .getMsgType ().equals (MessageType .STATE_TRANSITION .name ())
1010- && isStateTransitionInProgress (messageTarget )) {
1011- String taskId = _messageTaskMap .get (messageTarget );
1012- Message msg = _taskMap .get (taskId ).getTask ().getMessage ();
1013- // If there is another state transition for same partition is going on,
1014- // discard the message. Controller will resend if this is a valid message
1015- String errMsg = String .format (
1016- "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message" ,
1017- message .getResourceName (), message .getPartitionName (), msg .getMsgId (),
1018- msg .isRelayMessage (), msg .getReadTimeStamp (), System .currentTimeMillis (),
1019- message .getFromState (), message .getToState ());
1020- handleUnprocessableMessage (message , null /* exception */ , errMsg , accessor , instanceName ,
1021- manager );
1022- return false ;
1023- }
1024- if (createHandler instanceof HelixStateTransitionHandler ) {
1025- // We only check to state if there is no ST task scheduled/executing.
1026- HelixStateTransitionHandler .StaleMessageValidateResult result =
1027- ((HelixStateTransitionHandler ) createHandler ).staleMessageValidator ();
1028- if (!result .isValid ) {
1029- handleUnprocessableMessage (message , null /* exception */ , result .exception .getMessage (),
1030- accessor , instanceName , manager );
1050+
1051+ try {
1052+ if (message .getMsgType ().equals (MessageType .STATE_TRANSITION .name ())
1053+ && isStateTransitionInProgress (messageTarget )) {
1054+ String taskId = _messageTaskMap .get (messageTarget );
1055+ Message msg = _taskMap .get (taskId ).getTask ().getMessage ();
1056+ // If there is another state transition for same partition is going on,
1057+ // discard the message. Controller will resend if this is a valid message
1058+ String errMsg = String .format (
1059+ "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message" ,
1060+ message .getResourceName (), message .getPartitionName (), msg .getMsgId (),
1061+ msg .isRelayMessage (), msg .getReadTimeStamp (), System .currentTimeMillis (),
1062+ message .getFromState (), message .getToState ());
1063+ updateUnprocessableMessage (message , null /* exception */ , errMsg , manager );
10311064 return false ;
10321065 }
1033- }
1034- if (stateTransitionHandlers .containsKey (messageTarget )) {
1035- // If there are 2 messages in same batch about same partition's state transition,
1036- // the later one is discarded
1037- Message duplicatedMessage = stateTransitionHandlers .get (messageTarget )._message ;
1038- String errMsg = String .format (
1039- "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s" ,
1040- message .getMsgId (), duplicatedMessage .getFromState (), duplicatedMessage .getToState (),
1041- message .getFromState (), message .getToState ());
1042- handleUnprocessableMessage (message , null /* exception */ , errMsg , accessor , instanceName ,
1066+ if (createHandler instanceof HelixStateTransitionHandler ) {
1067+ // We only check to state if there is no ST task scheduled/executing.
1068+ HelixStateTransitionHandler .StaleMessageValidateResult result =
1069+ ((HelixStateTransitionHandler ) createHandler ).staleMessageValidator ();
1070+ if (!result .isValid ) {
1071+ updateUnprocessableMessage (message , null /* exception */ , result .exception .getMessage (),
1072+ manager );
1073+ return false ;
1074+ }
1075+ }
1076+ if (stateTransitionHandlers .containsKey (messageTarget )) {
1077+ // If there are 2 messages in same batch about same partition's state transition,
1078+ // the later one is discarded
1079+ Message duplicatedMessage = stateTransitionHandlers .get (messageTarget )._message ;
1080+ String errMsg = String .format (
1081+ "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s" ,
1082+ message .getMsgId (), duplicatedMessage .getFromState (), duplicatedMessage .getToState (),
1083+ message .getFromState (), message .getToState ());
1084+ updateUnprocessableMessage (message , null /* exception */ , errMsg , manager );
1085+ return false ;
1086+ }
1087+ return true ;
1088+ } catch (Exception ex ) {
1089+ updateUnprocessableMessage (message , ex , "State transition validation failed with Exception." ,
10431090 manager );
10441091 return false ;
10451092 }
1046- return true ;
10471093 }
10481094
1049- private void scheduleTaskForMessage (String instanceName , HelixDataAccessor accessor ,
1095+ /**
1096+ * Schedule task to execute the message handler.
1097+ * @param instanceName
1098+ * @param accessor
1099+ * @param handler
1100+ * @param context
1101+ * @return True if schedule the task successfully. False otherwise.
1102+ */
1103+ private boolean scheduleTaskForMessage (String instanceName , HelixDataAccessor accessor ,
10501104 MessageHandler handler , NotificationContext context ) {
10511105 Message msg = handler ._message ;
10521106 if (!scheduleTask (new HelixTask (msg , context , handler , this ))) {
10531107 // Remove message if schedule tasks are failed.
10541108 removeMessageFromTaskAndFutureMap (msg );
10551109 removeMessageFromZK (accessor , msg , instanceName );
1110+ return false ;
10561111 }
1112+ return true ;
10571113 }
10581114
10591115 /**
@@ -1151,8 +1207,8 @@ private Message markReadMessage(Message message, NotificationContext context,
11511207 return message ;
11521208 }
11531209
1154- private void handleUnprocessableMessage (Message message , Exception exception , String errorMsg ,
1155- HelixDataAccessor accessor , String instanceName , HelixManager manager ) {
1210+ private void updateUnprocessableMessage (Message message , Exception exception , String errorMsg ,
1211+ HelixManager manager ) {
11561212 String error = "Message " + message .getMsgId () + " cannot be processed: " + message .getRecord ();
11571213 if (exception != null ) {
11581214 LOG .error (error , exception );
@@ -1162,9 +1218,7 @@ private void handleUnprocessableMessage(Message message, Exception exception, St
11621218 _statusUpdateUtil .logError (message , HelixStateMachineEngine .class , errorMsg , manager );
11631219 }
11641220 message .setMsgState (MessageState .UNPROCESSABLE );
1165- removeMessageFromZK (accessor , message , instanceName );
1166- _monitor
1167- .reportProcessedMessage (message , ParticipantMessageMonitor .ProcessedMessageState .DISCARDED );
1221+ _monitor .reportProcessedMessage (message , ProcessedMessageState .FAILED );
11681222 }
11691223
11701224 public MessageHandler createMessageHandler (Message message , NotificationContext changeContext ) {
0 commit comments