Skip to content

Commit

Permalink
[Issue #655] Adding send message constraints for message size and bat…
Browse files Browse the repository at this point in the history
…ch size (#760)

* [Issue #655] Adding send message constraints for message size and batch size
  • Loading branch information
jinrongluo authored Feb 13, 2022
1 parent b8891ee commit 85de5c7
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception {
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();

for (int i = 0; i < 5; i++) {
for (int i = 0; i < 2; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
logger.info("begin send async msg[{}]==================={}", i, event);
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
Expand Down
9 changes: 7 additions & 2 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20

# for single event publish, maximum size allowed per event
eventMesh.server.maxEventSize=1000
# for batch event publish, maximum number of events allowed in one batch
eventMesh.server.maxEventBatchSize=10

# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
Expand All @@ -61,8 +66,8 @@ eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200

#ip address blacklist
eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=standalone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {

public int eventMeshBatchMsgRequestNumPerSecond = 20000;

public int eventMeshEventSize = 1000;

public int eventMeshEventBatchSize = 10;

public List<IPAddress> eventMeshIpv4BlackList = Collections.emptyList();

public List<IPAddress> eventMeshIpv6BlackList = Collections.emptyList();
Expand Down Expand Up @@ -266,6 +270,16 @@ public void init() {

}

String eventSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENTSIZE);
if (StringUtils.isNotEmpty(eventSize) && StringUtils.isNumeric(eventSize)) {
eventMeshEventSize = Integer.parseInt(eventSize);
}

String eventBatchSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE);
if (StringUtils.isNotEmpty(eventBatchSize) && StringUtils.isNumeric(eventBatchSize)) {
eventMeshEventBatchSize = Integer.parseInt(eventBatchSize);
}

String ipv4BlackList = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST);
if (StringUtils.isNotEmpty(ipv4BlackList)) {
eventMeshIpv4BlackList = getBlacklist(ipv4BlackList);
Expand Down Expand Up @@ -339,8 +353,12 @@ static class ConfKeys {

public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond";

public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4";
public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";

public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";

public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventMesh.server.blacklist.ipv4";

public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6";
public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventMesh.server.blacklist.ipv6";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {

public int sleepIntervalInRebalanceRedirectMills = 200;

public int eventMeshEventSize = 1000;

public int eventMeshEventBatchSize = 10;

private TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000);
private TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000);

Expand Down Expand Up @@ -155,6 +159,10 @@ public void init() {
sleepIntervalInRebalanceRedirectMills = configurationWrapper.getIntProp(
ConfKeys.KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME, sleepIntervalInRebalanceRedirectMills);

eventMeshEventSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_EVENTSIZE, eventMeshEventSize);

eventMeshEventBatchSize = configurationWrapper.getIntProp(
ConfKeys.KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE, eventMeshEventBatchSize);
}

public TrafficShapingConfig getGtc() {
Expand Down Expand Up @@ -191,6 +199,8 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME = "eventMesh.server.tcp.pushFailIsolateTimeInMills";
public static String KEYS_EVENTMESH_SERVER_GRACEFUL_SHUTDOWN_SLEEP_TIME = "eventMesh.server.gracefulShutdown.sleepIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME = "eventMesh.server.rebalanceRedirect.sleepIntervalInM";
public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize";
public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize";
}

public static class TrafficShapingConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,6 +112,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
String producerGroup = "";
int eventSize = eventList.size();

if (eventSize > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize) {
batchMessageLogger.error("Event batch size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchResponseHeader,
SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
"Event batch size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

for (CloudEvent event : eventList) {
//validate event
if (StringUtils.isBlank(event.getId())
Expand All @@ -126,6 +139,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
batchMessageLogger.error("Event size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchResponseHeader,
SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
"Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString();
String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString();
String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -158,6 +159,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
batchMessageLogger.error("Event size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchV2ResponseHeader,
SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
"Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

//do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -152,6 +153,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
httpLogger.error("Event size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
replyMessageResponseHeader,
ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
"Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

if (!eventMeshProducer.getStarted().get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -195,6 +196,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
httpLogger.error("Event size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
"Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

try {
// body
//omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -191,6 +192,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
return;
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) {
httpLogger.error("Event size exceeds the limit: {}",
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
"Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize));
asyncContext.onComplete(responseEventMeshCommand);
return;
}

EventMeshProducer eventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down Expand Up @@ -86,6 +87,12 @@ public void run() {
throw new Exception("event is null");
}

String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
throw new Exception("event size exceeds the limit: "
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
}

//do acl check in sending msg
if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
Expand Down

0 comments on commit 85de5c7

Please sign in to comment.