From 44bfc40d8343f58d943205faf8fa967752a6afec Mon Sep 17 00:00:00 2001 From: Hiroshi Fujita Date: Tue, 22 Oct 2024 16:38:16 +0000 Subject: [PATCH] Reconfigure the additional API endpoints in a separate blueprint --- app/backend/app.py | 150 +---------------------- app/backend/chat_history/__init__.py | 0 app/backend/chat_history/cosmosdb.py | 176 +++++++++++++++++++++++++++ app/backend/config.py | 1 + 4 files changed, 182 insertions(+), 145 deletions(-) create mode 100644 app/backend/chat_history/__init__.py create mode 100644 app/backend/chat_history/cosmosdb.py diff --git a/app/backend/app.py b/app/backend/app.py index 7a0f4da9f6..e6d4527e58 100644 --- a/app/backend/app.py +++ b/app/backend/app.py @@ -16,7 +16,6 @@ SpeechSynthesizer, ) from azure.core.exceptions import ResourceNotFoundError -from azure.cosmos.aio import ContainerProxy, CosmosClient from azure.identity.aio import ( AzureDeveloperCliCredential, ManagedIdentityCredential, @@ -54,6 +53,7 @@ from approaches.chatreadretrievereadvision import ChatReadRetrieveReadVisionApproach from approaches.retrievethenread import RetrieveThenReadApproach from approaches.retrievethenreadvision import RetrieveThenReadVisionApproach +from chat_history.cosmosdb import chat_history_cosmosdb_bp from config import ( CONFIG_ASK_APPROACH, CONFIG_ASK_VISION_APPROACH, @@ -63,7 +63,6 @@ CONFIG_CHAT_HISTORY_BROWSER_ENABLED, CONFIG_CHAT_HISTORY_COSMOS_ENABLED, CONFIG_CHAT_VISION_APPROACH, - CONFIG_COSMOS_HISTORY_CONTAINER, CONFIG_CREDENTIAL, CONFIG_GPT4V_DEPLOYED, CONFIG_INGESTER, @@ -407,129 +406,6 @@ async def list_uploaded(auth_claims: dict[str, Any]): return jsonify(files), 200 -@bp.post("/chat_history") -@authenticated -async def post_chat_history(auth_claims: Dict[str, Any]): - if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: - return jsonify({"error": "Chat history not enabled"}), 405 - - container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) - if not container: - return jsonify({"error": "Chat history not enabled"}), 405 - - entra_oid = auth_claims.get("oid") - if not entra_oid: - return jsonify({"error": "User OID not found"}), 401 - - try: - request_json = await request.get_json() - id = request_json.get("id") - answers = request_json.get("answers") - title = answers[0][0][:50] + "..." if len(answers[0][0]) > 50 else answers[0][0] - - await container.upsert_item({"id": id, "entra_oid": entra_oid, "title": title, "answers": answers}) - - return jsonify({}), 201 - except Exception as error: - return error_response(error, "/chat_history") - - -@bp.post("/chat_history/items") -@authenticated -async def get_chat_history(auth_claims: Dict[str, Any]): - if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: - return jsonify({"error": "Chat history not enabled"}), 405 - - container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) - if not container: - return jsonify({"error": "Chat history not enabled"}), 405 - - entra_oid = auth_claims.get("oid") - if not entra_oid: - return jsonify({"error": "User OID not found"}), 401 - - try: - request_json = await request.get_json() - count = request_json.get("count", 20) - continuation_token = request_json.get("continuation_token") - - res = container.query_items( - query="SELECT c.id, c.entra_oid, c.title, c._ts FROM c WHERE c.entra_oid = @entra_oid ORDER BY c._ts DESC", - parameters=[dict(name="@entra_oid", value=entra_oid)], - max_item_count=count, - ) - - # set the continuation token for the next page - pager = res.by_page(continuation_token) - - # Get the first page, and the continuation token - try: - page = await pager.__anext__() - continuation_token = pager.continuation_token # type: ignore - - items = [] - async for item in page: - items.append(item) - - # If there are no page, StopAsyncIteration is raised - except StopAsyncIteration: - items = [] - continuation_token = None - - return jsonify({"items": items, "continuation_token": continuation_token}), 200 - - except Exception as error: - return error_response(error, "/chat_history/items") - - -@bp.get("/chat_history/items/") -@authenticated_path -async def get_chat_history_session(path: str, auth_claims: Dict[str, Any]): - if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: - return jsonify({"error": "Chat history not enabled"}), 405 - - container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) - if not container: - return jsonify({"error": "Chat history not enabled"}), 405 - - if not path: - return jsonify({"error": "Invalid path"}), 400 - - entra_oid = auth_claims.get("oid") - if not entra_oid: - return jsonify({"error": "User OID not found"}), 401 - - try: - res = await container.read_item(item=path, partition_key=entra_oid) - return jsonify(res), 200 - except Exception as error: - return error_response(error, f"/chat_history/items/{path}") - - -@bp.delete("/chat_history/items/") -@authenticated_path -async def delete_chat_history_session(path: str, auth_claims: Dict[str, Any]): - if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: - return jsonify({"error": "Chat history not enabled"}), 405 - - container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) - if not container: - return jsonify({"error": "Chat history not enabled"}), 405 - - if not path: - return jsonify({"error": "Invalid path"}), 400 - - entra_oid = auth_claims.get("oid") - if not entra_oid: - return jsonify({"error": "User OID not found"}), 401 - - try: - await container.delete_item(item=path, partition_key=entra_oid) - return jsonify({}), 200 - except Exception as error: - return error_response(error, f"/chat_history/items/{path}") - - @bp.before_app_serving async def setup_clients(): # Replace these with your own values, either in environment variables or directly here @@ -585,12 +461,8 @@ async def setup_clients(): USE_SPEECH_INPUT_BROWSER = os.getenv("USE_SPEECH_INPUT_BROWSER", "").lower() == "true" USE_SPEECH_OUTPUT_BROWSER = os.getenv("USE_SPEECH_OUTPUT_BROWSER", "").lower() == "true" USE_SPEECH_OUTPUT_AZURE = os.getenv("USE_SPEECH_OUTPUT_AZURE", "").lower() == "true" - USE_CHAT_HISTORY_BROWSER = os.getenv("USE_CHAT_HISTORY_BROWSER", "").lower() == "true" USE_CHAT_HISTORY_COSMOS = os.getenv("USE_CHAT_HISTORY_COSMOS", "").lower() == "true" - AZURE_COSMOSDB_ACCOUNT = os.getenv("AZURE_COSMOSDB_ACCOUNT") - AZURE_CHAT_HISTORY_DATABASE = os.getenv("AZURE_CHAT_HISTORY_DATABASE") - AZURE_CHAT_HISTORY_CONTAINER = os.getenv("AZURE_CHAT_HISTORY_CONTAINER") # WEBSITE_HOSTNAME is always set by App Service, RUNNING_IN_PRODUCTION is set in main.bicep RUNNING_ON_AZURE = os.getenv("WEBSITE_HOSTNAME") is not None or os.getenv("RUNNING_IN_PRODUCTION") is not None @@ -620,6 +492,9 @@ async def setup_clients(): current_app.logger.info("Setting up Azure credential using AzureDeveloperCliCredential for home tenant") azure_credential = AzureDeveloperCliCredential(process_timeout=60) + # Set the Azure credential in the app config for use in other parts of the app + current_app.config[CONFIG_CREDENTIAL] = azure_credential + # Set up clients for AI Search and Storage search_client = SearchClient( endpoint=f"https://{AZURE_SEARCH_SERVICE}.search.windows.net", @@ -694,21 +569,6 @@ async def setup_clients(): ) current_app.config[CONFIG_INGESTER] = ingester - if USE_CHAT_HISTORY_COSMOS: - current_app.logger.info("USE_CHAT_HISTORY_COSMOS is true, setting up CosmosDB client") - if not AZURE_COSMOSDB_ACCOUNT: - raise ValueError("AZURE_COSMOSDB_ACCOUNT must be set when USE_CHAT_HISTORY_COSMOS is true") - if not AZURE_CHAT_HISTORY_DATABASE: - raise ValueError("AZURE_CHAT_HISTORY_DATABASE must be set when USE_CHAT_HISTORY_COSMOS is true") - if not AZURE_CHAT_HISTORY_CONTAINER: - raise ValueError("AZURE_CHAT_HISTORY_CONTAINER must be set when USE_CHAT_HISTORY_COSMOS is true") - cosmos_client = CosmosClient( - url=f"https://{AZURE_COSMOSDB_ACCOUNT}.documents.azure.com:443/", credential=azure_credential - ) - cosmos_db = cosmos_client.get_database_client(AZURE_CHAT_HISTORY_DATABASE) - cosmos_container = cosmos_db.get_container_client(AZURE_CHAT_HISTORY_CONTAINER) - current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER] = cosmos_container - # Used by the OpenAI SDK openai_client: AsyncOpenAI @@ -723,7 +583,6 @@ async def setup_clients(): current_app.config[CONFIG_SPEECH_SERVICE_VOICE] = AZURE_SPEECH_VOICE # Wait until token is needed to fetch for the first time current_app.config[CONFIG_SPEECH_SERVICE_TOKEN] = None - current_app.config[CONFIG_CREDENTIAL] = azure_credential if OPENAI_HOST.startswith("azure"): api_version = os.getenv("AZURE_OPENAI_API_VERSION") or "2024-03-01-preview" @@ -867,6 +726,7 @@ async def close_clients(): def create_app(): app = Quart(__name__) app.register_blueprint(bp) + app.register_blueprint(chat_history_cosmosdb_bp) if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): app.logger.info("APPLICATIONINSIGHTS_CONNECTION_STRING is set, enabling Azure Monitor") diff --git a/app/backend/chat_history/__init__.py b/app/backend/chat_history/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/app/backend/chat_history/cosmosdb.py b/app/backend/chat_history/cosmosdb.py new file mode 100644 index 0000000000..01a5876eaf --- /dev/null +++ b/app/backend/chat_history/cosmosdb.py @@ -0,0 +1,176 @@ +import os +from typing import Any, Dict, Union, cast + +from azure.cosmos.aio import ContainerProxy, CosmosClient +from azure.identity.aio import AzureDeveloperCliCredential, ManagedIdentityCredential +from quart import Blueprint, current_app, jsonify, request + +from config import ( + CONFIG_CHAT_HISTORY_COSMOS_ENABLED, + CONFIG_COSMOS_HISTORY_CLIENT, + CONFIG_COSMOS_HISTORY_CONTAINER, + CONFIG_CREDENTIAL, +) +from decorators import authenticated, authenticated_path +from error import error_response + +chat_history_cosmosdb_bp = Blueprint("chat_history_cosmos", __name__, static_folder="static") + + +@chat_history_cosmosdb_bp.post("/chat_history") +@authenticated +async def post_chat_history(auth_claims: Dict[str, Any]): + if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: + return jsonify({"error": "Chat history not enabled"}), 405 + + container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) + if not container: + return jsonify({"error": "Chat history not enabled"}), 405 + + entra_oid = auth_claims.get("oid") + if not entra_oid: + return jsonify({"error": "User OID not found"}), 401 + + try: + request_json = await request.get_json() + id = request_json.get("id") + answers = request_json.get("answers") + title = answers[0][0][:50] + "..." if len(answers[0][0]) > 50 else answers[0][0] + + await container.upsert_item({"id": id, "entra_oid": entra_oid, "title": title, "answers": answers}) + + return jsonify({}), 201 + except Exception as error: + return error_response(error, "/chat_history") + + +@chat_history_cosmosdb_bp.post("/chat_history/items") +@authenticated +async def get_chat_history(auth_claims: Dict[str, Any]): + if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: + return jsonify({"error": "Chat history not enabled"}), 405 + + container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) + if not container: + return jsonify({"error": "Chat history not enabled"}), 405 + + entra_oid = auth_claims.get("oid") + if not entra_oid: + return jsonify({"error": "User OID not found"}), 401 + + try: + request_json = await request.get_json() + count = request_json.get("count", 20) + continuation_token = request_json.get("continuation_token") + + res = container.query_items( + query="SELECT c.id, c.entra_oid, c.title, c._ts FROM c WHERE c.entra_oid = @entra_oid ORDER BY c._ts DESC", + parameters=[dict(name="@entra_oid", value=entra_oid)], + max_item_count=count, + ) + + # set the continuation token for the next page + pager = res.by_page(continuation_token) + + # Get the first page, and the continuation token + try: + page = await pager.__anext__() + continuation_token = pager.continuation_token # type: ignore + + items = [] + async for item in page: + items.append(item) + + # If there are no page, StopAsyncIteration is raised + except StopAsyncIteration: + items = [] + continuation_token = None + + return jsonify({"items": items, "continuation_token": continuation_token}), 200 + + except Exception as error: + return error_response(error, "/chat_history/items") + + +@chat_history_cosmosdb_bp.get("/chat_history/items/") +@authenticated_path +async def get_chat_history_session(path: str, auth_claims: Dict[str, Any]): + if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: + return jsonify({"error": "Chat history not enabled"}), 405 + + container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) + if not container: + return jsonify({"error": "Chat history not enabled"}), 405 + + if not path: + return jsonify({"error": "Invalid path"}), 400 + + entra_oid = auth_claims.get("oid") + if not entra_oid: + return jsonify({"error": "User OID not found"}), 401 + + try: + res = await container.read_item(item=path, partition_key=entra_oid) + return jsonify(res), 200 + except Exception as error: + return error_response(error, f"/chat_history/items/{path}") + + +@chat_history_cosmosdb_bp.delete("/chat_history/items/") +@authenticated_path +async def delete_chat_history_session(path: str, auth_claims: Dict[str, Any]): + if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]: + return jsonify({"error": "Chat history not enabled"}), 405 + + container = cast(ContainerProxy, current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]) + if not container: + return jsonify({"error": "Chat history not enabled"}), 405 + + if not path: + return jsonify({"error": "Invalid path"}), 400 + + entra_oid = auth_claims.get("oid") + if not entra_oid: + return jsonify({"error": "User OID not found"}), 401 + + try: + await container.delete_item(item=path, partition_key=entra_oid) + return jsonify({}), 200 + except Exception as error: + return error_response(error, f"/chat_history/items/{path}") + + +@chat_history_cosmosdb_bp.before_app_serving +async def setup_clients(): + USE_CHAT_HISTORY_COSMOS = os.getenv("USE_CHAT_HISTORY_COSMOS", "").lower() == "true" + AZURE_COSMOSDB_ACCOUNT = os.getenv("AZURE_COSMOSDB_ACCOUNT") + AZURE_CHAT_HISTORY_DATABASE = os.getenv("AZURE_CHAT_HISTORY_DATABASE") + AZURE_CHAT_HISTORY_CONTAINER = os.getenv("AZURE_CHAT_HISTORY_CONTAINER") + + azure_credential = cast( + Union[AzureDeveloperCliCredential, ManagedIdentityCredential], current_app.config[CONFIG_CREDENTIAL] + ) + + if USE_CHAT_HISTORY_COSMOS: + current_app.logger.info("USE_CHAT_HISTORY_COSMOS is true, setting up CosmosDB client") + if not AZURE_COSMOSDB_ACCOUNT: + raise ValueError("AZURE_COSMOSDB_ACCOUNT must be set when USE_CHAT_HISTORY_COSMOS is true") + if not AZURE_CHAT_HISTORY_DATABASE: + raise ValueError("AZURE_CHAT_HISTORY_DATABASE must be set when USE_CHAT_HISTORY_COSMOS is true") + if not AZURE_CHAT_HISTORY_CONTAINER: + raise ValueError("AZURE_CHAT_HISTORY_CONTAINER must be set when USE_CHAT_HISTORY_COSMOS is true") + cosmos_client = CosmosClient( + url=f"https://{AZURE_COSMOSDB_ACCOUNT}.documents.azure.com:443/", credential=azure_credential + ) + cosmos_db = cosmos_client.get_database_client(AZURE_CHAT_HISTORY_DATABASE) + cosmos_container = cosmos_db.get_container_client(AZURE_CHAT_HISTORY_CONTAINER) + + current_app.config[CONFIG_COSMOS_HISTORY_CLIENT] = cosmos_client + current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER] = cosmos_container + + +@chat_history_cosmosdb_bp.after_app_serving +async def close_clients(): + cosmos_client = cast(CosmosClient, current_app.config.get(CONFIG_COSMOS_HISTORY_CLIENT)) + if cosmos_client: + await cosmos_client.close() diff --git a/app/backend/config.py b/app/backend/config.py index b42738e19c..eaba154116 100644 --- a/app/backend/config.py +++ b/app/backend/config.py @@ -24,4 +24,5 @@ CONFIG_SPEECH_SERVICE_VOICE = "speech_service_voice" CONFIG_CHAT_HISTORY_BROWSER_ENABLED = "chat_history_browser_enabled" CONFIG_CHAT_HISTORY_COSMOS_ENABLED = "chat_history_cosmos_enabled" +CONFIG_COSMOS_HISTORY_CLIENT = "cosmos_history_client" CONFIG_COSMOS_HISTORY_CONTAINER = "cosmos_history_container"