Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@

package io.cloudevents.amqp;


import io.cloudevents.SpecVersion;
import io.cloudevents.amqp.impl.AmqpConstants;
import io.cloudevents.amqp.impl.ProtonAmqpBinaryMessageReader;
import io.cloudevents.amqp.impl.ProtonAmqpMessageWriter;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;

import javax.annotation.ParametersAreNonnullByDefault;

/**
* A factory class providing convenience methods for creating MessageReader and MessageWriter instances based on Qpid Proton.
* A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances based on Qpid Proton {@link Message}.
*/
@ParametersAreNonnullByDefault
public final class ProtonAmqpMessageFactory {
Expand All @@ -41,62 +45,47 @@ private ProtonAmqpMessageFactory() {
}

/**
* Creates a MessageReader to read a proton-based {@link Message}.
* <p>
* This implementation simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
* Creates a {@link MessageReader} to read a proton-based {@link Message}.
* This reader is able to read both binary and structured encoded {@link io.cloudevents.CloudEvent}.
*
* @param message The proton message to read from.
*
* @return A message reader that can read the given proton message to a cloud event representation.
* @param message The proton {@link Message} to read from.
* @return A {@link MessageReader} that can read the given proton {@link Message} to a {@link io.cloudevents.CloudEvent} representation.
* @throws CloudEventRWException if something goes wrong while resolving the {@link SpecVersion} or if the message has unknown encoding
* @see #createReader(String, ApplicationProperties, Section)
*/
public static MessageReader createReader(final Message message) {

final byte[] payload = AmqpConstants.getPayloadAsByteArray(message.getBody());
return createReader(message.getContentType(), message.getApplicationProperties(), payload);
public static MessageReader createReader(final Message message) throws CloudEventRWException {
return createReader(message.getContentType(), message.getApplicationProperties(), message.getBody());
}

/**
* Creates a MessageReader using the content-type property and payload of a proton-based message.
* <p>
* This method simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
* Creates a MessageReader to read using the {@code content-type} property, {@code application-properties} and data payload
* of a proton-based {@link Message}. This reader is able to read both binary and structured encoded {@link io.cloudevents.CloudEvent}.
*
* @param contentType The content-type of the message payload.
* @param payload The message payload in bytes.
* @return A message reader capable of representing a CloudEvent from
* a message <em>content-type</em> property and <em>application-data</em>.
* @param contentType The {@code content-type} of the message payload.
* @param props The {@code application-properties} section of the proton-message containing cloud event metadata (attributes and/or extensions).
* @param body The message body or {@code null} if the message does not contain any body.
* @return A {@link MessageReader} capable of representing a {@link io.cloudevents.CloudEvent} from the {@code application-properties},
* {@code content-type} and payload of a proton message.
* @throws CloudEventRWException if something goes wrong while resolving the {@link SpecVersion} or if the message has unknown encoding
*/
public static MessageReader createReader(final String contentType, final byte[] payload) {
return createReader(contentType, null, payload);
public static MessageReader createReader(final String contentType, final ApplicationProperties props, @Nullable final Section body) throws CloudEventRWException {
final byte[] payload = AmqpConstants.getPayloadAsByteArray(body);
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, payload),
() -> AmqpConstants.getApplicationProperty(props, AmqpConstants.APP_PROPERTY_SPEC_VERSION, String.class),
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload)
);
}

/**
* Creates a MessageWriter capable of translating both a structured and binary CloudEvent
* to a proton-based AMQP 1.0 representation.
* Creates a {@link MessageWriter} capable of translating both a structured and binary CloudEvent
* to a proton-based AMQP 1.0 {@link Message}.
*
* @return A message writer to read structured and binary cloud event from a proton-based message.
* @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to Proton {@link Message} using structured or binary encoding.
*/
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter() {
return new ProtonAmqpMessageWriter<>();
}

/**
* Creates a MessageReader to read using the content-type property, application-propeties and data payload
* of a proton-based message.
*
* @param contentType The content-type of the message payload.
* @param props The application-properties section of the proton-message containing cloud event metadata (attributes and/or extensions).
* @param payload The message payload in bytes or {@code null} if the message does not contain any payload.
* @return A message reader capable of representing a CloudEvent from the application-properties,
* content-type and payload of a proton message.
*/
public static MessageReader createReader(final String contentType, final ApplicationProperties props, final byte[] payload) {

return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, payload),
() -> AmqpConstants.getApplicationProperty(props, AmqpConstants.APP_PROPERTY_SPEC_VERSION, String.class),
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,6 @@

package io.cloudevents.amqp;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.amqp.impl.AmqpConstants;
Expand All @@ -39,6 +27,19 @@
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.types.Time;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests verifying the behavior of the {@code ProtonAmqpMessageFactory}.
Expand All @@ -53,7 +54,8 @@ public class ProtonAmqpMessageFactoryTest {
@MethodSource("binaryTestArguments")
public void readBinary(final Map<String, Object> props, final String contentType, final byte[] body,
final CloudEvent event) {
final MessageReader amqpReader = ProtonAmqpMessageFactory.createReader(contentType, new ApplicationProperties(props), body);
final Section bodySection = body != null ? new org.apache.qpid.proton.amqp.messaging.Data(new Binary(body)) : null;
final MessageReader amqpReader = ProtonAmqpMessageFactory.createReader(contentType, new ApplicationProperties(props), bodySection);
assertThat(amqpReader.getEncoding()).isEqualTo(Encoding.BINARY);
assertThat(amqpReader.toEvent()).isEqualTo(event);
}
Expand All @@ -64,7 +66,7 @@ public void readStructured(final CloudEvent event) {
final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);

final MessageReader amqpReader = ProtonAmqpMessageFactory.createReader(contentType, null, contentPayload);
final MessageReader amqpReader = ProtonAmqpMessageFactory.createReader(contentType, null, new org.apache.qpid.proton.amqp.messaging.Data(new Binary(contentPayload)));
assertThat(amqpReader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
assertThat(amqpReader.toEvent()).isEqualTo(event);
}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
<link>https://docs.spring.io/spring-framework/docs/current/javadoc-api/</link>
<link>https://vertx.io/docs/apidocs/</link>
<link>https://jakarta.ee/specifications/platform/8/apidocs/</link>
<link>https://qpid.apache.org/releases/qpid-proton-j-0.33.7/api/</link>
</links>
</configuration>
<executions>
Expand Down