From dace9c4d226c0cbfa58ecd7168c9ab4cf1794b12 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Mon, 19 Jul 2021 14:44:55 +0200 Subject: [PATCH] Add support for pull and push consumers (#6) * Add handling of QueueDeclare frame, bindings and partial handling of BasicGet * Add RabbitMQ client end-to-end test * Handle no-ack BasicGet * Add handling of BasicConsume, BasicCancel, BasicDeliver and BasicQos frames --- .github/{workflow => workflows}/github-ci.yml | 0 conf/gateway.conf | 1 + pom.xml | 2 +- pulsar-rabbitmq-gw/dependency-reduced-pom.xml | 70 +++ pulsar-rabbitmq-gw/pom.xml | 35 ++ .../oss/pulsar/rabbitmqgw/AMQChannel.java | 493 ++++++++++++++-- .../oss/pulsar/rabbitmqgw/Binding.java | 94 +++ .../oss/pulsar/rabbitmqgw/ConsumerTarget.java | 74 +++ .../oss/pulsar/rabbitmqgw/Exchange.java | 33 +- .../pulsar/rabbitmqgw/GatewayConnection.java | 17 +- .../oss/pulsar/rabbitmqgw/GatewayService.java | 7 + .../oss/pulsar/rabbitmqgw/MessageUtils.java | 66 +++ .../rabbitmqgw/Pre0_10CreditManager.java | 121 ++++ .../rabbitmqgw/ProtocolOutputConverter.java | 510 +++++++++++++++++ .../datastax/oss/pulsar/rabbitmqgw/Queue.java | 181 ++++++ .../oss/pulsar/rabbitmqgw/VirtualHost.java | 79 +++ .../oss/pulsar/rabbitmqgw/AMQChannelTest.java | 541 +++++++++++++++++- .../pulsar/rabbitmqgw/AbstractBaseTest.java | 36 +- .../rabbitmqgw/GatewayConnectionTest.java | 7 +- rabbitmq-tests/pom.xml | 2 +- .../rabbitmqtests/RabbitmqInteropIT.java | 141 ++++- 21 files changed, 2414 insertions(+), 96 deletions(-) rename .github/{workflow => workflows}/github-ci.yml (100%) create mode 100644 conf/gateway.conf create mode 100644 pulsar-rabbitmq-gw/dependency-reduced-pom.xml create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Binding.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ConsumerTarget.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/MessageUtils.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Pre0_10CreditManager.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ProtocolOutputConverter.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Queue.java create mode 100644 pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/VirtualHost.java diff --git a/.github/workflow/github-ci.yml b/.github/workflows/github-ci.yml similarity index 100% rename from .github/workflow/github-ci.yml rename to .github/workflows/github-ci.yml diff --git a/conf/gateway.conf b/conf/gateway.conf new file mode 100644 index 0000000..c5d8489 --- /dev/null +++ b/conf/gateway.conf @@ -0,0 +1 @@ +brokerServiceURL = http://localhost:8080 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 16b8e97..db7ae9e 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ pom 1.0.0-SNAPSHOT DataStax RabbitMQ (R) gateway for Apache Pulsar (R) - Gateway to connect RabbitMQ clients to Apache Pulsar seamlessly + Gateway to seamlessly connect RabbitMQ clients to Apache Pulsar https://github.com/datastax/pulsar-rabbitmq-gw 2021 diff --git a/pulsar-rabbitmq-gw/dependency-reduced-pom.xml b/pulsar-rabbitmq-gw/dependency-reduced-pom.xml new file mode 100644 index 0000000..4d3c2ec --- /dev/null +++ b/pulsar-rabbitmq-gw/dependency-reduced-pom.xml @@ -0,0 +1,70 @@ + + + + pulsar-rabbitmq-gw-parent + com.datastax.oss + 1.0.0-SNAPSHOT + + 4.0.0 + pulsar-rabbitmq-gw + DataStax RabbitMQ (R) gateway for Apache Pulsar (R) + + + + maven-shade-plugin + 2.3 + + + package + + shade + + + + + com.datastax.oss.pulsar.rabbitmqgw.GatewayServiceStarter + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + org.junit.jupiter + junit-jupiter-engine + 5.7.1 + test + + + apiguardian-api + org.apiguardian + + + junit-platform-engine + org.junit.platform + + + junit-jupiter-api + org.junit.jupiter + + + + + + ${project.build.directory} + + + diff --git a/pulsar-rabbitmq-gw/pom.xml b/pulsar-rabbitmq-gw/pom.xml index 6ec7bc4..83d0273 100644 --- a/pulsar-rabbitmq-gw/pom.xml +++ b/pulsar-rabbitmq-gw/pom.xml @@ -78,4 +78,39 @@ test + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + + package + + shade + + + + + com.datastax.oss.pulsar.rabbitmqgw.GatewayServiceStarter + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannel.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannel.java index e1b36ad..e143b9e 100644 --- a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannel.java +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannel.java @@ -15,24 +15,26 @@ */ package com.datastax.oss.pulsar.rabbitmqgw; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.qpid.server.transport.util.Functions.hex; +import java.nio.ByteBuffer; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; @@ -40,13 +42,16 @@ import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; import org.apache.qpid.server.protocol.v0_8.transport.AMQMethodBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody; import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +59,26 @@ public class AMQChannel implements ServerChannelMethodProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class); + private static final long DEFAULT_HIGH_PREFETCH_LIMIT = 100L; + private static final long DEFAULT_BATCH_LIMIT = 10L; private final int _channelId; + private final Pre0_10CreditManager _creditManager; + /** + * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver + * frame so that value of this represents the last tag sent out + */ + private volatile long _deliveryTag = 0; + + private Queue _defaultQueue; + /** + * This tag is unique per subscription to a queue. The server returns this in response to a + * basic.consume request. + */ + private volatile int _consumerTag; + /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ + private final Map _tag2SubscriptionTargetMap = + new HashMap(); /** * The current message - which may be partial in the sense that not all frames have been received * yet - which has been received by this channel. As the frames are received the message gets @@ -67,23 +90,14 @@ public class AMQChannel implements ServerChannelMethodProcessor { private final AtomicBoolean _closing = new AtomicBoolean(false); private boolean _confirmOnPublish; private long _confirmedMessageCounter; - private final ConcurrentHashMap exchanges = new ConcurrentHashMap<>(); private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); public AMQChannel(GatewayConnection connection, int channelId) { _connection = connection; _channelId = channelId; - addStandardExchange( - ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - addStandardExchange( - ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - addStandardExchange( - ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - addStandardExchange( - ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); - addStandardExchange( - ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + _creditManager = + new Pre0_10CreditManager(0L, 0L, DEFAULT_HIGH_PREFETCH_LIMIT, DEFAULT_BATCH_LIMIT); } @Override @@ -176,7 +190,7 @@ public void receiveExchangeDeclare( } else if (!nowait) { _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); } - } else if (exchanges.containsKey(name)) { + } else if (_connection.getVhost().hasExchange(name)) { exchange = getExchange(name); if (!exchange.getType().equals(typeString)) { _connection.sendConnectionClose( @@ -210,7 +224,7 @@ public void receiveExchangeDeclare( } catch (IllegalArgumentException e) { String errorMessage = "Unknown exchange type '" + typeString + "' for exchange '" + exchangeName + "'"; - LOGGER.debug(errorMessage, e); + LOGGER.warn(errorMessage, e); _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, errorMessage, getChannelId()); } } @@ -258,13 +272,182 @@ public void receiveExchangeBound( @Override public void receiveQueueDeclare( - AMQShortString queue, + AMQShortString queueStr, boolean passive, boolean durable, boolean exclusive, boolean autoDelete, boolean nowait, - FieldTable arguments) {} + FieldTable arguments) { + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "RECV[" + + _channelId + + "] QueueDeclare[" + + " queue: " + + queueStr + + " passive: " + + passive + + " durable: " + + durable + + " exclusive: " + + exclusive + + " autoDelete: " + + autoDelete + + " nowait: " + + nowait + + " arguments: " + + arguments + + " ]"); + } + + final AMQShortString queueName; + + // if we aren't given a queue name, we create one which we return to the client + if ((queueStr == null) || (queueStr.length() == 0)) { + queueName = AMQShortString.createAMQShortString("tmp_" + UUID.randomUUID()); + } else { + queueName = queueStr; + } + + Queue queue; + + // TODO: do we need to check that the queue already exists with exactly the same + // "configuration"? + + if (passive) { + queue = getQueue(queueName.toString()); + if (queue == null) { + closeChannel( + ErrorCodes.NOT_FOUND, + "Queue: '" + + queueName + + "' not found on VirtualHost '" + + _connection.getVhost().getNamespace() + + "'."); + } else { + // TODO: check exclusive queue access + { + // set this as the default queue on the channel: + setDefaultQueue(queue); + if (!nowait) { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody( + queueName, queue.getQueueDepthMessages(), queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Queue " + queueName + " declared successfully"); + } + } + } + } + } else { + try { + LifetimePolicy lifetimePolicy; + ExclusivityPolicy exclusivityPolicy; + + if (exclusive) { + lifetimePolicy = + autoDelete + ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; + exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; + } else { + lifetimePolicy = + autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; + exclusivityPolicy = ExclusivityPolicy.NONE; + } + + Map attributes = FieldTable.convertToMap(arguments); + + if (attributes.containsKey(org.apache.qpid.server.model.Queue.EXCLUSIVE)) { + exclusivityPolicy = + ExclusivityPolicy.valueOf( + attributes.get(org.apache.qpid.server.model.Queue.EXCLUSIVE).toString()); + } + if (attributes.containsKey(org.apache.qpid.server.model.Queue.LIFETIME_POLICY)) { + lifetimePolicy = + LifetimePolicy.valueOf( + attributes.get(org.apache.qpid.server.model.Queue.LIFETIME_POLICY).toString()); + } + + queue = getQueue(queueName.toString()); + if (queue != null) { + // TODO; verify if queue is exclusive and opened by another connection + if (queue.isExclusive() != exclusive) { + + closeChannel( + ErrorCodes.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different exclusivity (was: " + + queue.isExclusive() + + " requested " + + exclusive + + ")"); + } else if ((autoDelete && queue.getLifetimePolicy() == LifetimePolicy.PERMANENT) + || (!autoDelete + && queue.getLifetimePolicy() + != ((exclusive && !durable) + ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + : LifetimePolicy.PERMANENT))) { + closeChannel( + ErrorCodes.ALREADY_EXISTS, + "Cannot re-declare queue '" + + queue.getName() + + "' with different lifetime policy (was: " + + queue.getLifetimePolicy() + + " requested autodelete: " + + autoDelete + + ")"); + } else { + setDefaultQueue(queue); + if (!nowait) { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody( + queueName, queue.getQueueDepthMessages(), queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Queue " + queueName + " declared successfully"); + } + } + } + } else { + + queue = new Queue(queueName.toString(), lifetimePolicy, exclusivityPolicy); + addQueue(queue); + _connection + .getVhost() + .getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME) + .bind(queue, queue.getName(), _connection); + + setDefaultQueue(queue); + + if (!nowait) { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + QueueDeclareOkBody responseBody = + methodRegistry.createQueueDeclareOkBody( + queueName, queue.getQueueDepthMessages(), queue.getConsumerCount()); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Queue " + queueName + " declared successfully"); + } + } + } + } catch (IllegalArgumentException e) { + String message = String.format("Error creating queue '%s': %s", queueName, e.getMessage()); + _connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, message, getChannelId()); + } catch (PulsarClientException e) { + _connection.sendConnectionClose(ErrorCodes.INTERNAL_ERROR, e.getMessage(), getChannelId()); + } + } + } @Override public void receiveQueueBind( @@ -292,20 +475,133 @@ public void receiveQueueUnbind( public void receiveBasicRecover(boolean requeue, boolean sync) {} @Override - public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {} + public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "RECV[" + + _channelId + + "] BasicQos[" + + " prefetchSize: " + + prefetchSize + + " prefetchCount: " + + prefetchCount + + " global: " + + global + + " ]"); + } + _creditManager.setCreditLimits(prefetchSize, prefetchCount); + + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); + _connection.writeFrame(responseBody.generateFrame(getChannelId())); + } @Override public void receiveBasicConsume( - AMQShortString queue, + AMQShortString queueName, AMQShortString consumerTag, boolean noLocal, boolean noAck, boolean exclusive, boolean nowait, - FieldTable arguments) {} + FieldTable arguments) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "RECV[" + + _channelId + + "] BasicConsume[" + + " queue: " + + queueName + + " consumerTag: " + + consumerTag + + " noLocal: " + + noLocal + + " noAck: " + + noAck + + " exclusive: " + + exclusive + + " nowait: " + + nowait + + " arguments: " + + arguments + + " ]"); + } + + AMQShortString consumerTag1 = consumerTag; + + Queue queue = + queueName == null + ? getDefaultQueue() + : _connection.getVhost().getQueue(queueName.toString()); + + if (queue == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("No queue for '" + queueName + "'"); + } + if (queueName != null) { + closeChannel(ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'"); + } else { + _connection.sendConnectionClose( + ErrorCodes.NOT_ALLOWED, + "No queue name provided, no default queue defined.", + _channelId); + } + } else { + if (consumerTag1 == null) { + consumerTag1 = AMQShortString.createAMQShortString("sgen_" + getNextConsumerTag()); + } + // TODO: check exclusive queue owned by another connection + if (_tag2SubscriptionTargetMap.containsKey(consumerTag1)) { + _connection.sendConnectionClose( + ErrorCodes.NOT_ALLOWED, "Non-unique consumer tag, '" + consumerTag1 + "'", _channelId); + } else if (queue.hasExclusiveConsumer()) { + _connection.sendConnectionClose( + ErrorCodes.ACCESS_REFUSED, + "Cannot subscribe to queue '" + + queue.getName() + + "' as it already has an existing exclusive consumer", + _channelId); + } else if (exclusive && queue.getConsumerCount() != 0) { + _connection.sendConnectionClose( + ErrorCodes.ACCESS_REFUSED, + "Cannot subscribe to queue '" + + queue.getName() + + "' exclusively as it already has a consumer", + _channelId); + } else { + if (!nowait) { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } + ConsumerTarget consumer = new ConsumerTarget(this, consumerTag1, queue); + queue.addConsumer(consumer, exclusive); + _tag2SubscriptionTargetMap.put(consumerTag1, consumer); + } + } + } @Override - public void receiveBasicCancel(AMQShortString consumerTag, boolean noWait) {} + public void receiveBasicCancel(AMQShortString consumerTag, boolean nowait) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "RECV[" + + _channelId + + "] BasicCancel[" + + " consumerTag: " + + consumerTag + + " noWait: " + + nowait + + " ]"); + } + + unsubscribeConsumer(consumerTag); + if (!nowait) { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); + _connection.writeFrame(cancelOkBody.generateFrame(_channelId)); + } + } @Override public void receiveBasicPublish( @@ -330,8 +626,12 @@ public void receiveBasicPublish( + " ]"); } - if (!exchanges.containsKey( - exchangeName == null ? ExchangeDefaults.DEFAULT_EXCHANGE_NAME : exchangeName.toString())) { + if (!_connection + .getVhost() + .hasExchange( + exchangeName == null + ? ExchangeDefaults.DEFAULT_EXCHANGE_NAME + : exchangeName.toString())) { closeChannel(ErrorCodes.NOT_FOUND, "Unknown exchange name: '" + exchangeName + "'"); } @@ -342,7 +642,68 @@ public void receiveBasicPublish( } @Override - public void receiveBasicGet(AMQShortString queue, boolean noAck) {} + public void receiveBasicGet(AMQShortString queueName, boolean noAck) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "RECV[" + + _channelId + + "] BasicGet[" + + " queue: " + + queueName + + " noAck: " + + noAck + + " ]"); + } + + Queue queue = + queueName == null + ? getDefaultQueue() + : _connection.getVhost().getQueue(queueName.toString()); + + if (queue == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("No queue for '" + queueName + "'"); + } + if (queueName != null) { + _connection.sendConnectionClose( + ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId); + + } else { + _connection.sendConnectionClose( + ErrorCodes.NOT_ALLOWED, + "No queue name provided, no default queue defined.", + _channelId); + } + } else { + // TODO: check exclusive queue owned by another connection + if (queue.hasExclusiveConsumer()) { + _connection.sendConnectionClose( + ErrorCodes.ACCESS_REFUSED, + "Cannot subscribe to queue '" + + queue.getName() + + "' as it already has an existing exclusive consumer", + _channelId); + } else { + Message message = queue.receive(noAck); + if (message != null) { + _connection + .getProtocolOutputConverter() + .writeGetOk( + MessageUtils.getMessagePublishInfo(message), + new ContentBody(ByteBuffer.wrap(message.getData())), + MessageUtils.getContentHeaderBody(message), + message.getRedeliveryCount() > 0, + _channelId, + getNextDeliveryTag(), + queue.getQueueDepthMessages()); + } else { + MethodRegistry methodRegistry = _connection.getMethodRegistry(); + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } + } + } + } @Override public void receiveChannelFlow(boolean active) {} @@ -509,6 +870,14 @@ private void publishContentBody(ContentBody contentBody) { } } + public long getNextDeliveryTag() { + return ++_deliveryTag; + } + + private int getNextConsumerTag() { + return ++_consumerTag; + } + private void deliverCurrentMessageIfComplete() { // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { @@ -543,13 +912,21 @@ private void deliverCurrentMessageIfComplete() { qpidByteBuffer.copyTo(value); messageBuilder.value(value); + messageBuilder.property( + MessageUtils.MESSAGE_PROPERTY_AMQP_IMMEDIATE, String.valueOf(info.isImmediate())); + messageBuilder.property( + MessageUtils.MESSAGE_PROPERTY_AMQP_MANDATORY, String.valueOf(info.isMandatory())); + ContentHeaderBody contentHeader = _currentMessage.getContentHeader(); byte[] bytes = new byte[contentHeader.getSize()]; QpidByteBuffer buf = QpidByteBuffer.wrap(bytes); contentHeader.writePayload(buf); - messageBuilder.property("amqp-headers", Base64.getEncoder().encodeToString(bytes)); - messageBuilder.eventTime(contentHeader.getProperties().getTimestamp()); + messageBuilder.property( + MessageUtils.MESSAGE_PROPERTY_AMQP_HEADERS, Base64.getEncoder().encodeToString(bytes)); + if (contentHeader.getProperties().getTimestamp() > 0) { + messageBuilder.eventTime(contentHeader.getProperties().getTimestamp()); + } messageBuilder .sendAsync() @@ -582,7 +959,7 @@ public boolean isClosing() { return _closing.get() || getConnection().isClosing(); } - private GatewayConnection getConnection() { + public GatewayConnection getConnection() { return _connection; } @@ -594,6 +971,30 @@ public int getChannelId() { return _channelId; } + /** + * Unsubscribe a consumer from a queue. + * + * @param consumerTag + * @return true if the consumerTag had a mapped queue that could be unregistered. + */ + private boolean unsubscribeConsumer(AMQShortString consumerTag) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unsubscribing consumer '{}' on channel {}", consumerTag, this); + } + + ConsumerTarget target = _tag2SubscriptionTargetMap.remove(consumerTag); + if (target != null) { + target.close(); + return true; + } else { + LOGGER.warn( + "Attempt to unsubscribe consumer with tag '" + + consumerTag + + "' which is not registered."); + } + return false; + } + public void close() { close(0, null); } @@ -622,29 +1023,32 @@ private void messageWithSubject(final LogMessage operationalLogMessage) { logger.info(operationalLogMessage.toString()); } - private void setDefaultQueue(Queue queue) { - // TODO setDefaultQueue + private void setDefaultQueue(Queue queue) { + _defaultQueue = queue; + } + + private Queue getDefaultQueue() { + return _defaultQueue; } private Exchange getExchange(String name) { - return exchanges.get(name); + return _connection.getVhost().getExchange(name); } private void addExchange(Exchange exchange) { - exchanges.put(exchange.getName(), exchange); + _connection.getVhost().addExchange(exchange); } private void deleteExchange(Exchange exchange) { - exchanges.remove(exchange.getName()); + _connection.getVhost().deleteExchange(exchange); + } + + private Queue getQueue(String name) { + return _connection.getVhost().getQueue(name); } - private void addStandardExchange(String directExchangeName, String directExchangeClass) { - addExchange( - new Exchange( - directExchangeName, - Exchange.Type.valueOf(directExchangeClass), - true, - LifetimePolicy.PERMANENT)); + private void addQueue(Queue queue) { + _connection.getVhost().addQueue(queue); } private boolean isReservedExchangeName(String name) { @@ -656,12 +1060,7 @@ private boolean isReservedExchangeName(String name) { private Producer getOrCreateProducerForExchange(String exchangeName, String routingKey) { String vHost = _connection.getNamespace(); - StringBuilder topic = new StringBuilder(isBlank(exchangeName) ? "amq.default" : exchangeName); - if (isNotBlank(routingKey)) { - topic.append(".__").append(routingKey).append("__"); - } - - TopicName topicName = TopicName.get("persistent", NamespaceName.get(vHost), topic.toString()); + TopicName topicName = Exchange.getTopicName(vHost, exchangeName, routingKey); return producers.computeIfAbsent(topicName.toString(), this::createProducer); } diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Binding.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Binding.java new file mode 100644 index 0000000..738423a --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Binding.java @@ -0,0 +1,94 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +public class Binding { + + private final String vHost; + private final Exchange exchange; + private final Queue queue; + private final List routingKeys = new ArrayList<>(); + private final PulsarClient pulsarClient; + + private Consumer pulsarConsumer; + private volatile MessageId lastReceivedmessageId; + private volatile CompletableFuture> message; + + public Binding(String vHost, Exchange exchange, Queue queue, PulsarClient pulsarClient) { + this.vHost = vHost; + this.exchange = exchange; + this.queue = queue; + this.pulsarClient = pulsarClient; + } + + public Exchange getExchange() { + return exchange; + } + + public CompletableFuture receiveMessageAsync() { + CompletableFuture> messageCompletableFuture = pulsarConsumer.receiveAsync(); + message = + messageCompletableFuture.thenApply( + msg -> { + lastReceivedmessageId = msg.getMessageId(); + return msg; + }); + return message.thenApply(it -> this); + } + + public void addKey(String routingKey) throws PulsarClientException { + routingKeys.add(routingKey); + List topics = + routingKeys + .stream() + .map(key -> Exchange.getTopicName(vHost, exchange.getName(), routingKey).toString()) + .collect(Collectors.toList()); + // TODO: make this part async + if (pulsarConsumer != null) { + // TODO: verify that it also removes the subscription + pulsarConsumer.close(); + } + pulsarConsumer = + pulsarClient + .newConsumer() + .topics(topics) + .subscriptionName(exchange.getName() + "-" + queue.getName() + "-" + UUID.randomUUID()) + .subscribe(); + if (lastReceivedmessageId != null) { + pulsarConsumer.seek(lastReceivedmessageId); + } + receiveMessageAsync().thenAccept(queue::deliverMessage); + } + + public CompletableFuture> getReceive() { + return message; + } + + public CompletableFuture ackMessage(Message message) { + return pulsarConsumer.acknowledgeAsync(message); + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ConsumerTarget.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ConsumerTarget.java new file mode 100644 index 0000000..2122eaf --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ConsumerTarget.java @@ -0,0 +1,74 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.api.Message; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; + +public class ConsumerTarget { + + enum State { + OPEN, + CLOSED + } + + private final AMQChannel channel; + private final AMQShortString tag; + private final Queue queue; + private final AtomicReference _state = new AtomicReference<>(State.OPEN); + private CompletableFuture> messageCompletableFuture; + + public ConsumerTarget(AMQChannel channel, AMQShortString tag, Queue queue) { + this.channel = channel; + this.tag = tag; + this.queue = queue; + } + + public void consume() { + if (_state.get() == State.OPEN) { + messageCompletableFuture = queue.receiveAsync(true, 0); + messageCompletableFuture + .thenAccept( + message -> + channel + .getConnection() + .getProtocolOutputConverter() + .writeDeliver( + MessageUtils.getMessagePublishInfo(message), + new ContentBody(ByteBuffer.wrap(message.getData())), + MessageUtils.getContentHeaderBody(message), + message.getRedeliveryCount() > 0, + channel.getChannelId(), + channel.getNextDeliveryTag(), + tag)) + .thenRunAsync(this::consume); + } + } + + public boolean close() { + if (_state.compareAndSet(State.OPEN, State.CLOSED)) { + messageCompletableFuture.cancel(false); + queue.unregisterConsumer(this); + return true; + } else { + return false; + } + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Exchange.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Exchange.java index 0de2608..edbc822 100644 --- a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Exchange.java +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Exchange.java @@ -15,6 +15,14 @@ */ package com.datastax.oss.pulsar.rabbitmqgw; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.qpid.server.model.LifetimePolicy; public class Exchange { @@ -30,6 +38,7 @@ enum Type { private final Type type; private final boolean durable; private final LifetimePolicy lifetimePolicy; + private final Map bindings = new HashMap<>(); public Exchange(String name, Type type, boolean durable, LifetimePolicy lifetimePolicy) { this.name = name; @@ -47,6 +56,28 @@ public String getType() { } public boolean hasBindings() { - return false; + return bindings.size() != 0; + } + + public static TopicName getTopicName(String vHost, String exchangeName, String routingKey) { + StringBuilder topic = new StringBuilder(isBlank(exchangeName) ? "amq.default" : exchangeName); + if (isNotBlank(routingKey)) { + topic.append("$$").append(routingKey); + } + return TopicName.get("persistent", NamespaceName.get(vHost), topic.toString()); + } + + public void bind(Queue queue, String routingKey, GatewayConnection connection) + throws PulsarClientException { + String vHost = connection.getNamespace(); + + if (!bindings.containsKey(queue.getName())) { + bindings.put( + queue.getName(), + new Binding(vHost, this, queue, connection.getGatewayService().getPulsarClient())); + } + Binding binding = bindings.get(queue.getName()); + binding.addKey(routingKey); + queue.addBinding(binding); } } diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnection.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnection.java index 5ec3595..fe55c10 100644 --- a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnection.java +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnection.java @@ -86,6 +86,7 @@ enum ConnectionState { private ChannelHandlerContext ctx; private SocketAddress remoteAddress; private String namespace; + private VirtualHost vhost; // Variables copied from Qpid's AMQPConnection_0_8Impl private ServerDecoder _decoder; @@ -94,6 +95,7 @@ enum ConnectionState { private volatile MethodRegistry _methodRegistry; private volatile ConnectionState _state = ConnectionState.INIT; private final ConcurrentLongHashMap _channelMap = new ConcurrentLongHashMap<>(); + private final ProtocolOutputConverter _protocolOutputConverter; private volatile int _maxFrameSize; private final AtomicBoolean _orderlyClose = new AtomicBoolean(false); private final Map _closingChannelsList = new ConcurrentHashMap<>(); @@ -108,6 +110,7 @@ enum ConnectionState { public GatewayConnection(GatewayService gatewayService) { this.gatewayService = gatewayService; this._networkBufferSize = gatewayService.getConfig().getAmqpNetworkBufferSize(); + this._protocolOutputConverter = new ProtocolOutputConverter(this); } @Override @@ -377,6 +380,7 @@ public void receiveConnectionOpen( // TODO: can vhosts have / in their name ? in that case they could be mapped to tenant+namespace this.namespace = "public/" + (StringUtils.isEmpty(virtualHostStr) ? "default" : virtualHostStr); + this.vhost = this.getGatewayService().getOrCreateVhost(namespace); // TODO: check or create namespace with the admin client ? MethodRegistry methodRegistry = getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); @@ -411,7 +415,6 @@ public void receiveChannelOpen(int channelId) { LOGGER.debug("Connecting to: {}", namespace); final AMQChannel channel = new AMQChannel(this, channelId); - // channel.create(); addChannel(channel); @@ -719,14 +722,26 @@ private void sendConnectionClose(int channelId, AMQFrame frame) { } } + public boolean isCompressionSupported() { + return true; + } + public MethodRegistry getMethodRegistry() { return _methodRegistry; } + public ProtocolOutputConverter getProtocolOutputConverter() { + return _protocolOutputConverter; + } + public String getNamespace() { return namespace; } + public VirtualHost getVhost() { + return vhost; + } + public GatewayService getGatewayService() { return gatewayService; } diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayService.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayService.java index e186cf6..d28ab94 100644 --- a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayService.java +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayService.java @@ -26,6 +26,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -54,6 +55,8 @@ public class GatewayService implements Closeable { private static final int numThreads = Runtime.getRuntime().availableProcessors(); + private final ConcurrentHashMap vhosts = new ConcurrentHashMap<>(); + public GatewayService(GatewayConfiguration config) { checkNotNull(config); this.config = config; @@ -142,5 +145,9 @@ public GatewayConfiguration getConfig() { return config; } + public VirtualHost getOrCreateVhost(String namespace) { + return vhosts.computeIfAbsent(namespace, VirtualHost::new); + } + private static final Logger LOG = LoggerFactory.getLogger(GatewayService.class); } diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/MessageUtils.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/MessageUtils.java new file mode 100644 index 0000000..e38319a --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/MessageUtils.java @@ -0,0 +1,66 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.util.Base64; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class MessageUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class); + + public static final String MESSAGE_PROPERTY_AMQP_HEADERS = "amqp-headers"; + public static final String MESSAGE_PROPERTY_AMQP_IMMEDIATE = "amqp-immediate"; + public static final String MESSAGE_PROPERTY_AMQP_MANDATORY = "amqp-mandatory"; + + public static MessagePublishInfo getMessagePublishInfo(Message message) { + String localName = TopicName.get(message.getTopicName()).getLocalName(); + String[] split = localName.split("\\$\\$", 2); + String routingKey = ""; + String exchange = split[0]; + if (split.length > 1) { + routingKey = split[1]; + } + return new MessagePublishInfo( + AMQShortString.createAMQShortString(exchange), + Boolean.parseBoolean(message.getProperty(MessageUtils.MESSAGE_PROPERTY_AMQP_IMMEDIATE)), + Boolean.parseBoolean(message.getProperty(MessageUtils.MESSAGE_PROPERTY_AMQP_MANDATORY)), + AMQShortString.createAMQShortString(routingKey)); + } + + public static ContentHeaderBody getContentHeaderBody(Message message) { + try { + String headersProperty = message.getProperty(MessageUtils.MESSAGE_PROPERTY_AMQP_HEADERS); + if (headersProperty != null) { + byte[] headers = Base64.getDecoder().decode(headersProperty); + QpidByteBuffer buf = QpidByteBuffer.wrap(headers); + return ContentHeaderBody.createFromBuffer(buf, headers.length); + } + } catch (AMQFrameDecodingException | IllegalArgumentException e) { + LOGGER.error("Couldn't decode AMQP headers", e); + } + return new ContentHeaderBody(new BasicContentHeaderProperties(), message.getData().length); + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Pre0_10CreditManager.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Pre0_10CreditManager.java new file mode 100644 index 0000000..98bf627 --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Pre0_10CreditManager.java @@ -0,0 +1,121 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.util.concurrent.atomic.LongAdder; +import org.apache.qpid.server.protocol.v0_8.FlowCreditManager_0_8; + +/** + * Copy of package-private org.apache.qpid.server.protocol.v0_8.Pre0_10CreditManager with fixes for + * concurrency + */ +public class Pre0_10CreditManager implements FlowCreditManager_0_8 { + + private final long _highPrefetchLimit; + private final long _batchLimit; + private volatile long _bytesCreditLimit; + private volatile long _messageCreditLimit; + + private final LongAdder _bytesCredit = new LongAdder();; + private final LongAdder _messageCredit = new LongAdder(); + + Pre0_10CreditManager( + long bytesCreditLimit, long messageCreditLimit, long highPrefetchLimit, long batchLimit) { + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + _bytesCredit.add(bytesCreditLimit); + _messageCredit.add(messageCreditLimit); + _highPrefetchLimit = highPrefetchLimit; + _batchLimit = batchLimit; + } + + void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) { + long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; + long messageCreditChange = messageCreditLimit - _messageCreditLimit; + + if (bytesCreditChange != 0L) { + _bytesCredit.add(bytesCreditChange); + } + + if (messageCreditChange != 0L) { + _messageCredit.add(messageCreditChange); + } + + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + } + + @Override + public void restoreCredit(final long messageCredit, final long bytesCredit) { + _messageCredit.add(messageCredit); + long messageCreditSum = _messageCredit.longValue(); + if (messageCreditSum > _messageCreditLimit) { + throw new IllegalStateException( + String.format( + "Consumer credit accounting error. Restored more credit than we ever had: messageCredit=%d messageCreditLimit=%d", + messageCreditSum, _messageCreditLimit)); + } + + _bytesCredit.add(bytesCredit); + long _bytesCreditSum = _bytesCredit.longValue(); + if (_bytesCreditSum > _bytesCreditLimit) { + throw new IllegalStateException( + String.format( + "Consumer credit accounting error. Restored more credit than we ever had: bytesCredit=%d bytesCreditLimit=%d", + _bytesCreditSum, _bytesCreditLimit)); + } + } + + @Override + public boolean hasCredit() { + return (_bytesCreditLimit == 0L || _bytesCredit.longValue() > 0) + && (_messageCreditLimit == 0L || _messageCredit.longValue() > 0); + } + + @Override + public boolean useCreditForMessage(final long msgSize) { + if (_messageCreditLimit != 0) { + if (_messageCredit.longValue() <= 0) { + return false; + } + } + if (_bytesCreditLimit != 0) { + long _bytesCreditSum = _bytesCredit.longValue(); + if ((_bytesCreditSum < msgSize) && (_bytesCreditSum != _bytesCreditLimit)) { + return false; + } + } + + _messageCredit.decrement(); + _bytesCredit.add(-msgSize); + return true; + } + + @Override + public boolean isNotBytesLimitedAndHighPrefetch() { + return _bytesCreditLimit == 0L && _messageCreditLimit > _highPrefetchLimit; + } + + @Override + public boolean isBytesLimited() { + return _bytesCredit.longValue() != 0; + } + + @Override + public boolean isCreditOverBatchLimit() { + return _messageCredit.longValue() > _batchLimit; + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ProtocolOutputConverter.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ProtocolOutputConverter.java new file mode 100644 index 0000000..d1bfb7d --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/ProtocolOutputConverter.java @@ -0,0 +1,510 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import org.apache.qpid.server.QpidException; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; +import org.apache.qpid.server.protocol.v0_8.transport.AMQMethodBody; +import org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.apache.qpid.server.transport.ByteBufferSender; +import org.apache.qpid.server.util.GZIPUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Adapted from {@link org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl} */ +public class ProtocolOutputConverter { + private static final int BASIC_CLASS_ID = 60; + private static final int MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + private final GatewayConnection _connection; + private static final AMQShortString GZIP_ENCODING = + AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); + + private static final Logger LOGGER = + LoggerFactory.getLogger(org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter.class); + + public ProtocolOutputConverter(GatewayConnection connection) { + _connection = connection; + } + + public long writeDeliver( + final MessagePublishInfo info, + final ContentBody contentBody, + final ContentHeaderBody contentHeaderBody, + final boolean isRedelivered, + int channelId, + long deliveryTag, + AMQShortString consumerTag) { + AMQBody deliverBody = createEncodedDeliverBody(info, isRedelivered, deliveryTag, consumerTag); + return writeMessageDelivery(contentBody, contentHeaderBody, channelId, deliverBody); + } + + interface DisposableMessageContentSource extends MessageContentSource { + void dispose(); + } + + private long writeMessageDelivery( + ContentBody body, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) { + + int bodySize = (int) contentHeaderBody.getBodySize(); + boolean msgCompressed = isCompressed(contentHeaderBody); + DisposableMessageContentSource modifiedContent = null; + + boolean compressionSupported = _connection.isCompressionSupported(); + + long length; + if (msgCompressed + && !compressionSupported + && (modifiedContent = inflateIfPossible(body)) != null) { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding((String) null); + + length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps); + } else if (!msgCompressed + && compressionSupported + && contentHeaderBody.getProperties().getEncoding() == null + && bodySize > MESSAGE_COMPRESSION_THRESHOLD_SIZE + && (modifiedContent = deflateIfPossible(body)) != null) { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding(GZIP_ENCODING); + + length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps); + } else { + writeMessageDeliveryUnchanged( + new ModifiedContentSource(body.getPayload()), + channelId, + deliverBody, + contentHeaderBody, + bodySize); + + length = bodySize; + } + + if (modifiedContent != null) { + modifiedContent.dispose(); + } + + return length; + } + + private DisposableMessageContentSource deflateIfPossible(ContentBody source) { + try (QpidByteBuffer contentBuffers = source.getPayload()) { + return new ModifiedContentSource(QpidByteBuffer.deflate(contentBuffers)); + } catch (IOException e) { + LOGGER.warn( + "Unable to compress message payload for consumer with gzip, message will be sent as is", + e); + return null; + } + } + + private DisposableMessageContentSource inflateIfPossible(ContentBody source) { + try (QpidByteBuffer contentBuffers = source.getPayload()) { + return new ModifiedContentSource(QpidByteBuffer.inflate(contentBuffers)); + } catch (IOException e) { + LOGGER.warn( + "Unable to decompress message payload for consumer with gzip, message will be sent as is", + e); + return null; + } + } + + private int writeMessageDeliveryModified( + final MessageContentSource content, + final int channelId, + final AMQBody deliverBody, + final BasicContentHeaderProperties modifiedProps) { + final int bodySize = (int) content.getSize(); + ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize); + writeMessageDeliveryUnchanged(content, channelId, deliverBody, modifiedHeaderBody, bodySize); + return bodySize; + } + + private void writeMessageDeliveryUnchanged( + final MessageContentSource content, + int channelId, + AMQBody deliverBody, + ContentHeaderBody contentHeaderBody, + int bodySize) { + if (bodySize == 0) { + SmallCompositeAMQBodyBlock compositeBlock = + new SmallCompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody); + + writeFrame(compositeBlock); + } else { + int maxFrameBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead(); + try (QpidByteBuffer contentByteBuffer = content.getContent()) { + int contentChunkSize = bodySize > maxFrameBodySize ? maxFrameBodySize : bodySize; + QpidByteBuffer body = contentByteBuffer.view(0, contentChunkSize); + writeFrame( + new CompositeAMQBodyBlock( + channelId, deliverBody, contentHeaderBody, new MessageContentSourceBody(body))); + + int writtenSize = contentChunkSize; + while (writtenSize < bodySize) { + contentChunkSize = + (bodySize - writtenSize) > maxFrameBodySize + ? maxFrameBodySize + : bodySize - writtenSize; + QpidByteBuffer chunk = contentByteBuffer.view(writtenSize, contentChunkSize); + writtenSize += contentChunkSize; + writeFrame(new AMQFrame(channelId, new MessageContentSourceBody(chunk))); + } + } + } + } + + private boolean isCompressed(final ContentHeaderBody contentHeaderBody) { + return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding()); + } + + private static class MessageContentSourceBody implements AMQBody { + public static final byte TYPE = 3; + private final int _length; + private final QpidByteBuffer _content; + + private MessageContentSourceBody(QpidByteBuffer content) { + _content = content; + _length = content.remaining(); + } + + @Override + public byte getFrameType() { + return TYPE; + } + + @Override + public int getSize() { + return _length; + } + + @Override + public long writePayload(final ByteBufferSender sender) { + try { + sender.send(_content); + } finally { + _content.dispose(); + } + return _length; + } + + @Override + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) + throws QpidException { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "[" + getClass().getSimpleName() + ", length: " + _length + "]"; + } + } + + public long writeGetOk( + final MessagePublishInfo info, + final ContentBody contentBody, + final ContentHeaderBody contentHeaderBody, + final boolean isRedelivered, + int channelId, + long deliveryTag, + int queueSize) { + AMQBody deliver = createEncodedGetOkBody(info, isRedelivered, deliveryTag, queueSize); + return writeMessageDelivery(contentBody, contentHeaderBody, channelId, deliver); + } + + private AMQBody createEncodedDeliverBody( + MessagePublishInfo info, + boolean isRedelivered, + final long deliveryTag, + final AMQShortString consumerTag) { + + final AMQShortString exchangeName; + final AMQShortString routingKey; + + exchangeName = info.getExchange(); + routingKey = info.getRoutingKey(); + + return new EncodedDeliveryBody( + deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered); + } + + public class EncodedDeliveryBody implements AMQBody { + private final long _deliveryTag; + private final AMQShortString _routingKey; + private final AMQShortString _exchangeName; + private final AMQShortString _consumerTag; + private final boolean _isRedelivered; + private AMQBody _underlyingBody; + + private EncodedDeliveryBody( + long deliveryTag, + AMQShortString routingKey, + AMQShortString exchangeName, + AMQShortString consumerTag, + boolean isRedelivered) { + _deliveryTag = deliveryTag; + _routingKey = routingKey; + _exchangeName = exchangeName; + _consumerTag = consumerTag; + _isRedelivered = isRedelivered; + } + + public AMQBody createAMQBody() { + return _connection + .getMethodRegistry() + .createBasicDeliverBody( + _consumerTag, _deliveryTag, _isRedelivered, _exchangeName, _routingKey); + } + + @Override + public byte getFrameType() { + return AMQMethodBody.TYPE; + } + + @Override + public int getSize() { + if (_underlyingBody == null) { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + @Override + public long writePayload(ByteBufferSender sender) { + if (_underlyingBody == null) { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.writePayload(sender); + } + + @Override + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) + throws QpidException { + throw new QpidException("This block should never be dispatched!"); + } + + @Override + public String toString() { + return "[" + + getClass().getSimpleName() + + " underlyingBody: " + + String.valueOf(_underlyingBody) + + "]"; + } + } + + private AMQBody createEncodedGetOkBody( + MessagePublishInfo info, boolean isRedelivered, long deliveryTag, int queueSize) { + final AMQShortString exchangeName; + final AMQShortString routingKey; + + exchangeName = info.getExchange(); + routingKey = info.getRoutingKey(); + + return _connection + .getMethodRegistry() + .createBasicGetOkBody(deliveryTag, isRedelivered, exchangeName, routingKey, queueSize); + } + + private AMQBody createEncodedReturnFrame( + MessagePublishInfo messagePublishInfo, int replyCode, AMQShortString replyText) { + + return _connection + .getMethodRegistry() + .createBasicReturnBody( + replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); + } + + /*public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) + { + + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + + writeMessageDelivery(message, header, channelId, returnFrame); + }*/ + + public void writeFrame(AMQDataBlock block) { + _connection.writeFrame(block); + } + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { + + BasicCancelOkBody basicCancelOkBody = + _connection.getMethodRegistry().createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + } + + public static final class CompositeAMQBodyBlock extends AMQDataBlock { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + public CompositeAMQBodyBlock( + int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + } + + @Override + public long getSize() { + return OVERHEAD + + (long) _methodBody.getSize() + + (long) _headerBody.getSize() + + (long) _contentBody.getSize(); + } + + @Override + public long writePayload(final ByteBufferSender sender) { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + + size += (new AMQFrame(_channel, _contentBody)).writePayload(sender); + + return size; + } + + @VisibleForTesting + public AMQBody getMethodBody() { + return _methodBody; + } + + @VisibleForTesting + public AMQBody getHeaderBody() { + return _headerBody; + } + + @VisibleForTesting + public AMQBody getContentBody() { + return _contentBody; + } + + @VisibleForTesting + public int getChannel() { + return _channel; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder + .append("[") + .append(getClass().getSimpleName()) + .append(" methodBody=") + .append(_methodBody) + .append(", headerBody=") + .append(_headerBody) + .append(", contentBody=") + .append(_contentBody) + .append(", channel=") + .append(_channel) + .append("]"); + return builder.toString(); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + } + + @Override + public long getSize() { + return OVERHEAD + (long) _methodBody.getSize() + (long) _headerBody.getSize(); + } + + @Override + public long writePayload(final ByteBufferSender sender) { + long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender); + size += (new AMQFrame(_channel, _headerBody)).writePayload(sender); + return size; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder + .append(getClass().getSimpleName()) + .append("methodBody=") + .append(_methodBody) + .append(", headerBody=") + .append(_headerBody) + .append(", channel=") + .append(_channel) + .append("]"); + return builder.toString(); + } + } + + private static class ModifiedContentSource implements DisposableMessageContentSource { + private final QpidByteBuffer _buffer; + private final int _size; + + public ModifiedContentSource(final QpidByteBuffer buffer) { + _buffer = buffer; + _size = _buffer.remaining(); + } + + @Override + public void dispose() { + _buffer.dispose(); + } + + @Override + public QpidByteBuffer getContent() { + return getContent(0, (int) getSize()); + } + + @Override + public QpidByteBuffer getContent(final int offset, int length) { + return _buffer.view(offset, length); + } + + @Override + public long getSize() { + return _size; + } + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Queue.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Queue.java new file mode 100644 index 0000000..749b824 --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/Queue.java @@ -0,0 +1,181 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.pulsar.client.api.Message; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; + +public class Queue { + + private final String name; + private final LifetimePolicy lifetimePolicy; + private final ExclusivityPolicy exclusivityPolicy; + + private final Map bindings = new ConcurrentHashMap<>(); + private final Set exchangesToPoll = new HashSet<>(); + private final java.util.Queue messageRequests = new ConcurrentLinkedQueue<>(); + private final java.util.Queue pendingBindings = new ConcurrentLinkedQueue<>(); + + private volatile ConsumerTarget _exclusiveSubscriber; + private List consumers = new ArrayList<>(); + + public Queue(String name, LifetimePolicy lifetimePolicy, ExclusivityPolicy exclusivityPolicy) { + this.name = name; + this.lifetimePolicy = lifetimePolicy; + this.exclusivityPolicy = exclusivityPolicy; + } + + public String getName() { + return name; + } + + public void addBinding(Binding binding) { + bindings.put(binding.getExchange().getName(), binding); + exchangesToPoll.add(binding.getExchange().getName()); + } + + public int getQueueDepthMessages() { + // TODO: implement message count in queue ? + return 0; + } + + public int getConsumerCount() { + return consumers.size(); + } + + public boolean isExclusive() { + return exclusivityPolicy != ExclusivityPolicy.NONE; + } + + public LifetimePolicy getLifetimePolicy() { + return lifetimePolicy; + } + + public CompletableFuture> receiveAsync(boolean autoAck, int priority) { + // TODO: support consumer priority + MessageRequest messageRequest = new MessageRequest(autoAck); + messageRequests.add(messageRequest); + deliverMessageIfAvailable(); + return messageRequest.getMessage(); + } + + public Message receive(boolean autoAck) { + Binding binding = getReadyBinding(); + if (binding != null) { + Message message = null; + try { + message = binding.getReceive().get(); + } catch (Exception e) { + // TODO: should not happen. Close connection ? + } + if (autoAck) { + binding.ackMessage(message); + } + binding.receiveMessageAsync().thenAcceptAsync(this::deliverMessage); + return message; + } + return null; + } + + public Binding getReadyBinding() { + return pendingBindings.poll(); + } + + public void deliverMessageIfAvailable() { + Binding binding = getReadyBinding(); + if (binding != null) { + MessageRequest request = messageRequests.poll(); + if (request != null) { + binding + .getReceive() + .thenAccept( + message -> { + request.getMessage().complete(message); + if (request.isAutoAck()) { + binding.ackMessage(message); + } + binding.receiveMessageAsync().thenAcceptAsync(this::deliverMessage); + }); + } + } + } + + public void deliverMessage(Binding binding) { + MessageRequest request; + do { + request = messageRequests.poll(); + } while (request != null && request.getMessage().isDone()); + + if (request != null) { + final MessageRequest req = request; + binding + .getReceive() + .thenAccept( + message -> { + req.getMessage().complete(message); + if (req.isAutoAck()) { + binding.ackMessage(message); + } + binding.receiveMessageAsync().thenAcceptAsync(this::deliverMessage); + }); + } else { + pendingBindings.add(binding); + } + } + + public static class MessageRequest { + private final CompletableFuture> message = new CompletableFuture<>(); + private final boolean autoAck; + + public MessageRequest(boolean autoAck) { + this.autoAck = autoAck; + } + + public CompletableFuture> getMessage() { + return message; + } + + public boolean isAutoAck() { + return autoAck; + } + } + + public void addConsumer(ConsumerTarget consumer, boolean exclusive) { + if (exclusive) { + _exclusiveSubscriber = consumer; + } + consumers.add(consumer); + consumer.consume(); + } + + public void unregisterConsumer(ConsumerTarget consumer) { + consumers.remove(consumer); + _exclusiveSubscriber = null; + } + + public boolean hasExclusiveConsumer() { + return _exclusiveSubscriber != null; + } +} diff --git a/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/VirtualHost.java b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/VirtualHost.java new file mode 100644 index 0000000..435b391 --- /dev/null +++ b/pulsar-rabbitmq-gw/src/main/java/com/datastax/oss/pulsar/rabbitmqgw/VirtualHost.java @@ -0,0 +1,79 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.rabbitmqgw; + +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.server.exchange.ExchangeDefaults; +import org.apache.qpid.server.model.LifetimePolicy; + +public class VirtualHost { + private final String namespace; + + private final Map exchanges = new HashMap<>(); + private final Map queues = new HashMap<>(); + + public VirtualHost(String namespace) { + this.namespace = namespace; + addStandardExchange( + ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + addStandardExchange( + ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + addStandardExchange( + ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + addStandardExchange( + ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); + addStandardExchange( + ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + } + + public String getNamespace() { + return namespace; + } + + public boolean hasExchange(String name) { + return exchanges.containsKey(name); + } + + public Exchange getExchange(String name) { + return exchanges.get(name); + } + + public void addExchange(Exchange exchange) { + exchanges.put(exchange.getName(), exchange); + } + + public void deleteExchange(Exchange exchange) { + exchanges.remove(exchange.getName()); + } + + public Queue getQueue(String name) { + return queues.get(name); + } + + public void addQueue(Queue queue) { + queues.put(queue.getName(), queue); + } + + private void addStandardExchange(String directExchangeName, String directExchangeClass) { + addExchange( + new Exchange( + directExchangeName, + Exchange.Type.valueOf(directExchangeClass), + true, + LifetimePolicy.PERMANENT)); + } +} diff --git a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannelTest.java b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannelTest.java index 02a5e25..36fa743 100644 --- a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannelTest.java +++ b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AMQChannelTest.java @@ -15,38 +15,48 @@ */ package com.datastax.oss.pulsar.rabbitmqgw; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; import java.util.concurrent.CompletableFuture; import org.apache.commons.codec.binary.Base64; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.ProducerBase; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.exchange.ExchangeDefaults; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.FieldTableFactory; +import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicGetBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicGetOkBody; import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody; import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody; import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody; @@ -58,13 +68,17 @@ import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody; import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteBody; import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; public class AMQChannelTest extends AbstractBaseTest { public static final String TEST_EXCHANGE = "test-exchange"; + public static final String TEST_QUEUE = "test-queue"; public static final byte[] TEST_MESSAGE = "test-message".getBytes(StandardCharsets.UTF_8); + public static final String TEST_CONSUMER_TAG = "test-consumer-tag"; @Test void testReceiveChannelClose() { @@ -175,7 +189,7 @@ void testReceiveExchangeDeclareAlreadyExists() { } @Test - void testReceiveExchangeDeclareAlreadyExistsInvalidType() { + void testReceiveExchangeDeclareAlreadyExistsDifferentType() { openChannel(); sendExchangeDeclare(TEST_EXCHANGE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false); @@ -234,6 +248,114 @@ void testReceiveExchangeDeleteReservedExchange() { assertIsChannelCloseFrame(frame, ErrorCodes.NOT_ALLOWED); } + @Test + void testReceiveQueueDeclare() { + openChannel(); + + AMQFrame frame = sendQueueDeclare(); + + assertIsQueueDeclareOk(frame); + } + + @Test + void testReceiveQueueDeclareEmptyName() { + openChannel(); + + AMQFrame frame = sendQueueDeclare("", false); + + assertEquals(CHANNEL_ID, frame.getChannel()); + assertTrue(frame.getBodyFrame() instanceof QueueDeclareOkBody); + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) frame.getBodyFrame(); + assertTrue(queueDeclareOkBody.getQueue().toString().startsWith("tmp_")); + } + + @Test + void testReceiveQueueDeclarePassive() { + openChannel(); + sendQueueDeclare(); + + AMQFrame frame = sendQueueDeclare(TEST_QUEUE, true); + + assertIsQueueDeclareOk(frame); + } + + @Test + void testReceiveQueueDeclarePassiveNotFound() { + openChannel(); + + AMQFrame frame = sendQueueDeclare(TEST_QUEUE, true); + + assertIsChannelCloseFrame(frame, ErrorCodes.NOT_FOUND); + } + + @Test + void testReceiveQueueDeclareInvalidExclusivityAttribute() { + openChannel(); + + AMQFrame frame = sendQueueDeclare(TEST_QUEUE, false, null, "invalid"); + + assertIsConnectionCloseFrame(frame, ErrorCodes.INVALID_ARGUMENT); + } + + @Test + void testReceiveQueueDeclareInvalidLifetimePolicyAttribute() { + openChannel(); + + AMQFrame frame = sendQueueDeclare(TEST_QUEUE, false, "invalid", null); + + assertIsConnectionCloseFrame(frame, ErrorCodes.INVALID_ARGUMENT); + } + + @Test + void testReceiveQueueDeclareAlreadyExists() { + openChannel(); + sendQueueDeclare(); + + AMQFrame frame = sendQueueDeclare(); + + assertIsQueueDeclareOk(frame); + } + + @Test + void testReceiveQueueDeclareAlreadyExistsDifferentExclusivity() { + openChannel(); + sendQueueDeclare(); + + QueueDeclareBody queueDeclareBody = + new QueueDeclareBody( + 0, + AMQShortString.createAMQShortString(TEST_QUEUE), + false, + true, + true, + false, + false, + FieldTable.convertToFieldTable(Collections.emptyMap())); + AMQFrame frame = exchangeData(queueDeclareBody.generateFrame(CHANNEL_ID)); + + assertIsChannelCloseFrame(frame, ErrorCodes.ALREADY_EXISTS); + } + + @Test + void testReceiveQueueDeclareAlreadyExistsDifferentLifetime() { + openChannel(); + sendQueueDeclare(); + + QueueDeclareBody queueDeclareBody = + new QueueDeclareBody( + 0, + AMQShortString.createAMQShortString(TEST_QUEUE), + false, + true, + false, + true, + false, + FieldTable.convertToFieldTable(Collections.emptyMap())); + AMQFrame frame = exchangeData(queueDeclareBody.generateFrame(CHANNEL_ID)); + + assertIsChannelCloseFrame(frame, ErrorCodes.ALREADY_EXISTS); + } + @Test void testReceiveBasicPublishExchangeNotFound() { openChannel(); @@ -295,7 +417,9 @@ void testReceiveMessageContentTooLarge() { } @Test - void testReceiveMessagePulsarClientError() { + void testReceiveMessagePulsarClientError() throws Exception { + when(gatewayService.getPulsarClient()).thenThrow(PulsarClientException.class); + openChannel(); sendBasicPublish(); sendMessageHeader(TEST_MESSAGE.length); @@ -307,15 +431,8 @@ void testReceiveMessagePulsarClientError() { @Test void testReceiveMessageSuccess() throws Exception { - PulsarClient pulsarClient = mock(PulsarClient.class); - ProducerBuilder producerBuilder = mock(ProducerBuilder.class); - ProducerBase producer = mock(ProducerBase.class); TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class); - when(pulsarClient.newProducer()).thenReturn(producerBuilder); - when(producerBuilder.topic(anyString())).thenReturn(producerBuilder); - when(producerBuilder.create()).thenReturn(producer); - doReturn(pulsarClient).when(gatewayService).getPulsarClient(); when(producer.newMessage()).thenReturn(messageBuilder); when(messageBuilder.sendAsync()) .thenReturn(CompletableFuture.completedFuture(new MessageIdImpl(1, 2, 3))); @@ -332,7 +449,8 @@ void testReceiveMessageSuccess() throws Exception { assertNull(frame); ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); - verify(messageBuilder).property(eq("amqp-headers"), captor.capture()); + verify(messageBuilder) + .property(eq(MessageUtils.MESSAGE_PROPERTY_AMQP_HEADERS), captor.capture()); byte[] bytes = Base64.decodeBase64(captor.getValue()); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(QpidByteBuffer.wrap(bytes), bytes.length); @@ -344,16 +462,9 @@ void testReceiveMessageSuccess() throws Exception { } @Test - void testReceiveMessageConfirm() throws Exception { - PulsarClient pulsarClient = mock(PulsarClient.class); - ProducerBuilder producerBuilder = mock(ProducerBuilder.class); - Producer producer = mock(Producer.class); + void testReceiveMessageConfirm() { TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class); - when(pulsarClient.newProducer()).thenReturn(producerBuilder); - when(producerBuilder.topic(anyString())).thenReturn(producerBuilder); - when(producerBuilder.create()).thenReturn(producer); - doReturn(pulsarClient).when(gatewayService).getPulsarClient(); when(producer.newMessage()).thenReturn(messageBuilder); when(messageBuilder.sendAsync()) .thenReturn(CompletableFuture.completedFuture(new MessageIdImpl(1, 2, 3))); @@ -382,11 +493,353 @@ void testReceiveMessageConfirm() throws Exception { assertEquals(2, basicAckBody.getDeliveryTag()); } + @Test + void testReceiveBasicGet() { + openChannel(); + MessageImpl message = mock(MessageImpl.class); + when(message.getData()).thenReturn(TEST_MESSAGE); + when(message.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message.getRedeliveryCount()).thenReturn(2); + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setContentType("application/json"); + ContentHeaderBody contentHeader = new ContentHeaderBody(props, TEST_MESSAGE.length); + byte[] bytes = new byte[contentHeader.getSize()]; + QpidByteBuffer buf = QpidByteBuffer.wrap(bytes); + contentHeader.writePayload(buf); + String headers = java.util.Base64.getEncoder().encodeToString(bytes); + when(message.getProperty(MessageUtils.MESSAGE_PROPERTY_AMQP_HEADERS)).thenReturn(headers); + + MessageImpl message2 = mock(MessageImpl.class); + when(message2.getData()).thenReturn(TEST_MESSAGE); + when(message2.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message2.getRedeliveryCount()).thenReturn(0); + + when(consumer.receiveAsync()) + .thenReturn( + CompletableFuture.completedFuture(message), + CompletableFuture.completedFuture(message2), + new CompletableFuture<>()); + + sendQueueDeclare(); + + ProtocolOutputConverter.CompositeAMQBodyBlock compositeAMQBodyBlock = sendBasicGet(); + + assertNotNull(compositeAMQBodyBlock); + assertTrue(compositeAMQBodyBlock.getMethodBody() instanceof BasicGetOkBody); + BasicGetOkBody basicGetOkBody = (BasicGetOkBody) compositeAMQBodyBlock.getMethodBody(); + assertEquals(TEST_EXCHANGE, basicGetOkBody.getExchange().toString()); + assertEquals(TEST_QUEUE, basicGetOkBody.getRoutingKey().toString()); + assertEquals(1, basicGetOkBody.getDeliveryTag()); + assertTrue(basicGetOkBody.getRedelivered()); + + assertTrue(compositeAMQBodyBlock.getHeaderBody() instanceof ContentHeaderBody); + ContentHeaderBody contentHeaderBody = (ContentHeaderBody) compositeAMQBodyBlock.getHeaderBody(); + assertEquals(TEST_MESSAGE.length, contentHeaderBody.getBodySize()); + assertEquals("application/json", contentHeaderBody.getProperties().getContentTypeAsString()); + + AMQBody contentBody = compositeAMQBodyBlock.getContentBody(); + ByteBuf byteBuf = Unpooled.buffer(contentBody.getSize()); + contentBody.writePayload(new NettyByteBufferSender(byteBuf)); + assertArrayEquals(TEST_MESSAGE, byteBuf.array()); + + compositeAMQBodyBlock = sendBasicGet(); + assertNotNull(compositeAMQBodyBlock); + assertTrue(compositeAMQBodyBlock.getMethodBody() instanceof BasicGetOkBody); + basicGetOkBody = (BasicGetOkBody) compositeAMQBodyBlock.getMethodBody(); + assertEquals(2, basicGetOkBody.getDeliveryTag()); + assertFalse(basicGetOkBody.getRedelivered()); + } + + @Test + void testReceiveBasicGetEmpty() { + openChannel(); + sendQueueDeclare(); + + AMQFrame frame = sendBasicGet(); + + assertNotNull(frame); + assertEquals(CHANNEL_ID, frame.getChannel()); + assertTrue(frame.getBodyFrame() instanceof BasicGetEmptyBody); + } + + @Test + void testReceiveBasicGetQueueNotFound() { + openChannel(); + + AMQFrame frame = sendBasicGet(); + + assertIsConnectionCloseFrame(frame, ErrorCodes.NOT_FOUND); + } + + @Test + void testReceiveBasicGetQueueNameMissing() { + openChannel(); + + AMQFrame frame = sendBasicGet(""); + + assertIsConnectionCloseFrame(frame, ErrorCodes.NOT_ALLOWED); + } + + @Test + void testReceiveBasicGetDefaultQueue() { + openChannel(); + MessageImpl message = mock(MessageImpl.class); + when(message.getData()).thenReturn(TEST_MESSAGE); + when(message.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message.getRedeliveryCount()).thenReturn(0); + when(consumer.receiveAsync()).thenReturn(CompletableFuture.completedFuture(message)); + sendQueueDeclare(); + + ProtocolOutputConverter.CompositeAMQBodyBlock compositeAMQBodyBlock = sendBasicGet(""); + + assertNotNull(compositeAMQBodyBlock); + } + + @Test + void testReceiveBasicGetExistingExclusiveConsumer() { + openChannel(); + sendQueueDeclare(); + sendBasicConsume(null, TEST_QUEUE, true); + + AMQFrame frame = sendBasicGet(TEST_QUEUE); + + assertIsConnectionCloseFrame(frame, ErrorCodes.ACCESS_REFUSED); + } + + @Test + void testReceiveBasicConsume() { + openChannel(); + sendQueueDeclare(); + + AMQFrame frame = sendBasicConsume(TEST_CONSUMER_TAG, TEST_QUEUE, false); + + assertNotNull(frame); + assertTrue(frame.getBodyFrame() instanceof BasicConsumeOkBody); + assertEquals( + TEST_CONSUMER_TAG, ((BasicConsumeOkBody) frame.getBodyFrame()).getConsumerTag().toString()); + + frame = sendBasicConsume("test-consumer-tag2", TEST_QUEUE, false); + + assertNotNull(frame); + assertTrue(frame.getBodyFrame() instanceof BasicConsumeOkBody); + assertEquals( + "test-consumer-tag2", + ((BasicConsumeOkBody) frame.getBodyFrame()).getConsumerTag().toString()); + } + + @Test + void testReceiveBasicConsumeEmptyConsumerTag() { + openChannel(); + sendQueueDeclare(); + + AMQFrame frame = sendBasicConsume(null, TEST_QUEUE, false); + + assertNotNull(frame); + assertTrue(frame.getBodyFrame() instanceof BasicConsumeOkBody); + assertTrue( + ((BasicConsumeOkBody) frame.getBodyFrame()).getConsumerTag().toString().startsWith("sgen")); + } + + @Test + void testReceiveBasicConsumeQueueNotFound() { + openChannel(); + + AMQFrame frame = sendBasicConsume(null, TEST_QUEUE, false); + + assertIsChannelCloseFrame(frame, ErrorCodes.NOT_FOUND); + } + + @Test + void testReceiveBasicConsumeQueueNameMissing() { + openChannel(); + + AMQFrame frame = sendBasicConsume(null, "", false); + + assertIsConnectionCloseFrame(frame, ErrorCodes.NOT_ALLOWED); + } + + @Test + void testReceiveBasicConsumeExistingExclusiveConsumer() { + openChannel(); + sendQueueDeclare(); + sendBasicConsume(null, TEST_QUEUE, true); + + AMQFrame frame = sendBasicConsume(null, TEST_QUEUE, false); + + assertIsConnectionCloseFrame(frame, ErrorCodes.ACCESS_REFUSED); + } + + @Test + void testReceiveBasicConsumeExistingConsumerTag() { + openChannel(); + sendQueueDeclare(); + sendBasicConsume(TEST_CONSUMER_TAG, TEST_QUEUE, true); + + AMQFrame frame = sendBasicConsume(TEST_CONSUMER_TAG, TEST_QUEUE, false); + + assertIsConnectionCloseFrame(frame, ErrorCodes.NOT_ALLOWED); + } + + @Test + void testReceiveBasicConsumeExclusiveExistingConsumer() { + openChannel(); + sendQueueDeclare(); + sendBasicConsume(null, TEST_QUEUE, false); + + AMQFrame frame = sendBasicConsume(null, TEST_QUEUE, true); + + assertIsConnectionCloseFrame(frame, ErrorCodes.ACCESS_REFUSED); + } + + @Test + void testConsumerDeliver() throws Exception { + MessageImpl message = mock(MessageImpl.class); + when(message.getData()).thenReturn(TEST_MESSAGE); + when(message.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message.getRedeliveryCount()).thenReturn(2); + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setContentType("application/json"); + ContentHeaderBody contentHeader = new ContentHeaderBody(props, TEST_MESSAGE.length); + byte[] bytes = new byte[contentHeader.getSize()]; + QpidByteBuffer buf = QpidByteBuffer.wrap(bytes); + contentHeader.writePayload(buf); + String headers = java.util.Base64.getEncoder().encodeToString(bytes); + when(message.getProperty(MessageUtils.MESSAGE_PROPERTY_AMQP_HEADERS)).thenReturn(headers); + + MessageImpl message2 = mock(MessageImpl.class); + when(message2.getData()).thenReturn(TEST_MESSAGE); + when(message2.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message2.getRedeliveryCount()).thenReturn(0); + + when(consumer.receiveAsync()) + .thenReturn( + CompletableFuture.completedFuture(message), + CompletableFuture.completedFuture(message2), + new CompletableFuture<>()); + + openChannel(); + sendQueueDeclare(); + sendBasicConsume(null, TEST_QUEUE, false); + + ProtocolOutputConverter.CompositeAMQBodyBlock compositeAMQBodyBlock = null; + + for (int i = 0; i < 100 && compositeAMQBodyBlock == null; i++) { + compositeAMQBodyBlock = channel.readOutbound(); + Thread.sleep(10); + } + + assertNotNull(compositeAMQBodyBlock); + assertTrue( + compositeAMQBodyBlock.getMethodBody() + instanceof ProtocolOutputConverter.EncodedDeliveryBody); + BasicDeliverBody basicDeliverBody = + (BasicDeliverBody) + ((ProtocolOutputConverter.EncodedDeliveryBody) compositeAMQBodyBlock.getMethodBody()) + .createAMQBody(); + assertEquals(TEST_EXCHANGE, basicDeliverBody.getExchange().toString()); + assertEquals(TEST_QUEUE, basicDeliverBody.getRoutingKey().toString()); + assertEquals(1, basicDeliverBody.getDeliveryTag()); + assertTrue(basicDeliverBody.getRedelivered()); + + assertTrue(compositeAMQBodyBlock.getHeaderBody() instanceof ContentHeaderBody); + ContentHeaderBody contentHeaderBody = (ContentHeaderBody) compositeAMQBodyBlock.getHeaderBody(); + assertEquals(TEST_MESSAGE.length, contentHeaderBody.getBodySize()); + assertEquals("application/json", contentHeaderBody.getProperties().getContentTypeAsString()); + + compositeAMQBodyBlock = null; + for (int i = 0; i < 100 && compositeAMQBodyBlock == null; i++) { + compositeAMQBodyBlock = channel.readOutbound(); + Thread.sleep(10); + } + assertNotNull(compositeAMQBodyBlock); + assertTrue( + compositeAMQBodyBlock.getMethodBody() + instanceof ProtocolOutputConverter.EncodedDeliveryBody); + basicDeliverBody = + (BasicDeliverBody) + ((ProtocolOutputConverter.EncodedDeliveryBody) compositeAMQBodyBlock.getMethodBody()) + .createAMQBody(); + assertEquals(2, basicDeliverBody.getDeliveryTag()); + assertFalse(basicDeliverBody.getRedelivered()); + } + + @Test + void testReceiveBasicCancel() throws Exception { + MessageImpl message = mock(MessageImpl.class); + when(message.getData()).thenReturn(TEST_MESSAGE); + when(message.getTopicName()).thenReturn(TEST_EXCHANGE + "$$" + TEST_QUEUE); + when(message.getRedeliveryCount()).thenReturn(0); + + when(consumer.receiveAsync()).thenReturn(CompletableFuture.completedFuture(message)); + + openChannel(); + sendQueueDeclare(); + sendBasicConsume(TEST_CONSUMER_TAG, TEST_QUEUE, false); + + ProtocolOutputConverter.CompositeAMQBodyBlock compositeAMQBodyBlock = null; + + for (int i = 0; i < 100 && compositeAMQBodyBlock == null; i++) { + compositeAMQBodyBlock = channel.readOutbound(); + Thread.sleep(10); + } + + assertNotNull(compositeAMQBodyBlock); + + Object frame = sendBasicCancel(TEST_CONSUMER_TAG); + + long now = System.currentTimeMillis(); + while (System.currentTimeMillis() - now < 5000 && !(frame instanceof AMQFrame)) { + frame = channel.readOutbound(); + } + + assertNotNull(frame); + assertTrue(frame instanceof AMQFrame); + AMQBody body = ((AMQFrame) frame).getBodyFrame(); + assertTrue(body instanceof BasicCancelOkBody); + assertEquals(TEST_CONSUMER_TAG, ((BasicCancelOkBody) body).getConsumerTag().toString()); + + // Check nothing is received anymore + Thread.sleep(1000); + assertNull(channel.readOutbound()); + } + private void openChannel() { openConnection(); sendChannelOpen(); } + private T sendBasicCancel(String consumerTag) { + BasicCancelBody basicConsumeBody = + new BasicCancelBody(AMQShortString.createAMQShortString(consumerTag), false); + return exchangeData(basicConsumeBody.generateFrame(CHANNEL_ID)); + } + + private AMQFrame sendBasicConsume(String consumerTag, String queueName, boolean exclusive) { + BasicConsumeBody basicConsumeBody = + new BasicConsumeBody( + 0, + AMQShortString.createAMQShortString(queueName), + AMQShortString.createAMQShortString(consumerTag), + false, + false, + exclusive, + false, + FieldTable.convertToFieldTable(new HashMap<>())); + return exchangeData(basicConsumeBody.generateFrame(CHANNEL_ID)); + } + + private T sendBasicGet() { + return sendBasicGet(TEST_QUEUE); + } + + private T sendBasicGet(String queueName) { + BasicGetBody basicGetBody = + new BasicGetBody(0, AMQShortString.createAMQShortString(queueName), true); + return exchangeData(basicGetBody.generateFrame(CHANNEL_ID)); + } + private AMQFrame sendChannelClose() { ChannelCloseBody channelCloseBody = new ChannelCloseBody(0, AMQShortString.EMPTY_STRING, 0, 0); return exchangeData(channelCloseBody.generateFrame(CHANNEL_ID)); @@ -407,6 +860,36 @@ private AMQFrame sendExchangeDeclare(String exchange, String type, boolean passi return exchangeData(exchangeDeclareBody.generateFrame(CHANNEL_ID)); } + private AMQFrame sendQueueDeclare() { + return sendQueueDeclare(TEST_QUEUE, false); + } + + private AMQFrame sendQueueDeclare(String queue, boolean passive) { + return sendQueueDeclare(queue, passive, null, null); + } + + private AMQFrame sendQueueDeclare( + String queue, boolean passive, String lifetimePolicy, String exclusivityPolicy) { + HashMap attributes = new HashMap<>(); + if (lifetimePolicy != null) { + attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); + } + if (exclusivityPolicy != null) { + attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); + } + QueueDeclareBody queueDeclareBody = + new QueueDeclareBody( + 0, + AMQShortString.createAMQShortString(queue), + passive, + true, + false, + false, + false, + FieldTable.convertToFieldTable(attributes)); + return exchangeData(queueDeclareBody.generateFrame(CHANNEL_ID)); + } + private AMQFrame sendExchangeDelete(String exchange, boolean ifUnused) { ExchangeDeleteBody exchangeDeleteBody = new ExchangeDeleteBody(0, AMQShortString.createAMQShortString(exchange), ifUnused, false); @@ -435,6 +918,7 @@ private AMQFrame sendMessageContent() { } private void assertIsChannelCloseFrame(AMQFrame frame, int errorCode) { + assertNotNull(frame); assertEquals(CHANNEL_ID, frame.getChannel()); assertTrue(frame.getBodyFrame() instanceof ChannelCloseBody); ChannelCloseBody channelCloseBody = (ChannelCloseBody) frame.getBodyFrame(); @@ -442,7 +926,16 @@ private void assertIsChannelCloseFrame(AMQFrame frame, int errorCode) { } private void assertIsExchangeDeclareOk(AMQFrame frame) { + assertNotNull(frame); assertEquals(CHANNEL_ID, frame.getChannel()); assertTrue(frame.getBodyFrame() instanceof ExchangeDeclareOkBody); } + + private void assertIsQueueDeclareOk(AMQFrame frame) { + assertNotNull(frame); + assertEquals(CHANNEL_ID, frame.getChannel()); + assertTrue(frame.getBodyFrame() instanceof QueueDeclareOkBody); + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) frame.getBodyFrame(); + assertEquals(TEST_QUEUE, queueDeclareOkBody.getQueue().toString()); + } } diff --git a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AbstractBaseTest.java b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AbstractBaseTest.java index 1a12f89..3d912cf 100644 --- a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AbstractBaseTest.java +++ b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/AbstractBaseTest.java @@ -17,13 +17,25 @@ import static org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ConsumerBase; +import org.apache.pulsar.client.impl.ProducerBase; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.ProtocolVersion; @@ -48,13 +60,32 @@ public class AbstractBaseTest { protected final GatewayService gatewayService = spy(new GatewayService(config)); protected final GatewayConnection connection = new GatewayConnection(gatewayService); protected EmbeddedChannel channel; + protected ProducerBase producer = mock(ProducerBase.class); + protected ConsumerBase consumer = mock(ConsumerBase.class); @BeforeEach - void setup() { + void setup() throws Exception { channel = new EmbeddedChannel(connection, new AMQDataBlockEncoder()); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + ProducerBuilder producerBuilder = mock(ProducerBuilder.class); + when(pulsarClient.newProducer()).thenReturn(producerBuilder); + when(producerBuilder.topic(anyString())).thenReturn(producerBuilder); + when(producerBuilder.create()).thenReturn(producer); + + ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); + when(pulsarClient.newConsumer()).thenReturn(consumerBuilder); + when(consumerBuilder.topics(anyListOf(String.class))).thenReturn(consumerBuilder); + when(consumerBuilder.subscriptionName(anyString())).thenReturn(consumerBuilder); + when(consumerBuilder.subscribe()).thenReturn(consumer); + + when(consumer.receiveAsync()).thenReturn(new CompletableFuture<>()); + + doReturn(pulsarClient).when(gatewayService).getPulsarClient(); } - protected AMQFrame exchangeData(AMQDataBlock data) { + protected T exchangeData(AMQDataBlock data) { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); data.writePayload(new NettyByteBufferSender(byteBuf)); channel.writeInbound(byteBuf); @@ -138,6 +169,7 @@ protected AMQFrame sendChannelCloseOk() { } protected void assertIsConnectionCloseFrame(AMQFrame frame, int errorCode) { + assertNotNull(frame); assertEquals(0, frame.getChannel()); AMQBody body = frame.getBodyFrame(); assertTrue(body instanceof ConnectionCloseBody); diff --git a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnectionTest.java b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnectionTest.java index c45f3a1..0d0954d 100644 --- a/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnectionTest.java +++ b/pulsar-rabbitmq-gw/src/test/java/com/datastax/oss/pulsar/rabbitmqgw/GatewayConnectionTest.java @@ -22,8 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import java.nio.charset.StandardCharsets; import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.ProtocolVersion; @@ -63,11 +61,8 @@ void testReceiveUnsupportedProtocolHeader() { // Send protocol header for unsupported v1.0 ProtocolInitiation protocolInitiation = new ProtocolInitiation(ProtocolVersion.get((byte) 1, (byte) 0)); - ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); - protocolInitiation.writePayload(new NettyByteBufferSender(byteBuf)); - channel.writeInbound(byteBuf); - ProtocolInitiation pi = channel.readOutbound(); + ProtocolInitiation pi = exchangeData(protocolInitiation); assertEquals(9, pi.getProtocolMajor()); assertEquals(1, pi.getProtocolMinor()); diff --git a/rabbitmq-tests/pom.xml b/rabbitmq-tests/pom.xml index ebbe5ba..80bedf2 100644 --- a/rabbitmq-tests/pom.xml +++ b/rabbitmq-tests/pom.xml @@ -25,7 +25,7 @@ 4.0.0 rabbitmq-tests jar - DataStax RabbitMQ (R) gateway for Apache Pulsar (R) + Pulsar RabbitMQ integration tests with RabbitMQ client ${project.build.directory} diff --git a/rabbitmq-tests/src/test/java/com/datastax/oss/pulsar/rabbitmqtests/RabbitmqInteropIT.java b/rabbitmq-tests/src/test/java/com/datastax/oss/pulsar/rabbitmqtests/RabbitmqInteropIT.java index 8f15726..272fa0c 100644 --- a/rabbitmq-tests/src/test/java/com/datastax/oss/pulsar/rabbitmqtests/RabbitmqInteropIT.java +++ b/rabbitmq-tests/src/test/java/com/datastax/oss/pulsar/rabbitmqtests/RabbitmqInteropIT.java @@ -15,6 +15,7 @@ */ package com.datastax.oss.pulsar.rabbitmqtests; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,13 +27,21 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.ShutdownSignalException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Date; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.util.PortManager; import org.apache.commons.codec.binary.Base64; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; @@ -44,14 +53,28 @@ public class RabbitmqInteropIT { + public static final String TEST_QUEUE = "test-queue"; + public static final String TEST_CONSUMER_TAG = "test-consumer-tag"; public static final String TEST_MESSAGE = "test-message"; @TempDir public static Path tempDir; private static PulsarCluster cluster; + private static GatewayService gatewayService; + private static ConnectionFactory factory; @BeforeAll public static void before() throws Exception { cluster = new PulsarCluster(tempDir); cluster.start(); + GatewayConfiguration config = new GatewayConfiguration(); + config.setBrokerServiceURL(cluster.getAddress()); + config.setServicePort(Optional.of(PortManager.nextFreePort())); + gatewayService = new GatewayService(config); + gatewayService.start(); + + factory = new ConnectionFactory(); + factory.setVirtualHost("/"); + factory.setHost("localhost"); + factory.setPort(config.getServicePort().get()); } @AfterAll @@ -59,22 +82,14 @@ public static void after() throws Exception { if (cluster != null) { cluster.close(); } + if (gatewayService != null) { + gatewayService.close(); + } } @Test void testRabbitProducerPulsarConsumer() throws Exception { - GatewayConfiguration config = new GatewayConfiguration(); - config.setBrokerServiceURL(cluster.getAddress()); - GatewayService gatewayService = new GatewayService(config); - gatewayService.start(); - - ConnectionFactory factory = new ConnectionFactory(); - factory.setVirtualHost("/"); - factory.setHost("localhost"); - factory.setPort(config.getServicePort().get()); - Connection conn = factory.newConnection(); - Channel channel = conn.createChannel(); PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(cluster.getAddress()).build(); @@ -93,7 +108,7 @@ void testRabbitProducerPulsarConsumer() throws Exception { .build(); channel.basicPublish("", "", properties, TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); - Message receive = consumer.receive(1, TimeUnit.SECONDS); + Message receive = consumer.receive(10, TimeUnit.SECONDS); assertNotNull(receive); assertEquals(TEST_MESSAGE, receive.getValue()); @@ -110,6 +125,106 @@ void testRabbitProducerPulsarConsumer() throws Exception { consumer.close(); channel.close(); conn.close(); - gatewayService.close(); + } + + @Test + void testPulsarProducerRabbitConsumer() throws Exception { + Connection conn = factory.newConnection(); + Channel channel = conn.createChannel(); + + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(cluster.getAddress()).build(); + Producer producer = + pulsarClient.newProducer(Schema.STRING).topic("amq.default$$test-queue").create(); + + channel.queueDeclare(TEST_QUEUE, true, false, false, new HashMap<>()); + producer.send(TEST_MESSAGE); + + GetResponse getResponse = null; + for (int i = 0; i < 100 && getResponse == null; i++) { + getResponse = channel.basicGet(TEST_QUEUE, false); + Thread.sleep(10); + } + assertNotNull(getResponse); + assertEquals("amq.default", getResponse.getEnvelope().getExchange()); + assertEquals(TEST_QUEUE, getResponse.getEnvelope().getRoutingKey()); + assertArrayEquals(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8), getResponse.getBody()); + + producer.close(); + channel.close(); + conn.close(); + } + + @Test + void testRabbitProducerRabbitConsumer() throws Exception { + Connection conn1 = factory.newConnection(); + Channel channel1 = conn1.createChannel(); + + Connection conn2 = factory.newConnection(); + Channel channel2 = conn2.createChannel(); + + channel1.queueDeclare(TEST_QUEUE, true, false, false, new HashMap<>()); + + Date now = new Date(); + AMQP.BasicProperties properties = + new AMQP.BasicProperties.Builder() + .contentType("application/octet-stream") + .timestamp(now) + .build(); + channel2.basicPublish( + "", TEST_QUEUE, properties, TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); + + CountDownLatch consumeOkReceived = new CountDownLatch(1); + CountDownLatch messageReceived = new CountDownLatch(1); + CountDownLatch consumerCanceled = new CountDownLatch(1); + channel1.basicConsume( + TEST_QUEUE, + true, + TEST_CONSUMER_TAG, + new com.rabbitmq.client.Consumer() { + @Override + public void handleConsumeOk(String consumerTag) { + assertEquals(TEST_CONSUMER_TAG, consumerTag); + consumeOkReceived.countDown(); + } + + @Override + public void handleCancelOk(String consumerTag) { + assertEquals(TEST_CONSUMER_TAG, consumerTag); + consumerCanceled.countDown(); + } + + @Override + public void handleCancel(String consumerTag) {} + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {} + + @Override + public void handleRecoverOk(String consumerTag) {} + + @Override + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + assertEquals(TEST_CONSUMER_TAG, consumerTag); + assertEquals("amq.default", envelope.getExchange()); + assertEquals(TEST_QUEUE, envelope.getRoutingKey()); + assertEquals("application/octet-stream", properties.getContentType()); + assertEquals(now.getTime() / 1000, properties.getTimestamp().getTime() / 1000); + assertArrayEquals(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8), body); + messageReceived.countDown(); + } + }); + + assertTrue(consumeOkReceived.await(5, TimeUnit.SECONDS)); + assertTrue(messageReceived.await(5, TimeUnit.SECONDS)); + + channel1.basicCancel(TEST_CONSUMER_TAG); + + assertTrue(consumerCanceled.await(5, TimeUnit.SECONDS)); + + channel1.close(); + conn1.close(); + channel2.close(); + conn2.close(); } }