Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ public interface CloudEventDataMapper<R extends CloudEventData> {
*/
R map(CloudEventData data) throws CloudEventRWException;

/**
* No-op identity mapper which can be used as default when no mapper is provided.
*/
static CloudEventDataMapper<CloudEventData> identity() {
return d -> d;
}
}
6 changes: 3 additions & 3 deletions api/src/main/java/io/cloudevents/rw/CloudEventReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.cloudevents.rw;

import io.cloudevents.lang.Nullable;
import io.cloudevents.CloudEventData;

import javax.annotation.ParametersAreNonnullByDefault;

Expand All @@ -36,12 +36,12 @@ public interface CloudEventReader {
* @throws CloudEventRWException if something went wrong during the read.
*/
default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException {
return read(writerFactory, null);
return read(writerFactory, CloudEventDataMapper.identity());
}

/**
* Like {@link CloudEventReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException;
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException;

}
4 changes: 2 additions & 2 deletions core/src/main/java/io/cloudevents/core/CloudEventUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static CloudEventContextReader toContextReader(CloudEvent event) {
* @return the reader implementation
*/
public static CloudEvent toEvent(CloudEventReader reader) throws CloudEventRWException {
return toEvent(reader, null);
return toEvent(reader, CloudEventDataMapper.identity());
}

/**
Expand All @@ -86,7 +86,7 @@ public static CloudEvent toEvent(CloudEventReader reader) throws CloudEventRWExc
* @param mapper the mapper to use when reading the data
* @return the reader implementation
*/
public static CloudEvent toEvent(CloudEventReader reader, @Nullable CloudEventDataMapper<?> mapper) throws CloudEventRWException {
public static CloudEvent toEvent(CloudEventReader reader, CloudEventDataMapper<?> mapper) throws CloudEventRWException {
return reader.read(CloudEventBuilder::fromSpecVersion, mapper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package io.cloudevents.core.format;

import io.cloudevents.CloudEvent;
import io.cloudevents.lang.Nullable;
import io.cloudevents.CloudEventData;
import io.cloudevents.rw.CloudEventDataMapper;

import javax.annotation.ParametersAreNonnullByDefault;
Expand Down Expand Up @@ -61,7 +61,7 @@ default CloudEvent deserialize(byte[] bytes) throws EventDeserializationExceptio
/**
* Like {@link EventFormat#deserialize(byte[])}, but allows a mapper that maps the parsed {@link io.cloudevents.CloudEventData} to another one.
*/
CloudEvent deserialize(byte[] bytes, @Nullable CloudEventDataMapper mapper) throws EventDeserializationException;
CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) throws EventDeserializationException;

/**
* @return the set of content types this event format can deserialize. These content types are used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public Set<String> getExtensionNames() {
return this.extensions.keySet();
}

public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.getSpecVersion());
this.readAttributes(visitor);
this.readExtensions(visitor);

if (this.data != null) {
return visitor.end(mapper != null ? mapper.map(this.data) : this.data);
return visitor.end(mapper.map(this.data));
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.cloudevents.core.impl;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.rw.*;

public class CloudEventReaderAdapter implements CloudEventReader, CloudEventContextReader {
Expand All @@ -29,13 +30,13 @@ public CloudEventReaderAdapter(CloudEvent event) {
}

@Override
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) throws RuntimeException {
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws RuntimeException {
CloudEventWriter<R> visitor = writerFactory.create(event.getSpecVersion());
this.readAttributes(visitor);
this.readExtensions(visitor);

if (event.getData() != null) {
return visitor.end(mapper != null ? mapper.map(event.getData()) : event.getData());
return visitor.end(mapper.map(event.getData()));
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.*;

import javax.annotation.ParametersAreNonnullByDefault;
Expand All @@ -39,13 +38,13 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader
* @throws IllegalStateException if the message is not in binary encoding.
*/
default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException, IllegalStateException {
return read(writerFactory, null);
return read(writerFactory, CloudEventDataMapper.identity());
}

/**
* Like {@link MessageReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException;
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException;

/**
* Visit the message as structured encoded event using the provided visitor
Expand Down Expand Up @@ -89,7 +88,7 @@ default <BV extends CloudEventWriter<R>, R> R read(MessageWriter<BV, R> visitor)
* @throws IllegalStateException if the message has an unknown encoding.
*/
default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException {
return toEvent(null);
return toEvent(CloudEventDataMapper.identity());
}

/**
Expand All @@ -99,7 +98,7 @@ default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message has an unknown encoding.
*/
default CloudEvent toEvent(@Nullable CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
default CloudEvent toEvent(CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
switch (getEncoding()) {
case BINARY:
return CloudEventUtils.toEvent(this, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.cloudevents.CloudEventData;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventRWException;

Expand All @@ -45,7 +44,7 @@ default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException
return this.read(EventFormat::deserialize);
}

default CloudEvent toEvent(@Nullable CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
default CloudEvent toEvent(CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
return this.read((format, value) -> format.deserialize(value, mapper));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected BaseGenericBinaryMessageReaderImpl(SpecVersion version, CloudEventData
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.version);

// Grab from headers the attributes and extensions
Expand All @@ -72,7 +72,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w

// Set the payload
if (this.body != null) {
return visitor.end(mapper != null ? mapper.map(this.body) : this.body);
return visitor.end(mapper.map(this.body));
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.cloudevents.core.message.impl;

import io.cloudevents.CloudEventData;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rw.CloudEventDataMapper;
Expand All @@ -31,7 +32,7 @@ public Encoding getEncoding() {
}

@Override
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) {
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.cloudevents.core.message.impl;

import io.cloudevents.CloudEventData;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.StructuredMessageWriter;
Expand All @@ -32,7 +33,7 @@ public Encoding getEncoding() {
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
throw new IllegalStateException("Unknown encoding");
}

Expand Down
9 changes: 3 additions & 6 deletions core/src/test/java/io/cloudevents/core/mock/CSVFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.cloudevents.core.mock;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
Expand Down Expand Up @@ -59,7 +60,7 @@ public byte[] serialize(CloudEvent event) {
}

@Override
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) {
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) {
String[] splitted = new String(bytes, StandardCharsets.UTF_8).split(Pattern.quote(","));
SpecVersion sv = SpecVersion.parse(splitted[0]);

Expand Down Expand Up @@ -90,11 +91,7 @@ public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) {
builder.withTime(time);
}
if (data != null) {
if (mapper != null) {
builder.withData(mapper.map(new BytesCloudEventData(data)));
} else {
builder.withData(data);
}
builder.withData(mapper.map(new BytesCloudEventData(data)));
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public MockBinaryMessageWriter(CloudEvent event) {
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
if (version == null) {
throw new IllegalStateException("MockBinaryMessage is empty");
}
Expand All @@ -72,7 +72,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
this.readExtensions(visitor);

if (this.data != null) {
return visitor.end(mapper != null ? mapper.map(this.data) : this.data);
return visitor.end(mapper.map(this.data));
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public JsonMessage(JsonParser p, ObjectNode node) {
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
try {
SpecVersion specVersion = SpecVersion.parse(getStringNode(this.node, this.p, "specversion"));
CloudEventWriter<V> visitor = writerFactory.create(specVersion);
Expand Down Expand Up @@ -144,7 +144,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
});

if (data != null) {
return visitor.end(mapper != null ? mapper.map(data) : data);
return visitor.end(mapper.map(data));
}
return visitor.end();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.format.EventFormat;
Expand Down Expand Up @@ -85,15 +86,15 @@ public CloudEvent deserialize(byte[] bytes) throws EventDeserializationException
}

@Override
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException {
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) throws EventDeserializationException {
CloudEvent deserialized = this.deserialize(bytes);
if (deserialized.getData() == null) {
return deserialized;
}
try {
return CloudEventBuilder.from(deserialized).withData(
mapper != null ? mapper.map(deserialized.getData()) : deserialized.getData()
).build();
return CloudEventBuilder.from(deserialized)
.withData(mapper.map(deserialized.getData()))
.build();
} catch (CloudEventRWException e) {
throw new EventDeserializationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void deserializerShouldWork() {

@Test
public void deserializerWithMapper() {
CloudEventDataMapper mapper = data -> MyCloudEventData.fromStringBytes(data.toBytes());
CloudEventDataMapper<MyCloudEventData> mapper = data -> MyCloudEventData.fromStringBytes(data.toBytes());

CloudEventDeserializer deserializer = new CloudEventDeserializer();
HashMap<String, Object> config = new HashMap<>();
Expand Down