Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/io/cloudevents/CloudEventData.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.cloudevents;

/**
Expand Down
40 changes: 40 additions & 0 deletions api/src/main/java/io/cloudevents/rw/CloudEventDataMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.cloudevents.rw;

import io.cloudevents.CloudEventData;

import javax.annotation.ParametersAreNonnullByDefault;

/**
* Interface to convert a {@link CloudEventData} instance to another one.
*/
@FunctionalInterface
@ParametersAreNonnullByDefault
public interface CloudEventDataMapper {

/**
* Map {@code data} to another {@link CloudEventData} instance.
*
* @param data the input data
* @return The new data
* @throws CloudEventRWException is anything goes wrong while mapping the input data
*/
CloudEventData map(CloudEventData data) throws CloudEventRWException;

}
14 changes: 13 additions & 1 deletion api/src/main/java/io/cloudevents/rw/CloudEventReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package io.cloudevents.rw;

import io.cloudevents.lang.Nullable;

import javax.annotation.ParametersAreNonnullByDefault;

/**
* Represents an object that can be read as CloudEvent
*/
@ParametersAreNonnullByDefault
public interface CloudEventReader {

/**
Expand All @@ -28,7 +33,14 @@ public interface CloudEventReader {
* @param writerFactory a factory that generates a visitor starting from the SpecVersion of the event
* @throws CloudEventRWException if something went wrong during the visit.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException;
default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException {
return read(writerFactory, null);
}

/**
* Like {@link CloudEventReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException;

/**
* Visit self attributes using the provided writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,22 @@ static CloudEventBuilder fromSpecVersion(@Nonnull SpecVersion version) {
);
}

/**
* Create a new builder starting from the values of the provided event.
*
* @param event event to copy values from
* @return the new builder
*/
static CloudEventBuilder from(@Nonnull CloudEvent event) {
switch (event.getSpecVersion()) {
case V1:
return CloudEventBuilder.v1(event);
case V03:
return CloudEventBuilder.v03(event);
}
throw new IllegalStateException(
"The provided spec version doesn't exist. Please make sure your io.cloudevents deps versions are aligned."
);
}

}
11 changes: 10 additions & 1 deletion core/src/main/java/io/cloudevents/core/format/EventFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package io.cloudevents.core.format;

import io.cloudevents.CloudEvent;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventDataMapper;

import javax.annotation.ParametersAreNonnullByDefault;
import java.util.Collections;
Expand Down Expand Up @@ -52,7 +54,14 @@ public interface EventFormat {
* @return the deserialized event.
* @throws EventDeserializationException if something goes wrong during deserialization.
*/
CloudEvent deserialize(byte[] bytes) throws EventDeserializationException;
default CloudEvent deserialize(byte[] bytes) throws EventDeserializationException {
return this.deserialize(bytes, null);
}

/**
* Like {@link EventFormat#deserialize(byte[])}, but allows a mapper that maps the parsed {@link io.cloudevents.CloudEventData} to another one.
*/
CloudEvent deserialize(byte[] bytes, @Nullable CloudEventDataMapper mapper) throws EventDeserializationException;

/**
* @return the set of content types this event format can deserialize. These content types are used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public Set<String> getExtensionNames() {
return this.extensions.keySet();
}

public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.getSpecVersion());
this.readAttributes(visitor);
this.readExtensions(visitor);

if (this.data != null) {
return visitor.end(this.data);
return visitor.end(mapper != null ? mapper.map(this.data) : this.data);
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public class CloudEventReaderAdapter implements CloudEventReader {
}

@Override
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws RuntimeException {
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) throws RuntimeException {
CloudEventWriter<R> visitor = writerFactory.create(event.getSpecVersion());
this.readAttributes(visitor);
this.readExtensions(visitor);

if (event.getData() != null) {
return visitor.end(event.getData());
return visitor.end(mapper != null ? mapper.map(event.getData()) : event.getData());
}

return visitor.end();
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/java/io/cloudevents/core/message/MessageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.*;

import javax.annotation.ParametersAreNonnullByDefault;
Expand All @@ -37,7 +37,14 @@ public interface MessageReader extends StructuredMessageReader, CloudEventReader
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException, IllegalStateException;
default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) throws CloudEventRWException, IllegalStateException {
return read(writerFactory, null);
}

/**
* Like {@link MessageReader#read(CloudEventWriterFactory)}, but providing a mapper for {@link io.cloudevents.CloudEventData} to be invoked when the data field is available.
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException;

/**
* Visit the message attributes as binary encoded event using the provided visitor.
Expand Down Expand Up @@ -99,11 +106,22 @@ default <BV extends CloudEventWriter<R>, R> R visit(MessageWriter<BV, R> visitor
* @throws IllegalStateException if the message has an unknown encoding.
*/
default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException {
return toEvent(null);
}

/**
* Translate this message into a {@link CloudEvent} representation.
*
* @return A {@link CloudEvent} with the contents of this message.
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message has an unknown encoding.
*/
default CloudEvent toEvent(@Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
switch (getEncoding()) {
case BINARY:
return this.read(CloudEventBuilder::fromSpecVersion);
return this.read(CloudEventBuilder::fromSpecVersion, mapper);
case STRUCTURED:
return this.read(EventFormat::deserialize);
return this.read((format, value) -> format.deserialize(value, mapper));
default:
throw new IllegalStateException("Unknown encoding");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventRWException;

import javax.annotation.ParametersAreNonnullByDefault;
Expand All @@ -42,6 +44,10 @@ default CloudEvent toEvent() throws CloudEventRWException, IllegalStateException
return this.read(EventFormat::deserialize);
}

default CloudEvent toEvent(@Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
return this.read((format, value) -> format.deserialize(value, mapper));
}

/**
* Create a generic structured message from a {@link CloudEvent}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected BaseGenericBinaryMessageReaderImpl(SpecVersion version, CloudEventData
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.version);

// Grab from headers the attributes and extensions
Expand All @@ -68,7 +68,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w

// Set the payload
if (this.body != null) {
return visitor.end(this.body);
return visitor.end(mapper != null ? mapper.map(this.body) : this.body);
}

return visitor.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rw.CloudEventAttributesWriter;
import io.cloudevents.rw.CloudEventExtensionsWriter;
import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.rw.CloudEventWriterFactory;
import io.cloudevents.rw.*;

public abstract class BaseStructuredMessageReader implements MessageReader {

Expand All @@ -32,7 +29,7 @@ public Encoding getEncoding() {
}

@Override
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory) {
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Encoding getEncoding() {
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
throw new IllegalStateException("Unknown encoding");
}

Expand Down
20 changes: 10 additions & 10 deletions core/src/test/java/io/cloudevents/core/mock/CSVFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.types.Time;

import java.net.URI;
Expand Down Expand Up @@ -57,7 +59,7 @@ public byte[] serialize(CloudEvent event) {
}

@Override
public CloudEvent deserialize(byte[] bytes) {
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) {
String[] splitted = new String(bytes, StandardCharsets.UTF_8).split(Pattern.quote(","));
SpecVersion sv = SpecVersion.parse(splitted[0]);

Expand All @@ -70,7 +72,7 @@ public CloudEvent deserialize(byte[] bytes) {
OffsetDateTime time = splitted[7].equals("null") ? null : Time.parseTime(splitted[7]);
byte[] data = splitted[8].equals("null") ? null : Base64.getDecoder().decode(splitted[8].getBytes());

io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1()
CloudEventBuilder builder = CloudEventBuilder.fromSpecVersion(sv)
.withId(id)
.withType(type)
.withSource(source);
Expand All @@ -88,15 +90,13 @@ public CloudEvent deserialize(byte[] bytes) {
builder.withTime(time);
}
if (data != null) {
builder.withData(data);
if (mapper != null) {
builder.withData(mapper.map(new BytesCloudEventData(data)));
} else {
builder.withData(data);
}
}
switch (sv) {
case V03:
return CloudEventBuilder.v03(builder.build()).build();
case V1:
return builder.build();
}
return null;
return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public MockBinaryMessageWriter(CloudEvent event) {
}

@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory) throws CloudEventRWException, IllegalStateException {
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException {
if (version == null) {
throw new IllegalStateException("MockBinaryMessage is empty");
}
Expand All @@ -72,7 +72,7 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
this.readExtensions(visitor);

if (this.data != null) {
return visitor.end(this.data);
return visitor.end(mapper != null ? mapper.map(this.data) : this.data);
}

return visitor.end();
Expand Down
Loading