Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* The {@link ReactiveMessageHandlerSpec} extension for {@link ZeroMqMessageHandler}.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -126,6 +127,19 @@ public ZeroMqMessageHandlerSpec topic(String topic) {
return this;
}

/**
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
* subscriptions must be wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* @param wrapTopic true iff the topic must be wrapped with an additional empty frame.
* @return the spec
* @since 6.2.6
*/
public ZeroMqMessageHandlerSpec wrapTopic(boolean wrapTopic) {
this.reactiveMessageHandler.wrapTopic(wrapTopic);
return this;
}

/**
* Specify a {@link Function} to evaluate a topic a {@link SocketType#PUB}
* is going to use for distributing messages into the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -108,6 +109,19 @@ public ZeroMqMessageProducerSpec topics(String... topics) {
return this;
}

/**
* Specify if the topic
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* @param wrapTopic true iff the received topic is wrapped with an additional empty frame.
* @return the spec
* @since 6.2.6
*/
public ZeroMqMessageProducerSpec wrapTopic(boolean wrapTopic) {
this.target.wrapTopic(wrapTopic);
return this;
}

/**
* Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}.
* @param connectUrl the URL to connect ZeroMq socket to.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -54,6 +55,7 @@
* When the {@link SocketType#SUB} is used, the received topic is stored in the {@link ZeroMqHeaders#TOPIC}.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -90,6 +92,8 @@ public class ZeroMqMessageProducer extends MessageProducerSupport {

private volatile Mono<ZMQ.Socket> socketMono;

private volatile boolean wrapTopic = true;

public ZeroMqMessageProducer(ZContext context) {
this(context, SocketType.PAIR);
}
Expand Down Expand Up @@ -189,6 +193,17 @@ public int getBoundPort() {
return this.bindPort.get();
}

/**
* Specify if the topic
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* @param wrapTopic true iff the received topic is wrapped with an additional empty frame.
* @since 6.2.6
*/
public void wrapTopic(boolean wrapTopic) {
this.wrapTopic = wrapTopic;
}

@Override
public String getComponentType() {
return "zeromq:inbound-channel-adapter";
Expand Down Expand Up @@ -284,7 +299,14 @@ private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {
return msgMono.map((msg) -> {
Map<String, Object> headers = null;
if (msg.size() > 1) {
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, msg.unwrap().getString(ZMQ.CHARSET));
ZFrame frame;
if (this.wrapTopic) {
frame = msg.unwrap();
}
else {
frame = msg.pop();
}
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, frame.getString(ZMQ.CHARSET));
}
return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -88,6 +89,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler

private volatile Disposable socketMonoSubscriber;

private volatile boolean wrapTopic = true;

/**
* Create an instance based on the provided {@link ZContext} and connection string.
* @param context the {@link ZContext} to use for creating sockets.
Expand Down Expand Up @@ -191,6 +194,17 @@ public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}

/**
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
* subscriptions must be wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* @param wrapTopic true iff the topic must be wrapped with an additional empty frame.
* @since 6.2.6
*/
public void wrapTopic(boolean wrapTopic) {
this.wrapTopic = wrapTopic;
}

@Override
public String getComponentType() {
return "zeromq:outbound-channel-adapter";
Expand Down Expand Up @@ -244,7 +258,13 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
if (socket.base() instanceof Pub) {
String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class);
if (topic != null) {
msg.wrap(new ZFrame(topic));
var frame = new ZFrame(topic);
if (this.wrapTopic) {
msg.wrap(frame);
}
else {
msg.push(frame);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -146,4 +147,46 @@ void testMessageProducerForPubSubReceiveRaw() {
socket.close();
}

@Test
void testMessageProducerForPubSubDisabledWrapTopic() {
String socketAddress = "inproc://messageProducer.test";
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB);
socket.bind(socketAddress);
socket.setReceiveTimeOut(10_000);

FluxMessageChannel outputChannel = new FluxMessageChannel();

StepVerifier stepVerifier =
StepVerifier.create(outputChannel)
.assertNext((message) ->
assertThat(message.getPayload())
.asInstanceOf(InstanceOfAssertFactories.type(ZMsg.class))
.extracting(ZMsg::pop)
.isEqualTo(new ZFrame("testTopicWithNonWrappedTopic")))
.thenCancel()
.verifyLater();

ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("test");
messageProducer.setReceiveRaw(true);
messageProducer.setConnectUrl(socketAddress);
messageProducer.setConsumeDelay(Duration.ofMillis(10));
messageProducer.setBeanFactory(mock(BeanFactory.class));
messageProducer.wrapTopic(false);
messageProducer.afterPropertiesSet();
messageProducer.start();

assertThat(socket.recv()).isNotNull();

ZMsg msg = ZMsg.newStringMsg("test");
msg.push(new ZFrame("testTopicWithNonWrappedTopic"));
msg.send(socket);

stepVerifier.verify(Duration.ofSeconds(10));

messageProducer.destroy();
socket.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -150,4 +151,40 @@ void testMessageHandlerForPushPullOverProxy() {
proxy.destroy();
}

@Test
void testMessageHandlerForPubSubDisabledWrapTopic() {
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
subSocket.setReceiveTimeOut(0);
int port = subSocket.bindToRandomPort("tcp://*");
subSocket.subscribe("test");

ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB);
messageHandler.setBeanFactory(mock(BeanFactory.class));
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
messageHandler.wrapTopic(false);
messageHandler.afterPropertiesSet();
messageHandler.start();

Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();

await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100))
.untilAsserted(() -> {
subSocket.subscribe("test");
messageHandler.handleMessage(testMessage).subscribe();
ZMsg msg = ZMsg.recvMsg(subSocket);
assertThat(msg).isNotNull();
assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
Message<?> capturedMessage =
new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
assertThat(capturedMessage).isEqualTo(testMessage);
msg.destroy();
});

messageHandler.destroy();
subSocket.close();
}

}