Skip to content

cache: respond to watches in parallel v2 #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.envoyproxy.controlplane.server;

import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.Executor;

/**
* Default implementation of {@link ExecutorGroup} which
* always returns {@link MoreExecutors#directExecutor}.
*/
public class DefaultExecutorGroup implements ExecutorGroup {
/**
* Returns the next {@link Executor} to use, which in this case is
* always {@link MoreExecutors#directExecutor}.
*/
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Returns the next {@link Executor} to use, which in this case is always {@link MoreExecutors#directExecutor()}.
   */

public Executor next() {
return MoreExecutors.directExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -38,6 +39,7 @@ public class DiscoveryServer {

private final List<DiscoveryServerCallbacks> callbacks;
private final ConfigWatcher configWatcher;
private final ExecutorGroup executorGroup;
private final AtomicLong streamCount = new AtomicLong();

public DiscoveryServer(ConfigWatcher configWatcher) {
Expand All @@ -54,11 +56,25 @@ public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configW
* @param configWatcher source of configuration updates
*/
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher configWatcher) {
this(callbacks, configWatcher, new DefaultExecutorGroup());
}

/**
* Creates the server.
* @param callbacks server callbacks
* @param configWatcher source of configuration updates
* @param executorGroup executor group to use for responding stream requests
*/
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks,
ConfigWatcher configWatcher,
ExecutorGroup executorGroup) {
Preconditions.checkNotNull(callbacks, "callbacks cannot be null");
Preconditions.checkNotNull(configWatcher, "configWatcher cannot be null");
Preconditions.checkNotNull(executorGroup, "executorGroup cannot be null");

this.callbacks = callbacks;
this.configWatcher = configWatcher;
this.executorGroup = executorGroup;
}

/**
Expand Down Expand Up @@ -149,13 +165,14 @@ private StreamObserver<DiscoveryRequest> createRequestHandler(
String defaultTypeUrl) {

long streamId = streamCount.getAndIncrement();
Executor executor = executorGroup.next();

LOGGER.info("[{}] open stream from {}", streamId, defaultTypeUrl);

callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));

final DiscoveryRequestStreamObserver requestStreamObserver =
new DiscoveryRequestStreamObserver(defaultTypeUrl, responseObserver, streamId, ads);
new DiscoveryRequestStreamObserver(defaultTypeUrl, responseObserver, streamId, ads, executor);

if (responseObserver instanceof ServerCallStreamObserver) {
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled);
Expand All @@ -173,11 +190,15 @@ private class DiscoveryRequestStreamObserver implements StreamObserver<Discovery
private final StreamObserver<DiscoveryResponse> responseObserver;
private final long streamId;
private final boolean ads;
private final Executor executor;

private AtomicLong streamNonce;

public DiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<DiscoveryResponse> responseObserver,
long streamId, boolean ads) {
public DiscoveryRequestStreamObserver(String defaultTypeUrl,
StreamObserver<DiscoveryResponse> responseObserver,
long streamId,
boolean ads,
Executor executor) {
this.defaultTypeUrl = defaultTypeUrl;
this.responseObserver = responseObserver;
this.streamId = streamId;
Expand All @@ -186,6 +207,7 @@ public DiscoveryRequestStreamObserver(String defaultTypeUrl, StreamObserver<Disc
latestResponse = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
ackedResources = new ConcurrentHashMap<>(Resources.TYPE_URLS.size());
streamNonce = new AtomicLong();
this.executor = executor;
}

@Override
Expand Down Expand Up @@ -238,7 +260,7 @@ public void onNext(DiscoveryRequest request) {
ads,
request,
ackedResources.getOrDefault(typeUrl, Collections.emptySet()),
r -> send(r, typeUrl));
r -> executor.execute(() -> send(r, typeUrl)));
});

return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.envoyproxy.controlplane.server;

import java.util.concurrent.Executor;

/**
* The {@link ExecutorGroup} is responsible for providing the {@link Executor}'s to use
* via its {@link #next()} method.
*/
public interface ExecutorGroup {
/**
* Returns the next {@link Executor} to use.
*/
Executor next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to also pass some contextual information about the request stream here? Would there be use-cases where e.g. a particular type would need to have affinity to a specific Executor?

cc: @snowp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this as well, but there's not much contextual information we can send at the moment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having each stream always using the same executor is good enough for now

}