Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package io.github.protocol.pulsar.admin.api;

import io.github.openfacade.http.HttpClientEngine;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.ReactorHttpClientConfig;

public class Configuration {
public String host = "localhost";

public int port;

public boolean tlsEnabled;
public HttpClientConfig httpClientConfig;

public TlsConfig tlsConfig;

public HttpClientEngine engine;
public ReactorHttpClientConfig reactorHttpClientConfig;

public Configuration() {
}
Expand All @@ -26,18 +25,13 @@ public Configuration port(int port) {
return this;
}

public Configuration tlsEnabled(boolean tlsEnabled) {
this.tlsEnabled = tlsEnabled;
return this;
}

public Configuration tlsConfig(TlsConfig tlsConfig) {
this.tlsConfig = tlsConfig;
public Configuration httpClientConfig(HttpClientConfig httpClientConfig) {
this.httpClientConfig = httpClientConfig;
return this;
}

public Configuration engine(HttpClientEngine engine) {
this.engine = engine;
public Configuration reactorHttpClientConfig(ReactorHttpClientConfig reactorHttpClientConfig) {
this.reactorHttpClientConfig = reactorHttpClientConfig;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
package io.github.protocol.pulsar.admin.reactive;

import io.github.openfacade.http.ReactorHttpClient;
import io.github.openfacade.http.ReactorHttpClientFactory;
import io.github.protocol.pulsar.admin.api.Configuration;
import io.netty.handler.ssl.SslContext;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientSecurityUtils;

public class InnerReactiveClient {
private final HttpClient httpClient;
private final ReactorHttpClient httpClient;

private final String httpPrefix;

public InnerReactiveClient(Configuration conf) {
HttpClient client = HttpClient.create();

if (conf.tlsEnabled) {
client = client.secure(spec -> {
SslContext context = SslContextUtil.build(conf.tlsConfig);
if (conf.tlsConfig.hostnameVerifyDisabled) {
spec.sslContext(context)
.handlerConfigurator(HttpClientSecurityUtils.HOSTNAME_VERIFICATION_CONFIGURER);
} else {
spec.sslContext(context);
}
});
ReactorHttpClient client = ReactorHttpClientFactory.createReactorHttpClient(conf.reactorHttpClientConfig);
if (conf.reactorHttpClientConfig == null || conf.reactorHttpClientConfig.tlsConfig() == null) {
this.httpPrefix = "https://" + conf.host + ":" + conf.port;
} else {
this.httpPrefix = "http://" + conf.host + ":" + conf.port;
Comment on lines 15 to 17
Copy link

Copilot AI Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditional logic is inverted. When TLS config is null, it should use HTTP (not HTTPS), and when TLS config exists, it should use HTTPS. The current logic assigns HTTPS prefix when TLS config is null.

Suggested change
this.httpPrefix = "https://" + conf.host + ":" + conf.port;
} else {
this.httpPrefix = "http://" + conf.host + ":" + conf.port;
this.httpPrefix = "http://" + conf.host + ":" + conf.port;
} else {
this.httpPrefix = "https://" + conf.host + ":" + conf.port;

Copilot uses AI. Check for mistakes.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.protocol.pulsar.admin.reactive;

import io.github.protocol.pulsar.admin.api.TlsConfig;
import io.github.openfacade.http.TlsConfig;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand All @@ -17,36 +17,36 @@ public static SslContext build(TlsConfig config) {
try {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();

if (config.keyStorePath != null && config.keyStorePassword != null) {
if (config.keyStorePath() != null && config.keyStorePassword() != null) {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
try (FileInputStream keyStoreInputStream = new FileInputStream(config.keyStorePath)) {
keyStore.load(keyStoreInputStream, config.keyStorePassword);
try (FileInputStream keyStoreInputStream = new FileInputStream(config.keyStorePath())) {
keyStore.load(keyStoreInputStream, config.keyStorePassword());
}
String defaultKeyAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultKeyAlgorithm);
keyManagerFactory.init(keyStore, config.keyStorePassword);
keyManagerFactory.init(keyStore, config.keyStorePassword());
sslContextBuilder.keyManager(keyManagerFactory);
}

if (config.verifyDisabled) {
if (config.verifyDisabled()) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else if (config.trustStorePath != null && config.trustStorePassword != null) {
} else if (config.trustStorePath() != null && config.trustStorePassword() != null) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
try (FileInputStream trustStoreInputStream = new FileInputStream(config.trustStorePath)) {
trustStore.load(trustStoreInputStream, config.trustStorePassword);
try (FileInputStream trustStoreInputStream = new FileInputStream(config.trustStorePath())) {
trustStore.load(trustStoreInputStream, config.trustStorePassword());
}
String defaultTrustAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultTrustAlgorithm);
trustManagerFactory.init(trustStore);
sslContextBuilder.trustManager(trustManagerFactory);
}

if (config.versions != null) {
sslContextBuilder.protocols(config.versions);
if (config.versions() != null) {
sslContextBuilder.protocols(config.versions());
}

if (config.cipherSuites != null) {
sslContextBuilder.ciphers(Arrays.asList(config.cipherSuites));
if (config.cipherSuites() != null) {
sslContextBuilder.ciphers(Arrays.asList(config.cipherSuites()));
}

return sslContextBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import io.github.openfacade.http.HttpClient;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientEngine;
import io.github.openfacade.http.HttpClientFactory;
import io.github.openfacade.http.HttpMethod;
import io.github.openfacade.http.HttpRequest;
import io.github.openfacade.http.HttpResponse;
import io.github.openfacade.http.HttpSchema;
import io.github.openfacade.http.TlsConfig;
import io.github.openfacade.http.UrlBuilder;
import io.github.protocol.pulsar.admin.api.Configuration;
import io.github.protocol.pulsar.admin.common.JacksonService;
Expand All @@ -28,21 +25,15 @@ public class InnerHttpClient {
private UrlBuilder templateUrlBuilder;

public InnerHttpClient(Configuration conf) {
HttpClientConfig.Builder clientConfigBuilder = new HttpClientConfig.Builder();
clientConfigBuilder.engine(conf.engine == null ? HttpClientEngine.Java : conf.engine);
if (conf.tlsEnabled) {
TlsConfig.Builder tlsConfigBuilder = new TlsConfig.Builder();
io.github.protocol.pulsar.admin.api.TlsConfig tlsConfig = conf.tlsConfig;
tlsConfigBuilder.cipherSuites(tlsConfig.cipherSuites);
tlsConfigBuilder.hostnameVerifyDisabled(tlsConfig.hostnameVerifyDisabled);
tlsConfigBuilder.keyStore(tlsConfig.keyStorePath, tlsConfig.keyStorePassword);
tlsConfigBuilder.trustStore(tlsConfig.trustStorePath, tlsConfig.trustStorePassword);
tlsConfigBuilder.verifyDisabled(tlsConfig.verifyDisabled);
clientConfigBuilder.tlsConfig(tlsConfigBuilder.build());
boolean tlsEnable;
if (conf.httpClientConfig == null || conf.httpClientConfig.tlsConfig() == null) {
tlsEnable = false;
} else {
tlsEnable = true;
}
this.client = HttpClientFactory.createHttpClient(clientConfigBuilder.build());
this.client = HttpClientFactory.createHttpClient(conf.httpClientConfig);
templateUrlBuilder = new UrlBuilder();
templateUrlBuilder.setHttpSchema(conf.tlsEnabled ? HttpSchema.HTTPS : HttpSchema.HTTP).setHost(conf.host)
templateUrlBuilder.setHttpSchema(tlsEnable ? HttpSchema.HTTPS : HttpSchema.HTTP).setHost(conf.host)
.setPort(conf.port);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.protocol.pulsar.admin.jdk;

import io.github.protocol.pulsar.admin.api.TlsConfig;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.ReactorHttpClientConfig;

public interface PulsarAdminBuilder {
PulsarAdmin build();
Expand All @@ -9,8 +10,7 @@ public interface PulsarAdminBuilder {

PulsarAdminBuilder port(int port);

PulsarAdminBuilder tlsEnabled(boolean useSsl);

PulsarAdminBuilder tlsConfig(TlsConfig tlsConfig);
PulsarAdminBuilder httpClientConfig(HttpClientConfig httpClientConfig);

PulsarAdminBuilder reactorHttpClientConfig(ReactorHttpClientConfig reactorHttpClientConfig);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.github.protocol.pulsar.admin.jdk;

import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.ReactorHttpClientConfig;
import io.github.protocol.pulsar.admin.api.Configuration;
import io.github.protocol.pulsar.admin.api.TlsConfig;

public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
private final Configuration conf;
Expand All @@ -28,14 +29,14 @@ public PulsarAdminBuilder port(int port) {
}

@Override
public PulsarAdminBuilder tlsEnabled(boolean useSsl) {
this.conf.tlsEnabled(useSsl);
public PulsarAdminBuilder httpClientConfig(HttpClientConfig httpClientConfig) {
this.conf.httpClientConfig(httpClientConfig);
return this;
}

@Override
public PulsarAdminBuilder tlsConfig(TlsConfig tlsConfig) {
this.conf.tlsConfig(tlsConfig);
public PulsarAdminBuilder reactorHttpClientConfig(ReactorHttpClientConfig reactorHttpClientConfig) {
this.conf.reactorHttpClientConfig(reactorHttpClientConfig);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.github.protocol.pulsar.admin.jdk;

import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientEngine;
import io.github.protocol.pulsar.admin.api.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -40,12 +40,13 @@ protected List<PulsarAdmin> initPulsarAdmins() {
if (engine.equals(HttpClientEngine.Async) || engine.equals(HttpClientEngine.Jetty)) {
continue;
}
Configuration conf = new Configuration();
conf.host("localhost");
conf.port(getPort());
conf.engine(engine);
PulsarAdminImpl pulsarAdmin = new PulsarAdminImpl(conf);
pulsarAdmins.add(pulsarAdmin);
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
pulsarAdminBuilder.host("localhost");
pulsarAdminBuilder.port(getPort());
HttpClientConfig.Builder clientBuilder = new HttpClientConfig.Builder();
clientBuilder.engine(engine);
pulsarAdminBuilder.httpClientConfig(clientBuilder.build());
pulsarAdmins.add(pulsarAdminBuilder.build());
}
return pulsarAdmins;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package io.github.protocol.pulsar.admin.jdk;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class BrokersTest extends BaseTest{

@Test
public void testHealthCheckV1() throws PulsarAdminException {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(getPort()).build();
@MethodSource("providePulsarAdmins")
@ParameterizedTest
public void testHealthCheckV1(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
pulsarAdmin.brokers().healthcheck(TopicVersion.V1);
}

@Test
public void testHealthCheckV2() throws PulsarAdminException {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(getPort()).build();
@MethodSource("providePulsarAdmins")
@ParameterizedTest
public void testHealthCheckV2(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
pulsarAdmin.brokers().healthcheck(TopicVersion.V2);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package io.github.protocol.pulsar.admin.jdk;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;

public class ClustersTest extends BaseTest{
@Test
public void getClustersTest() throws PulsarAdminException {
Assertions.assertEquals(Arrays.asList("standalone"),
PulsarAdmin.builder().port(getPort()).build().clusters().getClusters());
}

@MethodSource("providePulsarAdmins")
@ParameterizedTest
public void getClustersTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
Assertions.assertEquals(Arrays.asList("standalone"), pulsarAdmin.clusters().getClusters());
}
}
Loading