|
26 | 26 |
|
27 | 27 | import org.apache.helix.CurrentStateChangeListener; |
28 | 28 | import org.apache.helix.HelixDataAccessor; |
29 | | -import org.apache.helix.HelixManager; |
30 | | -import org.apache.helix.HelixManagerFactory; |
31 | | -import org.apache.helix.InstanceType; |
32 | 29 | import org.apache.helix.NotificationContext; |
33 | 30 | import org.apache.helix.PropertyKey; |
34 | 31 | import org.apache.helix.PropertyType; |
@@ -467,23 +464,29 @@ public void testCurrentStatePathLeakingByAsycRemoval() throws Exception { |
467 | 464 | cs.setSessionId(jobSessionId); |
468 | 465 | cs.setStateModelDefRef(db0.getStateModelDefRef()); |
469 | 466 |
|
| 467 | + Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); |
| 468 | + Assert.assertFalse(rpWatchPaths.get("dataWatches").contains(jobKey.getPath())); |
| 469 | + |
470 | 470 | LOG.info("add job"); |
471 | | - boolean rtJob = false; |
472 | 471 | for (int i = 0; i < mJobUpdateCnt; i++) { |
473 | | - rtJob = jobAccesor.setProperty(jobKey, cs); |
| 472 | + jobAccesor.setProperty(jobKey, cs); |
474 | 473 | } |
475 | 474 |
|
| 475 | + Map<String, Set<String>> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR); |
| 476 | + Assert.assertTrue(listenersByZkPath.keySet().contains(jobKey.getPath())); |
| 477 | + rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); |
| 478 | + Assert.assertTrue(rpWatchPaths.get("dataWatches").contains(jobKey.getPath())); |
| 479 | + |
476 | 480 | LOG.info("remove job"); |
477 | | - rtJob = jobParticipant.getZkClient().delete(jobKey.getPath()); |
| 481 | + jobParticipant.getZkClient().delete(jobKey.getPath()); |
478 | 482 |
|
479 | 483 | // validate the job watch is not leaked. |
480 | 484 | Thread.sleep(5000); |
481 | 485 |
|
482 | | - Map<String, Set<String>> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR); |
483 | | - boolean jobKeyExists = listenersByZkPath.keySet().contains(jobKey.getPath()); |
484 | | - Assert.assertFalse(jobKeyExists); |
| 486 | + listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR); |
| 487 | + Assert.assertFalse(listenersByZkPath.keySet().contains(jobKey.getPath())); |
485 | 488 |
|
486 | | - Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); |
| 489 | + rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient()); |
487 | 490 | List<String> existWatches = rpWatchPaths.get("existWatches"); |
488 | 491 | Assert.assertTrue(existWatches.isEmpty()); |
489 | 492 |
|
|
0 commit comments