|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 |
| -import functools |
| 15 | +import enum |
16 | 16 | import importlib.util
|
| 17 | +import io |
| 18 | +import json |
17 | 19 | import os.path
|
18 | 20 | import pathlib
|
19 | 21 | import sys
|
20 | 22 | import types
|
21 | 23 |
|
| 24 | +import cloudevents.sdk |
| 25 | +import cloudevents.sdk.event |
| 26 | +import cloudevents.sdk.event.v1 |
| 27 | +import cloudevents.sdk.marshaller |
22 | 28 | import flask
|
23 | 29 | import werkzeug
|
24 | 30 |
|
|
35 | 41 | DEFAULT_SIGNATURE_TYPE = "http"
|
36 | 42 |
|
37 | 43 |
|
| 44 | +class _EventType(enum.Enum): |
| 45 | + LEGACY = 1 |
| 46 | + CLOUDEVENT_BINARY = 2 |
| 47 | + CLOUDEVENT_STRUCTURED = 3 |
| 48 | + |
| 49 | + |
38 | 50 | class _Event(object):
|
39 | 51 | """Event passed to background functions."""
|
40 | 52 |
|
@@ -67,38 +79,83 @@ def view_func(path):
|
67 | 79 | return view_func
|
68 | 80 |
|
69 | 81 |
|
70 |
| -def _is_binary_cloud_event(request): |
71 |
| - return ( |
| 82 | +def _get_cloudevent_version(): |
| 83 | + return cloudevents.sdk.event.v1.Event() |
| 84 | + |
| 85 | + |
| 86 | +def _run_legacy_event(function, request): |
| 87 | + event_data = request.get_json() |
| 88 | + if not event_data: |
| 89 | + flask.abort(400) |
| 90 | + event_object = _Event(**event_data) |
| 91 | + data = event_object.data |
| 92 | + context = Context(**event_object.context) |
| 93 | + function(data, context) |
| 94 | + |
| 95 | + |
| 96 | +def _run_binary_cloudevent(function, request, cloudevent_def): |
| 97 | + data = io.BytesIO(request.get_data()) |
| 98 | + http_marshaller = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 99 | + event = http_marshaller.FromRequest( |
| 100 | + cloudevent_def, request.headers, data, json.load |
| 101 | + ) |
| 102 | + |
| 103 | + function(event) |
| 104 | + |
| 105 | + |
| 106 | +def _run_structured_cloudevent(function, request, cloudevent_def): |
| 107 | + data = io.StringIO(request.get_data(as_text=True)) |
| 108 | + m = cloudevents.sdk.marshaller.NewDefaultHTTPMarshaller() |
| 109 | + event = m.FromRequest(cloudevent_def, request.headers, data, json.loads) |
| 110 | + function(event) |
| 111 | + |
| 112 | + |
| 113 | +def _get_event_type(request): |
| 114 | + if ( |
72 | 115 | request.headers.get("ce-type")
|
73 | 116 | and request.headers.get("ce-specversion")
|
74 | 117 | and request.headers.get("ce-source")
|
75 | 118 | and request.headers.get("ce-id")
|
76 |
| - ) |
| 119 | + ): |
| 120 | + return _EventType.CLOUDEVENT_BINARY |
| 121 | + elif request.headers.get("Content-Type") == "application/cloudevents+json": |
| 122 | + return _EventType.CLOUDEVENT_STRUCTURED |
| 123 | + else: |
| 124 | + return _EventType.LEGACY |
77 | 125 |
|
78 | 126 |
|
79 | 127 | def _event_view_func_wrapper(function, request):
|
80 | 128 | def view_func(path):
|
81 |
| - if _is_binary_cloud_event(request): |
82 |
| - # Support CloudEvents in binary content mode, with data being the |
83 |
| - # whole request body and context attributes retrieved from request |
84 |
| - # headers. |
85 |
| - data = request.get_data() |
86 |
| - context = Context( |
87 |
| - eventId=request.headers.get("ce-eventId"), |
88 |
| - timestamp=request.headers.get("ce-timestamp"), |
89 |
| - eventType=request.headers.get("ce-eventType"), |
90 |
| - resource=request.headers.get("ce-resource"), |
| 129 | + if _get_event_type(request) == _EventType.LEGACY: |
| 130 | + _run_legacy_event(function, request) |
| 131 | + else: |
| 132 | + # here for defensive backwards compatibility in case we make a mistake in rollout. |
| 133 | + flask.abort( |
| 134 | + 400, |
| 135 | + description="The FUNCTION_SIGNATURE_TYPE for this function is set to event " |
| 136 | + "but no Google Cloud Functions Event was given. If you are using CloudEvents set " |
| 137 | + "FUNCTION_SIGNATURE_TYPE=cloudevent", |
91 | 138 | )
|
92 |
| - function(data, context) |
| 139 | + |
| 140 | + return "OK" |
| 141 | + |
| 142 | + return view_func |
| 143 | + |
| 144 | + |
| 145 | +def _cloudevent_view_func_wrapper(function, request): |
| 146 | + def view_func(path): |
| 147 | + cloudevent_def = _get_cloudevent_version() |
| 148 | + event_type = _get_event_type(request) |
| 149 | + if event_type == _EventType.CLOUDEVENT_STRUCTURED: |
| 150 | + _run_structured_cloudevent(function, request, cloudevent_def) |
| 151 | + elif event_type == _EventType.CLOUDEVENT_BINARY: |
| 152 | + _run_binary_cloudevent(function, request, cloudevent_def) |
93 | 153 | else:
|
94 |
| - # This is a regular CloudEvent |
95 |
| - event_data = request.get_json() |
96 |
| - if not event_data: |
97 |
| - flask.abort(400) |
98 |
| - event_object = _Event(**event_data) |
99 |
| - data = event_object.data |
100 |
| - context = Context(**event_object.context) |
101 |
| - function(data, context) |
| 154 | + flask.abort( |
| 155 | + 400, |
| 156 | + description="Function was defined with FUNCTION_SIGNATURE_TYPE=cloudevent " |
| 157 | + " but it did not receive a cloudevent as a request.", |
| 158 | + ) |
102 | 159 |
|
103 | 160 | return "OK"
|
104 | 161 |
|
@@ -193,19 +250,27 @@ def create_app(target=None, source=None, signature_type=None):
|
193 | 250 | app.url_map.add(werkzeug.routing.Rule("/<path:path>", endpoint="run"))
|
194 | 251 | app.view_functions["run"] = _http_view_func_wrapper(function, flask.request)
|
195 | 252 | app.view_functions["error"] = lambda: flask.abort(404, description="Not Found")
|
196 |
| - elif signature_type == "event": |
| 253 | + elif signature_type == "event" or signature_type == "cloudevent": |
197 | 254 | app.url_map.add(
|
198 | 255 | werkzeug.routing.Rule(
|
199 |
| - "/", defaults={"path": ""}, endpoint="run", methods=["POST"] |
| 256 | + "/", defaults={"path": ""}, endpoint=signature_type, methods=["POST"] |
200 | 257 | )
|
201 | 258 | )
|
202 | 259 | app.url_map.add(
|
203 |
| - werkzeug.routing.Rule("/<path:path>", endpoint="run", methods=["POST"]) |
| 260 | + werkzeug.routing.Rule( |
| 261 | + "/<path:path>", endpoint=signature_type, methods=["POST"] |
| 262 | + ) |
204 | 263 | )
|
205 |
| - app.view_functions["run"] = _event_view_func_wrapper(function, flask.request) |
| 264 | + |
206 | 265 | # Add a dummy endpoint for GET /
|
207 | 266 | app.url_map.add(werkzeug.routing.Rule("/", endpoint="get", methods=["GET"]))
|
208 | 267 | app.view_functions["get"] = lambda: ""
|
| 268 | + |
| 269 | + # Add the view functions |
| 270 | + app.view_functions["event"] = _event_view_func_wrapper(function, flask.request) |
| 271 | + app.view_functions["cloudevent"] = _cloudevent_view_func_wrapper( |
| 272 | + function, flask.request |
| 273 | + ) |
209 | 274 | else:
|
210 | 275 | raise FunctionsFrameworkException(
|
211 | 276 | "Invalid signature type: {signature_type}".format(
|
|
0 commit comments