-
Notifications
You must be signed in to change notification settings - Fork 46
/
Topics.java
516 lines (462 loc) · 17.1 KB
/
Topics.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package com.aws.greengrass.config;
import com.aws.greengrass.dependency.Context;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
public class Topics extends Node implements Iterable<Node> {
public final Map<CaseInsensitiveString, Node> children = new ConcurrentHashMap<>();
private static final Logger logger = LogManager.getLogger(Topics.class);
Topics(Context c, String n, Topics p) {
super(c, n, p);
modtime = System.currentTimeMillis();
}
Topics(Context c, String n, Topics p, long timestamp) {
super(c, n, p, timestamp);
}
public static Topics of(Context c, String n, Topics p) {
return new Topics(c, n, p);
}
/**
* Create an errorNode with a given message.
*
* @param context context
* @param name name of the topics node
* @param message error message
* @return node
*/
public static Topics errorNode(Context context, String name, String message) {
Topics t = new Topics(context, name, null);
t.createLeafChild("error").withNewerValue(0, message);
return t;
}
@Override
public void appendTo(Appendable a) throws IOException {
appendNameTo(a);
a.append(':');
a.append(String.valueOf(children));
}
public int size() {
return children.size();
}
@Override
public void copyFrom(Node from) {
Objects.requireNonNull(from);
if (from instanceof Topics) {
this.modtime = from.modtime;
((Topics) from).forEach(n -> {
Objects.requireNonNull(n);
if (n instanceof Topic) {
createLeafChild(n.getName()).copyFrom(n);
} else {
createInteriorChild(n.getName()).copyFrom(n);
}
});
} else {
throw new IllegalArgumentException(
"copyFrom: " + from.getFullName() + " is already a leaf, not a container");
}
}
public Node getChild(String name) {
return children.get(new CaseInsensitiveString(name));
}
/**
* Create a leaf Topic under this Topics with the given name.
* Returns the leaf topic if it already existed.
*
* @param name name of the leaf node
* @return the node
*/
public Topic createLeafChild(String name) {
return createLeafChild(new CaseInsensitiveString(name), 0L);
}
/**
* Create a leaf Topic under this Topics with the given name.
* Returns the leaf topic if it already existed.
*
* @param name name of the leaf node
* @param timestamp modtime of the leaf node
* @return
*/
public Topic createLeafChild(String name, long timestamp) {
return createLeafChild(new CaseInsensitiveString(name), timestamp);
}
private Topic createLeafChild(CaseInsensitiveString name, long timestamp) {
Node n = children.computeIfAbsent(name,
(nm) -> {
Topic t = new Topic(context, nm.toString(), this, timestamp);
context.runOnPublishQueue(() -> childChanged(WhatHappened.childChanged, t));
return t;
});
if (n instanceof Topic) {
return (Topic) n;
} else {
throw new IllegalArgumentException(name + " in "
+ getFullName() + " is already a container, cannot become a leaf");
}
}
/**
* Create an interior Topics node with the provided name.
* Returns the new node or the existing node if it already existed.
*
* @param name name for the new node
* @return the node
*/
public Topics createInteriorChild(String name) {
return createInteriorChild(new CaseInsensitiveString(name), System.currentTimeMillis());
}
/**
* Create an interior Topics node with the provided name and modtime
* Returns the new node or the existing node if it already existed.
* @param name name for the new node
* @param timestamp modtime of the new node
* @return
*/
public Topics createInteriorChild(String name, long timestamp) {
return createInteriorChild(new CaseInsensitiveString(name), timestamp);
}
private Topics createInteriorChild(CaseInsensitiveString name, long timestamp) {
Node n = children.computeIfAbsent(name,
(nm) -> {
Topics t = new Topics(context, nm.toString(), this, timestamp);
context.runOnPublishQueue(() -> childChanged(WhatHappened.interiorAdded, t));
return t;
});
if (n instanceof Topics) {
return (Topics) n;
} else {
throw new IllegalArgumentException(name + " in "
+ getFullName() + " is already a leaf, cannot become a container");
}
}
public Topics findInteriorChild(String name) {
Node n = getChild(name);
return n instanceof Topics ? (Topics) n : null;
}
public Topic findLeafChild(String name) {
Node n = getChild(name);
return n instanceof Topic ? (Topic) n : null;
}
/**
* Find, and create if missing, a topic (a name/value pair) in the config
* file. Never returns null.
*
* @param path String[] of node names to traverse to find or create the Topic
*/
public Topic lookup(String... path) {
int limit = path.length - 1;
Topics n = this;
for (int i = 0; i < limit; i++) {
n = n.createInteriorChild(path[i]);
}
return n.createLeafChild(path[limit]);
}
/**
* Find, and create if missing, a topic (a name/value pair) in the config
* file. Never returns null.
* @param timestamp modtime of newly created nodes
* @param path String[] of node names to traverse to find or create the Topic
* @return
*/
public Topic lookup(long timestamp, String... path) {
int limit = path.length - 1;
Topics n = this;
for (int i = 0; i < limit; i++) {
n = n.createInteriorChild(path[i], timestamp);
}
return n.createLeafChild(path[limit], timestamp);
}
/**
* Find, and create if missing, a list of topics (name/value pairs) in the
* config file. Never returns null.
*
* @param path String[] of node names to traverse to find or create the Topics
*/
public Topics lookupTopics(String... path) {
return lookupTopics(System.currentTimeMillis(), path);
}
/**
* Find, and create if missing, a list of topics (name/value pairs) in the
* config file. Never returns null.
*
* @param timestamp modtime of newly created nodes
* @param path String[] of node names to traverse to find or create the Topics
* @return
*/
public Topics lookupTopics(long timestamp, String... path) {
Topics n = this;
for (String s : path) {
n = n.createInteriorChild(s, timestamp);
}
return n;
}
/**
* Find, but do not create if missing, a topic (a name/value pair) in the
* config file. Returns null if missing.
*
* @param path String[] of node names to traverse to find the Topic
*/
public Topic find(String... path) {
int limit = path.length - 1;
Topics n = this;
for (int i = 0; i < limit && n != null; i++) {
n = n.findInteriorChild(path[i]);
}
return n == null ? null : n.findLeafChild(path[limit]);
}
/**
* Find, but do not create if missing, a topic (a name/value pair) in the
* config file. If the topic exists, it returns the value. If the topic does not
* exist, then it will return the default value provided.
*
* @param defaultV default value if the Topic was not found
* @param path String[] of node names to traverse to find the Topic
*/
public Object findOrDefault(Object defaultV, String... path) {
Topic potentialTopic = find(path);
if (potentialTopic == null) {
return defaultV;
}
return potentialTopic.getOnce();
}
/**
* Find, but do not create if missing, a topics in the config file. Returns null if missing.
*
* @param path String[] of node names to traverse to the Topic
*/
public Topics findTopics(String... path) {
int limit = path.length;
Topics n = this;
for (int i = 0; i < limit && n != null; i++) {
n = n.findInteriorChild(path[i]);
}
return n;
}
/**
* Find, but do not create if missing, a Node (Topic or Topics) in the config file. Returns null if missing.
*
* @param path String[] of node names to traverse to find the Node
* @return Node instance found after traversing the given path
*/
public Node findNode(String... path) {
Topics n = this;
if (path.length == 0) {
return n;
}
int limit = path.length - 1;
for (int i = 0; i < limit && n != null; i++) {
n = n.findInteriorChild(path[i]);
}
return n == null ? null : n.getChild(path[limit]);
}
/**
* Add the given map to this Topics tree.
*
* @param map map to merge in
* @param mergeBehavior mergeBehavior
*/
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
public void updateFromMap(Map<String, Object> map, @NonNull UpdateBehaviorTree mergeBehavior) {
if (map == null) {
logger.atInfo().kv("node", getFullName()).log("Null map received in updateFromMap(), ignoring.");
return;
}
Set<CaseInsensitiveString> childrenToRemove = new HashSet<>(children.keySet());
map.forEach((okey, value) -> {
CaseInsensitiveString key = new CaseInsensitiveString(okey);
childrenToRemove.remove(key);
updateChild(key, value, mergeBehavior);
});
childrenToRemove.forEach(childName -> {
UpdateBehaviorTree childMergeBehavior = mergeBehavior.getChildBehavior(childName.toString());
// remove the existing child if its merge behavior is REPLACE
if (childMergeBehavior.getBehavior() == UpdateBehaviorTree.UpdateBehavior.REPLACE) {
remove(children.get(childName));
}
});
}
private void updateChild(CaseInsensitiveString key, Object value,
@NonNull UpdateBehaviorTree mergeBehavior) {
UpdateBehaviorTree childMergeBehavior = mergeBehavior.getChildBehavior(key.toString());
Node existingChild = children.get(key);
// if new node is a container node
if (value instanceof Map) {
// if existing child is a container node
if (existingChild == null || existingChild instanceof Topics) {
createInteriorChild(key.toString()).updateFromMap((Map) value, childMergeBehavior);
} else {
remove(existingChild);
Topics newNode = createInteriorChild(key.toString(), mergeBehavior.getTimestampToUse());
for (Watcher watcher : existingChild.watchers) {
newNode.addWatcher(watcher);
}
newNode.updateFromMap((Map) value, childMergeBehavior);
}
// if new node is a leaf node
} else {
if (existingChild == null || existingChild instanceof Topic) {
createLeafChild(key.toString())
.withNewerValue(childMergeBehavior.getTimestampToUse(), value, false, true);
} else {
remove(existingChild);
Topic newNode = createLeafChild(key.toString());
for (Watcher watcher : existingChild.watchers) {
newNode.addWatcher(watcher);
}
newNode.withNewerValue(childMergeBehavior.getTimestampToUse(), value, false, true);
}
}
}
@Override
public boolean equals(Object o) {
if (o instanceof Topics) {
Topics t = (Topics) o;
if (children.size() == t.children.size()) {
for (Map.Entry<CaseInsensitiveString, Node> me : children.entrySet()) {
Object mov = t.children.get(me.getKey());
if (!Objects.equals(me.getValue(), mov)) {
return false;
}
}
return true;
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(children);
}
@Override
@Nonnull
public Iterator<Node> iterator() {
return children.values().iterator();
}
@Override
public void deepForEachTopic(Consumer<Topic> f) {
children.values().forEach((t) -> t.deepForEachTopic(f));
}
@Override
public void deepForEach(BiConsumer<Node, UpdateBehaviorTree.UpdateBehavior> f, UpdateBehaviorTree tree) {
children.values().forEach((t) -> t.deepForEach(f, tree.getChildBehavior(t.getName())));
f.accept(this, tree.getBehavior());
}
/**
* Remove a node from this node's children.
*
* @param n node to remove
*/
public void remove(Node n) {
if (!children.remove(new CaseInsensitiveString(n.getName()), n)) {
logger.atError("config-node-child-remove-error").kv("thisNode", toString()).kv("childNode", n.getName())
.log();
return;
}
context.runOnPublishQueue(() -> {
n.fire(WhatHappened.removed);
this.childChanged(WhatHappened.childRemoved, n);
});
}
/**
* Clears all the children nodes and replaces with the provided new map. Waits for replace to finish
* @param newValue Map of new values for this topics
*/
public void replaceAndWait(Map<String, Object> newValue) {
context.runOnPublishQueueAndWait(() ->
updateFromMap(newValue,
new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()))
);
context.waitForPublishQueueToClear();
}
@SuppressWarnings("PMD.AvoidCatchingThrowable")
protected void childChanged(WhatHappened what, Node child) {
for (Watcher s : watchers) {
if (s instanceof ChildChanged) {
try {
((ChildChanged) s).childChanged(what, child);
} catch (Throwable t) {
logger.atError().log("Exception while notifying that {} changed", child, t);
}
}
}
if (what.equals(WhatHappened.removed)) {
children.forEach((k, v) -> v.fire(WhatHappened.removed));
return;
}
if (child != null && (child.modtime > this.modtime || children.isEmpty())) {
this.modtime = child.modtime;
} else {
Optional<Node> n = children.values().stream().max(Comparator.comparingLong(node -> node.modtime));
Node node = n.orElse(child);
if (node != null) {
this.modtime = node.modtime;
}
}
if (parentNeedsToKnow()) {
parent.childChanged(what, child);
}
}
@Override
protected void fire(WhatHappened what) {
childChanged(what, null);
}
/**
* Subscribe to receive updates from this node and its children.
*
* @param cc listener
* @return this
*/
public Topics subscribe(ChildChanged cc) {
if (addWatcher(cc)) {
cc.childChanged(WhatHappened.initialized, null);
}
return this;
}
@Override
public Map<String, Object> toPOJO() {
Map<String, Object> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
children.values().forEach((n) -> {
if (!n.getName().startsWith("_")) {
// Don't save entries whose name starts in '_'
map.put(n.getName(), n.toPOJO());
}
});
return map;
}
public boolean isEmpty() {
return children.isEmpty();
}
public Context getContext() {
return this.context;
}
/**
* Call a callback on every leaf Topics node which has no children.
*
* @param f callback to be called with the Topics
*/
public void forEachChildlessTopics(Consumer<Topics> f) {
if (children.isEmpty()) {
f.accept(this);
} else {
children.values().stream().filter(n -> n instanceof Topics)
.forEach(t -> ((Topics) t).forEachChildlessTopics(f));
}
}
}