Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.rw.CloudEventAttributesWriter;
import io.cloudevents.rw.CloudEventExtensionsWriter;
import io.cloudevents.rw.CloudEventRWException;
Expand All @@ -53,7 +54,7 @@ public ProtonAmqpMessageWriter() {

@Override
public CloudEventAttributesWriter withAttribute(final String name, final String value) throws CloudEventRWException {
if (name.equals("datacontenttype")) {
if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
message.setContentType(value);
} else {
if (applicationProperties == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.amqp.impl.AmqpConstants;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.test.Data;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.types.Time;

/**
* Tests verifying the behavior of the {@code ProtonAmqpMessageFactory}.
*/
public class ProtonAmqpMessageFactoryTest {

private static final String PREFIX_TEMPLATE = "cloudEvents:%s";
private static final String PREFIX_TEMPLATE = AmqpConstants.CE_PREFIX + "%s";
private static final String DATACONTENTTYPE_NULL = null;
private static final byte[] DATAPAYLOAD_NULL = null;

Expand Down Expand Up @@ -72,10 +75,10 @@ private static Stream<Arguments> binaryTestArguments() {
// V03
Arguments.of(
properties(
property("specversion", SpecVersion.V03.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE),
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE),
property("ignored", "ignore")
),
DATACONTENTTYPE_NULL,
Expand All @@ -84,13 +87,13 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V03.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("schemaurl", Data.DATASCHEMA.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignore")
),
Data.DATACONTENTTYPE_JSON,
Expand All @@ -99,13 +102,13 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V03.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("schemaurl", Data.DATASCHEMA.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("astring", "aaa"),
property("aboolean", "true"),
property("anumber", "10"),
Expand All @@ -117,12 +120,12 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V03.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_XML,
Expand All @@ -131,12 +134,12 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V03.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_TEXT,
Expand All @@ -146,10 +149,10 @@ private static Stream<Arguments> binaryTestArguments() {
// V1
Arguments.of(
properties(
property("specversion", SpecVersion.V1.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property("ignored", "ignored")
),
DATACONTENTTYPE_NULL,
Expand All @@ -158,13 +161,13 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V1.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("dataschema", Data.DATASCHEMA.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_JSON,
Expand All @@ -173,13 +176,13 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V1.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("dataschema", Data.DATASCHEMA.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("astring", "aaa"),
property("aboolean", "true"),
property("anumber", "10"),
Expand All @@ -191,12 +194,12 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V1.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_XML,
Expand All @@ -205,12 +208,12 @@ private static Stream<Arguments> binaryTestArguments() {
),
Arguments.of(
properties(
property("specversion", SpecVersion.V1.toString()),
property("id", Data.ID),
property("type", Data.TYPE),
property("source", Data.SOURCE.toString()),
property("subject", Data.SUBJECT),
property("time", Time.writeTime(Data.TIME)),
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_TEXT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
Expand Down Expand Up @@ -56,10 +57,10 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
// in order to complete the visit in one loop
this.forEachHeader((key, value) -> {
if (isContentTypeHeader(key)) {
visitor.withAttribute("datacontenttype", toCloudEventsValue(value));
visitor.withAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (name.equals("specversion")) {
if (name.equals(CloudEventV1.SPECVERSION)) {
return;
}
if (this.version.getAllAttributes().contains(name)) {
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/java/io/cloudevents/core/v1/CloudEventBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ public CloudEventBuilder withTime(OffsetDateTime time) {
@Override
public CloudEvent build() {
if (id == null) {
throw createMissingAttributeException("id");
throw createMissingAttributeException(CloudEventV1.ID);
}
if (source == null) {
throw createMissingAttributeException("source");
throw createMissingAttributeException(CloudEventV1.SOURCE);
}
if (type == null) {
throw createMissingAttributeException("type");
throw createMissingAttributeException(CloudEventV1.TYPE);
}

return new CloudEventV1(id, source, type, datacontenttype, dataschema, subject, time, this.data, this.extensions);
Expand All @@ -140,34 +140,34 @@ public CloudEventBuilder newBuilder() {
@Override
public CloudEventBuilder withAttribute(String name, String value) throws CloudEventRWException {
switch (name) {
case "id":
case CloudEventV1.ID:
withId(value);
return this;
case "source":
case CloudEventV1.SOURCE:
try {
withSource(new URI(value));
} catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("source", value, e);
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.SOURCE, value, e);
}
return this;
case "type":
case CloudEventV1.TYPE:
withType(value);
return this;
case "datacontenttype":
case CloudEventV1.DATACONTENTTYPE:
withDataContentType(value);
return this;
case "dataschema":
case CloudEventV1.DATASCHEMA:
try {
withDataSchema(new URI(value));
} catch (URISyntaxException e) {
throw CloudEventRWException.newInvalidAttributeValue("dataschema", value, e);
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.DATASCHEMA, value, e);
}
return this;
case "subject":
case CloudEventV1.SUBJECT:
withSubject(value);
return this;
case "time":
withTime(Time.parseTime("time", value));
case CloudEventV1.TIME:
withTime(Time.parseTime(CloudEventV1.TIME, value));
return this;
}
throw CloudEventRWException.newInvalidAttributeName(name);
Expand All @@ -176,10 +176,10 @@ public CloudEventBuilder withAttribute(String name, String value) throws CloudEv
@Override
public CloudEventBuilder withAttribute(String name, URI value) throws CloudEventRWException {
switch (name) {
case "source":
case CloudEventV1.SOURCE:
withSource(value);
return this;
case "dataschema":
case CloudEventV1.DATASCHEMA:
withDataSchema(value);
return this;
}
Expand All @@ -188,7 +188,7 @@ public CloudEventBuilder withAttribute(String name, URI value) throws CloudEvent

@Override
public CloudEventBuilder withAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
if ("time".equals(name)) {
if (CloudEventV1.TIME.equals(name)) {
withTime(value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cloudevents.http.impl;

import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;

import java.util.Collections;
import java.util.Map;
Expand All @@ -30,12 +31,12 @@ private CloudEventsHeaders() {}
public static final String CE_PREFIX = "ce-";

public static final Map<String, String> ATTRIBUTES_TO_HEADERS = Collections.unmodifiableMap(MessageUtils.generateAttributesToHeadersMapping(v -> {
if (v.equals("datacontenttype")) {
if (v.equals(CloudEventV1.DATACONTENTTYPE)) {
return CONTENT_TYPE;
}
return CE_PREFIX + v;
}));

public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cloudevents.kafka.impl;

import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

Expand All @@ -37,13 +39,13 @@ public class KafkaHeaders {

protected static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(
v -> {
if (v.equals("datacontenttype")) {
if (v.equals(CloudEventV1.DATACONTENTTYPE)) {
return CONTENT_TYPE;
}
return CE_PREFIX + v;
});

public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);

public static String getParsedKafkaHeader(Headers headers, String key) {
Header h = headers.lastHeader(key);
Expand Down
Loading