Skip to content

Commit

Permalink
[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeco…
Browse files Browse the repository at this point in the history
…nds to PulsarAdminBuilder (#22541)
  • Loading branch information
lhotari authored Aug 8, 2024
1 parent 3560ddb commit 3e7dbb4
Show file tree
Hide file tree
Showing 22 changed files with 675 additions and 220 deletions.
20 changes: 11 additions & 9 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ The Apache Software License, Version 2.0
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java17-1.33.3-alpha.jar
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.3-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.25.0-alpha.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down Expand Up @@ -580,15 +582,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
* Jersey
- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
- org.glassfish.jersey.core-jersey-client-2.41.jar
- org.glassfish.jersey.core-jersey-common-2.41.jar
- org.glassfish.jersey.core-jersey-server-2.41.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
- org.glassfish.jersey.core-jersey-client-2.42.jar
- org.glassfish.jersey.core-jersey-common-2.42.jar
- org.glassfish.jersey.core-jersey-server-2.42.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
Expand Down
13 changes: 7 additions & 6 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ The Apache Software License, Version 2.0
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar
* RE2j -- re2j-1.7.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
Expand Down Expand Up @@ -446,12 +447,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- aopalliance-repackaged-2.6.1.jar
- osgi-resource-locator-1.0.3.jar
* Jersey
- jersey-client-2.41.jar
- jersey-common-2.41.jar
- jersey-entity-filtering-2.41.jar
- jersey-media-json-jackson-2.41.jar
- jersey-media-multipart-2.41.jar
- jersey-hk2-2.41.jar
- jersey-client-2.42.jar
- jersey-common-2.42.jar
- jersey-entity-filtering-2.42.jar
- jersey-media-json-jackson-2.42.jar
- jersey-media-multipart-2.42.jar
- jersey-hk2-2.42.jar
* Mimepull -- mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-iouring.version>0.0.24.Final</netty-iouring.version>
<jetty.version>9.4.54.v20240208</jetty.version>
<conscrypt.version>2.5.2</conscrypt.version>
<jersey.version>2.41</jersey.version>
<jersey.version>2.42</jersey.version>
<athenz.version>1.10.50</athenz.version>
<prometheus.version>0.16.0</prometheus.version>
<vertx.version>4.5.8</vertx.version>
Expand Down Expand Up @@ -266,6 +266,7 @@ flexible messaging model and an intuitive client API.</description>
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
<picocli.version>4.7.5</picocli.version>
<re2j.version>1.7</re2j.version>
<completable-futures.version>0.3.6</completable-futures.version>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
Expand Down Expand Up @@ -665,6 +666,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${re2j.version}</version>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>${completable-futures.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
* requests
*/
PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression);

/**
* Configures the maximum number of connections that the client library will establish with a single host.
* <p>
* By default, the connection pool maintains up to 16 connections to a single host. This method allows you to
* modify this default behavior and limit the number of connections.
* <p>
* This setting can be useful in scenarios where you want to limit the resources used by the client library,
* or control the level of parallelism for operations so that a single client does not overwhelm
* the Pulsar cluster with too many concurrent connections.
*
* @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable
* the limit.
* @return the PulsarAdminBuilder instance, allowing for method chaining
*/
PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);

/**
* Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified
* amount of seconds, it will be released back to the connection pool.
* Defaults to 25 seconds.
*
* @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection
* @return the PulsarAdminBuilder instance
*/
PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
}
5 changes: 5 additions & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<include>com.google.guava:guava</include>
<include>com.google.code.gson:gson</include>
<include>com.google.re2j:re2j</include>
<include>com.spotify:completable-futures</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
Expand Down Expand Up @@ -192,6 +193,10 @@
<exclude>com.google.protobuf.*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>com.spotify.futures</pattern>
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.asynchttpclient.Dsl.post;
import static org.asynchttpclient.Dsl.put;
import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -41,6 +40,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
Expand All @@ -54,10 +54,8 @@
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncCompletionHandlerBase;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
Expand All @@ -70,12 +68,14 @@
public class FunctionsImpl extends ComponentResource implements Functions {

private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
public FunctionsImpl(WebTarget web, Authentication auth,
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
}

@Override
Expand Down Expand Up @@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync(
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar

RequestBuilder builder = get(target.getUri().toASCIIString());

CompletableFuture<HttpResponseStatus> statusFuture =
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(),
new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;

@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
status = responseStatus;
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
return State.ABORT;
}
return State.CONTINUE;
}

@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
return State.CONTINUE;
}
CompletableFuture<org.asynchttpclient.Response> responseFuture =
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(),
() -> new AsyncCompletionHandlerBase() {

@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
os.write(bodyPart.getBodyByteBuffer());
return State.CONTINUE;
}
});

@Override
public HttpResponseStatus onCompleted() throws Exception {
return status;
}

@Override
public void onThrowable(Throwable t) {
}
}).toCompletableFuture();

statusFuture
.whenComplete((status, throwable) -> {
responseFuture
.whenComplete((response, throwable) -> {
try {
os.close();
} catch (IOException e) {
future.completeExceptionally(getApiException(e));
}
})
.thenAccept(status -> {
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
.status(status.getStatusCode())
.entity(status.getStatusText())
.status(response.getStatusCode())
.entity(response.getStatusText())
.build()));
} else {
future.complete(null);
Expand Down Expand Up @@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync(
.path("state").path(state.getKey()).getUri().toASCIIString());
builder.addBodyPart(new StringPart("state", objectWriter()
.writeValueAsString(state), MediaType.APPLICATION_JSON));
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Expand Down Expand Up @@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Expand Down
Loading

0 comments on commit 3e7dbb4

Please sign in to comment.