Skip to content

Commit

Permalink
fix: defer watch until the initial processing of list is complete
Browse files Browse the repository at this point in the history
closes: fabric8io#5953

Signed-off-by: Steve Hawkins <shawkins@redhat.com>
  • Loading branch information
shawkins committed May 13, 2024
1 parent 74dac08 commit 7702980
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -102,24 +103,28 @@ public T getByKey(String key) {
}

@Override
public void retainAll(Set<String> nextKeys) {
public CompletableFuture<?> retainAll(Set<String> nextKeys, Runnable cacheStateComplete) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
.forEach(v -> this.processor.distribute(l -> l.add(new ProcessorListener.AddNotification<>(v)), false));
deferredAdd.clear();
}
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
return;
}
current.forEach(v -> {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
this.processor.distribute(l -> l.add(new ProcessorListener.DeleteNotification<>(v, true)), false);
}
});
cacheStateComplete.run();
// the processor will execute serially, when this task is done, we know it's all been processed
CompletableFuture<?> result = new CompletableFuture<>();
this.processor.distribute(l -> result.complete(null), false);
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,10 @@ public CompletableFuture<Void> listSyncAndWatch() {
}
Set<String> nextKeys = new ConcurrentSkipListSet<>();
CompletableFuture<Void> theFuture = processList(nextKeys, null).thenCompose(result -> {
store.retainAll(nextKeys);
final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
return startWatcher(latestResourceVersion);
return store.retainAll(nextKeys, () -> lastSyncResourceVersion = latestResourceVersion)
.thenCompose(ignored -> startWatcher(latestResourceVersion));
}).thenAccept(w -> {
if (w != null) {
if (!isStopped()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Extends a {@link Store}, but also has the responsibility of
Expand Down Expand Up @@ -56,12 +57,13 @@ public interface SyncableStore<T> extends Store<T> {
* Retain only the values with keys in the given set
*
* @param nextKeys to retain
* @param cacheStateComplete to run when the cache is up-to-date
*/
void retainAll(Set<String> nextKeys);
CompletableFuture<?> retainAll(Set<String> nextKeys, Runnable cacheStateComplete);

/**
* Process a batch of updates
*
*
* @param items
*/
void update(List<T> items);
Expand Down

0 comments on commit 7702980

Please sign in to comment.