Skip to content

Commit

Permalink
Multi-address client: optimize client selection (#2979)
Browse files Browse the repository at this point in the history
Motivation:

`selectClient` attracted my attention after I saw it can take up to 10% on the flame graph.

Modifications:
- Change `CachingKeyFactory` to `UrlKeyFactory`;
- Remove an extra layer of caching for `UrlKey`s;
- Eagerly compute `UrlKey.hashCode` because it's used >1 time;
- Remove allocation of `HostAndPort` from the hot path because it's required only to create a new client once;

Result:

Client selection reduced from `63.788 ± 1.268  ns/op` to `14.512 ± 0.233  ns/op`.
  • Loading branch information
idelpivnitskiy authored Jun 19, 2024
1 parent 831346f commit 7f8a7f4
Showing 1 changed file with 27 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,10 @@

import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.client.api.ClientGroup;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
Expand Down Expand Up @@ -60,17 +57,14 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.netty.handler.codec.http.HttpScheme.HTTP;
import static io.netty.handler.codec.http.HttpScheme.HTTPS;
import static io.servicetalk.buffer.api.CharSequences.caseInsensitiveHashCode;
import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll;
Expand Down Expand Up @@ -129,8 +123,7 @@ public StreamingHttpClient buildStreaming() {
final HttpExecutionContext executionContext = executionContextBuilder.build();
final ClientFactory clientFactory =
new ClientFactory(builderFactory, executionContext, singleAddressInitializer);
final CachingKeyFactory keyFactory = closeables.prepend(
new CachingKeyFactory(defaultHttpPort, defaultHttpsPort));
final UrlKeyFactory keyFactory = new UrlKeyFactory(defaultHttpPort, defaultHttpsPort);
final HttpHeadersFactory headersFactory = this.headersFactory;
FilterableStreamingHttpClient urlClient = closeables.prepend(
new StreamingUrlHttpClient(executionContext, keyFactory, clientFactory,
Expand All @@ -151,15 +144,15 @@ public StreamingHttpClient buildStreaming() {
}

/**
* Returns a cached {@link UrlKey} or creates a new one based on {@link StreamingHttpRequest} information.
* Creates a {@link UrlKey} based on {@link HttpRequestMetaData} information and rewrites absolute-form URL into a
* relative-form URL with a "host" header.
*/
private static final class CachingKeyFactory implements AsyncCloseable {
private static final class UrlKeyFactory {

private final ConcurrentMap<String, UrlKey> urlKeyCache = new ConcurrentHashMap<>();
private final int defaultHttpPort;
private final int defaultHttpsPort;

CachingKeyFactory(final int defaultHttpPort, final int defaultHttpsPort) {
UrlKeyFactory(final int defaultHttpPort, final int defaultHttpsPort) {
this.defaultHttpPort = defaultHttpPort;
this.defaultHttpsPort = defaultHttpsPort;
}
Expand All @@ -173,11 +166,7 @@ UrlKey get(final HttpRequestMetaData metaData) throws MalformedURLException {
(HTTPS_SCHEME.equals(scheme) ? defaultHttpsPort : defaultHttpPort);
setHostHeader(metaData, host, parsedPort);
metaData.requestTarget(absoluteToRelativeFormRequestTarget(metaData.requestTarget(), scheme, host));

final String key = scheme + ':' + host + ':' + port;
final UrlKey urlKey = urlKeyCache.get(key);
return urlKey != null ? urlKey : urlKeyCache.computeIfAbsent(key, ignore ->
new UrlKey(scheme, HostAndPort.of(host, port)));
return new UrlKey(scheme, host, port);
}

private static String ensureUrlComponentNonNull(@Nullable final String value,
Expand Down Expand Up @@ -213,29 +202,20 @@ private static String absoluteToRelativeFormRequestTarget(final String requestTa
final int questionMarkIdx = requestTarget.indexOf('?', fromIndex);
return questionMarkIdx < 0 ? "/" : '/' + requestTarget.substring(questionMarkIdx);
}

@Override
public Completable closeAsync() {
// Make a best effort to clear the map. Note that we don't attempt to resolve race conditions between
// closing the client and in flight requests adding Keys to the map. We also don't attempt to remove
// from the map if a request fails, or a request is made after the client is closed.
return new SubscribableCompletable() {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
urlKeyCache.clear();
deliverCompleteFromSource(subscriber);
}
};
}
}

private static final class UrlKey {
final String scheme;
final HostAndPort hostAndPort;
final String host;
final int port;
private final int hashCode;

UrlKey(final String scheme, final HostAndPort hostAndPort) {
UrlKey(final String scheme, final String host, final int port) {
this.scheme = scheme;
this.hostAndPort = hostAndPort;
this.host = host;
this.port = port;
// hashCode is required at least once, but may be necessary multiple times for a single selectClient run
this.hashCode = 31 * (caseInsensitiveHashCode(host) + port) + scheme.hashCode();
}

@Override
Expand All @@ -246,14 +226,13 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final UrlKey urlKey = (UrlKey) o;
return scheme.equals(urlKey.scheme) && hostAndPort.equals(urlKey.hostAndPort);
return port == urlKey.port && host.equalsIgnoreCase(urlKey.host) && scheme.equals(urlKey.scheme);
}

@Override
public int hashCode() {
return 31 * hostAndPort.hashCode() + scheme.hashCode();
return hashCode;
}
}

Expand All @@ -276,8 +255,9 @@ private static final class ClientFactory implements Function<UrlKey, FilterableS

@Override
public StreamingHttpClient apply(final UrlKey urlKey) {
final HostAndPort hostAndPort = HostAndPort.of(urlKey.host, urlKey.port);
final SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
requireNonNull(builderFactory.apply(urlKey.hostAndPort));
requireNonNull(builderFactory.apply(hostAndPort));

setExecutionContext(builder, executionContext);
if (HTTPS_SCHEME.equals(urlKey.scheme)) {
Expand All @@ -287,7 +267,7 @@ public StreamingHttpClient apply(final UrlKey urlKey) {
builder.appendClientFilter(HttpExecutionStrategyUpdater.INSTANCE);

if (singleAddressInitializer != null) {
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder);
singleAddressInitializer.initialize(urlKey.scheme, hostAndPort, builder);
}

return builder.buildStreaming();
Expand Down Expand Up @@ -354,19 +334,14 @@ private static final class StreamingUrlHttpClient implements FilterableStreaming
private final HttpExecutionContext executionContext;
private final StreamingHttpRequestResponseFactory reqRespFactory;
private final ClientGroup<UrlKey, FilterableStreamingHttpClient> group;
private final CachingKeyFactory keyFactory;
private final ListenableAsyncCloseable closeable;
private final UrlKeyFactory keyFactory;

StreamingUrlHttpClient(final HttpExecutionContext executionContext,
final CachingKeyFactory keyFactory, final ClientFactory clientFactory,
final UrlKeyFactory keyFactory, final ClientFactory clientFactory,
final StreamingHttpRequestResponseFactory reqRespFactory) {
this.reqRespFactory = requireNonNull(reqRespFactory);
this.group = ClientGroup.from(clientFactory);
this.keyFactory = keyFactory;
CompositeCloseable compositeCloseable = newCompositeCloseable();
compositeCloseable.append(group);
compositeCloseable.append(keyFactory);
closeable = toListenableAsyncCloseable(compositeCloseable);
this.executionContext = requireNonNull(executionContext);
}

Expand Down Expand Up @@ -412,22 +387,22 @@ public StreamingHttpResponseFactory httpResponseFactory() {

@Override
public Completable onClose() {
return closeable.onClose();
return group.onClose();
}

@Override
public Completable onClosing() {
return closeable.onClosing();
return group.onClosing();
}

@Override
public Completable closeAsync() {
return closeable.closeAsync();
return group.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return closeable.closeAsyncGracefully();
return group.closeAsyncGracefully();
}

@Override
Expand Down

0 comments on commit 7f8a7f4

Please sign in to comment.