Skip to content

Commit 5a80073

Browse files
authored
Merge pull request #914 from eight-nines/webhook
[ISSUE #865] EM-Webhook: repair exception handling && send webhook CloudEvents
2 parents fc832a0 + c5fdea0 commit 5a80073

File tree

26 files changed

+1198
-634
lines changed

26 files changed

+1198
-634
lines changed

eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/JsonUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.fasterxml.jackson.annotation.JsonInclude;
2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424
import com.fasterxml.jackson.databind.DeserializationFeature;
25+
import com.fasterxml.jackson.databind.JsonNode;
2526
import com.fasterxml.jackson.databind.ObjectMapper;
2627
import com.fasterxml.jackson.databind.SerializationFeature;
2728

@@ -69,4 +70,12 @@ public static <T> T deserialize(Class<T> clazz, String json) throws IOException
6970

7071
return objectMapper.readValue(json, clazz);
7172
}
73+
74+
public static JsonNode getJsonNode(String json) throws IOException {
75+
if (json == null || json.length() == 0) {
76+
return null;
77+
}
78+
79+
return objectMapper.readTree(json);
80+
}
7281
}

eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,19 @@ public class AsyncPublishInstance {
4747
public static final int MESSAGE_SIZE = 1;
4848

4949
public static void main(String[] args) throws Exception {
50-
50+
5151
Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
5252
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
5353
final String eventMeshHttpPort = properties.getProperty(ExampleConstants.EVENTMESH_HTTP_PORT);
54-
54+
5555
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
5656
String eventMeshIPPort = ExampleConstants.DEFAULT_EVENTMESH_IP_PORT;
5757
if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) {
5858
eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
5959
}
6060

61+
// Both the producer and consumer require an instance of EventMeshHttpClientConfig class
62+
// that specifies the configuration of EventMesh HTTP client.
6163
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
6264
.liteEventMeshAddr(eventMeshIPPort)
6365
.producerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP)

eventmesh-runtime/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
implementation project(":eventmesh-trace-plugin:eventmesh-trace-zipkin")
6767

6868
implementation project(":eventmesh-webhook:eventmesh-webhook-admin")
69+
implementation project(":eventmesh-webhook:eventmesh-webhook-api")
6970
implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
7071

7172
testImplementation "org.mockito:mockito-core"

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
package org.apache.eventmesh.runtime.admin.controller;
1919

2020
import org.apache.eventmesh.admin.rocketmq.controller.AdminController;
21+
import org.apache.eventmesh.runtime.admin.handler.DeleteWebHookConfigHandler;
22+
import org.apache.eventmesh.runtime.admin.handler.InsertWebHookConfigHandler;
2123
import org.apache.eventmesh.runtime.admin.handler.QueryRecommendEventMeshHandler;
24+
import org.apache.eventmesh.runtime.admin.handler.QueryWebHookConfigByIdHandler;
25+
import org.apache.eventmesh.runtime.admin.handler.QueryWebHookConfigByManufacturerHandler;
2226
import org.apache.eventmesh.runtime.admin.handler.RedirectClientByIpPortHandler;
2327
import org.apache.eventmesh.runtime.admin.handler.RedirectClientByPathHandler;
2428
import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandler;
@@ -28,6 +32,7 @@
2832
import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemHandler;
2933
import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler;
3034
import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler;
35+
import org.apache.eventmesh.runtime.admin.handler.UpdateWebHookConfigHandler;
3136
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
3237

3338
import java.io.IOException;
@@ -63,6 +68,11 @@ public void start() throws IOException {
6368
server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer));
6469
server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer));
6570
server.createContext("/eventMesh/recommend", new QueryRecommendEventMeshHandler(eventMeshTCPServer));
71+
server.createContext("/webhook/insertWebHookConfig", new InsertWebHookConfigHandler());
72+
server.createContext("/webhook/updateWebHookConfig", new UpdateWebHookConfigHandler());
73+
server.createContext("/webhook/deleteWebHookConfig", new DeleteWebHookConfigHandler());
74+
server.createContext("/webhook/queryWebHookConfigById", new QueryWebHookConfigByIdHandler());
75+
server.createContext("/webhook/queryWebHookConfigByManufacturer", new QueryWebHookConfigByManufacturerHandler());
6676

6777
adminController = new AdminController();
6878
adminController.run(server);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.apache.eventmesh.runtime.admin.handler;
2+
3+
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
4+
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
5+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
6+
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
7+
import org.apache.eventmesh.webhook.api.WebHookConfig;
8+
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
9+
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import com.sun.net.httpserver.HttpExchange;
17+
import com.sun.net.httpserver.HttpHandler;
18+
19+
public class DeleteWebHookConfigHandler implements HttpHandler {
20+
21+
public Logger logger = LoggerFactory.getLogger(this.getClass());
22+
23+
@Override
24+
public void handle(HttpExchange httpExchange) throws IOException {
25+
httpExchange.sendResponseHeaders(200, 0);
26+
27+
// get requestBody and resolve to WebHookConfig
28+
String requestBody = NetUtils.parsePostBody(httpExchange);
29+
WebHookConfig webHookConfig = JsonUtils.toObject(requestBody, WebHookConfig.class);
30+
31+
AdminWebHookConfigOperationManage manage = new AdminWebHookConfigOperationManage();
32+
try (OutputStream out = httpExchange.getResponseBody()) {
33+
WebHookConfigOperation operation = manage.getHookConfigOperationManage();
34+
Integer code = operation.deleteWebHookConfig(webHookConfig); // operating result
35+
String result = 1 == code ? "deleteWebHookConfig Succeed!" : "deleteWebHookConfig Failed!";
36+
out.write(result.getBytes());
37+
} catch (Exception e) {
38+
logger.error("get WebHookConfigOperation implementation Failed.", e);
39+
}
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.apache.eventmesh.runtime.admin.handler;
2+
3+
import org.apache.eventmesh.admin.rocketmq.HttpMethod;
4+
import org.apache.eventmesh.admin.rocketmq.request.TopicCreateRequest;
5+
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
6+
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
7+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
8+
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
9+
import org.apache.eventmesh.webhook.api.WebHookConfig;
10+
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
11+
12+
import org.apache.http.Consts;
13+
14+
import java.io.IOException;
15+
import java.io.InputStreamReader;
16+
import java.io.OutputStream;
17+
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import com.sun.net.httpserver.HttpExchange;
22+
import com.sun.net.httpserver.HttpHandler;
23+
24+
public class InsertWebHookConfigHandler implements HttpHandler {
25+
26+
public Logger logger = LoggerFactory.getLogger(this.getClass());
27+
28+
@Override
29+
public void handle(HttpExchange httpExchange) throws IOException {
30+
httpExchange.sendResponseHeaders(200, 0);
31+
32+
// get requestBody and resolve to WebHookConfig
33+
String requestBody = NetUtils.parsePostBody(httpExchange);
34+
WebHookConfig webHookConfig = JsonUtils.toObject(requestBody, WebHookConfig.class);
35+
36+
AdminWebHookConfigOperationManage manage = new AdminWebHookConfigOperationManage();
37+
try (OutputStream out = httpExchange.getResponseBody()) {
38+
WebHookConfigOperation operation = manage.getHookConfigOperationManage();
39+
Integer code = operation.insertWebHookConfig(webHookConfig); // operating result
40+
String result = 1 == code ? "insertWebHookConfig Succeed!" : "insertWebHookConfig Failed!";
41+
out.write(result.getBytes());
42+
} catch (Exception e) {
43+
logger.error("get WebHookConfigOperation implementation Failed.", e);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.apache.eventmesh.runtime.admin.handler;
2+
3+
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
4+
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
5+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
6+
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
7+
import org.apache.eventmesh.webhook.api.WebHookConfig;
8+
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
9+
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import com.sun.net.httpserver.HttpExchange;
17+
import com.sun.net.httpserver.HttpHandler;
18+
19+
public class QueryWebHookConfigByIdHandler implements HttpHandler {
20+
21+
public Logger logger = LoggerFactory.getLogger(this.getClass());
22+
23+
@Override
24+
public void handle(HttpExchange httpExchange) throws IOException {
25+
httpExchange.sendResponseHeaders(200, 0);
26+
httpExchange.getResponseHeaders().add("Content-Type", "application/json");
27+
28+
// get requestBody and resolve to WebHookConfig
29+
String requestBody = NetUtils.parsePostBody(httpExchange);
30+
WebHookConfig webHookConfig = JsonUtils.toObject(requestBody, WebHookConfig.class);
31+
32+
AdminWebHookConfigOperationManage manage = new AdminWebHookConfigOperationManage();
33+
try (OutputStream out = httpExchange.getResponseBody()) {
34+
WebHookConfigOperation operation = manage.getHookConfigOperationManage();
35+
WebHookConfig result = operation.queryWebHookConfigById(webHookConfig); // operating result
36+
out.write(JsonUtils.toJson(result).getBytes());
37+
} catch (Exception e) {
38+
logger.error("get WebHookConfigOperation implementation Failed.", e);
39+
}
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.apache.eventmesh.runtime.admin.handler;
2+
3+
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
4+
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
5+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
6+
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
7+
import org.apache.eventmesh.webhook.api.WebHookConfig;
8+
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
9+
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
import java.util.List;
13+
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import com.fasterxml.jackson.databind.JsonNode;
18+
import com.sun.net.httpserver.HttpExchange;
19+
import com.sun.net.httpserver.HttpHandler;
20+
21+
public class QueryWebHookConfigByManufacturerHandler implements HttpHandler {
22+
23+
public Logger logger = LoggerFactory.getLogger(this.getClass());
24+
25+
@Override
26+
public void handle(HttpExchange httpExchange) throws IOException {
27+
httpExchange.sendResponseHeaders(200, 0);
28+
httpExchange.getResponseHeaders().add("Content-Type", "application/json");
29+
30+
// get requestBody and resolve to WebHookConfig
31+
String requestBody = NetUtils.parsePostBody(httpExchange);
32+
JsonNode node = JsonUtils.getJsonNode(requestBody);
33+
WebHookConfig webHookConfig = JsonUtils.toObject(node.get("webHookConfig").toString(), WebHookConfig.class);
34+
Integer pageNum = Integer.parseInt(node.get("pageNum").toString());
35+
Integer pageSize = Integer.parseInt(node.get("pageSize").toString());
36+
37+
AdminWebHookConfigOperationManage manage = new AdminWebHookConfigOperationManage();
38+
try (OutputStream out = httpExchange.getResponseBody()) {
39+
WebHookConfigOperation operation = manage.getHookConfigOperationManage();
40+
List<WebHookConfig> result = operation.queryWebHookConfigByManufacturer(webHookConfig, pageNum, pageSize); // operating result
41+
out.write(JsonUtils.toJson(result).getBytes());
42+
} catch (Exception e) {
43+
logger.error("get WebHookConfigOperation implementation Failed.", e);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.apache.eventmesh.runtime.admin.handler;
2+
3+
import org.apache.eventmesh.admin.rocketmq.util.JsonUtils;
4+
import org.apache.eventmesh.admin.rocketmq.util.NetUtils;
5+
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
6+
import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
7+
import org.apache.eventmesh.webhook.api.WebHookConfig;
8+
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
9+
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import com.sun.net.httpserver.HttpExchange;
17+
import com.sun.net.httpserver.HttpHandler;
18+
19+
public class UpdateWebHookConfigHandler implements HttpHandler {
20+
21+
public Logger logger = LoggerFactory.getLogger(this.getClass());
22+
23+
@Override
24+
public void handle(HttpExchange httpExchange) throws IOException {
25+
httpExchange.sendResponseHeaders(200, 0);
26+
27+
// get requestBody and resolve to WebHookConfig
28+
String requestBody = NetUtils.parsePostBody(httpExchange);
29+
WebHookConfig webHookConfig = JsonUtils.toObject(requestBody, WebHookConfig.class);
30+
31+
AdminWebHookConfigOperationManage manage = new AdminWebHookConfigOperationManage();
32+
try (OutputStream out = httpExchange.getResponseBody()) {
33+
WebHookConfigOperation operation = manage.getHookConfigOperationManage();
34+
Integer code = operation.updateWebHookConfig(webHookConfig); // operating result
35+
String result = 1 == code ? "updateWebHookConfig Succeed!" : "updateWebHookConfig Failed!";
36+
out.write(result.getBytes());
37+
} catch (Exception e) {
38+
logger.error("get WebHookConfigOperation implementation Failed.", e);
39+
}
40+
}
41+
}

eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/AdminWebHookConfigOperationManage.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,53 +14,69 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
package org.apache.eventmesh.webhook.admin;
1819

1920
import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
20-
import org.slf4j.Logger;
21-
import org.slf4j.LoggerFactory;
2221

22+
import java.io.IOException;
23+
import java.io.InputStream;
2324
import java.lang.reflect.Constructor;
2425
import java.lang.reflect.InvocationTargetException;
2526
import java.util.HashMap;
2627
import java.util.Map;
28+
import java.util.Properties;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
2733

2834
public class AdminWebHookConfigOperationManage {
2935

30-
public Logger logger = LoggerFactory.getLogger(this.getClass());
36+
public Logger logger = LoggerFactory.getLogger(this.getClass());
3137

32-
private static final Map<String, Class<? extends WebHookConfigOperation>> map = new HashMap<>();
38+
private static final Map<String, Class<? extends WebHookConfigOperation>> map = new HashMap<>();
3339

34-
static {
35-
map.put("file", FileWebHookConfigOperation.class);
36-
map.put("nacos", NacosWebHookConfigOperation.class);
37-
}
40+
// todo 单例懒加载
41+
static {
42+
map.put("file", FileWebHookConfigOperation.class);
43+
map.put("nacos", NacosWebHookConfigOperation.class);
44+
}
3845

3946
/**
4047
* Create it in ClientManageController
4148
*
4249
* @return WebHookConfigOperation implementation
4350
*/
44-
public WebHookConfigOperation getHookConfigOperationManage() {
45-
return getHookConfigOperationManage("file","./webhook");
46-
}
51+
public WebHookConfigOperation getHookConfigOperationManage() throws Exception {
52+
Properties configProperties = readConfigFromConfigFile();
53+
String operationMode = configProperties.getProperty("eventMesh.webHook.operationMode");
54+
55+
if (!map.containsKey(operationMode)) {
56+
throw new IllegalStateException("operationMode is not supported.");
57+
}
58+
59+
Constructor<? extends WebHookConfigOperation> constructor = map.get(operationMode).getDeclaredConstructor(Properties.class);
60+
constructor.setAccessible(true);
61+
try {
62+
return constructor.newInstance(configProperties);
63+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
64+
logger.error("can't find WebHookConfigOperation implementation");
65+
throw new Exception("can't find WebHookConfigOperation implementation");
66+
}
67+
}
4768

48-
public WebHookConfigOperation getHookConfigOperationManage(String path) {
49-
return getHookConfigOperationManage("file",path);
69+
/**
70+
* Read webHook related configurations from the global configuration file
71+
*/
72+
public Properties readConfigFromConfigFile() throws IOException {
73+
Properties configProperties;
74+
try (final InputStream inputStream =
75+
WebHookConfigOperation.class.getClassLoader().getResourceAsStream("eventmesh.properties")) {
76+
configProperties = new Properties();
77+
configProperties.load(inputStream);
78+
}
79+
return configProperties;
5080
}
5181

52-
public WebHookConfigOperation getHookConfigOperationManage(String operationMode,String path) {
53-
try {
54-
Constructor<? extends WebHookConfigOperation> constructor = map.get(operationMode).getDeclaredConstructor(String.class);
55-
constructor.setAccessible(true);
56-
try {
57-
return constructor.newInstance(path);
58-
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
59-
logger.error("can't find WebHookConfigOperation implementation", e);
60-
}
61-
} catch (NoSuchMethodException e) {
62-
logger.error("getHookConfigOperationManage failed", e);
63-
}
64-
return null;
65-
}
6682
}

0 commit comments

Comments
 (0)