Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package io.cloudevents.amqp;


import javax.annotation.ParametersAreNonnullByDefault;

import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;

import io.cloudevents.amqp.impl.AmqpConstants;
import io.cloudevents.amqp.impl.ProtonAmqpBinaryMessageReader;
import io.cloudevents.amqp.impl.ProtonAmqpMessageWriter;
import io.cloudevents.amqp.impl.AmqpConstants;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.rw.CloudEventWriter;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;

import javax.annotation.ParametersAreNonnullByDefault;
/**
* A factory class providing convenience methods for creating MessageReader and MessageWriter instances based on Qpid Proton.
*/
Expand All @@ -46,9 +44,9 @@ private ProtonAmqpMessageFactory() {
* Creates a MessageReader to read a proton-based {@link Message}.
* <p>
* This implementation simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
*
*
* @param message The proton message to read from.
*
*
* @return A message reader that can read the given proton message to a cloud event representation.
*/
public static MessageReader createReader(final Message message) {
Expand All @@ -61,7 +59,7 @@ public static MessageReader createReader(final Message message) {
* Creates a MessageReader using the content-type property and payload of a proton-based message.
* <p>
* This method simply calls {@link #createReader(String, ApplicationProperties, byte[])}.
*
*
* @param contentType The content-type of the message payload.
* @param payload The message payload in bytes.
* @return A message reader capable of representing a CloudEvent from
Expand All @@ -74,7 +72,7 @@ public static MessageReader createReader(final String contentType, final byte[]
/**
* Creates a MessageWriter capable of translating both a structured and binary CloudEvent
* to a proton-based AMQP 1.0 representation.
*
*
* @return A message writer to read structured and binary cloud event from a proton-based message.
*/
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter() {
Expand All @@ -84,7 +82,7 @@ public static MessageWriter<CloudEventWriter<Message>, Message> createWriter() {
/**
* Creates a MessageReader to read using the content-type property, application-propeties and data payload
* of a proton-based message.
*
*
* @param contentType The content-type of the message payload.
* @param props The application-properties section of the proton-message containing cloud event metadata (attributes and/or extensions).
* @param payload The message payload in bytes or {@code null} if the message does not contain any payload.
Expand All @@ -94,12 +92,11 @@ public static MessageWriter<CloudEventWriter<Message>, Message> createWriter() {
public static MessageReader createReader(final String contentType, final ApplicationProperties props, final byte[] payload) {

return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
() -> contentType,
format -> new GenericStructuredMessageReader(format, payload),
() -> AmqpConstants.getApplicationProperty(props, AmqpConstants.APP_PROPERTY_SPEC_VERSION, String.class),
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload),
UnknownEncodingMessageReader::new);
sv -> new ProtonAmqpBinaryMessageReader(sv, props, contentType, payload));

}

}
11 changes: 11 additions & 0 deletions api/src/main/java/io/cloudevents/rw/CloudEventRWException.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public enum CloudEventRWExceptionKind {
* Error while converting CloudEventData.
*/
DATA_CONVERSION,
/**
* Invalid content type or spec version
*/
UNKNOWN_ENCODING,
/**
* Other error.
*/
Expand Down Expand Up @@ -146,4 +150,11 @@ public static CloudEventRWException newOther(Throwable cause) {
cause
);
}

public static CloudEventRWException newUnknownEncodingException() {
return new CloudEventRWException(
CloudEventRWExceptionKind.UNKNOWN_ENCODING,
"Could not parse. Unknown encoding. Invalid content type or spec version"
);
}
}
3 changes: 1 addition & 2 deletions core/src/main/java/io/cloudevents/core/message/Encoding.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@
*/
public enum Encoding {
STRUCTURED,
BINARY,
UNKNOWN
BINARY
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException;

public class MessageUtils {

/**
Expand All @@ -38,8 +40,7 @@ public static MessageReader parseStructuredOrBinaryMessage(
Supplier<String> contentTypeHeaderReader,
Function<EventFormat, MessageReader> structuredMessageFactory,
Supplier<String> specVersionHeaderReader,
Function<SpecVersion, MessageReader> binaryMessageFactory,
Supplier<MessageReader> unknownMessageFactory
Function<SpecVersion, MessageReader> binaryMessageFactory
) {
// Let's try structured mode
String ct = contentTypeHeaderReader.get();
Expand All @@ -57,7 +58,7 @@ public static MessageReader parseStructuredOrBinaryMessage(
return binaryMessageFactory.apply(SpecVersion.parse(specVersionUnparsed));
}

return unknownMessageFactory.get();
throw newUnknownEncodingException();
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.cloudevents.core.message.impl;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.rw.CloudEventRWException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

import static io.cloudevents.SpecVersion.V03;
import static io.cloudevents.SpecVersion.V1;
import static io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

class MessageUtilsTest {

@Test
void parseStructuredOrBinaryMessage_Exception() {
final CloudEventRWException cloudEventRWException = assertThrows(CloudEventRWException.class, () -> {
parseStructuredOrBinaryMessage(() -> null, eventFormat -> null, () -> null, specVersion -> null);
});
assertThat(cloudEventRWException.getKind())
.isEqualTo(CloudEventRWException.CloudEventRWExceptionKind.UNKNOWN_ENCODING);
}

@Test
void testParseStructuredOrBinaryMessage_StructuredMode() {
MessageUtils.parseStructuredOrBinaryMessage(() -> "application/cloudevents+csv;",
eventFormat -> {
assertTrue(eventFormat instanceof CSVFormat);
return null;
},
() -> null, specVersion -> null);
}

@ParameterizedTest
@MethodSource
void testParseStructuredOrBinaryMessage_BinaryMode(String specVersionHeader, SpecVersion expectedSpecVersion) {
MessageUtils.parseStructuredOrBinaryMessage(() -> null, eventFormat -> null,
() -> specVersionHeader, specVersion -> {
assertEquals(expectedSpecVersion, specVersion);
return null;
});
}

private static Stream<Arguments> testParseStructuredOrBinaryMessage_BinaryMode() {
return Stream.of(
Arguments.of("0.3", V03),
Arguments.of("1.0", V1)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.http.impl.CloudEventsHeaders;
import io.cloudevents.http.impl.HttpMessageReader;
import io.cloudevents.http.impl.HttpMessageWriter;
Expand Down Expand Up @@ -80,8 +79,7 @@ public static MessageReader createReader(Consumer<BiConsumer<String, String>> fo
contentType::get,
format -> new GenericStructuredMessageReader(format, body),
specVersion::get,
sv -> new HttpMessageReader(sv, forEachHeader, body),
UnknownEncodingMessageReader::new
sv -> new HttpMessageReader(sv, forEachHeader, body)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;

import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
Expand All @@ -36,8 +35,7 @@ public static MessageReader create(MediaType mediaType, MultivaluedMap<String, S
() -> headers.getFirst(HttpHeaders.CONTENT_TYPE),
format -> new GenericStructuredMessageReader(format, payload),
() -> headers.getFirst(CloudEventsHeaders.SPEC_VERSION),
sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload),
UnknownEncodingMessageReader::new
sv -> new BinaryRestfulWSMessageReaderImpl(sv, headers, payload)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl;
import io.cloudevents.http.vertx.impl.CloudEventsHeaders;
import io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl;
Expand Down Expand Up @@ -43,8 +42,7 @@ public static MessageReader createReader(MultiMap headers, Buffer body) throws I
() -> headers.get(HttpHeaders.CONTENT_TYPE),
format -> new GenericStructuredMessageReader(format, body.getBytes()),
() -> headers.get(CloudEventsHeaders.SPEC_VERSION),
sv -> new BinaryVertxMessageReaderImpl(sv, headers, body),
UnknownEncodingMessageReader::new
sv -> new BinaryVertxMessageReaderImpl(sv, headers, body)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ public void configure(Map<String, ?> configs, boolean isKey) {
} else if (encodingConfig != null) {
throw new IllegalArgumentException(ENCODING_CONFIG + " can be of type String or " + Encoding.class.getCanonicalName());
}
if (this.encoding == Encoding.UNKNOWN) {
throw new IllegalArgumentException(ENCODING_CONFIG + " cannot be " + Encoding.UNKNOWN);
}

if (this.encoding == Encoding.STRUCTURED) {
Object eventFormatConfig = configs.get(EVENT_FORMAT_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.kafka.impl.KafkaBinaryMessageReaderImpl;
import io.cloudevents.kafka.impl.KafkaHeaders;
import io.cloudevents.kafka.impl.KafkaProducerMessageWriterImpl;
Expand Down Expand Up @@ -61,8 +60,7 @@ public static MessageReader createReader(Headers headers, byte[] payload) throws
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.CONTENT_TYPE),
format -> new GenericStructuredMessageReader(format, payload),
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.SPEC_VERSION),
sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload),
UnknownEncodingMessageReader::new
sv -> new KafkaBinaryMessageReaderImpl(sv, headers, payload)
);
}

Expand Down