Skip to content

Commit b72d4f8

Browse files
authored
[ISSUE #5144] update eventmesh-connector-http module (#5145)
* [ISSUE #5137] update connector runtime v2 module * fix checkStyle error * [ISSUE #5139] update canal connector module * [ISSUE #5141] update eventmesh-admin-server module * [ISSUE #5144] update eventmesh-connector-http module
1 parent f6aa097 commit b72d4f8

File tree

10 files changed

+153
-52
lines changed

10 files changed

+153
-52
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,24 @@ public class SinkConnectorConfig {
3939
// timeunit: ms, default 5000ms
4040
private int idleTimeout = 5000;
4141

42-
// maximum number of HTTP/1 connections a client will pool, default 5
43-
private int maxConnectionPoolSize = 5;
42+
// maximum number of HTTP/1 connections a client will pool, default 50
43+
private int maxConnectionPoolSize = 50;
4444

4545
// retry config
4646
private HttpRetryConfig retryConfig = new HttpRetryConfig();
4747

4848
// webhook config
4949
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();
5050

51+
private String deliveryStrategy = "ROUND_ROBIN";
52+
53+
private boolean skipDeliverException = false;
54+
55+
// managed pipelining param, default true
56+
private boolean isParallelized = true;
57+
58+
private int parallelism = 2;
59+
5160

5261
/**
5362
* Fill default values if absent (When there are multiple default values for a field)

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,18 @@ public class SourceConnectorConfig {
4444
*/
4545
private int maxFormAttributeSize = 1024 * 1024;
4646

47-
// protocol, default Common
47+
// max size of the queue, default 1000
48+
private int maxStorageSize = 1000;
49+
50+
// batch size, default 10
51+
private int batchSize = 10;
52+
53+
// protocol, default CloudEvent
4854
private String protocol = "Common";
4955

5056
// extra config, e.g. GitHub secret
5157
private Map<String, String> extraConfig = new HashMap<>();
5258

5359
// data consistency enabled, default true
54-
private boolean dataConsistencyEnabled = false;
60+
private boolean dataConsistencyEnabled = true;
5561
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public synchronized List<E> fetchRange(int start, int end, boolean removed) {
142142
count++;
143143
}
144144
return items;
145+
145146
}
146147

147148

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.eventmesh.connector.http.sink;
1919

20+
import org.apache.eventmesh.common.EventMeshThreadFactory;
2021
import org.apache.eventmesh.common.config.connector.Config;
2122
import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
2223
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
@@ -32,6 +33,10 @@
3233

3334
import java.util.List;
3435
import java.util.Objects;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3540

3641
import lombok.Getter;
3742
import lombok.SneakyThrows;
@@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink, ConnectorCreateService<Sink> {
4550
@Getter
4651
private HttpSinkHandler sinkHandler;
4752

53+
private ThreadPoolExecutor executor;
54+
55+
private final LinkedBlockingQueue<ConnectRecord> queue = new LinkedBlockingQueue<>(10000);
56+
57+
private final AtomicBoolean isStart = new AtomicBoolean(true);
58+
4859
@Override
4960
public Class<? extends Config> configClass() {
5061
return HttpSinkConfig.class;
@@ -90,11 +101,30 @@ private void doInit() {
90101
} else {
91102
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
92103
}
104+
boolean isParallelized = this.httpSinkConfig.connectorConfig.isParallelized();
105+
int parallelism = isParallelized ? this.httpSinkConfig.connectorConfig.getParallelism() : 1;
106+
executor = new ThreadPoolExecutor(parallelism, parallelism, 0L, TimeUnit.MILLISECONDS,
107+
new LinkedBlockingQueue<>(), new EventMeshThreadFactory("http-sink-handler"));
93108
}
94109

95110
@Override
96111
public void start() throws Exception {
97112
this.sinkHandler.start();
113+
for (int i = 0; i < this.httpSinkConfig.connectorConfig.getParallelism(); i++) {
114+
executor.execute(() -> {
115+
while (isStart.get()) {
116+
ConnectRecord connectRecord = null;
117+
try {
118+
connectRecord = queue.poll(2, TimeUnit.SECONDS);
119+
} catch (InterruptedException e) {
120+
throw new RuntimeException(e);
121+
}
122+
if (connectRecord != null) {
123+
sinkHandler.handle(connectRecord);
124+
}
125+
}
126+
});
127+
}
98128
}
99129

100130
@Override
@@ -114,7 +144,18 @@ public void onException(ConnectRecord record) {
114144

115145
@Override
116146
public void stop() throws Exception {
147+
isStart.set(false);
148+
while (!queue.isEmpty()) {
149+
ConnectRecord record = queue.poll();
150+
this.sinkHandler.handle(record);
151+
}
152+
try {
153+
Thread.sleep(50);
154+
} catch (InterruptedException e) {
155+
Thread.currentThread().interrupt();
156+
}
117157
this.sinkHandler.stop();
158+
log.info("All tasks completed, start shut down http sink connector");
118159
}
119160

120161
@Override
@@ -125,8 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
125166
log.warn("ConnectRecord data is null, ignore.");
126167
continue;
127168
}
128-
// Handle the ConnectRecord
129-
this.sinkHandler.handle(sinkRecord);
169+
queue.put(sinkRecord);
130170
} catch (Exception e) {
131171
log.error("Failed to sink message via HTTP. ", e);
132172
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable {
4040

4141
private LocalDateTime receivedTime;
4242

43-
private String httpRecordId;
44-
4543
private String recordId;
4644

4745
private String retriedBy;

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,32 +30,33 @@
3030
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.stream.Collectors;
3232

33+
import lombok.Getter;
34+
3335
/**
3436
* AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler.
3537
*/
3638
public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {
3739

40+
@Getter
3841
private final SinkConnectorConfig sinkConnectorConfig;
3942

43+
@Getter
4044
private final List<URI> urls;
4145

46+
private final HttpDeliveryStrategy deliveryStrategy;
47+
48+
private int roundRobinIndex = 0;
49+
4250
protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
4351
this.sinkConnectorConfig = sinkConnectorConfig;
52+
this.deliveryStrategy = HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
4453
// Initialize URLs
4554
String[] urlStrings = sinkConnectorConfig.getUrls();
4655
this.urls = Arrays.stream(urlStrings)
4756
.map(URI::create)
4857
.collect(Collectors.toList());
4958
}
5059

51-
public SinkConnectorConfig getSinkConnectorConfig() {
52-
return sinkConnectorConfig;
53-
}
54-
55-
public List<URI> getUrls() {
56-
return urls;
57-
}
58-
5960
/**
6061
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
6162
*
@@ -65,23 +66,38 @@ public List<URI> getUrls() {
6566
public void handle(ConnectRecord record) {
6667
// build attributes
6768
Map<String, Object> attributes = new ConcurrentHashMap<>();
68-
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));
69-
70-
// send the record to all URLs
71-
for (URI url : urls) {
72-
// convert ConnectRecord to HttpConnectRecord
73-
String type = String.format("%s.%s.%s",
74-
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
75-
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
76-
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
77-
78-
// add AttemptEvent to the attributes
79-
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
80-
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);
81-
82-
// deliver the record
83-
deliver(url, httpConnectRecord, attributes, record);
69+
70+
switch (deliveryStrategy) {
71+
case ROUND_ROBIN:
72+
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(1));
73+
URI url = urls.get(roundRobinIndex);
74+
roundRobinIndex = (roundRobinIndex + 1) % urls.size();
75+
sendRecordToUrl(record, attributes, url);
76+
break;
77+
case BROADCAST:
78+
for (URI broadcastUrl : urls) {
79+
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));
80+
sendRecordToUrl(record, attributes, broadcastUrl);
81+
}
82+
break;
83+
default:
84+
throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy);
8485
}
8586
}
8687

88+
private void sendRecordToUrl(ConnectRecord record, Map<String, Object> attributes, URI url) {
89+
// convert ConnectRecord to HttpConnectRecord
90+
String type = String.format("%s.%s.%s",
91+
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
92+
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
93+
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
94+
95+
// add AttemptEvent to the attributes
96+
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
97+
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);
98+
99+
// deliver the record
100+
deliver(url, httpConnectRecord, attributes, record);
101+
}
102+
87103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.http.sink.handler;
19+
20+
public enum HttpDeliveryStrategy {
21+
ROUND_ROBIN,
22+
BROADCAST
23+
}

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ private void doInitWebClient() {
9393
.setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
9494
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
9595
.setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
96-
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
96+
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
97+
.setPipelining(sinkConnectorConfig.isParallelized());
9798
this.webClient = WebClient.create(vertx, options);
9899
}
99100

@@ -108,7 +109,7 @@ private void doInitWebClient() {
108109
*/
109110
@Override
110111
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
111-
ConnectRecord connectRecord) {
112+
ConnectRecord connectRecord) {
112113
// create headers
113114
Map<String, Object> extensionMap = new HashMap<>();
114115
Set<String> extensionKeySet = httpConnectRecord.getExtensions().keySet();
@@ -203,6 +204,9 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
203204
// failure
204205
record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
205206
}
207+
} else {
208+
log.warn("still have requests to process, size {}|attempt num {}",
209+
multiHttpRequestContext.getRemainingRequests(), attemptEvent.getAttempts());
206210
}
207211
}
208212

0 commit comments

Comments
 (0)