Skip to content

feat: add execution_id support for async stack #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2f81ad8
feat: add execution_id support for async stack
taeold Jun 15, 2025
2ef096d
refactor: move exception logging to crash handler for cleaner code
taeold Jun 15, 2025
d03052f
refactor: improve code organization based on feedback
taeold Jun 15, 2025
565e22c
fix: preserve execution context for exception handlers
taeold Jun 15, 2025
5278a0d
style: apply black and isort formatting
taeold Jun 15, 2025
8be2d2c
refactor: clean up async tests and remove redundant comments
taeold Jun 15, 2025
69919d7
chore: remove uv.lock from version control
taeold Jun 15, 2025
12b1f46
style: fix black formatting
taeold Jun 15, 2025
c473a2d
fix: skip async execution_id tests on Python 3.7
taeold Jun 15, 2025
8479649
refactor: reuse _enable_execution_id_logging from main module
taeold Jun 15, 2025
a04783e
chore: more cleanup.
taeold Jun 15, 2025
f260dcf
test: remove unnecessary pragma no cover for sync_wrapper
taeold Jun 15, 2025
2c4e880
test: improve coverage by removing unnecessary pragma no cover annota…
taeold Jun 15, 2025
d43f109
style: fix black formatting
taeold Jun 15, 2025
55a44f3
style: fix isort import ordering
taeold Jun 15, 2025
f216979
test: add back pragma no cover for genuinely hard-to-test edge cases
taeold Jun 15, 2025
054bbc4
refactor: simplify async decorator by removing dead code branch
taeold Jun 15, 2025
cffd0fa
fix: exclude async-specific code from py37 coverage
taeold Jun 16, 2025
b57b668
fix: improve async execution ID context propagation using contextvars
taeold Jun 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .coveragerc-py37
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ exclude_lines =
# Don't complain about async-specific imports and code
from functions_framework.aio import
from functions_framework._http.asgi import
from functions_framework._http.gunicorn import UvicornApplication
from functions_framework._http.gunicorn import UvicornApplication

# Exclude async-specific classes and functions in execution_id.py
class AsgiMiddleware:
async def __call__
def set_execution_context_async
8 changes: 6 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ def pytest_ignore_collect(collection_path, config):
if sys.version_info >= (3, 8):
return None

# Skip test_aio.py and test_asgi.py entirely on Python 3.7
if collection_path.name in ["test_aio.py", "test_asgi.py"]:
# Skip test_aio.py, test_asgi.py, and test_execution_id_async.py entirely on Python 3.7
if collection_path.name in [
"test_aio.py",
"test_asgi.py",
"test_execution_id_async.py",
]:
return True

return None
Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ functions_framework = ["py.typed"]

[tool.setuptools.package-dir]
"" = "src"

[dependency-groups]
dev = [
"black>=23.3.0",
"build>=1.1.1",
"isort>=5.11.5",
"pretend>=1.0.9",
"pytest>=7.4.4",
"pytest-asyncio>=0.21.2",
]
105 changes: 90 additions & 15 deletions src/functions_framework/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,26 @@
# limitations under the License.

import asyncio
import contextvars
import functools
import inspect
import json
import logging
import os
import re
import sys
import traceback

from typing import Any, Awaitable, Callable, Dict, Tuple, Union

from cloudevents.http import from_http
from cloudevents.http.event import CloudEvent

from functions_framework import _function_registry
from functions_framework import (
_enable_execution_id_logging,
_function_registry,
execution_id,
)
from functions_framework.exceptions import (
FunctionsFrameworkException,
MissingSourceException,
Expand Down Expand Up @@ -51,6 +61,21 @@
_FUNCTION_STATUS_HEADER_FIELD = "X-Google-Status"
_CRASH = "crash"


async def _crash_handler(request, exc):
logger = logging.getLogger()
tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
tb_text = "".join(tb_lines)
error_msg = (
f"Exception on {request.url.path} [{request.method}]\n{tb_text}".rstrip()
)

logger.error(error_msg)

headers = {_FUNCTION_STATUS_HEADER_FIELD: _CRASH}
return Response("Internal Server Error", status_code=500, headers=headers)


CloudEventFunction = Callable[[CloudEvent], Union[None, Awaitable[None]]]
HTTPFunction = Callable[[Request], Union[HTTPResponse, Awaitable[HTTPResponse]]]

Expand Down Expand Up @@ -96,29 +121,27 @@ def wrapper(*args, **kwargs):
return wrapper


async def _crash_handler(request, exc):
headers = {_FUNCTION_STATUS_HEADER_FIELD: _CRASH}
return Response(f"Internal Server Error: {exc}", status_code=500, headers=headers)


def _http_func_wrapper(function, is_async):
def _http_func_wrapper(function, is_async, enable_id_logging=False):
@execution_id.set_execution_context_async(enable_id_logging)
@functools.wraps(function)
async def handler(request):
if is_async:
result = await function(request)
else:
# TODO: Use asyncio.to_thread when we drop Python 3.8 support
# Python 3.8 compatible version of asyncio.to_thread
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, function, request)
ctx = contextvars.copy_context()
result = await loop.run_in_executor(None, ctx.run, function, request)
if isinstance(result, str):
return Response(result)
elif isinstance(result, dict):
return JSONResponse(result)
elif isinstance(result, tuple) and len(result) == 2:
# Support Flask-style tuple response
content, status_code = result
return Response(content, status_code=status_code)
if isinstance(content, dict):
return JSONResponse(content, status_code=status_code)
else:
return Response(content, status_code=status_code)
elif result is None:
raise HTTPException(status_code=500, detail="No response returned")
else:
Expand All @@ -127,7 +150,8 @@ async def handler(request):
return handler


def _cloudevent_func_wrapper(function, is_async):
def _cloudevent_func_wrapper(function, is_async, enable_id_logging=False):
@execution_id.set_execution_context_async(enable_id_logging)
@functools.wraps(function)
async def handler(request):
data = await request.body()
Expand All @@ -138,6 +162,7 @@ async def handler(request):
raise HTTPException(
400, detail=f"Bad Request: Got CloudEvent exception: {repr(e)}"
)

if is_async:
await function(event)
else:
Expand All @@ -154,6 +179,45 @@ async def _handle_not_found(request: Request):
raise HTTPException(status_code=404, detail="Not Found")


def _configure_app_execution_id_logging():
import logging
import logging.config

class AsyncExecutionIdHandler(logging.StreamHandler):
def emit(self, record):
context = execution_id.execution_context_var.get(None)

log_entry = {
"message": self.format(record),
"severity": record.levelname,
}

if context and context.execution_id:
log_entry["logging.googleapis.com/labels"] = {
"execution_id": context.execution_id
}

if context and context.span_id:
log_entry["logging.googleapis.com/spanId"] = context.span_id

try:
self.stream.write(json.dumps(log_entry) + "\n")
self.stream.flush()
except Exception:
super().emit(record)

root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)

for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)

handler = AsyncExecutionIdHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
handler.setLevel(logging.NOTSET)
root_logger.addHandler(handler)


def create_asgi_app(target=None, source=None, signature_type=None):
"""Create an ASGI application for the function.

Expand All @@ -175,14 +239,19 @@ def create_asgi_app(target=None, source=None, signature_type=None):
)

source_module, spec = _function_registry.load_function_module(source)

enable_id_logging = _enable_execution_id_logging()
if enable_id_logging:
_configure_app_execution_id_logging()

spec.loader.exec_module(source_module)
function = _function_registry.get_user_function(source, source_module, target)
signature_type = _function_registry.get_func_signature_type(target, signature_type)

is_async = inspect.iscoroutinefunction(function)
routes = []
if signature_type == _function_registry.HTTP_SIGNATURE_TYPE:
http_handler = _http_func_wrapper(function, is_async)
http_handler = _http_func_wrapper(function, is_async, enable_id_logging)
routes.append(
Route(
"/",
Expand All @@ -202,7 +271,9 @@ def create_asgi_app(target=None, source=None, signature_type=None):
)
)
elif signature_type == _function_registry.CLOUDEVENT_SIGNATURE_TYPE:
cloudevent_handler = _cloudevent_func_wrapper(function, is_async)
cloudevent_handler = _cloudevent_func_wrapper(
function, is_async, enable_id_logging
)
routes.append(
Route("/{path:path}", endpoint=cloudevent_handler, methods=["POST"])
)
Expand All @@ -222,9 +293,13 @@ def create_asgi_app(target=None, source=None, signature_type=None):
)

exception_handlers = {
500: _crash_handler,
Exception: _crash_handler,
}
app = Starlette(routes=routes, exception_handlers=exception_handlers)

# Apply ASGI middleware for execution ID
app = execution_id.AsgiMiddleware(app)

return app


Expand Down
Loading
Loading