|
| 1 | +# Copyright 2018-Present The CloudEvents Authors |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | +# not use this file except in compliance with the License. You may obtain |
| 5 | +# a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | +# License for the specific language governing permissions and limitations |
| 13 | +# under the License. |
| 14 | +# |
| 15 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 16 | +# not use this file except in compliance with the License. You may obtain |
| 17 | +# a copy of the License at |
| 18 | +# |
| 19 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 20 | +# |
| 21 | +# Unless required by applicable law or agreed to in writing, software |
| 22 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 23 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 24 | +# License for the specific language governing permissions and limitations |
| 25 | +# under the License. |
| 26 | +import json |
| 27 | +import typing |
| 28 | + |
| 29 | +from cloudevents import exceptions as cloud_exceptions |
| 30 | +from cloudevents.abstract import AnyCloudEvent |
| 31 | +from cloudevents.http import is_binary |
| 32 | +from cloudevents.http.mappings import _marshaller_by_format, _obj_by_version |
| 33 | +from cloudevents.http.util import _json_or_string |
| 34 | +from cloudevents.sdk import converters, marshaller, types |
| 35 | + |
| 36 | + |
| 37 | +def to_json( |
| 38 | + event: AnyCloudEvent, |
| 39 | + data_marshaller: types.MarshallerType = None, |
| 40 | +) -> typing.Union[str, bytes]: |
| 41 | + """ |
| 42 | + Converts given `event` to a JSON string. |
| 43 | +
|
| 44 | + :param event: A CloudEvent to be converted into a JSON string. |
| 45 | + :param data_marshaller: Callable function which will cast `event.data` |
| 46 | + into a JSON string. |
| 47 | + :returns: A JSON string representing the given event. |
| 48 | + """ |
| 49 | + return to_structured(event, data_marshaller=data_marshaller)[1] |
| 50 | + |
| 51 | + |
| 52 | +def from_json( |
| 53 | + event_type: typing.Type[AnyCloudEvent], |
| 54 | + data: typing.Union[str, bytes], |
| 55 | + data_unmarshaller: types.UnmarshallerType = None, |
| 56 | +) -> AnyCloudEvent: |
| 57 | + """ |
| 58 | + Parses JSON string `data` into a CloudEvent. |
| 59 | +
|
| 60 | + :param data: JSON string representation of a CloudEvent. |
| 61 | + :param data_unmarshaller: Callable function that casts `data` to a |
| 62 | + Python object. |
| 63 | + :param event_type: A concrete type of the event into which the data is |
| 64 | + deserialized. |
| 65 | + :returns: A CloudEvent parsed from the given JSON representation. |
| 66 | + """ |
| 67 | + return from_http( |
| 68 | + headers={}, |
| 69 | + data=data, |
| 70 | + data_unmarshaller=data_unmarshaller, |
| 71 | + event_type=event_type, |
| 72 | + ) |
| 73 | + |
| 74 | + |
| 75 | +def from_http( |
| 76 | + event_type: typing.Type[AnyCloudEvent], |
| 77 | + headers: typing.Dict[str, str], |
| 78 | + data: typing.Union[str, bytes, None], |
| 79 | + data_unmarshaller: types.UnmarshallerType = None, |
| 80 | +) -> AnyCloudEvent: |
| 81 | + """ |
| 82 | + Parses CloudEvent `data` and `headers` into an instance of a given `event_type`. |
| 83 | +
|
| 84 | + The method supports both binary and structured representations. |
| 85 | +
|
| 86 | + :param headers: The HTTP request headers. |
| 87 | + :param data: The HTTP request body. If set to None, "" or b'', the returned |
| 88 | + event's `data` field will be set to None. |
| 89 | + :param data_unmarshaller: Callable function to map data to a python object |
| 90 | + e.g. lambda x: x or lambda x: json.loads(x) |
| 91 | + :param event_type: The actual type of CloudEvent to deserialize the event to. |
| 92 | + :returns: A CloudEvent instance parsed from the passed HTTP parameters of |
| 93 | + the specified type. |
| 94 | + """ |
| 95 | + if data is None or data == b"": |
| 96 | + # Empty string will cause data to be marshalled into None |
| 97 | + data = "" |
| 98 | + |
| 99 | + if not isinstance(data, (str, bytes, bytearray)): |
| 100 | + raise cloud_exceptions.InvalidStructuredJSON( |
| 101 | + "Expected json of type (str, bytes, bytearray), " |
| 102 | + f"but instead found type {type(data)}" |
| 103 | + ) |
| 104 | + |
| 105 | + headers = {key.lower(): value for key, value in headers.items()} |
| 106 | + if data_unmarshaller is None: |
| 107 | + data_unmarshaller = _json_or_string |
| 108 | + |
| 109 | + marshall = marshaller.NewDefaultHTTPMarshaller() |
| 110 | + |
| 111 | + if is_binary(headers): |
| 112 | + specversion = headers.get("ce-specversion", None) |
| 113 | + else: |
| 114 | + try: |
| 115 | + raw_ce = json.loads(data) |
| 116 | + except json.decoder.JSONDecodeError: |
| 117 | + raise cloud_exceptions.MissingRequiredFields( |
| 118 | + "Failed to read specversion from both headers and data. " |
| 119 | + f"The following can not be parsed as json: {data}" |
| 120 | + ) |
| 121 | + if hasattr(raw_ce, "get"): |
| 122 | + specversion = raw_ce.get("specversion", None) |
| 123 | + else: |
| 124 | + raise cloud_exceptions.MissingRequiredFields( |
| 125 | + "Failed to read specversion from both headers and data. " |
| 126 | + f"The following deserialized data has no 'get' method: {raw_ce}" |
| 127 | + ) |
| 128 | + |
| 129 | + if specversion is None: |
| 130 | + raise cloud_exceptions.MissingRequiredFields( |
| 131 | + "Failed to find specversion in HTTP request" |
| 132 | + ) |
| 133 | + |
| 134 | + event_handler = _obj_by_version.get(specversion, None) |
| 135 | + |
| 136 | + if event_handler is None: |
| 137 | + raise cloud_exceptions.InvalidRequiredFields( |
| 138 | + f"Found invalid specversion {specversion}" |
| 139 | + ) |
| 140 | + |
| 141 | + event = marshall.FromRequest( |
| 142 | + event_handler(), headers, data, data_unmarshaller=data_unmarshaller |
| 143 | + ) |
| 144 | + attrs = event.Properties() |
| 145 | + attrs.pop("data", None) |
| 146 | + attrs.pop("extensions", None) |
| 147 | + attrs.update(**event.extensions) |
| 148 | + |
| 149 | + if event.data == "" or event.data == b"": |
| 150 | + # TODO: Check binary unmarshallers to debug why setting data to "" |
| 151 | + # returns an event with data set to None, but structured will return "" |
| 152 | + data = None |
| 153 | + else: |
| 154 | + data = event.data |
| 155 | + return event_type.create(attrs, data) |
| 156 | + |
| 157 | + |
| 158 | +def _to_http( |
| 159 | + event: AnyCloudEvent, |
| 160 | + format: str = converters.TypeStructured, |
| 161 | + data_marshaller: types.MarshallerType = None, |
| 162 | +) -> typing.Tuple[dict, typing.Union[bytes, str]]: |
| 163 | + """ |
| 164 | + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. |
| 165 | +
|
| 166 | + :param format: The encoding format of the event. |
| 167 | + :param data_marshaller: Callable function that casts event.data into |
| 168 | + either a string or bytes. |
| 169 | + :returns: (http_headers: dict, http_body: bytes or str) |
| 170 | + """ |
| 171 | + if data_marshaller is None: |
| 172 | + data_marshaller = _marshaller_by_format[format] |
| 173 | + |
| 174 | + if event["specversion"] not in _obj_by_version: |
| 175 | + raise cloud_exceptions.InvalidRequiredFields( |
| 176 | + f"Unsupported specversion: {event['specversion']}" |
| 177 | + ) |
| 178 | + |
| 179 | + event_handler = _obj_by_version[event["specversion"]]() |
| 180 | + for attribute_name in event: |
| 181 | + event_handler.Set(attribute_name, event[attribute_name]) |
| 182 | + event_handler.data = event.data |
| 183 | + |
| 184 | + return marshaller.NewDefaultHTTPMarshaller().ToRequest( |
| 185 | + event_handler, format, data_marshaller=data_marshaller |
| 186 | + ) |
| 187 | + |
| 188 | + |
| 189 | +def to_structured( |
| 190 | + event: AnyCloudEvent, |
| 191 | + data_marshaller: types.MarshallerType = None, |
| 192 | +) -> typing.Tuple[dict, typing.Union[bytes, str]]: |
| 193 | + """ |
| 194 | + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. |
| 195 | +
|
| 196 | + If event.data is a byte object, body will have a `data_base64` field instead of |
| 197 | + `data`. |
| 198 | +
|
| 199 | + :param event: The event to be converted. |
| 200 | + :param data_marshaller: Callable function to cast event.data into |
| 201 | + either a string or bytes |
| 202 | + :returns: (http_headers: dict, http_body: bytes or str) |
| 203 | + """ |
| 204 | + return _to_http(event=event, data_marshaller=data_marshaller) |
| 205 | + |
| 206 | + |
| 207 | +def to_binary( |
| 208 | + event: AnyCloudEvent, data_marshaller: types.MarshallerType = None |
| 209 | +) -> typing.Tuple[dict, typing.Union[bytes, str]]: |
| 210 | + """ |
| 211 | + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. |
| 212 | +
|
| 213 | + Uses Binary conversion format. |
| 214 | +
|
| 215 | + :param event: The event to be converted. |
| 216 | + :param data_marshaller: Callable function to cast event.data into |
| 217 | + either a string or bytes. |
| 218 | + :returns: (http_headers: dict, http_body: bytes or str) |
| 219 | + """ |
| 220 | + return _to_http( |
| 221 | + event=event, |
| 222 | + format=converters.TypeBinary, |
| 223 | + data_marshaller=data_marshaller, |
| 224 | + ) |
0 commit comments