Skip to content

Commit 37c9861

Browse files
committed
remove duplicate subscribe in CallBackHandler
1 parent f11396e commit 37c9861

File tree

2 files changed

+13
-60
lines changed

2 files changed

+13
-60
lines changed

helix-core/src/main/java/org/apache/helix/NotificationContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public enum MapKey {
4343
private String _pathChanged;
4444
private String _eventName;
4545
private long _creationTime;
46+
private boolean _isChildChange;
4647

4748
/**
4849
* Get the name associated with the event
@@ -227,4 +228,12 @@ public HelixConstants.ChangeType getChangeType() {
227228
public void setChangeType(HelixConstants.ChangeType changeType) {
228229
this._changeType = changeType;
229230
}
231+
232+
public boolean getIsChildChange() {
233+
return _isChildChange;
234+
}
235+
236+
public void setIsChildChange(boolean cc) {
237+
_isChildChange = cc;
238+
}
230239
}

helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java

Lines changed: 4 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
118118
nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
119119
}
120120

121-
// processor to handle async zk event resubscription.
122-
private static DedupEventProcessor SubscribeChangeEventProcessor;
123121

124122
private final String _path;
125123
private final Object _listener;
@@ -142,50 +140,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
142140
// indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
143141
private boolean _ready = false;
144142

145-
static {
146-
SubscribeChangeEventProcessor = new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>(
147-
"Singleton", "CallbackHandler-AsycSubscribe") {
148-
@Override
149-
protected void handleEvent(SubscribeChangeEvent event) {
150-
logger.info("CallbackHandler {}, resubscribe change listener to path: {}, for listener: {}, watchChild: {}",
151-
event.handler._uid, event.path, event.listener, event.watchChild);
152-
try {
153-
if (event.handler.isReady()) {
154-
event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
155-
} else {
156-
logger.info("CallbackHandler is not ready, stop subscribing changes listener to "
157-
+ "path: {} for listener: {} watchChild: {}", event.path, event.listener,
158-
event.listener);
159-
}
160-
} catch (Exception e) {
161-
logger.error("Failed to resubscribe change to path: {} for listener: {}", event.path,
162-
event.listener, e);
163-
}
164-
}
165-
};
166-
167-
SubscribeChangeEventProcessor.start();
168-
}
169-
170-
class SubscribeChangeEvent {
171-
final CallbackHandler handler;
172-
final String path;
173-
final NotificationContext.Type callbackType;
174-
final Object listener;
175-
final boolean watchChild;
176-
177-
SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type callbackType,
178-
String path, boolean watchChild, Object listener) {
179-
this.handler = handler;
180-
this.path = path;
181-
this.callbackType = callbackType;
182-
this.listener = listener;
183-
this.watchChild = watchChild;
184-
}
185-
}
186-
187-
class CallbackProcessor
188-
extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
143+
class CallbackProcessor extends DedupEventProcessor<NotificationContext.Type, NotificationContext> {
189144
private CallbackHandler _handler;
190145

191146
public CallbackProcessor(CallbackHandler handler) {
@@ -402,13 +357,9 @@ public void invoke(NotificationContext changeContext) throws Exception {
402357
}
403358
_expectTypes = nextNotificationType.get(type);
404359

405-
if (type == Type.INIT || type == Type.FINALIZE) {
360+
if (type == Type.INIT || type == Type.FINALIZE || changeContext.getIsChildChange()) {
406361
subscribeForChanges(changeContext.getType(), _path, _watchChild);
407-
} else {
408-
// put SubscribeForChange run in async thread to reduce the latency of zk callback handling.
409-
subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
410362
}
411-
412363
if (_changeType == IDEAL_STATE) {
413364
IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
414365
List<IdealState> idealStates = preFetch(_propertyKey);
@@ -598,14 +549,6 @@ private void subscribeDataChange(String path, NotificationContext.Type callbackT
598549
}
599550
}
600551

601-
/** Subscribe Changes in asynchronously */
602-
private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path,
603-
boolean watchChild) {
604-
SubscribeChangeEvent subscribeEvent =
605-
new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener);
606-
SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent);
607-
}
608-
609552
private void subscribeForChanges(NotificationContext.Type callbackType, String path,
610553
boolean watchChild) {
611554
logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, "
@@ -734,6 +677,7 @@ public void handleDataChange(String dataPath, Object data) {
734677
changeContext.setType(NotificationContext.Type.CALLBACK);
735678
changeContext.setPathChanged(dataPath);
736679
changeContext.setChangeType(_changeType);
680+
changeContext.setIsChildChange(false);
737681
enqueueTask(changeContext);
738682
}
739683
} catch (Exception e) {
@@ -796,7 +740,7 @@ public void handleChildChange(String parentPath, List<String> currentChilds) {
796740
changeContext.setType(NotificationContext.Type.CALLBACK);
797741
changeContext.setPathChanged(parentPath);
798742
changeContext.setChangeType(_changeType);
799-
subscribeForChanges(changeContext.getType(), _path, _watchChild);
743+
changeContext.setIsChildChange(true);
800744
enqueueTask(changeContext);
801745
}
802746
}

0 commit comments

Comments
 (0)