Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,26 @@

package io.cloudevents.amqp.impl;

import java.util.Objects;
import java.util.function.BiConsumer;

import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;

import java.util.Objects;
import java.util.function.BiConsumer;

/**
* An AMQP 1.0 message reader that can be read as a <em>CloudEvent</em>.
* <p>
*
*
* This reader reads sections of an AMQP message to construct a CloudEvent representation by doing the following:
* <ul>
* <li> If the content-type property is set for an AMQP message, the value of the property
* is represented as a cloud event datacontenttype attribute.
* <li> If the (mandatory) application-properties of the AMQP message contains attributes and/or extentions,
* this reader will represent each property/extension as a cloud event attribute.
* </ul>
*
*
*/
public final class ProtonAmqpBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {

Expand All @@ -53,12 +52,12 @@ public final class ProtonAmqpBinaryMessageReader extends BaseGenericBinaryMessag
* The applicationProperties MUST not be {@code null}.
* @param contentType The content-type property of the AMQP message or {@code null} if the message content type is unknown.
* @param payload The message payload or {@code null} if the message does not contain any payload.
*
*
* @throws NullPointerException if the applicationPropereties is {@code null}.
*/
public ProtonAmqpBinaryMessageReader(final SpecVersion version, final ApplicationProperties applicationProperties,
final String contentType, final byte[] payload) {
super(version, payload != null && payload.length > 0 ? new BytesCloudEventData(payload) : null);
public ProtonAmqpBinaryMessageReader(final SpecVersion version, final ApplicationProperties applicationProperties,
final String contentType, final byte[] payload) {
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
this.contentType = contentType;
this.applicationProperties = Objects.requireNonNull(applicationProperties);
}
Expand All @@ -70,7 +69,7 @@ protected boolean isContentTypeHeader(final String key) {

/**
* Tests whether the given attribute key is prefixed with <em>cloudEvents:</em>
*
*
* @param key The key to test for the presence of the prefix.
* @return True if the specified key starts with the prefix or
* false otherwise.
Expand All @@ -83,9 +82,9 @@ protected boolean isCloudEventsHeader(final String key) {

/**
* Gets the cloud event attribute key without the preceding prefix.
*
*
* @param key The key containing the AMQP specific prefix.
*
*
* @return The key without the prefix.
*/
@Override
Expand Down Expand Up @@ -119,7 +118,7 @@ protected void forEachHeader(final BiConsumer<String, Object> fn) {
* Gets the cloud event representation of the value.
* <p>
* This method simply returns the string representation of the type of value passed as argument.
*
*
* @param value The value of a CloudEvent attribute or extension.
*
* @return The string representation of the specified value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public class BytesCloudEventData implements CloudEventData {

private final byte[] value;

/**
* @deprecated use {@link BytesCloudEventData#wrap(byte[])}
*/
public BytesCloudEventData(byte[] value) {
Objects.requireNonNull(value);
this.value = value;
Expand Down Expand Up @@ -38,4 +41,12 @@ public String toString() {
"value=" + Arrays.toString(value) +
'}';
}

/**
* @param value byte array to wrap
* @return byte array wrapped in a {@link BytesCloudEventData}, which implements {@link CloudEventData}.
*/
public static BytesCloudEventData wrap(byte[] value) {
return new BytesCloudEventData(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public BaseCloudEventBuilder(CloudEvent event) {
// to encode data

public SELF withData(byte[] data) {
this.data = new BytesCloudEventData(data);
this.data = BytesCloudEventData.wrap(data);
return this.self;
}

Expand Down
5 changes: 2 additions & 3 deletions core/src/test/java/io/cloudevents/core/mock/CSVFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.cloudevents.core.mock;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
Expand Down Expand Up @@ -60,7 +59,7 @@ public byte[] serialize(CloudEvent event) {
}

@Override
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) {
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) {
String[] splitted = new String(bytes, StandardCharsets.UTF_8).split(Pattern.quote(","));
SpecVersion sv = SpecVersion.parse(splitted[0]);

Expand Down Expand Up @@ -91,7 +90,7 @@ public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends Cloud
builder.withTime(time);
}
if (data != null) {
builder.withData(mapper.map(new BytesCloudEventData(data)));
builder.withData(mapper.map(BytesCloudEventData.wrap(data)));
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> attribut
}

public MockBinaryMessageWriter(SpecVersion version, Map<String, Object> attributes, byte[] data, Map<String, Object> extensions) {
this(version, attributes, new BytesCloudEventData(data), extensions);
this(version, attributes, BytesCloudEventData.wrap(data), extensions);
}

public MockBinaryMessageWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
boolean isBase64 = "base64".equals(getOptionalStringNode(this.node, this.p, "datacontentencoding"));
if (node.has("data")) {
if (isBase64) {
data = new BytesCloudEventData(node.remove("data").binaryValue());
data = BytesCloudEventData.wrap(node.remove("data").binaryValue());
} else {
if (JsonFormat.dataIsJsonContentType(contentType)) {
// This solution is quite bad, but i see no alternatives now.
Expand All @@ -99,7 +99,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
} else {
JsonNode dataNode = node.remove("data");
assertNodeType(dataNode, JsonNodeType.STRING, "data", "Because content type is not a json, only a string is accepted as data");
data = new BytesCloudEventData(dataNode.asText().getBytes());
data = BytesCloudEventData.wrap(dataNode.asText().getBytes());
}
}
}
Expand All @@ -108,7 +108,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
throw MismatchedInputException.from(p, CloudEvent.class, "CloudEvent cannot have both 'data' and 'data_base64' fields");
}
if (node.has("data_base64")) {
data = new BytesCloudEventData(node.remove("data_base64").binaryValue());
data = BytesCloudEventData.wrap(node.remove("data_base64").binaryValue());
} else if (node.has("data")) {
if (JsonFormat.dataIsJsonContentType(contentType)) {
// This solution is quite bad, but i see no alternatives now.
Expand All @@ -117,7 +117,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
} else {
JsonNode dataNode = node.remove("data");
assertNodeType(dataNode, JsonNodeType.STRING, "data", "Because content type is not a json, only a string is accepted as data");
data = new BytesCloudEventData(dataNode.asText().getBytes());
data = BytesCloudEventData.wrap(dataNode.asText().getBytes());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public HttpMessageReader(SpecVersion version, Consumer<BiConsumer<String, String
}

public HttpMessageReader(SpecVersion version, Consumer<BiConsumer<String, String>> forEachHeader, byte[] body) {
this(version, forEachHeader, body != null && body.length > 0 ? new BytesCloudEventData(body) : null);
this(version, forEachHeader, body != null && body.length > 0 ? BytesCloudEventData.wrap(body) : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class BinaryRestfulWSMessageReaderImpl extends BaseGenericBinaryMes
private final MultivaluedMap<String, String> headers;

public BinaryRestfulWSMessageReaderImpl(SpecVersion version, MultivaluedMap<String, String> headers, byte[] body) {
super(version, body != null && body.length > 0 ? new BytesCloudEventData(body) : null);
super(version, body != null && body.length > 0 ? BytesCloudEventData.wrap(body) : null);

Objects.requireNonNull(headers);
this.headers = headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class BinaryVertxMessageReaderImpl extends BaseGenericBinaryMessageReader
private final MultiMap headers;

public BinaryVertxMessageReaderImpl(SpecVersion version, MultiMap headers, Buffer body) {
super(version, body != null && body.length() > 0 ? new BytesCloudEventData(body.getBytes()) : null);
super(version, body != null && body.length() > 0 ? BytesCloudEventData.wrap(body.getBytes()) : null);

Objects.requireNonNull(headers);
this.headers = headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class KafkaBinaryMessageReaderImpl extends BaseGenericBinaryMessageReader
private final Headers headers;

public KafkaBinaryMessageReaderImpl(SpecVersion version, Headers headers, byte[] payload) {
super(version, payload != null && payload.length > 0 ? new BytesCloudEventData(payload) : null);
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);

Objects.requireNonNull(headers);
this.headers = headers;
Expand Down