Skip to content

Commit 32f44bf

Browse files
committed
run_code works
1 parent 6b74a8b commit 32f44bf

File tree

2 files changed

+90
-27
lines changed

2 files changed

+90
-27
lines changed

azure_functions_worker/dispatcher.py

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import asyncio
99
import concurrent.futures
10+
import json
1011
import logging
1112
import os
1213
import platform
@@ -62,6 +63,7 @@
6263
from .utils.app_setting_manager import get_python_appsetting_state
6364
from .utils.common import get_app_setting, is_envvar_true, validate_script_file_name
6465
from .utils.dependency import DependencyManager
66+
from .utils.kernel_manager import KernelHandler
6567
from .utils.tracing import marshall_exception_trace
6668
from .utils.wrappers import disable_feature_by
6769
from .version import VERSION
@@ -601,9 +603,14 @@ async def _handle__invocation_request(self, request):
601603
current_task.set_azure_invocation_id(invocation_id)
602604

603605
try:
606+
# Execute things here
607+
604608
fi: functions.FunctionInfo = self._functions.get_function(
605609
function_id)
606610
assert fi is not None
611+
logger.info("Starting kernel handler...")
612+
kernel_handler = KernelHandler()
613+
logger.info("Kernel handler successfully started!")
607614

608615
function_invocation_logs: List[str] = [
609616
'Received FunctionInvocationRequest',
@@ -655,36 +662,44 @@ async def _handle__invocation_request(self, request):
655662

656663
fi_context = self._get_context(invoc_request, fi.name,
657664
fi.directory)
665+
666+
logger.info("Starting execution using kernel...")
667+
code_json = invoc_request.trigger_metadata.get('mcptoolargs').json
668+
logger.info("Full request: %s, Received code from request: %s, Type of object: %s", invoc_request, code_json, type(code_json))
669+
code = json.dumps(code_json)
670+
logger.info("Parsed code from request: %s", code)
671+
call_result = kernel_handler.run_code(code)
672+
logger.info("Execution completed successfully! Result: %s", call_result)
658673

659674
# Use local thread storage to store the invocation ID
660675
# for a customer's threads
661-
fi_context.thread_local_storage.invocation_id = invocation_id
662-
if fi.requires_context:
663-
args['context'] = fi_context
664-
665-
if fi.output_types:
666-
for name in fi.output_types:
667-
args[name] = bindings.Out()
668-
669-
if fi.is_async:
670-
if self._azure_monitor_available or self._otel_libs_available:
671-
self.configure_opentelemetry(fi_context)
672-
673-
call_result = \
674-
await self._run_async_func(fi_context, fi.func, args)
675-
else:
676-
call_result = await self._loop.run_in_executor(
677-
self._sync_call_tp,
678-
self._run_sync_func,
679-
invocation_id, fi_context, fi.func, args)
680-
681-
if call_result is not None and not fi.has_return:
682-
raise RuntimeError(
683-
f'function {fi.name!r} without a $return binding'
684-
'returned a non-None value')
685-
686-
if http_v2_enabled:
687-
http_coordinator.set_http_response(invocation_id, call_result)
676+
# fi_context.thread_local_storage.invocation_id = invocation_id
677+
# if fi.requires_context:
678+
# args['context'] = fi_context
679+
680+
# if fi.output_types:
681+
# for name in fi.output_types:
682+
# args[name] = bindings.Out()
683+
684+
# if fi.is_async:
685+
# if self._azure_monitor_available or self._otel_libs_available:
686+
# self.configure_opentelemetry(fi_context)
687+
688+
# call_result = \
689+
# await self._run_async_func(fi_context, fi.func, args)
690+
# else:
691+
# call_result = await self._loop.run_in_executor(
692+
# self._sync_call_tp,
693+
# self._run_sync_func,
694+
# invocation_id, fi_context, fi.func, args)
695+
696+
# if call_result is not None and not fi.has_return:
697+
# raise RuntimeError(
698+
# f'function {fi.name!r} without a $return binding'
699+
# 'returned a non-None value')
700+
701+
# if http_v2_enabled:
702+
# http_coordinator.set_http_response(invocation_id, call_result)
688703

689704
output_data = []
690705
cache_enabled = self._function_data_cache_enabled
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import os
2+
import jupyter_client
3+
from ipykernel.kernelspec import install as install_ipykernel
4+
from jupyter_client.kernelspec import KernelSpecManager
5+
6+
class KernelHandler:
7+
def __init__(self, kernel_name="func-kernel"):
8+
self.kernel_name = kernel_name
9+
self._ensure_kernel_registered()
10+
self.km = jupyter_client.KernelManager(kernel_name=self.kernel_name)
11+
self.km.start_kernel()
12+
self.kc = self.km.client()
13+
self.kc.start_channels()
14+
self._wait_for_ready()
15+
16+
def _ensure_kernel_registered(self):
17+
ksm = KernelSpecManager()
18+
try:
19+
ksm.get_kernel_spec(self.kernel_name)
20+
except jupyter_client.kernelspec.NoSuchKernel:
21+
print(f"Kernel '{self.kernel_name}' not found. Installing...")
22+
install_ipykernel(user=True, kernel_name=self.kernel_name, display_name="Function App Kernel")
23+
print(f"Kernel '{self.kernel_name}' installed.")
24+
25+
def _wait_for_ready(self):
26+
self.kc.wait_for_ready(timeout=10)
27+
28+
def run_code(self, code):
29+
msg_id = self.kc.execute(code)
30+
result = ""
31+
while True:
32+
msg = self.kc.get_iopub_msg(timeout=5)
33+
if msg['parent_header'].get('msg_id') != msg_id:
34+
continue
35+
msg_type = msg['msg_type']
36+
if msg_type == 'stream':
37+
result += msg['content']['text']
38+
elif msg_type == 'execute_result':
39+
result += msg['content']['data'].get('text/plain', '')
40+
elif msg_type == 'error':
41+
result += "\n".join(msg['content']['traceback'])
42+
elif msg_type == 'status' and msg['content']['execution_state'] == 'idle':
43+
break
44+
return result
45+
46+
def shutdown(self):
47+
self.kc.stop_channels()
48+
self.km.shutdown_kernel()

0 commit comments

Comments
 (0)