Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions ex_app/lib/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
import concurrent.futures
import os
import traceback
from contextlib import asynccontextmanager
from json import JSONDecodeError
from threading import Event
import asyncio

import httpx
from niquests import RequestException
import json
from fastapi import FastAPI
from nc_py_api import NextcloudApp, NextcloudException
Expand Down Expand Up @@ -39,6 +40,9 @@
fast_app = FastAPI(lifespan=http_mcp_app.lifespan)

app_enabled = Event()
TRIGGER = Event()
IDLE_POLLING_INTERVAL = 5
IDLE_POLLING_INTERVAL_WITH_TRIGGER = 5 * 60

LOCALE_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "locale")
current_translator = ContextVar("current_translator")
Expand All @@ -52,9 +56,14 @@ async def lifespan(app: FastAPI):
async with exapp_lifespan(app):
async with http_mcp_app.lifespan(app):
yield

@asynccontextmanager
async def exapp_lifespan(app: FastAPI):
set_handlers(app, enabled_handler)
set_handlers(
app,
enabled_handler,
trigger_handler=trigger_handler,
)
start_bg_task()
nc = NextcloudApp()
if nc.enabled_state:
Expand Down Expand Up @@ -135,29 +144,23 @@ async def background_thread_task():
try:
response = nc.providers.task_processing.next_task([provider.id], [provider.task_type])
if not response or not 'task' in response:
await asyncio.sleep(2)
await wait_for_task()
continue
except (NextcloudException, httpx.RequestError, JSONDecodeError) as e:
except (NextcloudException, RequestException, JSONDecodeError) as e:
tb_str = ''.join(traceback.format_exception(e))
log(nc, LogLvl.WARNING, "Error fetching the next task " + tb_str)
await asyncio.sleep(5)
await wait_for_task(5)
continue
except (
httpx.RemoteProtocolError,
httpx.ReadError,
httpx.LocalProtocolError,
httpx.PoolTimeout,
) as e:
except RequestException as e:
log(nc, LogLvl.DEBUG, "Ignored error during task polling")
await asyncio.sleep(2)
await wait_for_task(2)
continue

task = response["task"]
log(nc, LogLvl.INFO, 'New Task incoming')
log(nc, LogLvl.DEBUG, str(task))
log(nc, LogLvl.INFO, str({'input': task['input']['input'], 'confirmation': task['input']['confirmation'], 'conversation_token': '<skipped>'}))
asyncio.create_task(handle_task(task, nc))
await asyncio.sleep(5)


async def handle_task(task, nc: NextcloudApp):
Expand All @@ -171,7 +174,7 @@ async def handle_task(task, nc: NextcloudApp):
log(nc, LogLvl.ERROR, "Error: " + tb_str)
try:
nc.providers.task_processing.report_result(task["id"], error_message=str(e))
except (NextcloudException, httpx.RequestError) as net_err:
except (NextcloudException, RequestException) as net_err:
tb_str = ''.join(traceback.format_exception(net_err))
log(nc, LogLvl.WARNING, "Network error in reporting the error: " + tb_str)
return
Expand All @@ -180,7 +183,7 @@ async def handle_task(task, nc: NextcloudApp):
task["id"],
output,
)
except (NextcloudException, httpx.RequestError, JSONDecodeError) as e:
except (NextcloudException, RequestException, JSONDecodeError) as e:
tb_str = ''.join(traceback.format_exception(e))
log(nc, LogLvl.ERROR, "Network error trying to report the task result: " + tb_str)

Expand All @@ -190,6 +193,29 @@ def start_bg_task():
loop = asyncio.get_event_loop()
loop.create_task(background_thread_task())

# Trigger event is available starting with nextcloud v33
def trigger_handler(providerId: str):
global TRIGGER
TRIGGER.set()

# Waits for interval seconds or IDLE_POLLING_INTERVAL seconds
# but can return earlier when TRIGGER event is received from nextcloud
# if the trigger event is received, IDLE_POLLING_INTERVAL is set to IDLE_POLLING_INTERVAL_WITH_TRIGGER
async def wait_for_task(interval = None):
global TRIGGER
global IDLE_POLLING_INTERVAL
global IDLE_POLLING_INTERVAL_WITH_TRIGGER
if interval is None:
interval = IDLE_POLLING_INTERVAL
# Call TRIGGER.wait() in a separate thread
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
was_event = await loop.run_in_executor(pool, TRIGGER.wait, interval)
if was_event:
IDLE_POLLING_INTERVAL = IDLE_POLLING_INTERVAL_WITH_TRIGGER
TRIGGER.clear()


APP.mount("/mcp", http_mcp_app)

if __name__ == "__main__":
Expand Down
Loading
Loading