Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4174]Support to subscribe message by tags when using RocketMQ storage plugin. #4175

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class Constants {

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp";

public static final String MESSAGE_PROP_SEPARATOR = "99";
public static final String MESSAGE_PROP_SEPARATOR = "95";

public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));

Expand Down Expand Up @@ -178,4 +178,6 @@ public class Constants {
* application/cloudevents+json Content-type
*/
public static final String CONTENT_TYPE_CLOUDEVENTS_JSON = "application/cloudevents+json";

public static final String MSG_TAG = "tags";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class SubscriptionItem implements Serializable {

private String topic;

private String subExpression;

@JsonDeserialize(converter = SubscriptionModeConverter.class)
private SubscriptionMode mode;

Expand All @@ -43,6 +45,13 @@ public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType ty
this.type = type;
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type, String subExpression) {
this.topic = topic;
this.mode = mode;
this.type = type;
this.subExpression = subExpression;
}

public SubscriptionType getType() {
return type;
}
Expand All @@ -67,12 +76,21 @@ public void setMode(SubscriptionMode mode) {
this.mode = mode;
}

public String getSubExpression() {
return subExpression;
}

public void setSubExpression(String subExpression) {
this.subExpression = subExpression;
}

@Override
public String toString() {
return "SubscriptionItem{"
+ "topic=" + topic
+ ", mode=" + mode
+ ", type=" + type
+ ", subExpression=" + subExpression
+ '}';
}

Expand All @@ -85,12 +103,12 @@ public boolean equals(Object o) {
return false;
}
SubscriptionItem that = (SubscriptionItem) o;
return Objects.equal(topic, that.topic) && mode == that.mode && type == that.type;
return Objects.equal(topic, that.topic) && mode == that.mode && type == that.type && Objects.equal(subExpression, that.subExpression);
}

@Override
public int hashCode() {
return Objects.hashCode(topic, mode, type);
return Objects.hashCode(topic, mode, type, subExpression);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public class ExampleConstants {
public static final String EVENTMESH_HTTP_ASYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
public static final String EVENTMESH_HTTP_SYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-SYNC";
public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC = "TEST-TOPIC-TCP-ASYNC";
public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG = "TEST-TOPIC-TCP-ASYNC-TAG";
public static final String EVENTMESH_TCP_SYNC_TEST_TOPIC = "TEST-TOPIC-TCP-SYNC";
public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC = "TEST-TOPIC-TCP-BROADCAST";
public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC_TAG = "TEST-TOPIC-TCP-BROADCAST-TAG";

public static final String DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP = "EventMeshTest-producerGroup";
public static final String DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP = "EventMeshTest-consumerGroup";
Expand All @@ -56,4 +58,6 @@ public class ExampleConstants {
public static final String IDC = "FT";
public static final String SUB_SYS = "1234";
public static final String SERVER_PORT = "server.port";

public static final String TAG_PREFIX = "TEST_MSG_TAG_";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand Down Expand Up @@ -109,38 +111,47 @@ public static Package rrResponse(final EventMeshMessage request) {
}

public static EventMeshMessage getEventMeshMessage(String eventMeshTcpSyncTestTopic, String msgType, String msg,
String keys, String keyMsg, String testMessage) {
String keys, String keyMsg, String testMessage, String tag) {
final EventMeshMessage mqmsg = new EventMeshMessage();
mqmsg.setTopic(eventMeshTcpSyncTestTopic);
mqmsg.getProperties().put(msgType, msg);
mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
mqmsg.getProperties().put(keys, keyMsg);
if (StringUtils.isNotBlank(tag)) {
mqmsg.getProperties().put(Constants.MSG_TAG, tag);
}
mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
mqmsg.setBody(testMessage);
return mqmsg;
}

public static EventMeshMessage generateSyncRRMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, UtilsConstants.MSG_TYPE,
"persistent", UtilsConstants.KEYS, generateRandomString(16), "testSyncRR");
"persistent", UtilsConstants.KEYS, generateRandomString(16), "testSyncRR", null);
}

private static EventMeshMessage generateAsyncRRMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", "testAsyncRR");
"notnull", "testAsyncRR", null);
}

public static EventMeshMessage generateAsyncEventMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY);
"notnull", ASYNC_MSG_BODY, null);
}

public static EventMeshMessage generateAsyncEventMqMsgWithTag(int i) {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY, ExampleConstants.TAG_PREFIX + i);
}

public static EventMeshMessage generateBroadcastMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_BROADCAST_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY);
"notnull", ASYNC_MSG_BODY, null);
}

private static String generateRandomString(final int length) {
Expand All @@ -166,6 +177,22 @@ public static CloudEvent generateCloudEventV1Async() {
.build();
}

public static CloudEvent generateCloudEventV1AsyncWithTag(int i) {
final Map<String, String> content = new HashMap<>();
content.put(UtilsConstants.CONTENT, ASYNC_MSG_BODY);

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(Objects.requireNonNull(JsonUtils.toJSONString(content)).getBytes(Constants.DEFAULT_CHARSET))
.withExtension(UtilsConstants.TTL, DEFAULT_TTL_MS)
.withExtension(Constants.MSG_TAG, ExampleConstants.TAG_PREFIX + i)
.build();
}

public static CloudEvent generateCloudEventV1SyncRR() {
final Map<String, String> content = new HashMap<>();
content.put(UtilsConstants.CONTENT, "testSyncRR");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public static void main(String[] args) throws Exception {

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

for (int i = 0; i < 4; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1AsyncWithTag(i);
if (log.isInfoEnabled()) {
log.info("begin send async msg[{}] with tag: {}", i, event);
}
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("AsyncPublish failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

Expand All @@ -42,6 +43,15 @@
public class SyncRequest {

public static void main(String[] args) throws Exception {
EventMeshTCPClient<CloudEvent> client = createClient();
try {
publishMsg(client);
} catch (Exception e) {
log.error("SyncRequest failed", e);
}
}

private static EventMeshTCPClient<CloudEvent> createClient() throws IOException {
final Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
Expand All @@ -51,44 +61,43 @@ public static void main(String[] args) throws Exception {
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
final EventMeshTCPClient<CloudEvent> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, CloudEvent.class);
client.init();
return client;
}

try {

final EventMeshTCPClient<CloudEvent> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, CloudEvent.class);
client.init();

final CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR();

if (log.isInfoEnabled()) {
log.info("begin send rr msg: {}", event);
}
private static void publishMsg(EventMeshTCPClient<CloudEvent> client) {
final CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR();
if (log.isInfoEnabled()) {
log.info("begin send req-resp msg: {}", event);
}

final Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
// check-NPE EventFormat
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
if (null == eventFormat) {
log.error("eventFormat is null. end the process");
return;
}
final Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
logResponse(response);
}

final CloudEvent replyEvent = eventFormat
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));
private static void logResponse(Package response) {
// check-NPE EventFormat
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
if (null == eventFormat) {
log.error("eventFormat is null. end the process");
return;
}

// check-NPE CloudEventData
final CloudEventData cloudEventData = replyEvent.getData();
if (null == cloudEventData) {
log.error("replyEvent.data is null. end the process");
return;
}
final CloudEvent replyEvent = eventFormat
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));

final String content = new String(cloudEventData.toBytes(), StandardCharsets.UTF_8);
if (log.isInfoEnabled()) {
log.info("receive rr reply: {}|{}", response, content);
}
// check-NPE CloudEventData
final CloudEventData cloudEventData = replyEvent.getData();
if (null == cloudEventData) {
log.error("replyEvent.data is null. end the process");
return;
}

} catch (Exception e) {
log.error("SyncRequest failed", e);
final String content = new String(cloudEventData.toBytes(), StandardCharsets.UTF_8);
if (log.isInfoEnabled()) {
log.info("receive rr reply, response: {}, response's content: {}", response, content);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public static void main(String[] args) throws Exception {

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

for (int i = 0; i < 4; i++) {
final EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsgWithTag(i);

if (log.isInfoEnabled()) {
log.info("begin send async msg[{}] with tag, msg: {}", i, eventMeshMessage);
}
client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("AsyncPublish failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
Expand All @@ -29,6 +30,7 @@
import org.apache.eventmesh.util.Utils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -56,6 +58,9 @@ public static void main(String[] args) throws Exception {

client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC);
client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG,
ExampleConstants.TAG_PREFIX + 1 + "||" + ExampleConstants.TAG_PREFIX + 3,
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(new AsyncSubscribe());

client.listen();
Expand All @@ -73,7 +78,11 @@ public Optional<CloudEvent> handle(final CloudEvent msg) {

final String content = new String(msg.getData().toBytes(), StandardCharsets.UTF_8);
if (log.isInfoEnabled()) {
log.info("receive async msg: {}|{}", msg, content);
if (Objects.nonNull(msg.getExtension(Constants.MSG_TAG))) {
log.info("receive async msg, msg:{}, msg's data:{}, tag:{}", msg, content, msg.getExtension(Constants.MSG_TAG));
} else {
log.info("receive async msg, msg:{}, msg's data:{}", msg, content);
}
}
return Optional.empty();
}
Expand Down
Loading