2121package org .apache .metron .common .configuration .manager ;
2222
2323import org .apache .curator .framework .CuratorFramework ;
24- import org .apache .curator .framework .recipes .cache .TreeCache ;
24+ import org .apache .curator .framework .recipes .cache .NodeCache ;
2525import org .apache .curator .utils .CloseableUtils ;
26- import org .apache .metron .common .Constants ;
2726import org .apache .metron .common .utils .JSONUtils ;
2827
2928import java .io .ByteArrayInputStream ;
3029import java .io .IOException ;
31- import java .util .ArrayList ;
32- import java .util .List ;
30+ import java .util .Collections ;
31+ import java .util .HashMap ;
3332import java .util .Map ;
3433import java .util .Optional ;
35- import java .util .concurrent .ConcurrentHashMap ;
3634
3735import static org .apache .commons .lang .ArrayUtils .isNotEmpty ;
3836
@@ -48,44 +46,18 @@ public class ZkConfigurationManager implements ConfigurationManager {
4846 private CuratorFramework zookeeperClient ;
4947
5048 /**
51- * A cache of the values stored in Zookeeper.
49+ * The configuration values under management. Maps the path to the configuration values
50+ * in Zookeeper to the cache of its values.
5251 */
53- private TreeCache zookeeperCache ;
54-
55- /**
56- * The configuration values under management
57- */
58- private Map <String , byte []> values ;
59-
60- /**
61- * The paths within Zookeeper that we care about.
62- */
63- private List <String > paths ;
64-
65- /**
66- * All configuration values must live under this root path.
67- */
68- private String rootPath ;
52+ private Map <String , NodeCache > valuesCache ;
6953
7054 /**
7155 * @param zookeeperClient The client used to communicate with Zookeeper. The client is not
7256 * closed. It must be managed externally.
7357 */
7458 public ZkConfigurationManager (CuratorFramework zookeeperClient ) {
75- this (zookeeperClient , Constants .ZOOKEEPER_TOPOLOGY_ROOT );
76- }
77-
78- /**
79- * @param zookeeperClient The client used to communicate with Zookeeper. The client is not
80- * closed. It must be managed externally.
81- * @param rootPath The root of all configuration paths in Zookeeper that should be
82- * monitored for configuration values.
83- */
84- public ZkConfigurationManager (CuratorFramework zookeeperClient , String rootPath ) {
8559 this .zookeeperClient = zookeeperClient ;
86- this .paths = new ArrayList <>();
87- this .values = new ConcurrentHashMap <>();
88- this .rootPath = rootPath ;
60+ this .valuesCache = Collections .synchronizedMap (new HashMap <>());
8961 }
9062
9163 /**
@@ -94,15 +66,16 @@ public ZkConfigurationManager(CuratorFramework zookeeperClient, String rootPath)
9466 */
9567 @ Override
9668 public ZkConfigurationManager with (String zookeeperPath ) {
97- paths .add (zookeeperPath );
69+ NodeCache cache = new NodeCache (zookeeperClient , zookeeperPath );
70+ valuesCache .put (zookeeperPath , cache );
9871 return this ;
9972 }
10073
10174 /**
10275 * Open a connection to Zookeeper and retrieve the initial configuration value.
10376 */
10477 @ Override
105- public ZkConfigurationManager open () throws IOException {
78+ public synchronized ZkConfigurationManager open () throws IOException {
10679 try {
10780 doOpen ();
10881 } catch (Exception e ) {
@@ -113,51 +86,21 @@ public ZkConfigurationManager open() throws IOException {
11386 }
11487
11588 private void doOpen () throws Exception {
116-
117- // the cache which will remain synced with zookeeper
118- zookeeperCache = new TreeCache (zookeeperClient , rootPath );
119- zookeeperCache .getListenable ().addListener ((client , event ) -> {
120-
121- // is there data that was added or changed?
122- if (event != null && event .getData () != null ) {
123- String pathAffected = event .getData ().getPath ();
124-
125- switch (event .getType ()) {
126- case NODE_ADDED :
127- case NODE_UPDATED :
128- paths .stream ()
129- .filter (path -> path .equals (pathAffected ))
130- .forEach (path -> values .put (path , event .getData ().getData ()));
131- break ;
132-
133- case NODE_REMOVED :
134- paths .stream ()
135- .filter (path -> path .equals (pathAffected ))
136- .forEach (path -> values .remove (path ));
137- break ;
138- }
139- }
140- });
141-
142- // initialize the values
143- for (String path : paths ) {
144- fetch (path ).ifPresent (val -> values .put (path , val ));
89+ for (NodeCache cache : valuesCache .values ()) {
90+ cache .start (true );
14591 }
146-
147- // start the cache
148- zookeeperCache .start ();
14992 }
15093
15194 /**
15295 * Retrieve the configuration object.
15396 */
15497 @ Override
155- public <T > Optional <T > get (String key , Class <T > clazz ) throws IOException {
98+ public synchronized <T > Optional <T > get (String key , Class <T > clazz ) throws IOException {
15699 T result = null ;
157100
158- byte [] val = values .get (key );
159- if (isNotEmpty (val )) {
160- result = deserialize (val , clazz );
101+ NodeCache cache = valuesCache .get (key );
102+ if (cache != null && cache . getCurrentData () != null && isNotEmpty (cache . getCurrentData (). getData () )) {
103+ result = deserialize (cache . getCurrentData (). getData () , clazz );
161104 }
162105
163106 return Optional .ofNullable (result );
@@ -169,22 +112,10 @@ public <T> Optional<T> get(String key, Class<T> clazz) throws IOException {
169112 * Does not close the zookeeperClient that was passed in to the constructor.
170113 */
171114 @ Override
172- public void close () {
173- CloseableUtils .closeQuietly (zookeeperCache );
174- }
175-
176- /**
177- * Retrieves the initial configuration value from Zookeeper
178- * @param zookeeperPath The path in Zookeeper where the value is stored.
179- */
180- private Optional <byte []> fetch (String zookeeperPath ) throws Exception {
181- byte [] result = null ;
182-
183- if (zookeeperClient .checkExists ().forPath (zookeeperPath ) != null ) {
184- result = zookeeperClient .getData ().forPath (zookeeperPath );
115+ public synchronized void close () {
116+ for (NodeCache cache : valuesCache .values ()) {
117+ CloseableUtils .closeQuietly (cache );
185118 }
186-
187- return Optional .ofNullable (result );
188119 }
189120
190121 /**
0 commit comments