Skip to content

Staging#123

Merged
gkorland merged 31 commits intomainfrom
staging
Aug 25, 2025
Merged

Staging#123
gkorland merged 31 commits intomainfrom
staging

Conversation

@gkorland
Copy link
Contributor

@gkorland gkorland commented Aug 25, 2025

Summary by CodeRabbit

  • New Features

    • Interactive confirmation for destructive queries and improved streaming query flow with end-to-end timing.
  • Refactor/Performance

    • Major migration of graph, loader, and query pipelines to async with parallel tasks for faster, more scalable responses.
  • UI/Style

    • Updated left toolbar icons for a cleaner, modern look.
  • Documentation

    • Python examples updated to demonstrate async usage for the Postgres loader.
  • Chores

    • Removed a legacy import-time ontology generation module.

@vercel
Copy link

vercel bot commented Aug 25, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
queryweaver Ready Ready Preview Comment Aug 25, 2025 2:19pm

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

Walkthrough

Broad async migration: many loader, graph, agent, and route functions converted to async/await; ThreadPoolExecutor replaced by asyncio tasks in query flow; FalkorDB initialized via asyncio/redis pool; GTM exposure moved to Jinja globals; ontology module removed; tests and docs updated for async usage.

Changes

Cohort / File(s) Summary of changes
App boot & template GTM
api/app_factory.py, api/routes/auth.py
Removed middleware setting request.state.google_tag_manager_id; added templates.env.globals["google_tag_manager_id"] and invoked setup_oauth_handlers during app creation.
Async FalkorDB init
api/extensions.py
Switched import to falkordb.asyncio; use redis.asyncio.ConnectionPool.from_url(...) when FALKORDB_URL set; fallback to local FalkorDB otherwise; added pool error handling.
Async graph core & helpers
api/graph.py
get_db_description → async; added async helpers (_query_graph, _find_tables, etc.); reworked find to an embedding-driven, parallel async pipeline; return shape changed to List[List[Any]].
Async agents
api/agents/relevancy_agent.py
RelevancyAgent.get_answer converted to async def get_answer(...).
Async loaders: core
api/loaders/graph_loader.py
load_to_graph converted to async def; internal graph queries awaited.
Async DB loaders
api/loaders/mysql_loader.py, api/loaders/postgres_loader.py
Loader methods converted to async; awaited load_to_graph and graph.delete(); Postgres refresh derives prefix differently; Postgres lost @staticmethod on methods.
Async format loaders
api/loaders/csv_loader.py, api/loaders/json_loader.py, api/loaders/odata_loader.py
load methods converted to async def (static where kept); now await load_to_graph(...).
Async routes: graphs & database
api/routes/graphs.py, api/routes/database.py
Graph routes migrated to async/await; replaced thread executor with asyncio tasks (concurrent relevancy/find), added per-stage timing and destructive-operation confirmation streaming; database connect route now awaits loaders.
Auth async updates
api/auth/user_management.py, api/auth/oauth_handlers.py
ensure_user_in_organizations and update_identity_last_login → async; OAuth handlers now await calls and optionally call provider-specific callback handlers on app.state.
OAuth setup
api/auth/oauth_handlers.py, api/app_factory.py
setup_oauth_handlers imported and invoked from app factory; OAuth error handler improved to detect token/oauth errors, clear session, and redirect to /.
Tests & docs
tests/test_mysql_loader.py, tests/test_postgres_loader.py, docs/postgres_loader.md
Tests updated to run loader coroutines via asyncio.run(...); docs Python examples updated to await graph.query(...).
UI icons
app/templates/components/left_toolbar.j2
Replaced two inline SVGs with Lucide-style stroke icons (markup adjusted).
Removed side-effect module
onthology.py
Deleted ontology generation module and its import-time side effects.

Sequence Diagram(s)

sequenceDiagram
  actor User
  participant API as FastAPI /graphs/query
  participant Relevancy as RelevancyAgent (async)
  participant Finder as find() pipeline (async)
  participant Analysis as AnalysisAgent
  participant DB as FalkorDB (async)
  participant Formatter as ResponseFormatterAgent

  User->>API: POST /graphs/query (question)
  rect rgba(200,230,255,0.3)
    note right of API: start relevancy & finder concurrently
    API->>Relevancy: get_answer() (awaitable)
    API->>Finder: find(...) (awaitable)
  end

  Relevancy-->>API: relevance result
  alt Off-topic
    API-->>Finder: cancel task
    API-->>User: stream off-topic reason
  else On-topic
    Finder-->>API: relevant tables/paths
    API->>Analysis: generate SQL
    Analysis-->>API: SQL + flags
    alt Destructive SQL
      API-->>User: stream destructive_confirmation
      User-->>API: confirm/cancel
      alt Confirm
        API->>DB: execute SQL (await)
        DB-->>API: results
      else Cancel
        API-->>User: cancellation message
      end
    else Non-destructive
      API->>DB: execute SQL (await)
      DB-->>API: results
    end
    API->>Formatter: format response
    Formatter-->>API: ai_response
    API-->>User: stream results & ai_response
  end
  note over API: per-stage and total timing logged
Loading
sequenceDiagram
  participant App as App Factory
  participant Templates as Jinja2
  participant Env as Environment

  App->>App: remove add_template_globals middleware
  App->>App: invoke setup_oauth_handlers(app, app.state.oauth)
  Env-->>Templates: GOOGLE_TAG_MANAGER_ID
  App->>Templates: templates.env.globals["google_tag_manager_id"]=value
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60–90 minutes

Possibly related PRs

  • Async graph worker #98 — Large overlapping async refactor across agents, loaders, graph functions, and FalkorDB async imports.
  • Staging #95 — Related FastAPI/async migration touching app factory, OAuth, and async loader changes.
  • Staging #104 — Overlaps OAuth initialization and handler changes (setup_oauth_handlers and callback handling).

Suggested reviewers

  • swilly22
  • galshubeli

Poem

I twitch my ears at tasks that wait,
Async burrows hum — no blocking fate.
Tags tucked in templates, pools by the door,
Pipelines hop parallel, ontology's no more.
A carrot of logs, and icons that gleam — hippity hop, deploy the dream! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch staging

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions
Copy link

github-actions bot commented Aug 25, 2025

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
api/routes/auth.py (1)

284-285: Avoid broad exception; catch HTTP client errors explicitly.

This satisfies pylint and prevents masking unrelated bugs.

Apply this diff:

-            except Exception as e:
-                logging.error("Error revoking Google tokens: %s", e)
+            except httpx.HTTPError as e:
+                logging.error("Network error revoking Google tokens: %s", e)
🧹 Nitpick comments (6)
api/routes/auth.py (6)

269-276: Send revoke token in form body and add a client timeout.

Headers declare form-encoded content, but the token is sent as a query param. Posting it in the body matches Google’s expectations and avoids proxy logging of query strings. A small timeout prevents hangs.

Apply this diff:

-                async with httpx.AsyncClient() as client:
+                async with httpx.AsyncClient(timeout=10.0) as client:
                     for token in tokens_to_revoke:
                         resp = await client.post(
                             "https://oauth2.googleapis.com/revoke",
-                            params={"token": token},
-                            headers={"content-type": "application/x-www-form-urlencoded"},
+                            data={"token": token},
+                            headers={"content-type": "application/x-www-form-urlencoded"},
                         )

209-209: Trim trailing whitespace (pylint: trailing-whitespace).

Minor lint fix.

Apply this diff:

-        
+

34-36: Disable Jinja auto-reload in production via env flag.

Auto-reload is great for dev but adds overhead in prod. Make it configurable to satisfy performance best practice while keeping current behavior by default.

Apply this diff:

-        auto_reload=True,
+        auto_reload=os.getenv("JINJA_AUTO_RELOAD", "1") == "1",

87-87: Add short docstrings to silence pylint warnings and improve clarity.

Pylint flagged missing docstrings for these route handlers. Minimal one-liners keep lint green.

Apply this diff:

 @auth_router.get("/", response_class=HTMLResponse)
 async def home(request: Request) -> HTMLResponse:
+    """Render landing or chat depending on authentication state."""
     user_info, is_authenticated_flag = await validate_and_cache_user(request)
 @auth_router.get("/login", response_class=RedirectResponse)
 async def login_page(_: Request) -> RedirectResponse:
+    """Redirect to Google login to start the OAuth flow."""
     return RedirectResponse(url="/login/google", status_code=status.HTTP_302_FOUND)
 @auth_router.get("/login/google", name="google.login", response_class=RedirectResponse)
 async def login_google(request: Request) -> RedirectResponse:
+    """Start Google OAuth authorization."""
     google = _get_provider_client(request, "google")
 @auth_router.get("/login/google/authorized", response_class=RedirectResponse)
 async def google_authorized(request: Request) -> RedirectResponse:
+    """Handle Google OAuth callback and persist session."""
     try:
 @auth_router.get("/login/google/callback", response_class=RedirectResponse)
 async def google_callback_compat(request: Request) -> RedirectResponse:
+    """Legacy-compatible path; forwards to /login/google/authorized."""
     qs = f"?{request.url.query}" if request.url.query else ""
 @auth_router.get("/login/github",  name="github.login", response_class=RedirectResponse)
 async def login_github(request: Request) -> RedirectResponse:
+    """Start GitHub OAuth authorization."""
     github = _get_provider_client(request, "github")
 @auth_router.get("/login/github/authorized", response_class=RedirectResponse)
 async def github_authorized(request: Request) -> RedirectResponse:
+    """Handle GitHub OAuth callback and persist session."""
     try:
 @auth_router.get("/login/github/callback", response_class=RedirectResponse)
 async def github_callback_compat(request: Request) -> RedirectResponse:
+    """Legacy-compatible path; forwards to /login/github/authorized."""
     qs = f"?{request.url.query}" if request.url.query else ""

Also applies to: 114-114, 119-119, 135-135, 173-173, 180-180, 196-196, 247-247


185-191: Deduplicate repeated localhost/127.0.0.1 hint.

The warning block appears in both Google and GitHub flows. Consider a tiny helper to keep messages consistent.

Example:

def _warn_localhost_mismatch(request: Request, provider: str):
    if not os.getenv("OAUTH_BASE_URL") and "127.0.0.1" in str(request.base_url):
        logging.warning(
            "OAUTH_BASE_URL not set and base URL is 127.0.0.1; "
            "if your %s OAuth app uses 'http://localhost:5000', "
            "set OAUTH_BASE_URL=http://localhost:5000 to avoid redirect_uri mismatch.",
            provider,
        )

Then call _warn_localhost_mismatch(request, "Google") and _warn_localhost_mismatch(request, "GitHub").

Also applies to: 123-130


39-39: Optional Refactor: Normalize and Bind GTM ID at Init

Verification complete—no legacy request.state.google_tag_manager_id references or add_template_globals middleware remain, and all templates now use the new google_tag_manager_id global.

• File needing optional update:

  • api/routes/auth.py, around line 39

• Current code:

templates.env.globals["google_tag_manager_id"] = os.getenv("GOOGLE_TAG_MANAGER_ID")

• Recommended diff to strip empty values and allow runtime overrides for testing:

-templates.env.globals["google_tag_manager_id"] = os.getenv("GOOGLE_TAG_MANAGER_ID")
+from os import getenv
+
+# Normalize empty strings to None and support runtime overrides
+google_tag_manager_id = getenv("GOOGLE_TAG_MANAGER_ID", "").strip() or None
+templates.env.globals["google_tag_manager_id"] = google_tag_manager_id

This change ensures:

  1. Empty or unset environment values become None, enabling simple truthiness checks in Jinja (no more '' vs. None confusion).
  2. Late binding at app-init makes it easier to override in tests or at runtime if needed.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a9dd3b3 and 03d6a7b.

📒 Files selected for processing (2)
  • api/app_factory.py (0 hunks)
  • api/routes/auth.py (1 hunks)
💤 Files with no reviewable changes (1)
  • api/app_factory.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • api/routes/auth.py
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
PR: FalkorDB/QueryWeaver#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-08-24T17:15:21.337Z
Learning: Applies to api/app_factory.py : Maintain the application factory and OAuth setup in `api/app_factory.py` (core configuration in the top of the file)
🪛 GitHub Actions: Pylint
api/routes/auth.py

[warning] 209-209: Pylint: Trailing whitespace (trailing-whitespace).


[warning] 87-87: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 114-114: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 119-119: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 135-135: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 173-173: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 180-180: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 196-196: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 247-247: Pylint: Missing function or method docstring (missing-function-docstring).


[warning] 284-284: Pylint: Catching too general exception Exception (broad-exception-caught).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (18)
api/agents/relevancy_agent.py (2)

61-69: Call BaseAgent.init to fix pylint W0231 and ensure base state is initialized.

Pylint flagged W0231; add super().__init__() at the start of __init__.

Apply this diff:

 class RelevancyAgent(BaseAgent):
     # pylint: disable=too-few-public-methods
     """Agent for determining relevancy of queries to database schema."""

     def __init__(self, queries_history: list, result_history: list):
         """Initialize the relevancy agent with query and result history."""
+        super().__init__()
         if result_history is None:
             self.messages = []
         else:
             self.messages = []
             for query, result in zip(queries_history[:-1], result_history):
                 self.messages.append({"role": "user", "content": query})
                 self.messages.append({"role": "assistant", "content": result})

71-86: Async method calls sync LLM (blocks event loop); switch to litellm.acompletion and request JSON.

get_answer is async but uses the synchronous completion, which will block the loop under load. Use acompletion and await, and enforce JSON to make parse_response robust.

Apply this diff:

-from litellm import completion
+from litellm import acompletion
@@
-        completion_result = completion(
-            model=Config.COMPLETION_MODEL,
-            messages=self.messages,
-            temperature=0,
-        )
+        completion_result = await acompletion(
+            model=Config.COMPLETION_MODEL,
+            messages=self.messages,
+            temperature=0,
+            response_format={"type": "json_object"},
+        )
api/loaders/mysql_loader.py (4)

472-476: Incorrect PyMySQL cursor creation.

pymysql.connect(...).cursor(dictionary=True) is not valid for PyMySQL. Use DictCursor.

Apply this diff:

-            cursor = conn.cursor(dictionary=True)
+            cursor = conn.cursor(DictCursor)

519-539: Duplicate and partially swallowing MySQL errors; consolidate and re-raise.

There are two except pymysql.MySQLError blocks; the first swallows errors and may return None. Consolidate, rollback, close, and raise a clear exception.

Apply this diff:

-        except pymysql.MySQLError as e:
-            # Rollback in case of error
-            if 'conn' in locals():
-                conn.rollback()
-                cursor.close()
-                conn.close()
-        except pymysql.MySQLError as e:
-            # Rollback in case of error
-            if 'conn' in locals():
-                conn.rollback()
-                cursor.close()
-                conn.close()
-            raise Exception(f"MySQL query execution error: {str(e)}") from e
+        except pymysql.MySQLError as e:
+            # Rollback in case of error
+            if 'conn' in locals():
+                try:
+                    conn.rollback()
+                finally:
+                    cursor.close()
+                    conn.close()
+            raise Exception(f"MySQL query execution error: {str(e)}") from e

406-454: Ensure all async calls to refresh_graph_schema and get_db_description are properly awaited

Several call sites in api/routes/graphs.py (and one in api/routes/graphs.py’s confirmation generator) invoke async functions without await, which results in coroutine objects being assigned or unpacked and will raise at runtime.

Please update the following locations to use await (inside an async def):

• Streaming-style route around lines 457–463:

-    refresh_result = loader_class.refresh_graph_schema(graph_id, db_url)
-    refresh_success, refresh_message = refresh_result
+    refresh_result = await loader_class.refresh_graph_schema(graph_id, db_url)
+    refresh_success, refresh_message = refresh_result

• Second streaming block around lines 591–597:

-    refresh_success, refresh_message = (
-        loader_class.refresh_graph_schema(graph_id, db_url)
-    )
+    refresh_success, refresh_message = await loader_class.refresh_graph_schema(graph_id, db_url)

• Standard POST handler around lines 684–688:

-    success, message = loader_class.refresh_graph_schema(graph_id, db_url)
+    success, message = await loader_class.refresh_graph_schema(graph_id, db_url)

• Async confirmation generator at line 559:

-    db_description, db_url = get_db_description(graph_id)
+    db_description, db_url = await get_db_description(graph_id)

• Final sync-style handler at line 667 (ensure the enclosing function is async def):

-    _, db_url = get_db_description(graph_id)
+    _, db_url = await get_db_description(graph_id)

With these changes, you’ll avoid unpacking coroutine objects and ensure the async logic executes as intended.


160-166: Await load_to_graph and convert MySQLLoader.load to async

Confirmed that load_to_graph is declared as an async function in api/loaders/graph_loader.py:12, so calling it without await simply returns a coroutine that never runs. To fix:

• In api/loaders/mysql_loader.py, change load to an async function and await the call to load_to_graph:

-    def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
+    async def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
@@
-        # Load data into graph
-        load_to_graph(prefix + "_" + db_name, entities, relationships,
-                     db_name=db_name, db_url=connection_url)
+        # Load data into graph
+        await load_to_graph(
+            prefix + "_" + db_name,
+            entities,
+            relationships,
+            db_name=db_name,
+            db_url=connection_url,
+        )

• Update the sole caller in your orchestration (e.g., refresh_graph_schema) to await the loader:

-    success, message = MySQLLoader.load(prefix, db_url)
+    success, message = await MySQLLoader.load(prefix, db_url)

• Note: the same pattern occurs in other loaders (odata_loader.py, postgres_loader.py, json_loader.py, csv_loader.py). Each one must await load_to_graph, and their parent load methods must be marked async def.

api/auth/user_management.py (1)

49-81: Logic bug: had_other_identities always true after MERGE; compute count excluding the just-created identity.

EXISTS((user)<-[:AUTHENTICATES]-(:Identity)) also matches the identity merged earlier, so it doesn’t tell if the user already had identities. Use an OPTIONAL MATCH + COUNT and mark “other identities” as count > 1.

Apply this diff to the Cypher:

-        // Ensure relationship exists
-        MERGE (identity)-[:AUTHENTICATES]->(user)
-
-        // Return results with flags to determine if this was a new user/identity
-        RETURN
-            identity,
-            user,
-            identity.created_at = identity.last_login AS is_new_identity,
-            EXISTS((user)<-[:AUTHENTICATES]-(:Identity)) AS had_other_identities
+        // Ensure relationship exists
+        MERGE (identity)-[:AUTHENTICATES]->(user)
+
+        // Compute flags; an existing user will have >1 identities after adding a new one
+        WITH identity, user
+        OPTIONAL MATCH (user)<-[:AUTHENTICATES]-(i:Identity)
+        WITH identity, user, COUNT(i) AS identity_count
+        RETURN
+            identity,
+            user,
+            identity.created_at = identity.last_login AS is_new_identity,
+            identity_count > 1 AS had_other_identities

Also applies to: 83-93

api/loaders/postgres_loader.py (2)

98-101: Ensure PostgresLoader.load is declared async and awaits load_to_graph

Verified that load_to_graph is defined as an async def in api/loaders/graph_loader.py and therefore must be awaited when called. The current PostgresLoader.load remains a synchronous method and invokes load_to_graph without awaiting it—this will return a coroutine rather than execute the load.

Please update api/loaders/postgres_loader.py as follows:

  • Change the loader signature from def load to async def load.
  • Prefix the call to load_to_graph with await.

Suggested diff:

--- a/api/loaders/postgres_loader.py
+++ b/api/loaders/postgres_loader.py
@@
-    def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
+    async def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
         entities, relationships = extract_entities_and_rels(connection_url)
         for db_name in entities:
             # Load data into graph
-            load_to_graph(prefix + "_" + db_name, entities, relationships,
-                         db_name=db_name, db_url=connection_url)
+            await load_to_graph(
+                prefix + "_" + db_name,
+                entities,
+                relationships,
+                db_name=db_name,
+                db_url=connection_url
+            )
         return True, "Data loaded successfully"

367-415: Await the asynchronous refresh_graph_schema calls in api/routes/graphs.py

The static method refresh_graph_schema is declared as async def in both MySQL and Postgres loaders, so calling it without await returns a coroutine rather than the expected (bool, str) tuple. In your GraphQL/FastAPI routes, all invocations must be awaited inside an async def context; otherwise you’ll get a runtime “cannot unpack coroutine” or unexecuted coroutine.

Attention is needed at least in these locations:

  • api/routes/graphs.py line 460
  • api/routes/graphs.py line 594
  • api/routes/graphs.py line 685

Proposed fixes:

--- a/api/routes/graphs.py
+++ b/api/routes/graphs.py
@@ -458,7 +458,7 @@ async def some_graph_refresh_endpoint(...):
-        refresh_result = loader_class.refresh_graph_schema(graph_id, db_url)
+        refresh_result = await loader_class.refresh_graph_schema(graph_id, db_url)

@@ -592,7 +592,7 @@ async def another_graph_task(...):
-        loader_class.refresh_graph_schema(graph_id, db_url)
+        await loader_class.refresh_graph_schema(graph_id, db_url)

@@ -683,7 +683,7 @@ async def refresh_graph_schema(request: Request, graph_id: str):
-        success, message = loader_class.refresh_graph_schema(graph_id, db_url)
+        success, message = await loader_class.refresh_graph_schema(graph_id, db_url)

These changes ensure the coroutines returned by refresh_graph_schema are actually run and their results properly unpacked.

docs/postgres_loader.md (1)

99-112: Fix Python snippet: await used outside of an async context

The sample uses await graph.query(...) without being inside an async function. Readers will copy/paste and hit a SyntaxError. Wrap the snippet with asyncio.run(...) (or show an async function).

Apply this diff to the Python snippet:

-from api.loaders.postgres_loader import PostgreSQLLoader
-from api.extensions import db
-
-# Load PostgreSQL schema into graph
-graph_id = "customer_db_schema"
-connection_url = "postgresql://postgres:password@localhost:5432/customers"
-
-success, message = PostgreSQLLoader.load(graph_id, connection_url)
-
-if success:
-    # The schema is now available in the graph database
-    graph = db.select_graph(graph_id)
-
-    # Query for all tables
-    result = await graph.query("MATCH (t:Table) RETURN t.name")
-    print("Tables:", [record[0] for record in result.result_set])
+import asyncio
+from api.loaders.postgres_loader import PostgreSQLLoader
+from api.extensions import db
+
+# Load PostgreSQL schema into graph
+graph_id = "customer_db_schema"
+connection_url = "postgresql://postgres:password@localhost:5432/customers"
+
+success, message = PostgreSQLLoader.load(graph_id, connection_url)
+
+async def main():
+    if success:
+        # The schema is now available in the graph database
+        graph = db.select_graph(graph_id)
+        # Query for all tables
+        result = await graph.query("MATCH (t:Table) RETURN t.name")
+        print("Tables:", [record[0] for record in result.result_set])
+
+asyncio.run(main())
api/loaders/graph_loader.py (3)

90-113: Column embeddings batching: avoid double sources of truth

Relying on external col_descriptions to align with table_info["columns"] can drift. Prefer deriving descriptions from the same columns dict you iterate later to keep order consistent.

Follow-up available: I can provide a batched-by-columns refactor that derives the embed list directly from table_info["columns"] to guarantee index alignment.


120-147: Bug: idx = 0 overwrite leads to wrong embedding selection for per-column path

Inside the per-column embedding branch, idx is reset to 0 then used to index embed_columns[idx]. This is brittle and obscures intent; use a dedicated variable for the single-column embedding.

Apply this diff to make per-column path correct and explicit:

-            if not batch_flag:
-                embed_columns = []
-                embedding_result = embedding_model.embed(col_info["description"])
-                embed_columns.extend(embedding_result)
-                idx = 0
+            embedding_value = None
+            if not batch_flag:
+                desc = col_info.get("description") or ""
+                # Single embedding for this column
+                embedding_result = embedding_model.embed(desc) if desc else [[0.0] * vec_len]
+                embedding_value = embedding_result[0]
@@
-                    "embedding": embed_columns[idx],
+                    "embedding": (embedding_value if not batch_flag else embed_columns[idx]),

12-19: Critical: update all call sites to await async load_to_graph

Since load_to_graph is now defined with async def, every invocation must be prefixed with await (or otherwise scheduled) to ensure the coroutine actually executes. Currently all call sites simply call load_to_graph(…) and silently return a coroutine.

Please update the following locations:

  • api/loaders/postgres_loader.py (around line 99):
    change

    load_to_graph(prefix + "_" + db_name, entities, relationships, …

    to

    await load_to_graph(prefix + "_" + db_name, entities, relationships, …
  • api/loaders/json_loader.py (around line 69):
    change

    load_to_graph(graph_id, data["tables"], relationships, …

    to

    await load_to_graph(graph_id, data["tables"], relationships, …
  • api/loaders/odata_loader.py (around line 26):
    change

    load_to_graph(graph_id, entities, relationships, …

    to

    await load_to_graph(graph_id, entities, relationships, …
  • api/loaders/mysql_loader.py (around line 161):
    change

    load_to_graph(prefix + "_" + db_name, entities, relationships, …

    to

    await load_to_graph(prefix + "_" + db_name, entities, relationships, …
  • api/loaders/csv_loader.py (around line 198):
    change

    load_to_graph(graph_id, tables, relationships, …

    to

    await load_to_graph(graph_id, tables, relationships, …

Also remove or update the commented-out calls in csv_loader.py (lines 422–424) to avoid confusion.

All of these callers must be inside an async function or otherwise run on the event loop; ensure their enclosing contexts support await. Failing to do so will prevent any graph loading logic from running.

api/routes/graphs.py (5)

460-466: Await async refresh_graph_schema (now async in loaders) or handle both sync/async

Without awaiting, schema refresh won’t run and will silently fail in async loaders.

Apply this diff:

-                        refresh_result = loader_class.refresh_graph_schema(
-                            graph_id, db_url)
+                        refresh_result = (
+                            await loader_class.refresh_graph_schema(graph_id, db_url)
+                            if asyncio.iscoroutinefunction(loader_class.refresh_graph_schema)
+                            else loader_class.refresh_graph_schema(graph_id, db_url)
+                        )

556-569: Await get_db_description in confirmation path

This function is async; missing await will return a coroutine and break subsequent usage.

Apply this diff:

-        if confirmation == "CONFIRM":
+        if confirmation == "CONFIRM":
             try:
-                db_description, db_url = get_db_description(graph_id)
+                db_description, db_url = await get_db_description(graph_id)

575-596: Await (or handle) async refresh_graph_schema in confirmation path

Same issue as in the main pipeline.

Apply this diff:

-                if is_schema_modifying:
+                if is_schema_modifying:
@@
-                    refresh_success, refresh_message = (
-                        loader_class.refresh_graph_schema(graph_id, db_url)
-                    )
+                    refresh_success, refresh_message = (
+                        await loader_class.refresh_graph_schema(graph_id, db_url)
+                        if asyncio.iscoroutinefunction(loader_class.refresh_graph_schema)
+                        else loader_class.refresh_graph_schema(graph_id, db_url)
+                    )

667-674: Await get_db_description in refresh endpoint

Missing await here will break URL retrieval.

Apply this diff:

-        _, db_url = get_db_description(graph_id)
+        _, db_url = await get_db_description(graph_id)

684-692: Await (or handle) async refresh_graph_schema in refresh endpoint

Ensure compatibility with async loader implementations.

Apply this diff:

-        success, message = loader_class.refresh_graph_schema(graph_id, db_url)
+        success, message = (
+            await loader_class.refresh_graph_schema(graph_id, db_url)
+            if asyncio.iscoroutinefunction(loader_class.refresh_graph_schema)
+            else loader_class.refresh_graph_schema(graph_id, db_url)
+        )
🧹 Nitpick comments (20)
api/extensions.py (1)

5-16: Async FalkorDB import is good; unify error handling and avoid double getenv.

  • Keeping a single try/except around both URL and host/port paths makes failures consistent.
  • Reuse the local url variable instead of calling os.getenv twice.

Apply this diff:

 import os

-from falkordb.asyncio import FalkorDB
+from falkordb.asyncio import FalkorDB

 # Connect to FalkorDB
-url = os.getenv("FALKORDB_URL", None)
-if url is None:
-    try:
-        db = FalkorDB(host="localhost", port=6379)
-    except Exception as e:
-        raise ConnectionError(f"Failed to connect to FalkorDB: {e}") from e
-else:
-    db = FalkorDB.from_url(os.getenv("FALKORDB_URL"))
+url = os.getenv("FALKORDB_URL")
+try:
+    db = FalkorDB.from_url(url) if url else FalkorDB(host="localhost", port=6379)
+except Exception as e:
+    raise ConnectionError(f"Failed to connect to FalkorDB: {e}") from e
api/agents/relevancy_agent.py (1)

16-23: Fix prompt typos/clarity (user-facing).

Minor spelling/grammar issues: “explict” → “explicit”, and clarify “yourself” as “the assistant”. Improves model guidance and user trust.

Apply this diff:

-• Common tables that can be found in most of the systems considered "On-topic" even if it not explict in the database description.
-• Don't answer questions that related to yourself.
-• Don't answer questions that related to personal information unless it related to data in the schemas.
-• Questions about the user's (first person) defined as "personal" and is Off-topic.
-• Questions about yourself defined as "personal" and is Off-topic.
+• Common tables that can be found in most systems are considered "On-topic" even if not explicit in the database description.
+• Don't answer questions related to the assistant itself.
+• Don't answer questions related to personal information unless it is tied to data in the provided schemas.
+• First‑person questions about the user are considered "personal" and are Off-topic.
+• Questions about the assistant are considered "personal" and are Off-topic.

Also applies to: 33-54

api/auth/user_management.py (2)

100-114: Simplify control flow (fix pylint R1705 “no-else-return”).

The first branch returns; the elif can be if.

Apply this diff:

-            elif is_new_identity and had_other_identities:
+            if is_new_identity and had_other_identities:
                 # New identity for existing user (cross-provider linking)
                 logging.info("NEW IDENTITY LINKED TO EXISTING USER: provider=%s, "
                              "provider_user_id=%s, email=%s, name=%s",
                              provider, provider_user_id, email, name)
                 return True, {"identity": identity, "user": user}

16-41: Consider reducing locals/returns to appease pylint without hurting readability.

Group related values into dicts and funnel returns through one exit to address R0914/R0911 warnings, if you want a clean lint.

If helpful, I can follow up with a PR-ready diff refactor that reduces locals and unifies returns.

Also applies to: 95-125

api/loaders/graph_loader.py (4)

36-51: Index creation: add IF NOT EXISTS and log errors instead of printing

Creating indices repeatedly can error on reruns. Prefer guarding with "IF NOT EXISTS" (if your FalkorDB version supports it) and replace prints with structured logging. Also consider centralizing the similarity function in Config to keep it consistent with query-time settings.

Apply this diff to improve logging (safe regardless of server support), and optionally add IF NOT EXISTS if supported:

-    try:
-        # Create vector indices
-        await graph.query(
+    try:
+        # Create vector indices
+        await graph.query(
             """
-            CREATE VECTOR INDEX FOR (t:Table) ON (t.embedding)
+            CREATE VECTOR INDEX FOR (t:Table) ON (t.embedding)
             OPTIONS {dimension:$size, similarityFunction:'euclidean'}
         """,
             {"size": vec_len},
         )
 
-        await graph.query(
+        await graph.query(
             """
-            CREATE VECTOR INDEX FOR (c:Column) ON (c.embedding)
+            CREATE VECTOR INDEX FOR (c:Column) ON (c.embedding)
             OPTIONS {dimension:$size, similarityFunction:'euclidean'}
         """,
             {"size": vec_len},
         )
-        await graph.query("CREATE INDEX FOR (p:Table) ON (p.name)")
+        await graph.query("CREATE INDEX FOR (p:Table) ON (p.name)")
     except Exception as e:
-        print(f"Error creating vector indices: {str(e)}")
+        import logging
+        logging.exception("Error creating indices: %s", e)

Optional (only if supported by your server):

  • Append IF NOT EXISTS to the CREATE statements to make reruns idempotent.

51-51: Nit: alias p is arbitrary in CREATE INDEX

This is fine as-is, but consider using t for consistency with other Table aliases in this file.


73-88: Harden embedding assumptions and missing descriptions

If table_info["description"] is missing/empty, embedding_result[0] will raise. Guard and default to an empty string (or skip embedding) to avoid loader failures.

Apply this diff to guard:

-        table_desc = table_info["description"]
-        embedding_result = embedding_model.embed(table_desc)
+        table_desc = table_info.get("description") or ""
+        embedding_result = embedding_model.embed(table_desc) if table_desc else [[0.0] * vec_len]

162-181: Batch relationship creation with UNWIND for performance

Creating relationships one by one incurs many round trips. You can accumulate a list and UNWIND it to create multiple relationships in a single query.

I can draft an UNWIND-based version if you want this optimized.

api/routes/graphs.py (4)

7-9: Remove unused concurrent.futures imports

These are no longer used in the async flow and will trigger pylint warnings.

Apply this diff:

-from concurrent.futures import ThreadPoolExecutor
-from concurrent.futures import TimeoutError as FuturesTimeoutError

360-376: Analysis step synchronous call is fine, but consider wrapping for cancellation

get_analysis is sync; during client disconnects the generator may be cancelled. Optional: run it in a thread executor tied to the request to make it cancellable.


24-26: Content streaming format

You’re sending chunked JSON parts with a custom delimiter using application/json. Consider text/plain or a custom media type if clients misinterpret the stream as a single JSON document. Optional only.


61-83: Loader selection defaults

Defaulting unknown schemes to Postgres keeps backward compatibility. Optionally log a warning when defaulting to help diagnose mismatched URLs.

api/graph.py (8)

15-15: Avoid configuring logging in a library module

logging.basicConfig in a library can interfere with application logging configuration. Move this to your app entrypoint or guard it.

Apply this diff:

-logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+# Configure logging in the application entrypoint, not in the library.

39-55: Add LIMIT and handle multiple Database nodes deterministically

If multiple :Database nodes exist, returning the first row is ambiguous. Limit to 1 explicitly.

Apply this diff:

-    query_result = await graph.query(
+    query_result = await graph.query(
         """
         MATCH (d:Database)
-        RETURN d.description, d.url
+        RETURN d.description, d.url
+        LIMIT 1
         """
     )

91-111: Add labels to speed up matches and avoid accidental pattern captures

Explicitly match columns:Column to reduce accidental matches.

Apply this diff:

-        MATCH (node)-[:BELONGS_TO]-(columns)
+        MATCH (node)-[:BELONGS_TO]-(columns:Column)

127-142: Clarify relationship directions and labels for column-based lookup

Be explicit on labels and directions to avoid cross-label traversals.

Apply this diff:

-        CALL db.idx.vector.queryNodes('Column','embedding',3,vecf32($embedding))
+        CALL db.idx.vector.queryNodes('Column','embedding',3,vecf32($embedding))
         YIELD node, score
-        MATCH (node)-[:BELONGS_TO]-(table)-[:BELONGS_TO]-(columns)
+        MATCH (node:Column)-[:BELONGS_TO]->(table:Table)<-[:BELONGS_TO]-(columns:Column)

186-239: allShortestPaths can be expensive; keep a cap and consider early LIMIT

Path length up to 9 can explode on dense graphs. If possible, reduce the upper bound or post-filter with a LIMIT to prevent heavy scans on large schemas.


287-295: Fix line-too-long and readability when building descriptions_text

Break into multiple lines to satisfy pylint and improve clarity.

Apply this diff:

-    descriptions_text = [desc.description for desc in descriptions.tables_descriptions] + [desc.description for desc in descriptions.columns_descriptions]
+    descriptions_text = (
+        [desc.description for desc in descriptions.tables_descriptions]
+        + [desc.description for desc in descriptions.columns_descriptions]
+    )

103-103: Trim trailing whitespace to satisfy pylint C0303

Pylint flagged trailing whitespace. Remove extra spaces or stray blanks on this line.


346-346: Add final newline to satisfy pylint C0304

Ensure there is a newline at end-of-file.

Apply this diff:

-    return list(unique_tables.values())
\ No newline at end of file
+    return list(unique_tables.values())
+
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 03d6a7b and ebd1006.

📒 Files selected for processing (10)
  • api/agents/relevancy_agent.py (1 hunks)
  • api/auth/user_management.py (4 hunks)
  • api/extensions.py (1 hunks)
  • api/graph.py (4 hunks)
  • api/loaders/graph_loader.py (5 hunks)
  • api/loaders/mysql_loader.py (2 hunks)
  • api/loaders/postgres_loader.py (2 hunks)
  • api/routes/graphs.py (8 hunks)
  • docs/postgres_loader.md (1 hunks)
  • onthology.py (0 hunks)
💤 Files with no reviewable changes (1)
  • onthology.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • api/agents/relevancy_agent.py
  • api/auth/user_management.py
  • api/loaders/mysql_loader.py
  • api/extensions.py
  • api/loaders/graph_loader.py
  • api/loaders/postgres_loader.py
  • api/graph.py
  • api/routes/graphs.py
🧬 Code graph analysis (6)
api/agents/relevancy_agent.py (2)
api/agents/follow_up_agent.py (1)
  • get_answer (52-73)
api/agents/taxonomy_agent.py (1)
  • get_answer (45-60)
api/loaders/mysql_loader.py (2)
api/routes/graphs.py (1)
  • refresh_graph_schema (657-704)
api/loaders/postgres_loader.py (1)
  • refresh_graph_schema (368-414)
api/loaders/graph_loader.py (1)
api/utils.py (1)
  • generate_db_description (12-69)
api/loaders/postgres_loader.py (2)
api/routes/graphs.py (1)
  • refresh_graph_schema (657-704)
api/loaders/mysql_loader.py (1)
  • refresh_graph_schema (407-453)
api/graph.py (1)
api/config.py (2)
  • Config (47-137)
  • embed (18-31)
api/routes/graphs.py (4)
api/agents/relevancy_agent.py (2)
  • RelevancyAgent (57-90)
  • get_answer (71-90)
api/agents/analysis_agent.py (2)
  • AnalysisAgent (9-201)
  • get_analysis (14-49)
app/ts/modules/config.ts (1)
  • MESSAGE_DELIMITER (5-5)
api/graph.py (2)
  • get_db_description (39-54)
  • find (241-327)
🪛 GitHub Actions: Pylint
api/agents/relevancy_agent.py

[warning] 61-61: W0231: init method from base class 'BaseAgent' is not called (super-init-not-called)

api/auth/user_management.py

[warning] 16-16: R0914: Too many local variables (17/15) (too-many-locals)


[warning] 100-100: R1705: Unnecessary "elif" after "return", remove the leading "el" from "elif" (no-else-return)


[warning] 16-16: R0911: Too many return statements (9/6) (too-many-return-statements)

api/graph.py

[warning] 103-103: C0303: Trailing whitespace (trailing-whitespace)


[warning] 289-289: C0301: Line too long (154/100) (line-too-long)


[warning] 346-346: C0304: Final newline missing (missing-final-newline)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: e2e-tests
  • GitHub Check: e2e-tests
🔇 Additional comments (9)
api/auth/user_management.py (1)

16-16: Ensure all async calls are properly awaited

It looks like ensure_user_in_organizations was converted to an async function but its call sites in api/auth/oauth_handlers.py still invoke it synchronously. Specifically:

  • api/auth/oauth_handlers.py:37
  • api/auth/oauth_handlers.py:65

Both currently read:

_, _ = ensure_user_in_organizations(
    user_id,
    email,
    …
)

They should be updated to:

- _, _ = ensure_user_in_organizations(
+ _, _ = await ensure_user_in_organizations(
      user_id,
      email,
      …
  )

I didn’t find any usages of update_identity_last_login outside its definition (only the async signature in user_management.py) — please double-check if it’s invoked elsewhere and, if so, prefix those calls with await as well.

api/loaders/graph_loader.py (1)

55-66: LGTM: database node creation aligns with get_db_description expectations

The properties description and url match what get_db_description reads later.

api/routes/graphs.py (6)

299-303: Nice: timing instrumentation at pipeline start

End-to-end timing is useful for latency tracking.


324-335: Good: concurrent relevancy + graph find orchestration

Starting both tasks and awaiting relevancy first is a clean optimization.


94-106: LGTM: namespacing and filtering user graphs

Async db.list_graphs() is used correctly, and IDs are properly scoped.


233-241: Robust JSON file handling

Good error handling on json.JSONDecodeError.


378-396: Destructive operation messaging is clear

The operation-specific messaging is explicit and user-friendly.


295-353: Cancellation of find_task on off-topic is well-handled

Catching CancelledError prevents log noise.

api/graph.py (1)

323-327: Return shape consistency

find now returns List[List[Any]] consistent with the downstream AnalysisAgent._format_schema expectations. Looks good.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (10)
api/loaders/graph_loader.py (2)

120-147: Guard against embed/column index mismatch and replace print with logging

If embed_columns length drifts from the enumerated columns (data source mismatch), embed_columns[idx] will IndexError. Add a safe fallback and use logging.

-            await graph.query(
+            # Safe select the embedding vector
+            if idx < len(embed_columns):
+                embedding_vector = embed_columns[idx]
+            else:
+                # Fallback: compute per-column embedding
+                single = await asyncio.to_thread(embedding_model.embed, col_info["description"])
+                embedding_vector = single[0]
+            await graph.query(
@@
-                    "embedding": embed_columns[idx],
+                    "embedding": embedding_vector,
                 },
             )

Also change the earlier exception print (Line 111) to logging:

-            except Exception as e:
-                print(f"Error creating embeddings: {str(e)}")
+            except Exception:
+                logging.exception("Error creating embeddings for table=%s", table_name)

12-19: Ensure async load_to_graph is awaited everywhere and suppress the lint warning

The newly async load_to_graph must be awaited at every call site to avoid silently dropped coroutines, and Pylint’s R0913 (“too many arguments”) should be disabled on the definition.

• In api/loaders/graph_loader.py (line 12):

-async def load_to_graph(
+async def load_to_graph(  # pylint: disable=too-many-arguments

• In api/loaders/csv_loader.py (line 198):

-    load_to_graph(graph_id, tables, relationships, db_name=db_name)
+    await load_to_graph(graph_id, tables, relationships, db_name=db_name)

• In api/loaders/postgres_loader.py (line 99):

-            load_to_graph(prefix + "_" + db_name, entities, relationships,
+            await load_to_graph(prefix + "_" + db_name, entities, relationships,

• In api/loaders/json_loader.py (line 69):

-        load_to_graph(graph_id, data["tables"], relationships, db_name=data["database"])
+        await load_to_graph(graph_id, data["tables"], relationships, db_name=data["database"])

• In api/loaders/odata_loader.py (line 26):

-        load_to_graph(graph_id, entities, relationships, db_name="ERP system")
+        await load_to_graph(graph_id, entities, relationships, db_name="ERP system")

• In api/loaders/mysql_loader.py (line 161):

-            load_to_graph(prefix + "_" + db_name, entities, relationships,
+            await load_to_graph(prefix + "_" + db_name, entities, relationships,

These changes are critical to ensure proper async execution and to silence the argument‐count lint error.

api/loaders/mysql_loader.py (3)

161-163: Bug: async graph load is never awaited; schema is not loaded

load_to_graph is async, but MySQLLoader.load is synchronous and doesn’t await it. This will silently skip the actual load.

Apply the following changes:

  • Make MySQLLoader.load async and await load_to_graph.
  • Update callers accordingly (see additional note on routes below).
-    def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
+    async def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
@@
-            load_to_graph(prefix + "_" + db_name, entities, relationships,
-                         db_name=db_name, db_url=connection_url)
+            await load_to_graph(
+                prefix + "_" + db_name,
+                entities,
+                relationships,
+                db_name=db_name,
+                db_url=connection_url,
+            )

Follow-up in this file at refresh (Line 439):

-            success, message = MySQLLoader.load(prefix, db_url)
+            success, message = await MySQLLoader.load(prefix, db_url)

I can propagate the signature change across the repo on your signal.


519-531: Duplicate exception handler; first swallows errors and makes second unreachable

Two consecutive except pymysql.MySQLError as e blocks exist. The first one rolls back and closes but does not raise, masking errors; the second is unreachable. Collapse into a single handler that rolls back, cleans up, and raises.

-        except pymysql.MySQLError as e:
-            # Rollback in case of error
-            if 'conn' in locals():
-                conn.rollback()
-                cursor.close()
-                conn.close()
-        except pymysql.MySQLError as e:
+        except pymysql.MySQLError as e:
             # Rollback in case of error
             if 'conn' in locals():
                 conn.rollback()
                 cursor.close()
                 conn.close()
             raise Exception(f"MySQL query execution error: {str(e)}") from e

438-446: Ensure all calls to refresh_graph_schema are awaited

The refresh_graph_schema method on both MySQLLoader and PostgresLoader has been converted to an async static method. Any invocation that omits await will return a coroutine rather than executing the schema refresh, leading to silent failures or runtime errors.

Please update the following call sites in api/routes/graphs.py to await the async method:

  • Lines 458–462
  • refresh_result = loader_class.refresh_graph_schema(
  •   graph_id, db_url)
    
  • refresh_result = await loader_class.refresh_graph_schema(
  •   graph_id, db_url)
    
    refresh_success, refresh_message = refresh_result
    
    
  • Lines 592–595
  • refresh_success, refresh_message = (
  •   loader_class.refresh_graph_schema(graph_id, db_url)
    
  • )
  • refresh_success, refresh_message = await (
  •   loader_class.refresh_graph_schema(graph_id, db_url)
    
  • )

- Lines 683–687  
```diff
-   success, message = loader_class.refresh_graph_schema(graph_id, db_url)
+   success, message = await loader_class.refresh_graph_schema(graph_id, db_url)

After these changes, all schema-refresh routes and streaming handlers will properly execute the async loader logic.

api/agents/relevancy_agent.py (2)

61-70: Initialize BaseAgent; fix pylint W0231

Pylint warns that base init isn’t called. Either call super().__init__() or disable W0231 if BaseAgent intentionally doesn't require init. Prefer calling super.

     def __init__(self, queries_history: list, result_history: list):
         """Initialize the relevancy agent with query and result history."""
+        super().__init__()
         if result_history is None:
             self.messages = []
         else:
             self.messages = []
             for query, result in zip(queries_history[:-1], result_history):
                 self.messages.append({"role": "user", "content": query})
                 self.messages.append({"role": "assistant", "content": result})

71-90: Async method but blocking completion; also prefer JSON response_format

  • Method is async but performs blocking I/O via completion(...), which can stall the event loop. Either switch to an async client (e.g., acompletion) or offload with asyncio.to_thread.
  • Add response_format={"type": "json_object"} for more robust parsing.
+import asyncio
@@
-        completion_result = completion(
+        completion_result = await asyncio.to_thread(
+            completion,
             model=Config.COMPLETION_MODEL,
             messages=self.messages,
+            response_format={"type": "json_object"},
             temperature=0,
-        )
+        )

If litellm exposes an async acompletion, prefer that directly.

api/loaders/postgres_loader.py (2)

368-389: Async conversion is correct, but you’re still doing blocking I/O inside the event loop

  • refresh_graph_schema is now async and correctly awaits graph.delete(). However, PostgresLoader.load uses psycopg2 (blocking) and is invoked directly here, which will block the event loop during schema reloads triggered by routes.
  • Also, callers must now await this method; several call sites currently don’t (see api/routes/graphs.py comments).

Apply this diff to offload the blocking load() to a background thread:

@@
-    @staticmethod
-    async def refresh_graph_schema(graph_id: str, db_url: str) -> Tuple[bool, str]:
+    @staticmethod
+    async def refresh_graph_schema(graph_id: str, db_url: str) -> Tuple[bool, str]:
@@
-            success, message = PostgresLoader.load(prefix, db_url)
+            import asyncio
+            success, message = await asyncio.to_thread(PostgresLoader.load, prefix, db_url)

And ensure all call sites await this async method (see route fixes in api/routes/graphs.py).


98-104: Await and propagate the async load_to_graph call in all loader classes

load_to_graph was changed to an async def (api/loaders/graph_loader.py:12) and therefore returns a coroutine. All callers in synchronous load methods currently drop the coroutine, preventing any data from actually loading. You must:

  • Update each loader’s load method signature to be async def load(...) instead of def load(...).
  • Prepend await to every load_to_graph(...) invocation.

Impacted files and locations:

  • api/loaders/postgres_loader.py (line 67): def loadasync def load
  • api/loaders/postgres_loader.py (lines 98–100): add await before load_to_graph
  • api/loaders/mysql_loader.py (line 159): change def loadasync def load and await on line 161
  • api/loaders/odata_loader.py (line 24): change def loadasync def load and await on line 26
  • api/loaders/csv_loader.py (line 196): change def loadasync def load and await on line 198
  • api/loaders/json_loader.py (line 67): change def loadasync def load and await on line 69

Example diff for PostgresLoader:

@@ api/loaders/postgres_loader.py
-    @staticmethod
-    def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
+    @staticmethod
+    async def load(prefix: str, connection_url: str) -> Tuple[bool, str]:
@@
-            load_to_graph(prefix + "_" + db_name, entities, relationships,
-                         db_name=db_name, db_url=connection_url)
+            await load_to_graph(prefix + "_" + db_name, entities, relationships,
+                                db_name=db_name, db_url=connection_url)

Apply the same signature and call-site changes in MySQLLoader, ODataLoader, CSVLoader, and JSONLoader so that each loader properly awaits the async graph load.

api/routes/graphs.py (1)

460-463: Async API misuse: missing awaits for async functions (and a sync DB call blocking the event loop)

  • get_db_description is async — missing await in confirm flow.
  • Postgres/MySQL refresh_graph_schema is async — missing await in both streaming path and refresh endpoint.
  • loader_class.execute_sql_query uses psycopg2 (blocking). Calling it directly inside async code blocks the event loop.

Apply these diffs:

@@ streaming path after query_results
-                    query_results = loader_class.execute_sql_query(answer_an["sql_query"], db_url)
+                    query_results = await asyncio.to_thread(
+                        loader_class.execute_sql_query, answer_an["sql_query"], db_url
+                    )
@@ schema refresh inside streaming path
-                        refresh_result = loader_class.refresh_graph_schema(
-                            graph_id, db_url)
-                        refresh_success, refresh_message = refresh_result
+                        refresh_success, refresh_message = await loader_class.refresh_graph_schema(
+                            graph_id, db_url
+                        )
@@ confirm_destructive_operation()
-                db_description, db_url = get_db_description(graph_id)
+                db_description, db_url = await get_db_description(graph_id)
@@ confirmed execution path
-                query_results = loader_class.execute_sql_query(sql_query, db_url)
+                query_results = await asyncio.to_thread(
+                    loader_class.execute_sql_query, sql_query, db_url
+                )
@@ confirmed refresh after destructive op
-                    refresh_success, refresh_message = (
-                        loader_class.refresh_graph_schema(graph_id, db_url)
-                    )
+                    refresh_success, refresh_message = await loader_class.refresh_graph_schema(
+                        graph_id, db_url
+                    )
@@ manual refresh endpoint
-        _, db_url = get_db_description(graph_id)
+        _, db_url = await get_db_description(graph_id)
@@
-        success, message = loader_class.refresh_graph_schema(graph_id, db_url)
+        success, message = await loader_class.refresh_graph_schema(graph_id, db_url)

This fixes runtime errors and prevents event loop stalls.

Also applies to: 559-560, 579-586, 594-596, 667-667, 685-686

♻️ Duplicate comments (1)
api/routes/graphs.py (1)

667-703: This route must await async helpers

Same awaits issue in the manual refresh endpoint (also noted above). See combined diff in the earlier comment.

🧹 Nitpick comments (20)
api/extensions.py (2)

5-15: Async client import: confirm sync-safe construction and reuse env var

  • Importing from falkordb.asyncio implies async methods (e.g., Graph.query). Verify that FalkorDB(...) and FalkorDB.from_url(...) are synchronous factories; otherwise, construction should be moved into an async initializer to avoid creating a client at import time that requires awaiting.
  • Minor: reuse the already-read url instead of calling os.getenv again.

Apply this small nit and consider lazy/async initialization if needed:

-    db = FalkorDB.from_url(os.getenv("FALKORDB_URL"))
+    db = FalkorDB.from_url(url)

Optionally refactor to lazy init:

+from typing import Optional
+_db: Optional[FalkorDB] = None
+
+def get_db() -> FalkorDB:
+    global _db
+    if _db is not None:
+        return _db
+    url = os.getenv("FALKORDB_URL")
+    if url:
+        _db = FalkorDB.from_url(url)
+        return _db
+    try:
+        _db = FalkorDB(host="localhost", port=6379)
+        return _db
+    except Exception as e:
+        raise ConnectionError(f"Failed to connect to FalkorDB: {e}") from e

If FalkorDB.from_url is an async coroutine in this driver, we should instead expose async def get_db_async() -> FalkorDB and await construction. Do you want me to push that change?


11-13: Broader exception bucket at import-time connection

Catching bare Exception at import-time can mask configuration issues and make debugging harder. Prefer narrowing to the client’s specific exception type (if exposed) or move connection into a function so errors surface at call time rather than on module import.

api/loaders/graph_loader.py (4)

36-52: Use logging and avoid swallowing index-creation failures

Replace print with logging and include exception context. Current catch hides which statement failed if multiple run back-to-back.

+import logging
@@
-    except Exception as e:
-        print(f"Error creating vector indices: {str(e)}")
+    except Exception:
+        logging.exception("Error creating vector indices for graph_id=%s", graph_id)

55-65: generate_db_description is synchronous and network-bound; don’t block the event loop

If generate_db_description performs a blocking HTTP call, wrap it in a thread off the event loop or migrate to an async completion. This path will be hot for large loads.

+import asyncio
@@
-    db_des = generate_db_description(db_name=db_name, table_names=list(entities.keys()))
+    db_des = await asyncio.to_thread(
+        generate_db_description, db_name=db_name, table_names=list(entities.keys())
+    )

67-89: Embedding calls inside async function should be offloaded to threads

Both table and column embeddings are CPU/IO-bound. Use asyncio.to_thread to avoid blocking the loop during long runs.

-        embedding_result = embedding_model.embed(table_desc)
+        embedding_result = await asyncio.to_thread(embedding_model.embed, table_desc)

And in the batch path below (Line 108):

-                    embedding_result = embedding_model.embed(batch)
+                    embedding_result = await asyncio.to_thread(embedding_model.embed, batch)

160-184: Use logging for relationship creation warnings

Swap print for logging to preserve stack and context.

-            except Exception as e:
-                print(f"Warning: Could not create relationship: {str(e)}")
+            except Exception:
+                logging.exception("Could not create relationship %s between %s.%s -> %s.%s",
+                                  rel_name, source_table, source_field, target_table, target_field)
api/loaders/mysql_loader.py (2)

17-18: Library-level logging.basicConfig

Calling logging.basicConfig in a library module affects global logging for consumers. Prefer configuring logging in the application entrypoint.

-logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+# Configure logging in the app entrypoint; keep module-level loggers lightweight.

472-475: Cursor creation style inconsistency

Elsewhere you use DictCursor; here you pass dictionary=True. Pick one style for consistency and to avoid surprises if PyMySQL changes behavior.

-            cursor = conn.cursor(dictionary=True)
+            cursor = conn.cursor(DictCursor)
api/agents/relevancy_agent.py (1)

9-54: Prompt typos and clarity nits

Small edits improve clarity and reduce ambiguity for the model (“explicit” spelling, phrasing around personal info).

-• Common tables that can be found in most of the systems considered "On-topic" even if it not explict in the database description.
-• Don't answer questions that related to yourself.
-• Don't answer questions that related to personal information unless it related to data in the schemas.
-• Questions about the user's (first person) defined as "personal" and is Off-topic.
-• Questions about yourself defined as "personal" and is Off-topic.
+• Common tables that are found in most systems are considered "On-topic" even if not explicitly mentioned in the database description.
+• Don't answer questions related to yourself.
+• Don't answer questions related to personal information unless it is part of the database schema.
+• Questions about the user's (first person) life are "personal" and are Off-topic.
+• Questions about yourself are "personal" and are Off-topic.
app/templates/components/left_toolbar.j2 (1)

41-53: Decorative SVGs should be hidden from screen readers

Add aria-hidden and focusable to reduce verbosity for assistive tech. Keeps the button’s label/aria-label as the accessible name.

-                <svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor"
+                <svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor"
+                     aria-hidden="true" focusable="false"
                      stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
                      class="lucide lucide-sliders-horizontal-icon lucide-sliders-horizontal">
@@
-                <svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor"
+                <svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor"
+                     aria-hidden="true" focusable="false"
                      stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
                      class="lucide lucide-waypoints-icon lucide-waypoints">

Also applies to: 59-69

api/graph.py (6)

287-295: Long line violates pylint C0301; split for readability

-    descriptions_text = [desc.description for desc in descriptions.tables_descriptions] + [desc.description for desc in descriptions.columns_descriptions]
+    descriptions_text = (
+        [desc.description for desc in descriptions.tables_descriptions]
+        + [desc.description for desc in descriptions.columns_descriptions]
+    )

342-344: Replace print with logging and avoid mutating possibly immutable rows

  • Use logging instead of print.
  • Some drivers return tuples; assigning table_info[2]/[3] could raise. You already catch exceptions; logging will help debug.
-        except Exception as e:
-            print(f"Error: {table_info}, Exception: {e}")
+        except Exception as e:
+            logging.warning("Error normalizing table info %r: %s", table_info, e)

39-55: Blocking SDK calls inside async function (LLM completion and embeddings)

completion(...) and Config.EMBEDDING_MODEL.embed(...) are synchronous. They will block the event loop and hurt concurrency, especially with high latency models.

Two lightweight options:

  • Switch to async SDK if available (e.g., litellm.acompletion / async embeddings).
  • Or offload to a thread:
-    completion_result = completion(
+    completion_result = await asyncio.to_thread(
+        completion,
         model=Config.COMPLETION_MODEL,
         response_format=Descriptions,
         messages=[...],
         temperature=0,
-    )
+    )
@@
-    embedding_results = Config.EMBEDDING_MODEL.embed(descriptions_text)
+    embedding_results = await asyncio.to_thread(
+        Config.EMBEDDING_MODEL.embed, descriptions_text
+    )

I can push a follow-up patch wiring litellm’s async APIs if desired.

Also applies to: 266-286, 290-295


103-103: Lint nits: trailing whitespace and missing final newline

Pylint warnings:

  • Line 103: trailing whitespace (C0303)
  • End of file: missing final newline (C0304)

Trim the trailing spaces and add a newline at EOF.

Also applies to: 346-346


127-142: Vector index query limits (top-k=3) are quite tight

Returning only 3 matches per embedding may be too restrictive. If recall is a concern, consider making k configurable (e.g., via Config) and testing k=5–10.


56-76: _query_graph: consider passing timeout through callers and adding error handling

Optional: surface timeouts per query type and wrap with try/except to attach the offending query to logs.

api/auth/user_management.py (1)

16-39: High complexity and broad exception catches — tame for now, but worth a cleanup pass

  • Pylint flags too-many-locals/branches/statements and broad-exception-caught. Code is readable but could be split into helpers (validate inputs, build params, execute merge, interpret result).
  • Replace broad Exception catches with narrower exceptions where possible.

Happy to draft a refactor that:

  • Extracts validation into small functions.
  • Narrows exceptions (e.g., ValueError, KeyError, AttributeError) and a final Exception block for safety.
  • Adds unit tests for new user, new identity, existing identity paths.

Also applies to: 119-125, 141-160

api/routes/graphs.py (2)

7-9: Remove unused ThreadPoolExecutor imports

You no longer use ThreadPoolExecutor/TimeoutError. Dropping them will avoid lint warnings later.

-from concurrent.futures import ThreadPoolExecutor
-from concurrent.futures import TimeoutError as FuturesTimeoutError

61-83: get_database_type_and_loader: consider supporting common SQLAlchemy-style URLs

Optional: accept postgres+psycopg2:// and mysql+pymysql:// prefixes to improve compatibility.

-    if db_url_lower.startswith('postgresql://') or db_url_lower.startswith('postgres://'):
+    if (db_url_lower.startswith('postgresql://')
+        or db_url_lower.startswith('postgres://')
+        or db_url_lower.startswith('postgresql+')
+        or db_url_lower.startswith('postgres+')):
         return 'postgresql', PostgresLoader
-    elif db_url_lower.startswith('mysql://'):
+    elif db_url_lower.startswith('mysql://') or db_url_lower.startswith('mysql+'):
         return 'mysql', MySQLLoader
api/loaders/postgres_loader.py (1)

55-65: _serialize_value: minor duplication

First two branches both call isoformat(). You can merge datetime/date/time handling.

-        if isinstance(value, (datetime.date, datetime.datetime)):
-            return value.isoformat()
-        if isinstance(value, datetime.time):
+        if isinstance(value, (datetime.date, datetime.datetime, datetime.time)):
             return value.isoformat()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 03d6a7b and 4f235ce.

📒 Files selected for processing (11)
  • api/agents/relevancy_agent.py (1 hunks)
  • api/auth/user_management.py (4 hunks)
  • api/extensions.py (1 hunks)
  • api/graph.py (4 hunks)
  • api/loaders/graph_loader.py (5 hunks)
  • api/loaders/mysql_loader.py (2 hunks)
  • api/loaders/postgres_loader.py (2 hunks)
  • api/routes/graphs.py (8 hunks)
  • app/templates/components/left_toolbar.j2 (1 hunks)
  • docs/postgres_loader.md (1 hunks)
  • onthology.py (0 hunks)
💤 Files with no reviewable changes (1)
  • onthology.py
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • api/loaders/graph_loader.py
  • api/agents/relevancy_agent.py
  • api/extensions.py
  • api/loaders/mysql_loader.py
  • api/auth/user_management.py
  • api/loaders/postgres_loader.py
  • api/routes/graphs.py
  • api/graph.py
app/**

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Keep the TypeScript frontend sources in app/ and build them before production runs

Files:

  • app/templates/components/left_toolbar.j2
🧬 Code graph analysis (6)
api/loaders/graph_loader.py (1)
api/utils.py (1)
  • generate_db_description (12-69)
api/agents/relevancy_agent.py (2)
api/agents/follow_up_agent.py (1)
  • get_answer (52-73)
api/agents/taxonomy_agent.py (1)
  • get_answer (45-60)
api/loaders/mysql_loader.py (2)
api/routes/graphs.py (1)
  • refresh_graph_schema (657-704)
api/loaders/postgres_loader.py (1)
  • refresh_graph_schema (368-414)
api/loaders/postgres_loader.py (2)
api/routes/graphs.py (1)
  • refresh_graph_schema (657-704)
api/loaders/mysql_loader.py (1)
  • refresh_graph_schema (407-453)
api/routes/graphs.py (3)
api/agents/relevancy_agent.py (2)
  • RelevancyAgent (57-90)
  • get_answer (71-90)
api/agents/analysis_agent.py (2)
  • AnalysisAgent (9-201)
  • get_analysis (14-49)
api/graph.py (2)
  • get_db_description (39-54)
  • find (241-327)
api/graph.py (1)
api/config.py (2)
  • Config (47-137)
  • embed (18-31)
🪛 GitHub Actions: Pylint
api/loaders/graph_loader.py

[warning] 12-12: R0913: Too many arguments (6/5) (too-many-arguments)

api/agents/relevancy_agent.py

[warning] 61-61: W0231: init method from base class 'BaseAgent' is not called (super-init-not-called)

api/auth/user_management.py

[warning] 16-16: R0914: Too many local variables (17/15) (too-many-locals)


[warning] 100-100: R1705: Unnecessary elif after return (no-else-return)


[warning] 16-16: R0911: Too many return statements (9/6) (too-many-return-statements)


[warning] 157-157: W0718: Catching too general exception (broad-exception-caught)


[warning] 275-275: W0718: Catching too general exception (broad-exception-caught)


[warning] 221-221: W0718: Catching too general exception (broad-exception-caught)


[warning] 196-196: W0718: Catching too general exception (broad-exception-caught)


[warning] 266-266: W0718: Catching too general exception (broad-exception-caught)


[warning] 169-169: R1702: Too many nested blocks (7/5) (too-many-nested-blocks)


[warning] 162-162: R0911: Too many return statements (9/6) (too-many-return-statements)


[warning] 162-162: R0912: Too many branches (17/12) (too-many-branches)


[warning] 162-162: R0915: Too many statements (67/50) (too-many-statements)


[warning] 317-317: W0707: Raise HTTPException without from e (raise-missing-from)

api/graph.py

[warning] 103-103: C0303: Trailing whitespace (trailing-whitespace)


[warning] 289-289: C0301: Line too long (154/100)


[warning] 346-346: C0304: Final newline missing (missing-final-newline)

🔇 Additional comments (9)
docs/postgres_loader.md (1)

110-112: Confirm Python driver result shape

Example accesses result.result_set. Some FalkorDB Python clients return rows directly (iterables), others wrap in an object. Please confirm and adjust to avoid confusing users.

Would you like me to update the docs once you confirm the actual return structure?

api/agents/relevancy_agent.py (1)

71-71: No missing awaits for RelevancyAgent.get_answer found

I audited all call-sites of the newly async RelevancyAgent.get_answer and confirmed:

• The only live invocation in api/routes/graphs.py (around line 327) is wrapped in an asyncio.create_task and subsequently awaited via await relevancy_task.
• The commented-out call in api/loaders/csv_loader.py is not active code.
• No other references to agent_rel.get_answer( exist outside its definition in api/agents/relevancy_agent.py.

Since every active invocation is properly awaited, no changes are required here.

app/templates/components/left_toolbar.j2 (1)

1-12: Icon swap looks good

Switch to stroke-based Lucide icons improves visual consistency. No functional issues spotted.

api/auth/user_management.py (1)

83-91: Async graph queries: good migration

Switching to await organizations_graph.query(...) aligns with the project’s async shift. Looks correct.

Also applies to: 148-151

api/routes/graphs.py (3)

324-335: Good use of concurrent tasks with cancellation on off-topic

Running relevancy and find concurrently and cancelling find when off-topic is the right shape. Logging is clear and helpful.


378-417: Destructive operation confirmation UX: solid

Clear messaging and halting the pipeline until confirmation is received is good. Nice touch logging total time up to the halt.


310-318: Timing logs are helpful

End-to-end and phase timings are valuable for observability. Keep them — they’ll pay dividends in prod.

Also applies to: 509-528

api/loaders/postgres_loader.py (1)

66-109: load(): safe and straightforward

Synchronous extract via psycopg2 and then load_to_graph is clear. Assuming load_to_graph matches sync/async reality (see earlier verification), this is OK.

api/graph.py (1)

241-256: Return type and helper naming are consistent

find returns a flat List[List[Any]] and _get_unique_tables documents and implements deduplication across sources. Looks consistent with downstream agent formatting.

Also applies to: 327-346

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
api/extensions.py (1)

11-14: Avoid broad except Exception; catch specific exceptions to satisfy pylint (W0718) and improve signal

Catching Exception is flagged by pylint and obscures actionable failures. Narrow the set to expected failures from client construction.

Apply this diff to use specific exceptions (pairs with the import suggested above):

-    try:
-        db = FalkorDB(host="localhost", port=6379)
-    except Exception as e:
-        raise ConnectionError(f"Failed to connect to FalkorDB: {e}") from e
+    try:
+        db = FalkorDB(host="localhost", port=6379)
+    except (ValueError, RedisError, OSError) as exc:
+        raise ConnectionError(f"Failed to initialize FalkorDB on localhost: {exc}") from exc
🧹 Nitpick comments (2)
api/extensions.py (2)

1-1: Docstring nit: update to reflect module purpose

The current docstring references “text2sql,” which seems outdated for a module that initializes the FalkorDB client.

Apply this diff for clarity:

-"""Extensions for the text2sql library"""
+"""FalkorDB client initialization and connection utilities (async)."""

5-6: Update Redis imports for stable path and precise exception handling

The project’s Pipfile.lock pins redis to version 5.3.1, which ships the redis.asyncio.connection submodule. We can safely switch to importing ConnectionPool from there and add RedisError for narrow exception handling and satisfying pylint W0718.

• File: api/extensions.py
• Lines: 5–6

-from falkordb.asyncio import FalkorDB
-from redis.asyncio import ConnectionPool
+from falkordb.asyncio import FalkorDB
+from redis.asyncio.connection import ConnectionPool
+from redis.exceptions import RedisError
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4f235ce and df3cb25.

📒 Files selected for processing (1)
  • api/extensions.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • api/extensions.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: e2e-tests
  • GitHub Check: e2e-tests
🔇 Additional comments (1)
api/extensions.py (1)

16-17: Add error handling around pool/db initialization and guard empty FALKORDB_URL

Before calling into Redis and FalkorDB, validate that url is non-empty and wrap initialization in a narrow try/except. Also, double-check whether db.select_graph() is synchronous—if it’s async, all call sites must be awaited.

• In api/extensions.py (around lines 16–17):
– Validate url isn’t an empty string.
– Wrap ConnectionPool.from_url and FalkorDB(...) in a try/except catching ValueError, RedisError, and OSError.

• Review all db.select_graph(…) usages to confirm sync/async semantics. Current matches (likely false positives if select_graph is sync):
– api/graph.py:41, 257
– api/routes/graphs.py:124
– api/loaders/mysql_loader.py:426
– api/loaders/graph_loader.py:30
– api/loaders/postgres_loader.py:387
– api/auth/user_management.py:42, 142

Apply this diff in api/extensions.py:

-    pool = ConnectionPool.from_url(url)
-    db = FalkorDB(connection_pool=pool)
+    # Validate non-empty URL to avoid confusing "from_url('')" errors
+    if not url or not url.strip():
+        raise ConnectionError("FALKORDB_URL is set but empty.")
+    try:
+        pool = ConnectionPool.from_url(url)
+        db = FalkorDB(connection_pool=pool)
+    except (ValueError, RedisError, OSError) as exc:
+        # Normalize into a ConnectionError to avoid leaking lower-level details upstream
+        raise ConnectionError(f"Invalid FALKORDB_URL or failed to initialize FalkorDB: {exc}") from exc

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (13)
api/auth/oauth_handlers.py (2)

26-49: Do not ignore ensure_user_in_organizations result; returning True on failure can create auth inconsistencies

ensure_user_in_organizations returns (is_new_identity, result). result is None on failure. The current code discards both values and unconditionally returns True, which could report a “successful” callback even if persisting the identity failed. This is high-impact in an auth flow.

Apply this diff to validate the outcome and remove the broad try/except (also fixes the Pylint W0718 warning at Line 46):

-        try:
-            user_id = user_info.get("id")
-            email = user_info.get("email")
-            name = user_info.get("name")
-
-            # Validate required fields
-            if not user_id or not email:
-                logging.error("Missing required fields from Google OAuth response")
-                return False
-
-            # Check if identity exists in Organizations graph, create if new
-            _, _ = await ensure_user_in_organizations(
-                user_id,
-                email,
-                name,
-                "google",
-                user_info.get("picture"),
-            )
-
-            return True
-        except Exception as exc:  # capture exception for logging
-            logging.error("Error handling Google OAuth callback: %s", exc)
-            return False
+        user_id = user_info.get("id")
+        email = user_info.get("email")
+        name = user_info.get("name")
+
+        # Validate required fields
+        if not user_id or not email:
+            logging.error("Missing required fields from Google OAuth response")
+            return False
+
+        is_new_identity, result = await ensure_user_in_organizations(
+            user_id,
+            email,
+            name,
+            "google",
+            user_info.get("picture"),
+        )
+
+        if result is None:
+            logging.error(
+                "Failed to ensure Google identity in Organizations (user_id=%s, email=%s)",
+                user_id, email
+            )
+            return False
+
+        if is_new_identity:
+            logging.info("New Google identity/relationship created for email=%s", email)
+        return True

65-76: Handle ensure_user_in_organizations outcome for GitHub and use correct avatar field

  • Same correctness risk as the Google handler: currently returns True even if persistence fails.
  • GitHub’s profile image field is avatar_url, not picture. Falling back to picture is harmless, but avatar_url should be preferred.

Apply this diff:

-        try:
-            user_id = user_info.get("id")
-            email = user_info.get("email")
-            name = user_info.get("name") or user_info.get("login")
-
-            # Validate required fields
-            if not user_id or not email:
-                logging.error("Missing required fields from GitHub OAuth response")
-                return False
-
-            # Check if identity exists in Organizations graph, create if new
-            _, _ = await ensure_user_in_organizations(
-                user_id,
-                email,
-                name,
-                "github",
-                user_info.get("picture"),
-            )
-
-            return True
-        except Exception as exc:  # capture exception for logging
-            logging.error("Error handling GitHub OAuth callback: %s", exc)
-            return False
+        user_id = user_info.get("id")
+        email = user_info.get("email")
+        name = user_info.get("name") or user_info.get("login")
+        picture = user_info.get("avatar_url") or user_info.get("picture")
+
+        # Validate required fields
+        if not user_id or not email:
+            logging.error("Missing required fields from GitHub OAuth response")
+            return False
+
+        is_new_identity, result = await ensure_user_in_organizations(
+            user_id,
+            email,
+            name,
+            "github",
+            picture,
+        )
+
+        if result is None:
+            logging.error(
+                "Failed to ensure GitHub identity in Organizations (user_id=%s, email=%s)",
+                user_id, email
+            )
+            return False
+
+        if is_new_identity:
+            logging.info("New GitHub identity/relationship created for email=%s", email)
+        return True
api/loaders/odata_loader.py (3)

64-71: Prevent NoneType errors when reading property Annotations

Accessing .get("String") and taking len() can raise when the attribute is missing. Also, avoid repeated findall() calls.

Apply this diff:

-                        col_des = entity_name
-                        if len(prop.findall("edm:Annotation", namespaces)) > 0:
-                            if len(prop.findall("edm:Annotation", namespaces)[0].get("String")) > 0:
-                                col_des = prop.findall("edm:Annotation", namespaces)[0].get(
-                                    "String"
-                                )
+                        col_des = entity_name
+                        annotations = prop.findall("edm:Annotation", namespaces)
+                        if annotations:
+                            ann_val = annotations[0].get("String")
+                            if ann_val:
+                                col_des = ann_val

77-92: Fix entity description extraction and spelling; avoid broad except

description[0].get("String") may be None; replace() would then fail. Also fix “Primery” → “Primary” and avoid a bare except.

Apply this diff:

-            description = entity_type.findall("edm:Annotation", namespaces)
-            if len(description) > 0:
-                entities[entity_name]["description"] = (
-                    description[0].get("String").replace("'", "\\'")
-                )
-            else:
-                try:
-                    entities[entity_name]["description"] = (
-                        entity_name
-                        + " with Primery key: "
-                        + entity_type.find("edm:Key/edm:PropertyRef", namespaces).attrib["Name"]
-                    )
-                except:
-                    print(f"Error parsing description for entity {entity_name}")
-                    entities[entity_name]["description"] = entity_name
+            annotations = entity_type.findall("edm:Annotation", namespaces)
+            desc_val = annotations[0].get("String") if annotations else None
+            if desc_val:
+                entities[entity_name]["description"] = desc_val.replace("'", "\\'")
+            else:
+                try:
+                    key_ref = entity_type.find("edm:Key/edm:PropertyRef", namespaces)
+                    key_name = key_ref.attrib["Name"] if key_ref is not None else None
+                    if key_name:
+                        entities[entity_name]["description"] = (
+                            f"{entity_name} with Primary key: {key_name}"
+                        )
+                    else:
+                        entities[entity_name]["description"] = entity_name
+                except Exception as e:
+                    print(f"Error parsing description for entity {entity_name}: {e}")
+                    entities[entity_name]["description"] = entity_name

101-108: Guard NavigationProperty Type access

rel.get("Type") can be None; .startswith will raise. Add a None check before using string operations.

Apply this diff:

-                raw_type = rel.get("Type")  # e.g., 'Collection(Priority.OData.ABILITYVALUES)'
+                raw_type = rel.get("Type")  # e.g., 'Collection(Priority.OData.ABILITYVALUES)'
+                if not raw_type:
+                    continue
api/loaders/csv_loader.py (4)

17-29: CSV route will 500: NotImplementedError raised from async load path

load_graph() awaits CSVLoader.load; raising NotImplementedError here will bubble up and cause a 500. Return a (False, message) tuple instead to keep the API contract and return a 400/415.

Apply this minimal safety patch:

-        raise NotImplementedError("CSVLoader is not implemented yet")
+        return False, "CSVLoader is not implemented yet"

Optionally, update the route to translate this to HTTP 501 Not Implemented.


171-196: Fix wrong dictionary key when initializing relationships list

Inside the loop, you are checking if table not in relationships: but then initializing relationships[table_name] (undefined/outer var). This breaks relationship building.

Apply this diff:

-                            if table not in relationships:
-                                relationships[table_name] = []
+                            if table not in relationships:
+                                relationships[table] = []

109-126: Indexing next row via df.to_dict(...)[idx+1] is unsafe and O(n) each time

This can raise IndexError on the last row and converts the whole DF to dict on every iteration. Consider computing related field via a stable key or by grouping, or guard the access.

I can propose a vectorized/groupby-based approach that avoids per-row dict conversion and idx+1 reliance. Want a patch?


78-87: Potential UnboundLocalError: target_table used before assignment

In the else: branch (when field is NaN), you reference target_table which is only set inside the 'Related' block. Guard or restructure.

Apply this diff to guard usage:

-                    tables[target_table]["col_descriptions"].append(field_desc)
-                    tables[target_table]["columns"][field] = {
+                    if 'target_table' in locals() and target_table:
+                        tables[target_table]["col_descriptions"].append(field_desc)
+                        tables[target_table]["columns"][field] = {
                         ...
-                    }
+                        }

This is a stopgap; a proper refactor is recommended.

api/routes/graphs.py (2)

50-59: Avoid mutable default for Pydantic field; use Field(default_factory=list)

Defaulting chat: list = [] risks shared state. Use Field.

Apply this diff (and import Field):

-from pydantic import BaseModel
+from pydantic import BaseModel, Field
@@
 class ConfirmRequest(BaseModel):
@@
-    confirmation: str = ""
-    chat: list = []
+    confirmation: str = ""
+    chat: list = Field(default_factory=list)

226-253: Implement proper 501 response for unimplemented CSV loader

Verified that the CSV branch in api/routes/graphs.py still invokes CSVLoader.load, but the loader’s implementation immediately raises NotImplementedError, leading to an uncaught 500 error (api/loaders/csv_loader.py)
To ensure correct HTTP semantics, catch the NotImplementedError and translate it into a 501 Not Implemented.

Locations to update:

  • api/routes/graphs.py around line 252
  • api/loaders/csv_loader.py (confirmation of NotImplementedError)

Suggested diff in api/routes/graphs.py:

-        elif filename.endswith(".csv"):
-            csv_data = content.decode("utf-8")
-            graph_id = request.state.user_id + "_" + filename.replace(".csv", "")
-            success, result = await CSVLoader.load(graph_id, csv_data)
+        elif filename.endswith(".csv"):
+            csv_data = content.decode("utf-8")
+            graph_id = request.state.user_id + "_" + filename.replace(".csv", "")
+            try:
+                success, result = await CSVLoader.load(graph_id, csv_data)
+            except NotImplementedError:
+                raise HTTPException(status_code=501, detail="CSV loader not implemented")
tests/test_mysql_loader.py (1)

120-143: Ensure async mocks for load_to_graph in MySQL and Postgres loader tests

Both tests/test_mysql_loader.py and tests/test_postgres_loader.py currently patch load_to_graph with a plain MagicMock, but the production loader implementations await load_to_graph(...). A MagicMock isn’t awaitable and will error at runtime (“object MagicMock can’t be used in ‘await’ expression”). Update both tests to use AsyncMock and verify it was awaited:

• In tests/test_mysql_loader.py (around lines 118–122):

-from unittest.mock import patch, MagicMock
+from unittest.mock import patch, MagicMock, AsyncMock
@@
-    @patch('api.loaders.mysql_loader.load_to_graph')
+    @patch('api.loaders.mysql_loader.load_to_graph', new_callable=AsyncMock)
     def test_successful_load(self, mock_load_to_graph, mock_connect):
@@
-        mock_load_to_graph.assert_called_once()
+        mock_load_to_graph.assert_awaited_once()

• In tests/test_postgres_loader.py (around lines 22–26):

-from unittest.mock import patch, MagicMock
+from unittest.mock import patch, MagicMock, AsyncMock
@@
-    @patch("api.loaders.postgres_loader.load_to_graph")
+    @patch("api.loaders.postgres_loader.load_to_graph", new_callable=AsyncMock)
     def test_successful_load(self, mock_load_to_graph, mock_connect):
@@
-        mock_load_to_graph.assert_called_once()
+        mock_load_to_graph.assert_awaited_once()

Even though the Postgres test is currently skipped, aligning it with the async behavior prevents future surprises and maintains consistency.

api/loaders/mysql_loader.py (1)

128-170: Avoid blocking the event loop: wrap synchronous PyMySQL I/O in a thread or switch to an async driver.

pymysql.connect(...), cursor operations, and fetches are synchronous; in an async FastAPI app this will block the event loop and hurt concurrency. Minimal fix: run the blocking section in a worker thread via asyncio.to_thread. Longer-term: consider aiomysql/asyncmy.

Apply this diff within load:

         try:
-            # Parse connection URL
-            conn_params = MySQLLoader._parse_mysql_url(connection_url)
-
-            # Connect to MySQL database
-            conn = pymysql.connect(**conn_params)
-            cursor = conn.cursor(DictCursor)
-
-            # Get database name
-            db_name = conn_params['database']
-
-            # Get all table information
-            entities = MySQLLoader.extract_tables_info(cursor, db_name)
-
-            # Get all relationship information
-            relationships = MySQLLoader.extract_relationships(cursor, db_name)
-
-            # Close database connection
-            cursor.close()
-            conn.close()
+            # Parse connection URL
+            conn_params = MySQLLoader._parse_mysql_url(connection_url)
+
+            # Perform blocking DB I/O off the event loop
+            def _fetch_schema(cp):
+                conn = pymysql.connect(**cp)
+                try:
+                    cursor = conn.cursor(DictCursor)
+                    db_name_local = cp['database']
+                    entities_local = MySQLLoader.extract_tables_info(cursor, db_name_local)
+                    relationships_local = MySQLLoader.extract_relationships(cursor, db_name_local)
+                    return db_name_local, entities_local, relationships_local
+                finally:
+                    try:
+                        cursor.close()
+                    except Exception:
+                        pass
+                    conn.close()
+
+            db_name, entities, relationships = await asyncio.to_thread(_fetch_schema, conn_params)

Additionally, add this import outside the shown range:

import asyncio
♻️ Duplicate comments (1)
api/routes/graphs.py (1)

144-145: Fixed: await-precedence bug on graph.query result_set

The corrected form (await the query call, then access .result_set) resolves the earlier runtime error.

🧹 Nitpick comments (20)
api/auth/oauth_handlers.py (5)

22-25: Add explicit return type hints for the async handlers

Clarifies intent and helps static analysis.

-    async def handle_google_callback(_request: Request,
-                                     _token: Dict[str, Any],
-                                     user_info: Dict[str, Any]):
+    async def handle_google_callback(_request: Request,
+                                     _token: Dict[str, Any],
+                                     user_info: Dict[str, Any]) -> bool:
@@
-    async def handle_github_callback(_request: Request,
-                                     _token: Dict[str, Any],
-                                     user_info: Dict[str, Any]):
+    async def handle_github_callback(_request: Request,
+                                     _token: Dict[str, Any],
+                                     user_info: Dict[str, Any]) -> bool:

Also applies to: 50-53


16-18: Annotate setup function return type for completeness

-def setup_oauth_handlers(app: FastAPI, oauth: OAuth):
+def setup_oauth_handlers(app: FastAPI, oauth: OAuth) -> None:

55-62: GitHub email can be missing; optionally fetch primary email with the token

GitHub often omits email unless scopes/user settings allow it. If email is missing, try the /user/emails endpoint before failing. Example:

if not email:
    try:
        github = app.state.oauth.github
        resp = await github.get("user/emails", token=_token)
        emails = resp.json()
        primary = next((e for e in emails if e.get("primary") and e.get("verified")), None)
        email = (primary or (emails[0] if emails else {})).get("email")
    except Exception:  # keep narrow if you know the client exceptions
        email = None

If you adopt this, keep the existing validation (fail if still missing).


31-34: Optional: normalize email casing before persistence

To avoid duplicate identities differing only by case, normalize to lower-case when passing to ensure_user_in_organizations.

Before changing, confirm the database enforces case-insensitive uniqueness on User.email. If not, a migration or constraint may be needed.

Also applies to: 59-62


46-48: Avoid broad exception handling in OAuth handlers

No custom exception classes were found in the codebase, so we should either remove these broad try/except blocks entirely or—if you’d like to keep them for defensive logging—narrow them to built-in exception types only.

Locations to update:

  • api/auth/oauth_handlers.py, lines 46–48 (Google OAuth)
  • api/auth/oauth_handlers.py, lines 74–76 (GitHub OAuth)

Option A (recommended): remove the try/except blocks entirely, allowing exceptions to bubble up or be handled by higher-level error middleware.

Option B (fallback): catch only built-ins and log the full stack trace:

-        except Exception as exc:  # capture exception for logging
-            logging.error("Error handling Google OAuth callback: %s", exc)
-            return False
+        except (TypeError, ValueError, RuntimeError) as exc:  # narrow expected failures
+            logging.exception("Error handling Google OAuth callback: %s", exc)
+            return False  # pylint: disable=broad-exception-caught

Repeat the same change for the GitHub handler.

api/loaders/odata_loader.py (2)

111-113: Avoid KeyError if entity not present; use safe dict access

When a target entity is missing, entities.get(... )["columns"] will KeyError. Use .get('columns', {}) defaults.

Apply this diff:

-                source_fields = entities.get(entity_name, {})["columns"]
-                target_fields = entities.get(target_entity, {})["columns"]
+                source_fields = entities.get(entity_name, {}).get("columns", {})
+                target_fields = entities.get(target_entity, {}).get("columns", {})

71-73: Prefer logging over print for error reporting

Replace print with logging and include exception details for better observability. If you prefer, I can submit a follow-up patch to wire Python logging here.

api/loaders/json_loader.py (3)

55-57: Use .get to avoid KeyError if 'foreign_keys' missing

Some schemas may omit foreign_keys; defensive access improves robustness.

Apply this diff:

-            for fk_name, fk_info in tqdm.tqdm(
-                table_info["foreign_keys"].items(), "Create Foreign Key relationships"
-            ):
+            for fk_name, fk_info in tqdm.tqdm(
+                table_info.get("foreign_keys", {}).items(),
+                "Create Foreign Key relationships"
+            ):

38-45: Replace prints with logging and propagate validation details to caller/logs

Printing in APIs makes tracing harder. Consider logging errors and returning a short, user-safe message while logging details.

Apply this diff:

-            if not validation_errors:
-                print("✅ Schema is valid.")
-            else:
-                print("❌ Schema validation failed with the following issues:")
-                for error in validation_errors:
-                    print(f" - {error}")
+            if not validation_errors:
+                logging.info("Schema is valid.")
+            else:
+                for error in validation_errors:
+                    logging.error("Schema validation error: %s", error)

Note: add import logging at the top of the file if not already present.


69-71: Guard missing database key in payload

If data["database"] is absent, this will KeyError. Use a safe default or raise a clear error.

Apply this diff:

-        await load_to_graph(graph_id, data["tables"], relationships, db_name=data["database"])
+        db_name = data.get("database") or "TBD"
+        await load_to_graph(graph_id, data["tables"], relationships, db_name=db_name)
api/routes/graphs.py (4)

3-9: Remove unused concurrent.futures imports post-async migration

ThreadPoolExecutor and FuturesTimeoutError are now unused; keeping them will trigger lint warnings.

Apply this diff:

-import asyncio
+import asyncio
 import json
 import logging
 import time
-from concurrent.futures import ThreadPoolExecutor
-from concurrent.futures import TimeoutError as FuturesTimeoutError

361-363: Avoid blocking the event loop: offload synchronous AI analysis to a thread

AnalysisAgent.get_analysis appears synchronous and can block the loop (LLM/network). Offload via asyncio.to_thread to keep streaming responsive.

Apply this diff:

-            answer_an = agent_an.get_analysis(
-                queries_history[-1], result, db_description, instructions
-            )
+            answer_an = await asyncio.to_thread(
+                agent_an.get_analysis, queries_history[-1], result, db_description, instructions
+            )

444-451: Avoid blocking on synchronous SQL execution

If execute_sql_query is sync (DB driver/requests), use asyncio.to_thread to prevent blocking other requests.

Apply this diff:

-                    query_results = loader_class.execute_sql_query(answer_an["sql_query"], db_url)
+                    query_results = await asyncio.to_thread(
+                        loader_class.execute_sql_query, answer_an["sql_query"], db_url
+                    )

494-501: Offload response formatting if it’s synchronous

Formatting may involve LLM calls; to_thread keeps the event loop free.

Apply this diff:

-                    user_readable_response = response_agent.format_response(
-                        user_query=queries_history[-1],
-                        sql_query=answer_an["sql_query"],
-                        query_results=query_results,
-                        db_description=db_description
-                    )
+                    user_readable_response = await asyncio.to_thread(
+                        response_agent.format_response,
+                        user_query=queries_history[-1],
+                        sql_query=answer_an["sql_query"],
+                        query_results=query_results,
+                        db_description=db_description
+                    )
tests/test_postgres_loader.py (3)

44-44: Fix pylint line-too-long (C0301) by wrapping asyncio.run call

Split the long line to satisfy the 100-char limit.

Apply this diff:

-        success, message = asyncio.run(PostgresLoader.load(self.test_graph_id, self.test_connection_url))
+        success, message = asyncio.run(
+            PostgresLoader.load(self.test_graph_id, self.test_connection_url)
+        )

59-59: Wrap second asyncio.run call to satisfy linter

Same adjustment here.

Apply this diff:

-        success, message = asyncio.run(PostgresLoader.load(self.test_graph_id, self.test_connection_url))
+        success, message = asyncio.run(
+            PostgresLoader.load(self.test_graph_id, self.test_connection_url)
+        )

24-26: When un-skipping, patch async functions with AsyncMock

load_to_graph is async; patch with AsyncMock to avoid “object is not awaitable” when you re-enable the test.

Example change (when removing @Skip):

from unittest.mock import AsyncMock
@patch("api.loaders.postgres_loader.load_to_graph", new_callable=AsyncMock)
tests/test_mysql_loader.py (2)

114-117: Fix Pylint line-too-long by splitting the asyncio.run call.

This addresses CI warning C0301 (limit is 100 chars).

Apply this diff:

-        success, message = asyncio.run(MySQLLoader.load("test_prefix", "mysql://user:pass@host:3306/db"))
+        url = "mysql://user:pass@host:3306/db"
+        success, message = asyncio.run(MySQLLoader.load("test_prefix", url))

3-3: Optional: prefer pytest-asyncio over asyncio.run in tests.

Using pytest.mark.asyncio avoids nested-loop issues if a session loop is introduced later, and reads cleaner.

Example:

import pytest

@patch('pymysql.connect')
@pytest.mark.asyncio
async def test_connection_error(mock_connect):
    mock_connect.side_effect = Exception("Connection failed")
    success, message = await MySQLLoader.load("test_prefix", "mysql://user:pass@host:3306/db")
    assert not success
    assert "Error loading MySQL schema" in message
api/routes/database.py (1)

46-55: Simplify dead exception handling in database routes

Both PostgresLoader.load and MySQLLoader.load now catch all database errors internally and return (False, message) instead of raising. The inner try/except (ValueError, ConnectionError) blocks in api/routes/database.py can be removed. Rely on the returned success flag and the existing outer error handling.

Locations to update:

  • api/routes/database.py: lines 46–55 (around the PostgresLoader.load call)
  • api/routes/database.py: lines 58–66 (around the MySQLLoader.load call)

Suggested diff:

         if url.startswith("postgres://") or url.startswith("postgresql://"):
-            try:
-                # Attempt to connect/load using the PostgreSQL loader
-                success, result = await PostgresLoader.load(request.state.user_id, url)
-            except (ValueError, ConnectionError) as e:
-                logging.error("PostgreSQL connection error: %s", str(e))
-                raise HTTPException(
-                    status_code=500,
-                    detail="Failed to connect to PostgreSQL database",
-                )
+            # Attempt to connect/load using the PostgreSQL loader
+            success, result = await PostgresLoader.load(request.state.user_id, url)
 
         elif url.startswith("mysql://"):
-            try:
-                # Attempt to connect/load using the MySQL loader
-                success, result = await MySQLLoader.load(request.state.user_id, url)
-            except (ValueError, ConnectionError) as e:
-                logging.error("MySQL connection error: %s", str(e))
-                raise HTTPException(
-                    status_code=500, detail="Failed to connect to MySQL database"
-                )
+            # Attempt to connect/load using the MySQL loader
+            success, result = await MySQLLoader.load(request.state.user_id, url)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between df3cb25 and 603608e.

📒 Files selected for processing (12)
  • api/auth/oauth_handlers.py (2 hunks)
  • api/extensions.py (2 hunks)
  • api/graph.py (4 hunks)
  • api/loaders/csv_loader.py (2 hunks)
  • api/loaders/json_loader.py (2 hunks)
  • api/loaders/mysql_loader.py (5 hunks)
  • api/loaders/odata_loader.py (2 hunks)
  • api/loaders/postgres_loader.py (5 hunks)
  • api/routes/database.py (2 hunks)
  • api/routes/graphs.py (14 hunks)
  • tests/test_mysql_loader.py (3 hunks)
  • tests/test_postgres_loader.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • api/extensions.py
  • api/graph.py
  • api/loaders/postgres_loader.py
🧰 Additional context used
📓 Path-based instructions (2)
tests/test_*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Name and place unit tests as tests/test_*.py

Files:

  • tests/test_mysql_loader.py
  • tests/test_postgres_loader.py
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • tests/test_mysql_loader.py
  • api/routes/database.py
  • api/loaders/json_loader.py
  • api/auth/oauth_handlers.py
  • api/loaders/odata_loader.py
  • api/loaders/csv_loader.py
  • api/loaders/mysql_loader.py
  • tests/test_postgres_loader.py
  • api/routes/graphs.py
🧬 Code graph analysis (9)
tests/test_mysql_loader.py (1)
api/loaders/mysql_loader.py (2)
  • MySQLLoader (20-538)
  • load (128-170)
api/routes/database.py (2)
api/loaders/postgres_loader.py (2)
  • PostgresLoader (18-491)
  • load (67-108)
api/loaders/mysql_loader.py (2)
  • load (128-170)
  • MySQLLoader (20-538)
api/loaders/json_loader.py (2)
api/loaders/base_loader.py (1)
  • load (11-16)
api/loaders/graph_loader.py (1)
  • load_to_graph (12-184)
api/auth/oauth_handlers.py (1)
api/auth/user_management.py (1)
  • ensure_user_in_organizations (16-124)
api/loaders/odata_loader.py (2)
api/loaders/base_loader.py (1)
  • load (11-16)
api/loaders/graph_loader.py (1)
  • load_to_graph (12-184)
api/loaders/csv_loader.py (2)
api/loaders/base_loader.py (1)
  • load (11-16)
api/loaders/graph_loader.py (1)
  • load_to_graph (12-184)
api/loaders/mysql_loader.py (3)
api/loaders/postgres_loader.py (2)
  • load (67-108)
  • refresh_graph_schema (368-414)
api/loaders/graph_loader.py (1)
  • load_to_graph (12-184)
api/routes/graphs.py (1)
  • refresh_graph_schema (657-704)
tests/test_postgres_loader.py (1)
api/loaders/postgres_loader.py (2)
  • PostgresLoader (18-491)
  • load (67-108)
api/routes/graphs.py (8)
api/loaders/json_loader.py (2)
  • JSONLoader (23-71)
  • load (27-71)
api/loaders/csv_loader.py (2)
  • load (17-233)
  • CSVLoader (13-233)
api/loaders/mysql_loader.py (2)
  • load (128-170)
  • refresh_graph_schema (407-453)
api/loaders/odata_loader.py (2)
  • load (17-28)
  • ODataLoader (11-135)
api/loaders/postgres_loader.py (2)
  • load (67-108)
  • refresh_graph_schema (368-414)
api/agents/relevancy_agent.py (2)
  • RelevancyAgent (57-90)
  • get_answer (71-90)
api/agents/analysis_agent.py (2)
  • AnalysisAgent (9-201)
  • get_analysis (14-49)
api/graph.py (2)
  • get_db_description (39-54)
  • find (241-330)
🪛 GitHub Actions: Pylint
tests/test_mysql_loader.py

[warning] 114-114: C0301: Line too long (105/100) (line-too-long).

api/auth/oauth_handlers.py

[warning] 46-46: W0718: Catching too general exception (broad-exception-caught).


[warning] 74-74: W0718: Catching too general exception (broad-exception-caught).

tests/test_postgres_loader.py

[warning] 44-44: C0301: Line too long (105/100) (line-too-long).

🪛 Ruff (0.12.2)
api/routes/graphs.py

240-240: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (13)
api/auth/oauth_handlers.py (2)

37-43: Async migration looks correct

Awaiting ensure_user_in_organizations aligns with the async refactor in user_management.py and avoids blocking the event loop.

Also applies to: 65-71


78-81: Ensure OAuth callback handlers are actually invoked and handle a False return

I ran a repository-wide search for any invocation of your new handlers:

rg -nP 'google_callback_handler|github_callback_handler' -C3
rg -nP 'google_callback_handler\(' -C3
rg -nP 'github_callback_handler\(' -C3

and only found the assignments in api/auth/oauth_handlers.py (lines 79–80). No call sites were detected. Please:

  • Confirm that your OAuth callback routes (e.g. in api/auth/oauth_routes.py or similar) actually invoke
    await app.state.google_callback_handler(request) and
    await app.state.github_callback_handler(request).
  • Verify each route properly checks for a False result from these handlers and returns the intended auth error or redirect.
  • If the handlers are called indirectly or under alias, ensure those paths also branch on the boolean return.

Without explicit call sites, it’s unclear whether a failed callback will ever trigger your error handling.

api/loaders/odata_loader.py (1)

16-28: Async migration verified and all ODataLoader.load calls are awaited

The ripgrep search confirms that the only invocation of ODataLoader.load in api/routes/graphs.py (line 246) is already awaited. No remaining synchronous-style calls were found, so the async refactor is complete.

api/loaders/json_loader.py (1)

27-27: Async migration LGTM

Changing JSONLoader.load to async and awaiting load_to_graph is consistent with the refactor.

api/routes/graphs.py (5)

101-101: LGTM: await db.list_graphs()

Async call correctly awaited after migration.


297-333: Good async task orchestration with cancellation on off-topic

Concurrent start of find() and relevancy gets you latency wins; cancellation path is correct and logged.


559-559: LGTM: get_db_description awaited in confirmation path

Matches async refactor; prevents subtle bugs.


593-595: LGTM: refresh_graph_schema awaited

Async loader refresh correctly awaited to avoid race conditions.


667-686: LGTM: manual schema refresh uses awaited helpers

Both get_db_description and refresh_graph_schema are properly awaited.

api/routes/database.py (1)

48-49: Async migration looks correct — awaited loader calls are aligned with the new async API.

Good job on making the route coroutine-safe by awaiting PostgresLoader.load(...) and MySQLLoader.load(...).

Also applies to: 60-61

api/loaders/mysql_loader.py (3)

128-138: API change to async is consistent and non-breaking at call sites.

Signature async def load(...)-> Tuple[bool, str] matches the updated routes/tests. Docstring remains accurate.


161-163: LGTM: awaiting load_to_graph is correct after the async migration.

This keeps the graph writes coroutine-friendly and consistent with graph_loader.


407-417: LGTM: async refresh path awaits graph deletion and reuse of load.

Awaiting graph.delete() and MySQLLoader.load(...) ensures proper sequencing without blocking.

Also applies to: 427-427, 439-440

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
api/app_factory.py (1)

89-105: Avoid catch‑all Exception heuristics for OAuth; narrow to Authlib errors.

String-matching "token"/"oauth" on every Exception can mask unrelated failures and complicate debugging. Handle Authlib errors explicitly and re-raise everything else; let HTTPException pass through untouched.

-    @app.exception_handler(Exception)
-    async def handle_oauth_error(request: Request, exc: Exception):
-        """Handle OAuth-related errors gracefully"""
-        # Check if it's an OAuth-related error
-        if "token" in str(exc).lower() or "oauth" in str(exc).lower():
-            logging.warning("OAuth error occurred: %s", exc)
-            request.session.clear()
-            return RedirectResponse(url="/", status_code=302)
-
-        # If it's an HTTPException, re-raise so FastAPI handles it properly
-        if isinstance(exc, HTTPException):
-            raise exc
-
-        # For other errors, let them bubble up
-        raise exc
+    @app.exception_handler(Exception)
+    async def handle_oauth_error(request: Request, exc: Exception):
+        """Handle OAuth-related errors gracefully without masking unrelated issues."""
+        # Let FastAPI handle HTTPExceptions as usual
+        if isinstance(exc, HTTPException):
+            raise exc
+        # Handle Authlib OAuth errors explicitly
+        if isinstance(exc, AuthlibBaseError):
+            logging.warning("OAuth error occurred: %s", exc)
+            request.session.clear()
+            return RedirectResponse(url="/", status_code=302)
+        # Everything else: bubble up
+        raise exc

And add this import near the top:

from authlib.common.errors import AuthlibBaseError

Optional consistency nit: consider status.HTTP_302_FOUND for consistency with other modules.

🧹 Nitpick comments (2)
api/routes/auth.py (1)

39-39: Make GTM global resilient (avoid None in templates).

Default to empty string so templates don’t accidentally render “None” and conditionals stay clean.

-templates.env.globals["google_tag_manager_id"] = os.getenv("GOOGLE_TAG_MANAGER_ID")
+templates.env.globals["google_tag_manager_id"] = os.getenv("GOOGLE_TAG_MANAGER_ID") or ""
api/app_factory.py (1)

24-24: Silence Pylint R0903 for middleware class (matches pipeline warning).

This class intentionally has a single public method by Starlette design. Add a targeted disable to keep CI green.

-class SecurityMiddleware(BaseHTTPMiddleware):
+class SecurityMiddleware(BaseHTTPMiddleware):  # pylint: disable=too-few-public-methods
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 603608e and a8fa395.

📒 Files selected for processing (2)
  • api/app_factory.py (2 hunks)
  • api/routes/auth.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Adhere to pylint standards across all Python files (repository uses make lint)

Files:

  • api/routes/auth.py
  • api/app_factory.py
api/app_factory.py

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Maintain the application factory and OAuth setup in api/app_factory.py (core configuration in the top of the file)

Files:

  • api/app_factory.py
🧠 Learnings (1)
📚 Learning: 2025-08-24T17:15:21.337Z
Learnt from: CR
PR: FalkorDB/QueryWeaver#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-08-24T17:15:21.337Z
Learning: Applies to api/app_factory.py : Maintain the application factory and OAuth setup in `api/app_factory.py` (core configuration in the top of the file)

Applied to files:

  • api/app_factory.py
🧬 Code graph analysis (1)
api/app_factory.py (1)
api/auth/oauth_handlers.py (1)
  • setup_oauth_handlers (16-80)
🪛 GitHub Actions: Pylint
api/app_factory.py

[warning] 24-24: Pylint: R0903 Too few public methods (1/2) (too-few-public-methods).

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: e2e-tests
  • GitHub Check: e2e-tests
🔇 Additional comments (2)
api/app_factory.py (2)

14-14: Good separation of concerns: centralized OAuth handler wiring.

Importing setup_oauth_handlers here keeps OAuth wiring in the app factory as per guidelines.


87-87: Correct initialization order.

Calling setup_oauth_handlers after init_auth ensures app.state.oauth exists before wiring handlers.

Comment on lines +164 to +169
# Call the registered Google callback handler if it exists to store user data.
handler = getattr(request.app.state, "google_callback_handler", None)
if handler:
# call the registered handler (await if async)
await handler(request, token, user_info)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Harden OAuth callback handler invocation (don’t break login on handler errors; support sync/async).

Right now any exception inside the provider-specific handler will bubble up (outer except only catches AuthlibBaseError), turning successful OAuth into a 500. Also, the code always awaits the handler; if a sync handler is ever plugged in, this will fail. Wrap, detect coroutine, and log non-fatal False returns.

-        handler = getattr(request.app.state, "google_callback_handler", None)
-        if handler:
-            # call the registered handler (await if async)
-            await handler(request, token, user_info)
+        handler = getattr(request.app.state, "google_callback_handler", None)
+        if handler:
+            try:
+                if inspect.iscoroutinefunction(handler):
+                    ok = await handler(request, token, user_info)
+                else:
+                    ok = handler(request, token, user_info)
+                if ok is False:
+                    logging.warning("Google callback handler returned False; continuing login flow.")
+            except Exception as exc:
+                logging.error("Google callback handler error: %s", exc)
-        handler = getattr(request.app.state, "github_callback_handler", None)
-        if handler:
-            # call the registered handler (await if async)
-            await handler(request, token, user_info)
+        handler = getattr(request.app.state, "github_callback_handler", None)
+        if handler:
+            try:
+                if inspect.iscoroutinefunction(handler):
+                    ok = await handler(request, token, user_info)
+                else:
+                    ok = handler(request, token, user_info)
+                if ok is False:
+                    logging.warning("GitHub callback handler returned False; continuing login flow.")
+            except Exception as exc:
+                logging.error("GitHub callback handler error: %s", exc)

Add this import near the other imports:

import inspect

Verification idea:

  • Attempt login with a handler that raises, confirm the flow redirects home and session persists user_info; check logs for the error message.

Also applies to: 244-249


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

# Show context around Google callback handler invocation
rg -nC5 "google_callback_handler" api/routes/auth.py

# Show context around GitHub callback handler invocation
rg -nC5 "github_callback_handler" api/routes/auth.py

# Verify if 'inspect' is already imported
rg -n "import inspect" api/routes/auth.py

Length of output: 1243


Harden OAuth callback handler invocation to be non-breaking and support sync/async

The current code at both the Google and GitHub OAuth callback sites will:

  • Bubble up any exception from the registered handler (only AuthlibBaseError is caught elsewhere), causing a 500 on login success.
  • Always use await handler(...), which fails if the handler is a regular (sync) function.
  • Never log or handle a handler’s non-fatal “I don’t want to proceed” signal (e.g. returning False).

We should wrap each handler call in a try/except Exception, detect whether it’s a coroutine, call it appropriately, and log a warning if it returns False. Also add the missing import inspect.

Locations to update:

  • api/routes/auth.py: lines 164–169 (Google handler)
  • api/routes/auth.py: lines 244–249 (GitHub handler)
  • Top of api/routes/auth.py: add import inspect

Suggested diff:

--- a/api/routes/auth.py
+++ b/api/routes/auth.py
@@ -1,6 +1,7 @@
 import time
+import inspect
 from fastapi import status, Request
 from fastapi.responses import RedirectResponse
 from authlib.integrations.starlette_client import OAuthError

@@ -161,7 +162,17 @@ async def auth_google_callback(request: Request):
         request.session["token_validated_at"] = time.time()

-        handler = getattr(request.app.state, "google_callback_handler", None)
-        if handler:
-            # call the registered handler (await if async)
-            await handler(request, token, user_info)
+        handler = getattr(request.app.state, "google_callback_handler", None)
+        if handler:
+            try:
+                if inspect.iscoroutinefunction(handler):
+                    ok = await handler(request, token, user_info)
+                else:
+                    ok = handler(request, token, user_info)
+                if ok is False:
+                    logging.warning(
+                        "Google callback handler returned False; continuing login flow."
+                    )
+            except Exception as exc:
+                logging.error("Google callback handler error: %s", exc)

         return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
@@ -241,7 +252,17 @@ async def auth_github_callback(request: Request):
         request.session["token_validated_at"] = time.time()

-        handler = getattr(request.app.state, "github_callback_handler", None)
-        if handler:
-            # call the registered handler (await if async)
-            await handler(request, token, user_info)
+        handler = getattr(request.app.state, "github_callback_handler", None)
+        if handler:
+            try:
+                if inspect.iscoroutinefunction(handler):
+                    ok = await handler(request, token, user_info)
+                else:
+                    ok = handler(request, token, user_info)
+                if ok is False:
+                    logging.warning(
+                        "GitHub callback handler returned False; continuing login flow."
+                    )
+            except Exception as exc:
+                logging.error("GitHub callback handler error: %s", exc)

         return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)

This ensures that handler errors no longer break the OAuth flow, sync handlers are supported, and any intentional “stop” signals from handlers are surfaced in logs.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In api/routes/auth.py around lines 164–169 (Google handler) and 244–249 (GitHub
handler), wrap the registered callback invocation in a try/except Exception, add
import inspect at the top, call the handler without awaiting first (result =
handler(request, token, user_info) or appropriate args), if
inspect.isawaitable(result) then await it, log any Exception as a warning
instead of letting it bubble, and if the handler returns False log a warning
that the handler signaled to stop; use request.app.logger.warning for warnings.

@gkorland gkorland merged commit cd16fa3 into main Aug 25, 2025
14 of 16 checks passed
This was referenced Aug 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants