Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions api/src/main/java/io/cloudevents/rw/CloudEventContextReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.cloudevents.rw;

import javax.annotation.ParametersAreNonnullByDefault;

/**
* Represents an object that can be read as CloudEvent context attributes and extensions.
* <p>
* An object (in particular, buffered objects) can implement both this interface and {@link CloudEventReader}.
*/
@ParametersAreNonnullByDefault
public interface CloudEventContextReader {

/**
* Visit self attributes using the provided writer
*
* @param writer Attributes writer
* @throws CloudEventRWException if something went wrong during the visit.
*/
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException;

/**
* Visit self extensions using the provided writer
*
* @param visitor Extensions writer
* @throws CloudEventRWException if something went wrong during the visit.
*/
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException;

}
20 changes: 3 additions & 17 deletions api/src/main/java/io/cloudevents/rw/CloudEventReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import javax.annotation.ParametersAreNonnullByDefault;

/**
* Represents an object that can be read as CloudEvent
* Represents an object that can be read as CloudEvent.
* <p>
* The read may consume this object, hence it's not safe to invoke it multiple times, unless it's explicitly allowed by the implementer.
*/
@ParametersAreNonnullByDefault
public interface CloudEventReader {
Expand All @@ -42,20 +44,4 @@ default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R>
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException;

/**
* Visit self attributes using the provided writer
*
* @param writer Attributes writer
* @throws CloudEventRWException if something went wrong during the visit.
*/
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException;

/**
* Visit self extensions using the provided writer
*
* @param visitor Extensions writer
* @throws CloudEventRWException if something went wrong during the visit.
*/
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Map;
import java.util.Set;

public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader {
public abstract class BaseCloudEvent implements CloudEvent, CloudEventReader, CloudEventContextReader {

private final CloudEventData data;
protected final Map<String, Object> extensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.rw.*;

public class CloudEventReaderAdapter implements CloudEventReader {
public class CloudEventReaderAdapter implements CloudEventReader, CloudEventContextReader {

private CloudEvent event;
private final CloudEvent event;

CloudEventReaderAdapter(CloudEvent event) {
this.event = event;
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/java/io/cloudevents/core/impl/CloudEventUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventReader;

Expand All @@ -29,7 +30,9 @@ private CloudEventUtils() {}

/**
* Convert a {@link CloudEvent} to a {@link CloudEventReader}. This method provides a default implementation
* for CloudEvent that doesn't implement CloudEventVisitable
* for CloudEvent that doesn't implement CloudEventVisitable.
* <p>
* It's safe to use the returned {@link CloudEventReader} multiple times.
*
* @param event the event to convert
* @return the visitable implementation
Expand All @@ -42,6 +45,23 @@ public static CloudEventReader toVisitable(CloudEvent event) {
}
}

/**
* Convert a {@link CloudEvent} to a {@link CloudEventContextReader}. This method provides a default implementation
* for {@link CloudEvent} that doesn't implement {@link CloudEventContextReader}.
* <p>
* It's safe to use the returned {@link CloudEventReader} multiple times.
*
* @param event the event to convert
* @return the context reader implementation
*/
public static CloudEventContextReader toContextReader(CloudEvent event) {
if (event instanceof CloudEventContextReader) {
return (CloudEventContextReader) event;
} else {
return new CloudEventReaderAdapter(event);
}
}

/**
* Get the data contained in {@code event} and map it using the provided mapper.
*/
Expand Down
24 changes: 3 additions & 21 deletions core/src/main/java/io/cloudevents/core/message/MessageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,6 @@ default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R>
*/
<V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, @Nullable CloudEventDataMapper mapper) throws CloudEventRWException, IllegalStateException;

/**
* Visit the message attributes as binary encoded event using the provided visitor.
*
* @param writer Attributes visitor
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding.
*/
void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException, IllegalStateException;

/**
* Visit the message extensions as binary encoded event using the provided visitor.
*
* @param visitor Extensions visitor
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message is not in binary encoding.
*/
void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException, IllegalStateException;

/**
* Visit the message as structured encoded event using the provided visitor
*
Expand All @@ -80,15 +62,15 @@ default <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R>
Encoding getEncoding();

/**
* Visit the event using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without
* Read the content of this object using a {@link MessageWriter}. This method allows to transcode an event from one transport to another without
* converting it to {@link CloudEvent}. The resulting encoding will be the same as the original encoding.
*
* @param visitor the MessageVisitor accepting this Message
* @return The return value of the MessageVisitor
* @throws CloudEventRWException if something went wrong during the visit.
* @throws IllegalStateException if the message has an unknown encoding.
* @throws IllegalStateException if the message has an unknown encoding.
*/
default <BV extends CloudEventWriter<R>, R> R visit(MessageWriter<BV, R> visitor) throws CloudEventRWException, IllegalStateException {
default <BV extends CloudEventWriter<R>, R> R read(MessageWriter<BV, R> visitor) throws CloudEventRWException, IllegalStateException {
switch (getEncoding()) {
case BINARY:
return this.read((CloudEventWriterFactory<BV, R>) visitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.rw.*;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.rw.CloudEventWriterFactory;

import java.util.Objects;
import java.util.function.BiConsumer;

/**
* This class implements a Binary {@link io.cloudevents.core.message.MessageReader}, providing common logic to most protocol bindings
* which supports both Binary and Structured mode.
* This class implements a Binary {@link io.cloudevents.core.message.MessageReader},
* providing common logic to most protocol bindings which supports both Binary and Structured mode.
* <p>
* Content-type is handled separately using a key not prefixed with CloudEvents header prefix.
*
* @param <HK> Header key type
Expand Down Expand Up @@ -74,36 +78,6 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
return visitor.end();
}

@Override
public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException {
this.forEachHeader((key, value) -> {
if (isContentTypeHeader(key)) {
writer.withAttribute("datacontenttype", toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (name.equals("specversion")) {
return;
}
if (this.version.getAllAttributes().contains(name)) {
writer.withAttribute(name, toCloudEventsValue(value));
}
}
});
}

@Override
public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException {
// Grab from headers the attributes and extensions
this.forEachHeader((key, value) -> {
if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (!this.version.getAllAttributes().contains(name)) {
visitor.withExtension(name, toCloudEventsValue(value));
}
}
});
}

protected abstract boolean isContentTypeHeader(HK key);

protected abstract boolean isCloudEventsHeader(HK key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rw.*;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.rw.CloudEventWriterFactory;

public abstract class BaseStructuredMessageReader implements MessageReader {

Expand All @@ -32,14 +34,4 @@ public Encoding getEncoding() {
public <V extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<V, R> writerFactory, CloudEventDataMapper mapper) {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}

@Override
public void readAttributes(CloudEventAttributesWriter writer) throws RuntimeException {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}

@Override
public void readExtensions(CloudEventExtensionsWriter visitor) throws RuntimeException {
throw MessageUtils.generateWrongEncoding(Encoding.BINARY, Encoding.STRUCTURED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.StructuredMessageWriter;
import io.cloudevents.rw.*;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import io.cloudevents.rw.CloudEventWriterFactory;

public class UnknownEncodingMessageReader implements MessageReader {
@Override
Expand All @@ -33,16 +36,6 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
throw new IllegalStateException("Unknown encoding");
}

@Override
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException {
throw new IllegalStateException("Unknown encoding");
}

@Override
public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException {
throw new IllegalStateException("Unknown encoding");
}

@Override
public <T> T read(StructuredMessageWriter<T> visitor) throws CloudEventRWException, IllegalStateException {
throw new IllegalStateException("Unknown encoding");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public CloudEventBuilder(io.cloudevents.CloudEvent event) {
@Override
protected void setAttributes(io.cloudevents.CloudEvent event) {
if (event.getSpecVersion() == SpecVersion.V03) {
CloudEventUtils.toVisitable(event).readAttributes(this);
CloudEventUtils.toContextReader(event).readAttributes(this);
} else {
CloudEventUtils.toVisitable(event).readAttributes(new V1ToV03AttributesConverter(this));
CloudEventUtils.toContextReader(event).readAttributes(new V1ToV03AttributesConverter(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public CloudEventBuilder(io.cloudevents.CloudEvent event) {
@Override
protected void setAttributes(io.cloudevents.CloudEvent event) {
if (event.getSpecVersion() == SpecVersion.V1) {
CloudEventUtils.toVisitable(event).readAttributes(this);
CloudEventUtils.toContextReader(event).readAttributes(this);
} else {
CloudEventUtils.toVisitable(event).readAttributes(new V03ToV1AttributesConverter(this));
CloudEventUtils.toContextReader(event).readAttributes(new V03ToV1AttributesConverter(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.HashMap;
import java.util.Map;

public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> {
public class MockBinaryMessageWriter extends BaseBinaryMessageReader implements MessageReader, CloudEventContextReader, CloudEventWriterFactory<MockBinaryMessageWriter, MockBinaryMessageWriter>, CloudEventWriter<MockBinaryMessageWriter> {

private SpecVersion version;
private Map<String, Object> attributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,6 @@ public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> w
}
}

@Override
public void readAttributes(CloudEventAttributesWriter writer) throws CloudEventRWException {
// no-op no need for that
}

@Override
public void readExtensions(CloudEventExtensionsWriter visitor) throws CloudEventRWException {
// no-op no need for that
}

private String getStringNode(ObjectNode objNode, JsonParser p, String attributeName) throws JsonProcessingException {
String val = getOptionalStringNode(objNode, p, attributeName);
if (val == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import io.cloudevents.CloudEventData;
import io.cloudevents.core.impl.CloudEventUtils;
import io.cloudevents.rw.CloudEventAttributesWriter;
import io.cloudevents.rw.CloudEventContextReader;
import io.cloudevents.rw.CloudEventExtensionsWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventReader;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void serialize(CloudEvent value, JsonGenerator gen, SerializerProvider pr

// Serialize attributes
try {
CloudEventReader visitable = CloudEventUtils.toVisitable(value);
CloudEventContextReader visitable = CloudEventUtils.toContextReader(value);
FieldsSerializer serializer = new FieldsSerializer(gen, provider);
visitable.readAttributes(serializer);
visitable.readExtensions(serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static MessageReader createReaderFromMultimap(Map<String, List<String>> h
* @param sendBody a function that sends body (e.g. sets HTTP status code, content-length and writes the bytes into output stream).
* @return a message writer
*/
public static MessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody) {
public static HttpMessageWriter createWriter(BiConsumer<String, String> putHeader, Consumer<byte[]> sendBody) {
return new HttpMessageWriter(putHeader, sendBody);
}

Expand Down
Loading