Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Replace String with WebTarget in authenticationStage #23924

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -80,6 +81,18 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {

private static String localHostname = "localhost";
private Authentication authSasl;
private Map<String, String> clientSaslConfig;


@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setWebServicePortTls(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
}

@BeforeClass
public static void startMiniKdc() throws Exception {
Expand Down Expand Up @@ -172,7 +185,7 @@ protected void setup() throws Exception {
isTcpLookup = false;

// Client config
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
Expand Down Expand Up @@ -223,6 +236,46 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testClientWithTLSTransportAndSaslAuth() throws Exception {
@Cleanup
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getWebServiceAddressTls())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build();
pulsarAdmin.tenants().getTenants();

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddressTls())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build()) {
@Cleanup
Producer<byte[]> ignoredProducer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-sasl")
.create();

@Cleanup
Consumer<byte[]> ignoredConsumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic-sasl")
.subscriptionName("my-sub").subscribe();
}

try (PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrlTls())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build()) {
@Cleanup
Producer<byte[]> ignoredProducer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-sasl")
.create();

@Cleanup
Consumer<byte[]> ignoredConsumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic-sasl")
.subscriptionName("my-sub").subscribe();
}
}

// Test could verify with kerberos configured.
@Test
public void testProducerAndConsumerPassed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public CompletableFuture<Builder> requestAsync(final WebTarget target) {
AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());

if (authData.hasDataForHttp()) {
auth.authenticationStage(target.getUri().toString(), authData, null, authFuture);
auth.authenticationStage(target, authData, null, authFuture);
} else {
authFuture.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private Set<Entry<String, String>> getAuthHeaders(WebTarget target) throws Excep
String targetUrl = target.getUri().toString();
if (auth.getAuthMethodName().equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME)) {
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
auth.authenticationStage(targetUrl, authData, null, authFuture);
auth.authenticationStage(target, authData, null, authFuture);
return auth.newRequestHeader(targetUrl, authData, authFuture.get());
} else if (authData.hasDataForHttp()) {
return auth.newRequestHeader(targetUrl, authData, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.common.net.ServiceURI;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand Down Expand Up @@ -124,8 +125,10 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa
clientConfigData.setServiceUrl(serviceUrl);
}

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(serviceUrl);
AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression);
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression, pulsarServiceNameResolver);

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Source;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;
package org.apache.pulsar.client.internal.http;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
Expand Down Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import org.asynchttpclient.Request;
Expand Down Expand Up @@ -174,9 +176,11 @@ public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, Executi
Executor delayedExecutor = runnable -> {
scheduledExecutor.schedule(runnable, requestTimeout, TimeUnit.MILLISECONDS);
};
PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout,
requestTimeout, 0, conf, false) {
requestTimeout, 0, conf, false, pulsarServiceNameResolver) {
@Override
protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) {
// delay the response to simulate a timeout
Expand Down Expand Up @@ -214,7 +218,7 @@ public void failure(Throwable failure) {
}

@Test
void testMaxRedirects() {
void testMaxRedirects() throws InvalidServiceURL {
// Redirect to itself to test max redirects
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.willReturn(aResponse()
Expand All @@ -224,9 +228,11 @@ void testMaxRedirects() {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/admin/v2/clusters")
Expand All @@ -243,21 +249,21 @@ void testMaxRedirects() {
}

@Test
void testRelativeRedirect() throws ExecutionException, InterruptedException {
void testRelativeRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("path2");
}

@Test
void testAbsoluteRedirect() throws ExecutionException, InterruptedException {
void testAbsoluteRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("/path2");
}

@Test
void testUrlRedirect() throws ExecutionException, InterruptedException {
void testUrlRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("http://localhost:" + server.port() + "/path2");
}

private void doTestRedirect(String location) throws InterruptedException, ExecutionException {
private void doTestRedirect(String location) throws InterruptedException, ExecutionException, InvalidServiceURL {
server.stubFor(get(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(301)
Expand All @@ -270,9 +276,11 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -283,7 +291,7 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
}

@Test
void testRedirectWithBody() throws ExecutionException, InterruptedException {
void testRedirectWithBody() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(307)
Expand All @@ -296,9 +304,12 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);


Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -310,7 +321,7 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
}

@Test
void testMaxConnections() throws ExecutionException, InterruptedException {
void testMaxConnections() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/concurrency-test"))
.willReturn(aResponse()
.withTransformers("concurrency-test")));
Expand All @@ -320,9 +331,11 @@ void testMaxConnections() throws ExecutionException, InterruptedException {
conf.setConnectionsPerBroker(maxConnections);
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/concurrency-test")
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -85,14 +86,27 @@ default AuthenticationDataProvider getAuthData(String brokerHostName) throws Pul
/**
* An authentication Stage.
* when authentication complete, passed-in authFuture will contains authentication related http request headers.
* @deprecated Use {@link #authenticationStage(WebTarget, AuthenticationDataProvider, Map, CompletableFuture)} instead.
*/
@Deprecated
default void authenticationStage(String requestUrl,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders,
CompletableFuture<Map<String, String>> authFuture) {
authFuture.complete(null);
}

/**
* An authentication Stage.
* when authentication complete, passed-in authFuture will contains authentication related http request headers.
*/
default void authenticationStage(WebTarget webTarget,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders,
CompletableFuture<Map<String, String>> authFuture) {
authenticationStage(webTarget.getUri().toString(), authData, previousResHeaders, authFuture);
}

/**
* Add an authenticationStage that will complete along with authFuture.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,17 @@ private Map<String, String> getHeaders(Response response) {
}

@Override
public void authenticationStage(String requestUrl,
AuthenticationDataProvider authData,
public void authenticationStage(WebTarget webTarget, AuthenticationDataProvider authData,
Map<String, String> previousResHeaders,
CompletableFuture<Map<String, String>> authFuture) {
// a new request for sasl auth
Builder builder = newRequestBuilder(client.target(requestUrl), authData, previousResHeaders);
Builder builder = newRequestBuilder(webTarget, authData, previousResHeaders);
builder.async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (response.getStatus() == HTTP_UNAUTHORIZED) {
// sasl auth on going
authenticationStage(requestUrl, authData, getHeaders(response), authFuture);
authenticationStage(webTarget, authData, getHeaders(response), authFuture);
return;
}

Expand Down
Loading
Loading