Skip to content

Commit 7fc6b11

Browse files
committed
feat: add http engine support
Signed-off-by: moxiaoying <1159230165@qq.com>
1 parent 619aaa7 commit 7fc6b11

File tree

14 files changed

+194
-163
lines changed

14 files changed

+194
-163
lines changed

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
<nexus-staging-maven-plugin.version>1.7.0</nexus-staging-maven-plugin.version>
5252
<spotbugs-maven-plugin.version>4.8.6.2</spotbugs-maven-plugin.version>
5353
<spotless-maven-plugin.version>2.43.0</spotless-maven-plugin.version>
54+
<apache-http-client.version>5.3.1</apache-http-client.version>
55+
<okhttp.version>4.12.0</okhttp.version>
56+
<vertx.version>4.5.10</vertx.version>
57+
<netty.version>4.1.112.Final</netty.version>
5458
</properties>
5559

5660
<dependencyManagement>
@@ -62,6 +66,13 @@
6266
<type>pom</type>
6367
<scope>import</scope>
6468
</dependency>
69+
<dependency>
70+
<groupId>io.netty</groupId>
71+
<artifactId>netty-transport-native-epoll</artifactId>
72+
<classifier>linux-x86_64</classifier>
73+
<version>${netty.version}</version>
74+
<scope>test</scope>
75+
</dependency>
6576
</dependencies>
6677
</dependencyManagement>
6778

@@ -92,6 +103,12 @@
92103
<version>${junit.version}</version>
93104
<scope>test</scope>
94105
</dependency>
106+
<dependency>
107+
<groupId>org.junit.jupiter</groupId>
108+
<artifactId>junit-jupiter-params</artifactId>
109+
<version>${junit.version}</version>
110+
<scope>test</scope>
111+
</dependency>
95112
<dependency>
96113
<groupId>org.apache.logging.log4j</groupId>
97114
<artifactId>log4j-slf4j-impl</artifactId>

pulsar-admin-api/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,11 @@
1010
</parent>
1111

1212
<artifactId>pulsar-admin-api</artifactId>
13-
13+
<dependencies>
14+
<dependency>
15+
<groupId>io.github.openfacade</groupId>
16+
<artifactId>http-facade</artifactId>
17+
<version>${http-facade.version}</version>
18+
</dependency>
19+
</dependencies>
1420
</project>

pulsar-admin-api/src/main/java/io/github/protocol/pulsar/admin/api/Configuration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.protocol.pulsar.admin.api;
22

3+
import io.github.openfacade.http.HttpClientEngine;
4+
35
public class Configuration {
46
public String host = "localhost";
57

@@ -9,6 +11,8 @@ public class Configuration {
911

1012
public TlsConfig tlsConfig;
1113

14+
public HttpClientEngine engine;
15+
1216
public Configuration() {
1317
}
1418

@@ -31,4 +35,9 @@ public Configuration tlsConfig(TlsConfig tlsConfig) {
3135
this.tlsConfig = tlsConfig;
3236
return this;
3337
}
38+
39+
public Configuration engine(HttpClientEngine engine) {
40+
this.engine = engine;
41+
return this;
42+
}
3443
}

pulsar-admin/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@
2222
<artifactId>http-facade</artifactId>
2323
<version>${http-facade.version}</version>
2424
</dependency>
25+
<dependency>
26+
<groupId>org.apache.httpcomponents.client5</groupId>
27+
<artifactId>httpclient5</artifactId>
28+
<version>${apache-http-client.version}</version>
29+
<scope>test</scope>
30+
</dependency>
31+
<dependency>
32+
<groupId>com.squareup.okhttp3</groupId>
33+
<artifactId>okhttp</artifactId>
34+
<version>${okhttp.version}</version>
35+
<scope>test</scope>
36+
</dependency>
37+
<dependency>
38+
<groupId>io.vertx</groupId>
39+
<artifactId>vertx-web-client</artifactId>
40+
<version>${vertx.version}</version>
41+
</dependency>
2542
</dependencies>
2643

2744
<build>

pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class InnerHttpClient {
2929

3030
public InnerHttpClient(Configuration conf) {
3131
HttpClientConfig.Builder clientConfigBuilder = new HttpClientConfig.Builder();
32-
clientConfigBuilder.engine(HttpClientEngine.Java);
32+
clientConfigBuilder.engine(conf.engine == null ? HttpClientEngine.Java : conf.engine);
3333
if (conf.tlsEnabled) {
3434
TlsConfig.Builder tlsConfigBuilder = new TlsConfig.Builder();
3535
io.github.protocol.pulsar.admin.api.TlsConfig tlsConfig = conf.tlsConfig;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.github.protocol.pulsar.admin.jdk;
2+
3+
import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
4+
import io.github.openfacade.http.HttpClientEngine;
5+
import io.github.protocol.pulsar.admin.api.Configuration;
6+
import org.junit.jupiter.api.AfterAll;
7+
import org.junit.jupiter.api.BeforeAll;
8+
import org.junit.jupiter.api.TestInstance;
9+
import org.junit.jupiter.params.provider.Arguments;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.stream.Stream;
14+
15+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
16+
public class BaseTest {
17+
protected EmbeddedPulsarServer server;
18+
protected List<PulsarAdmin> pulsarAdmins;
19+
20+
@BeforeAll
21+
public void setup() throws Exception {
22+
server = new EmbeddedPulsarServer();
23+
server.start();
24+
pulsarAdmins = initPulsarAdmins();
25+
}
26+
27+
@AfterAll
28+
public void teardown() throws Exception {
29+
server.close();
30+
}
31+
32+
protected int getPort() {
33+
return server.getWebPort();
34+
}
35+
36+
protected List<PulsarAdmin> initPulsarAdmins() {
37+
List<PulsarAdmin> pulsarAdmins = new ArrayList<>();
38+
for (HttpClientEngine engine: HttpClientEngine.values()) {
39+
// async and jetty client has dependency conflicts with embedded pulsar core, skip it.
40+
if (engine.equals(HttpClientEngine.Async) || engine.equals(HttpClientEngine.Jetty)) {
41+
continue;
42+
}
43+
Configuration conf = new Configuration();
44+
conf.host("localhost");
45+
conf.port(getPort());
46+
conf.engine(engine);
47+
PulsarAdminImpl pulsarAdmin = new PulsarAdminImpl(conf);
48+
pulsarAdmins.add(pulsarAdmin);
49+
}
50+
return pulsarAdmins;
51+
}
52+
53+
protected Stream<Arguments> providePulsarAdmins() {
54+
return pulsarAdmins.stream().map(pulsarAdmin -> Arguments.arguments(pulsarAdmin));
55+
}
56+
}
Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,18 @@
11
package io.github.protocol.pulsar.admin.jdk;
22

3-
import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
4-
import org.junit.jupiter.api.AfterAll;
5-
import org.junit.jupiter.api.BeforeAll;
63
import org.junit.jupiter.api.Test;
74

8-
public class BrokersTest {
9-
private static final EmbeddedPulsarServer SERVER = new EmbeddedPulsarServer();
10-
11-
@BeforeAll
12-
public static void setup() throws Exception {
13-
SERVER.start();
14-
}
15-
16-
@AfterAll
17-
public static void teardown() throws Exception {
18-
SERVER.close();
19-
}
5+
public class BrokersTest extends BaseTest{
206

217
@Test
228
public void testHealthCheckV1() throws PulsarAdminException {
23-
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(SERVER.getWebPort()).build();
9+
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(getPort()).build();
2410
pulsarAdmin.brokers().healthcheck(TopicVersion.V1);
2511
}
2612

2713
@Test
2814
public void testHealthCheckV2() throws PulsarAdminException {
29-
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(SERVER.getWebPort()).build();
15+
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().port(getPort()).build();
3016
pulsarAdmin.brokers().healthcheck(TopicVersion.V2);
3117
}
3218
}
Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,15 @@
11
package io.github.protocol.pulsar.admin.jdk;
22

3-
import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
4-
import org.junit.jupiter.api.AfterAll;
53
import org.junit.jupiter.api.Assertions;
6-
import org.junit.jupiter.api.BeforeAll;
74
import org.junit.jupiter.api.Test;
85

96
import java.util.Arrays;
107

11-
public class ClustersTest {
12-
13-
private static final EmbeddedPulsarServer SERVER = new EmbeddedPulsarServer();
14-
15-
@BeforeAll
16-
public static void setup() throws Exception {
17-
SERVER.start();
18-
}
19-
20-
@AfterAll
21-
public static void teardown() throws Exception {
22-
SERVER.close();
23-
}
24-
8+
public class ClustersTest extends BaseTest{
259
@Test
2610
public void getClustersTest() throws PulsarAdminException {
2711
Assertions.assertEquals(Arrays.asList("standalone"),
28-
PulsarAdmin.builder().port(SERVER.getWebPort()).build().clusters().getClusters());
12+
PulsarAdmin.builder().port(getPort()).build().clusters().getClusters());
2913
}
3014

3115
}

pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/NamespacesTest.java

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,24 @@
11
package io.github.protocol.pulsar.admin.jdk;
22

33
import com.google.common.collect.ImmutableMap;
4-
import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
5-
import org.junit.jupiter.api.AfterAll;
64
import org.junit.jupiter.api.Assertions;
7-
import org.junit.jupiter.api.BeforeAll;
8-
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.params.ParameterizedTest;
6+
import org.junit.jupiter.params.provider.MethodSource;
97

108
import java.util.Arrays;
119
import java.util.HashSet;
1210
import java.util.TreeMap;
1311

14-
public class NamespacesTest {
15-
16-
private static final EmbeddedPulsarServer SERVER = new EmbeddedPulsarServer();
17-
12+
public class NamespacesTest extends BaseTest {
1813
private static final String CLUSTER_STANDALONE = "standalone";
1914

2015
private static final TenantInfo initialTenantInfo = (new TenantInfo.TenantInfoBuilder())
2116
.adminRoles(new HashSet<>(0))
2217
.allowedClusters(new HashSet<>(Arrays.asList(CLUSTER_STANDALONE))).build();
2318

24-
private static PulsarAdmin pulsarAdmin;
25-
26-
@BeforeAll
27-
public static void setup() throws Exception {
28-
SERVER.start();
29-
pulsarAdmin = PulsarAdmin.builder().port(SERVER.getWebPort()).build();
30-
}
31-
32-
@AfterAll
33-
public static void teardown() throws Exception {
34-
SERVER.close();
35-
}
36-
37-
@Test
38-
public void namespaceTest() throws PulsarAdminException {
19+
@ParameterizedTest
20+
@MethodSource("providePulsarAdmins")
21+
public void namespaceTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
3922
String tenant = RandomUtil.randomString();
4023
String namespace = RandomUtil.randomString();
4124
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
@@ -52,8 +35,9 @@ public void namespaceTest() throws PulsarAdminException {
5235
Assertions.assertEquals(Arrays.asList(), pulsarAdmin.namespaces().getTenantNamespaces(tenant));
5336
}
5437

55-
@Test
56-
public void namespacesBacklogQuotaTest() throws PulsarAdminException, InterruptedException {
38+
@ParameterizedTest
39+
@MethodSource("providePulsarAdmins")
40+
public void namespacesBacklogQuotaTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException, InterruptedException {
5741
String tenant = RandomUtil.randomString();
5842
String namespace = RandomUtil.randomString();
5943
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
@@ -86,8 +70,9 @@ public void namespacesBacklogQuotaTest() throws PulsarAdminException, Interrupte
8670
new TreeMap<>(pulsarAdmin.namespaces().getBacklogQuotaMap(tenant, namespace)));
8771
}
8872

89-
@Test
90-
public void namespacesClearBacklogTest() throws PulsarAdminException {
73+
@ParameterizedTest
74+
@MethodSource("providePulsarAdmins")
75+
public void namespacesClearBacklogTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
9176
String tenant = RandomUtil.randomString();
9277
String namespace = RandomUtil.randomString();
9378
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
@@ -97,8 +82,9 @@ public void namespacesClearBacklogTest() throws PulsarAdminException {
9782
tenant, namespace, RandomUtil.randomString(), false);
9883
}
9984

100-
@Test
101-
public void namespacesRetentionTest() throws PulsarAdminException {
85+
@ParameterizedTest
86+
@MethodSource("providePulsarAdmins")
87+
public void namespacesRetentionTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
10288
String tenant = RandomUtil.randomString();
10389
String namespace = RandomUtil.randomString();
10490
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
@@ -113,8 +99,9 @@ public void namespacesRetentionTest() throws PulsarAdminException {
11399
Assertions.assertNull(pulsarAdmin.namespaces().getRetention(tenant, namespace));
114100
}
115101

116-
@Test
117-
public void namespacesMessageTTLTest() throws PulsarAdminException {
102+
@ParameterizedTest
103+
@MethodSource("providePulsarAdmins")
104+
public void namespacesMessageTTLTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
118105
String tenant = RandomUtil.randomString();
119106
String namespace = RandomUtil.randomString();
120107
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
@@ -126,8 +113,9 @@ public void namespacesMessageTTLTest() throws PulsarAdminException {
126113
Assertions.assertNull(pulsarAdmin.namespaces().getNamespaceMessageTTL(tenant, namespace));
127114
}
128115

129-
@Test
130-
public void namespacesCompactionThresholdTest() throws PulsarAdminException {
116+
@ParameterizedTest
117+
@MethodSource("providePulsarAdmins")
118+
public void namespacesCompactionThresholdTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
131119
String tenant = RandomUtil.randomString();
132120
String namespace = RandomUtil.randomString();
133121
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);

0 commit comments

Comments
 (0)