2020
2121package org .apache .metron .common .configuration .manager ;
2222
23+ import com .google .common .cache .Cache ;
24+ import com .google .common .cache .CacheBuilder ;
2325import org .apache .curator .framework .CuratorFramework ;
2426import org .apache .curator .framework .recipes .cache .NodeCache ;
2527import org .apache .curator .utils .CloseableUtils ;
3133import java .util .HashMap ;
3234import java .util .Map ;
3335import java .util .Optional ;
36+ import java .util .concurrent .ExecutionException ;
37+ import java .util .concurrent .TimeUnit ;
3438
3539import static org .apache .commons .lang .ArrayUtils .isNotEmpty ;
40+ import static org .apache .metron .common .utils .ConversionUtils .convert ;
3641
3742/**
3843 * Responsible for managing configuration values that are created, persisted, and updated
@@ -46,66 +51,109 @@ public class ZkConfigurationManager implements ConfigurationManager {
4651 private CuratorFramework zookeeperClient ;
4752
4853 /**
49- * The configuration values under management. Maps the path to the configuration values
50- * in Zookeeper to the cache of its values.
54+ * Maps a zookeeper path to the Curator cache used to retrieve the raw data from Zookeeper.
5155 */
52- private Map <String , NodeCache > valuesCache ;
56+ private Map <String , NodeCache > zookeeperCache ;
57+
58+ /**
59+ * A cache of the deserialized objects stored in Zookeeper. This cache helps limit how
60+ * frequently we need to deserialize the raw data stored in Zookeeper.
61+ */
62+ private Cache <String , Object > configurationCache ;
5363
5464 /**
5565 * @param zookeeperClient The client used to communicate with Zookeeper. The client is not
5666 * closed. It must be managed externally.
5767 */
5868 public ZkConfigurationManager (CuratorFramework zookeeperClient ) {
69+ this (zookeeperClient , 15 , TimeUnit .MINUTES );
70+ }
71+
72+ /**
73+ * @param zookeeperClient The client used to communicate with Zookeeper. The client is not
74+ * closed. It must be managed externally.
75+ */
76+ public ZkConfigurationManager (CuratorFramework zookeeperClient , long cacheExpiration , TimeUnit cacheExpirationUnits ) {
5977 this .zookeeperClient = zookeeperClient ;
60- this .valuesCache = Collections .synchronizedMap (new HashMap <>());
78+ this .zookeeperCache = Collections .synchronizedMap (new HashMap <>());
79+ this .configurationCache = CacheBuilder .newBuilder ()
80+ .expireAfterAccess (cacheExpiration , cacheExpirationUnits )
81+ .build ();
6182 }
6283
6384 /**
6485 * Define the paths within Zookeeper that contains configuration values that need managed.
65- * @param zookeeperPath The Zookeeper path.
86+ * @param zkPath The Zookeeper path.
6687 */
6788 @ Override
68- public ZkConfigurationManager with (String zookeeperPath ) {
69- NodeCache cache = new NodeCache (zookeeperClient , zookeeperPath );
70- valuesCache .put (zookeeperPath , cache );
89+ public <T > ZkConfigurationManager with (String zkPath , Class <T > clazz ) {
90+
91+ NodeCache cache = new NodeCache (zookeeperClient , zkPath );
92+ cache .getListenable ().addListener (() -> configurationCache .invalidate (zkPath ));
93+ zookeeperCache .put (zkPath , cache );
94+
7195 return this ;
7296 }
7397
7498 /**
75- * Open a connection to Zookeeper and retrieve the initial configuration value .
99+ * Open a connection to Zookeeper.
76100 */
77101 @ Override
78102 public synchronized ZkConfigurationManager open () throws IOException {
79103 try {
80- doOpen ();
104+ synchronized (zookeeperCache ) {
105+ for (NodeCache cache : zookeeperCache .values ()) {
106+ cache .start (true );
107+ }
108+ }
81109 } catch (Exception e ) {
82110 throw new IOException (e );
83111 }
84112
85113 return this ;
86114 }
87115
88- private void doOpen () throws Exception {
89- synchronized (valuesCache ) {
90- for (NodeCache cache : valuesCache .values ()) {
91- cache .start (true );
116+ /**
117+ * Retrieve the configuration object.
118+ * @param zkPath A key to identify which configuration value.
119+ * @param clazz The expected type of the configuration value.
120+ * @param <T> The expected type of the configuration value.
121+ */
122+ @ Override
123+ public <T > Optional <T > get (String zkPath , Class <T > clazz ) throws IOException {
124+ try {
125+ Object value = configurationCache .get (zkPath , () -> doGet (zkPath , clazz ));
126+ return Optional .ofNullable (convert (value , clazz ));
127+
128+ } catch (ExecutionException e ) {
129+
130+ // exception used to signal no value exists for this key; cannot use null with a guava cache
131+ if (e .getCause () instanceof ZkPathNotFoundException ) {
132+ return Optional .empty ();
133+
134+ } else {
135+ throw new IOException (e );
92136 }
93137 }
94138 }
95139
96140 /**
97- * Retrieve the configuration object.
141+ * Retrieves the serialized data stored in Zookeeper and deserializes it.
142+ * @param zkPath The key of the configuration value to get.
143+ * @param clazz The type of configuration value expected.
144+ * @param <T> The type of configuration value expected.
145+ * @return The configuration value. If value does not exist, an exception is thrown.
146+ * @throws ZkPathNotFoundException If no such path exists in Zookeeper.
147+ * @throws IOException
98148 */
99- @ Override
100- public synchronized <T > Optional <T > get (String key , Class <T > clazz ) throws IOException {
101- T result = null ;
102-
103- NodeCache cache = valuesCache .get (key );
149+ public synchronized <T > T doGet (String zkPath , Class <T > clazz ) throws ZkPathNotFoundException , IOException {
150+ NodeCache cache = zookeeperCache .get (zkPath );
104151 if (cache != null && cache .getCurrentData () != null && isNotEmpty (cache .getCurrentData ().getData ())) {
105- result = deserialize (cache .getCurrentData ().getData (), clazz );
106- }
152+ return deserialize (cache .getCurrentData ().getData (), clazz );
107153
108- return Optional .ofNullable (result );
154+ } else {
155+ throw new ZkPathNotFoundException (zkPath );
156+ }
109157 }
110158
111159 /**
@@ -115,8 +163,8 @@ public synchronized <T> Optional<T> get(String key, Class<T> clazz) throws IOExc
115163 */
116164 @ Override
117165 public synchronized void close () {
118- synchronized (valuesCache ) {
119- for (NodeCache cache : valuesCache .values ()) {
166+ synchronized (zookeeperCache ) {
167+ for (NodeCache cache : zookeeperCache .values ()) {
120168 CloseableUtils .closeQuietly (cache );
121169 }
122170 }
@@ -130,4 +178,13 @@ public synchronized void close() {
130178 protected static <T > T deserialize (byte [] raw , Class <T > clazz ) throws IOException {
131179 return JSONUtils .INSTANCE .load (new ByteArrayInputStream (raw ), clazz );
132180 }
181+
182+ /**
183+ * Indicates that a zookeeper path does not exist.
184+ */
185+ public static class ZkPathNotFoundException extends Exception {
186+ public ZkPathNotFoundException (String zkPath ) {
187+ super ("zookeeper path does not exist: " + zkPath );
188+ }
189+ }
133190}
0 commit comments