Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions hugegraph-llm/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies = [
"apscheduler",
"litellm",
"hugegraph-python-client",
"pycgraph",
"pycgraph==3.2.2",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -97,7 +97,6 @@ allow-direct-references = true

[tool.uv.sources]
hugegraph-python-client = { workspace = true }
pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", tag = "v3.2.0" }

[tool.mypy]
disable_error_code = ["import-untyped"]
Expand Down
187 changes: 150 additions & 37 deletions hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

# pylint: disable=E1101

import json
import os
from typing import AsyncGenerator, Literal, Optional, Tuple
import re
from typing import AsyncGenerator, Tuple, Literal, Optional

import gradio as gr
from hugegraph_llm.flows.scheduler import SchedulerSingleton
from hugegraph_llm.flows.intent_detector import IntentDetectorSingleton
import pandas as pd
import gradio as gr

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This import gradio as gr is a duplicate of the import on line 25. Please remove it. Additionally, SchedulerSingleton is imported on both line 26 and line 33, which should also be consolidated to avoid redundancy.

from gradio.utils import NamedString
Expand Down Expand Up @@ -153,6 +159,7 @@ def update_ui_configs(

async def rag_answer_streaming(
text: str,
auto_mode: bool,
raw_answer: bool,
vector_only_answer: bool,
graph_only_answer: bool,
Expand Down Expand Up @@ -188,45 +195,132 @@ async def rag_answer_streaming(
try:
# Select the specific streaming workflow
scheduler = SchedulerSingleton.get_instance()
if graph_vector_answer or (graph_only_answer and vector_only_answer):
flow_key = FlowName.RAG_GRAPH_VECTOR
elif vector_only_answer:
flow_key = FlowName.RAG_VECTOR_ONLY
elif graph_only_answer:
flow_key = FlowName.RAG_GRAPH_ONLY
elif raw_answer:
flow_key = FlowName.RAG_RAW

if auto_mode:
intent_detector = IntentDetectorSingleton.get_instance()
result = await intent_detector.detect(
text,
[FlowName.RAG_RAW, FlowName.RAG_VECTOR_ONLY, FlowName.RAG_GRAPH_ONLY, FlowName.RAG_GRAPH_VECTOR],
)
if result["tool_name"] is None or result["tool_name"] == "none":
raise RuntimeError("No suitable flow found")
elif result["tool_name"] in [
FlowName.RAG_RAW,
FlowName.RAG_VECTOR_ONLY,
FlowName.RAG_GRAPH_ONLY,
FlowName.RAG_GRAPH_VECTOR
]:
flow_key = result["tool_name"]
else:
raise RuntimeError("Unsupported flow type")
async for res in scheduler.schedule_stream_flow(
flow_key,
query=text,
vector_search=result["parameters"].get("vector_search", vector_search)
if "parameters" in result
else vector_search,
graph_search=result["parameters"].get("graph_search", graph_search)
if "parameters" in result
else graph_search,
raw_answer=result["parameters"].get("raw_answer", False)
if "parameters" in result
else False,
vector_only_answer=result["parameters"].get("vector_only_answer", False)
if "parameters" in result
else False,
graph_only_answer=result["parameters"].get("graph_only_answer", False)
if "parameters" in result
else False,
graph_vector_answer=result["parameters"].get(
"graph_vector_answer", False
)
if "parameters" in result
else False,
graph_ratio=result["parameters"].get("graph_ratio", graph_ratio)
if "parameters" in result
else graph_ratio,
rerank_method=result["parameters"].get("rerank_method", rerank_method)
if "parameters" in result
else rerank_method,
near_neighbor_first=result["parameters"].get(
"near_neighbor_first", near_neighbor_first
)
if "parameters" in result
else near_neighbor_first,
custom_related_information=result["parameters"].get(
"custom_related_information", custom_related_information
)
if "parameters" in result
else custom_related_information,
answer_prompt=result["parameters"].get("answer_prompt", answer_prompt)
if "parameters" in result
else answer_prompt,
keywords_extract_prompt=result["parameters"].get(
"keywords_extract_prompt", keywords_extract_prompt
)
if "parameters" in result
else keywords_extract_prompt,
gremlin_tmpl_num=result["parameters"].get(
"gremlin_tmpl_num", gremlin_tmpl_num
)
if "parameters" in result
else gremlin_tmpl_num,
gremlin_prompt=result["parameters"].get(
"gremlin_prompt", gremlin_prompt
)
if "parameters" in result
else gremlin_prompt,
):
if res.get("switch_to_bleu"):
gr.Warning(
"Online reranker fails, automatically switches to local bleu rerank."
)
yield (
res.get("raw_answer", ""),
res.get("vector_only_answer", ""),
res.get("graph_only_answer", ""),
res.get("graph_vector_answer", ""),
)
else:
raise RuntimeError("Unsupported flow type")
if graph_vector_answer or (graph_only_answer and vector_only_answer):
flow_key = FlowName.RAG_GRAPH_VECTOR
elif vector_only_answer:
flow_key = FlowName.RAG_VECTOR_ONLY
elif graph_only_answer:
flow_key = FlowName.RAG_GRAPH_ONLY
elif raw_answer:
flow_key = FlowName.RAG_RAW
else:
raise RuntimeError("Unsupported flow type")

async for res in scheduler.schedule_stream_flow(
flow_key,
query=text,
vector_search=vector_search,
graph_search=graph_search,
raw_answer=raw_answer,
vector_only_answer=vector_only_answer,
graph_only_answer=graph_only_answer,
graph_vector_answer=graph_vector_answer,
graph_ratio=graph_ratio,
rerank_method=rerank_method,
near_neighbor_first=near_neighbor_first,
custom_related_information=custom_related_information,
answer_prompt=answer_prompt,
keywords_extract_prompt=keywords_extract_prompt,
gremlin_tmpl_num=gremlin_tmpl_num,
gremlin_prompt=gremlin_prompt,
):
if res.get("switch_to_bleu"):
gr.Warning(
"Online reranker fails, automatically switches to local bleu rerank."
async for res in scheduler.schedule_stream_flow(
flow_key,
query=text,
vector_search=vector_search,
graph_search=graph_search,
raw_answer=raw_answer,
vector_only_answer=vector_only_answer,
graph_only_answer=graph_only_answer,
graph_vector_answer=graph_vector_answer,
graph_ratio=graph_ratio,
rerank_method=rerank_method,
near_neighbor_first=near_neighbor_first,
custom_related_information=custom_related_information,
answer_prompt=answer_prompt,
keywords_extract_prompt=keywords_extract_prompt,
gremlin_tmpl_num=gremlin_tmpl_num,
gremlin_prompt=gremlin_prompt,
):
if res.get("switch_to_bleu"):
gr.Warning(
"Online reranker fails, automatically switches to local bleu rerank."
)
yield (
res.get("raw_answer", ""),
res.get("vector_only_answer", ""),
res.get("graph_only_answer", ""),
res.get("graph_vector_answer", ""),
)
Comment on lines +199 to 323

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic within this try block can be refactored to improve readability and reduce code duplication.

  1. Verbose Parameter Passing: The method of passing parameters to scheduler.schedule_stream_flow in auto_mode is very verbose, with a redundant if "parameters" in result else ... check for every parameter. This can be simplified significantly.
  2. Code Duplication: The async for res in scheduler.schedule_stream_flow(...) loop and its body are duplicated in both the if auto_mode: and else: branches.

Consider refactoring to first determine the flow_key and a flow_params dictionary, and then use a single schedule_stream_flow call and loop. This will make the code much cleaner and easier to maintain.

Here is an example of how it could be refactored:

scheduler = SchedulerSingleton.get_instance()
flow_params = {}
flow_key = ""

if auto_mode:
    intent_detector = IntentDetectorSingleton.get_instance()
    result = await intent_detector.detect(
        text,
        [FlowName.RAG_RAW, FlowName.RAG_VECTOR_ONLY, FlowName.RAG_GRAPH_ONLY, FlowName.RAG_GRAPH_VECTOR],
    )
    tool_name = result.get("tool_name")
    if tool_name is None or tool_name == "none":
        raise RuntimeError("No suitable flow found")
    
    flow_key = tool_name
    if flow_key not in [FlowName.RAG_RAW, FlowName.RAG_VECTOR_ONLY, FlowName.RAG_GRAPH_ONLY, FlowName.RAG_GRAPH_VECTOR]:
        raise RuntimeError("Unsupported flow type")

    params = result.get("parameters", {})
    flow_params = {
        "query": text,
        "vector_search": params.get("vector_search", vector_search),
        "graph_search": params.get("graph_search", graph_search),
        # ... other params
    }
else:
    if graph_vector_answer or (graph_only_answer and vector_only_answer):
        flow_key = FlowName.RAG_GRAPH_VECTOR
    # ... other manual flow selections
    else:
        raise RuntimeError("Unsupported flow type")
    
    flow_params = {
        "query": text,
        "vector_search": vector_search,
        "graph_search": graph_search,
        # ... other params
    }

async for res in scheduler.schedule_stream_flow(flow_key, **flow_params):
    if res.get("switch_to_bleu"):
        gr.Warning(
            "Online reranker fails, automatically switches to local bleu rerank."
        )
    yield (
        res.get("raw_answer", ""),
        res.get("vector_only_answer", ""),
        res.get("graph_only_answer", ""),
        res.get("graph_vector_answer", ""),
    )

yield (
res.get("raw_answer", ""),
res.get("vector_only_answer", ""),
res.get("graph_only_answer", ""),
res.get("graph_vector_answer", ""),
)
except ValueError as e:
log.critical(e)
raise gr.Error(str(e))
Expand Down Expand Up @@ -302,10 +396,18 @@ def create_rag_block():
graph_vector_radio = gr.Radio(
choices=[True, False], value=False, label="Graph-Vector Answer"
)
with gr.Row():
auto_mode = gr.Radio(
choices=[True, False], value=False, label="Auto Mode"
)

def toggle_slider(enable):
return gr.update(interactive=enable)

def toggle_manual_options(auto_enabled):
interactive = not auto_enabled
return [gr.update(interactive=interactive) for _ in range(4)]

with gr.Column():
with gr.Row():
online_rerank = llm_settings.reranker_type
Expand All @@ -326,6 +428,16 @@ def toggle_slider(enable):
graph_vector_radio.change(
toggle_slider, inputs=graph_vector_radio, outputs=graph_ratio
) # pylint: disable=no-member
auto_mode.change(
toggle_manual_options,
inputs=auto_mode,
outputs=[
graph_vector_radio,
graph_only_radio,
raw_radio,
vector_only_radio,
],
) # pylint: disable=no-member
near_neighbor_first = gr.Checkbox(
value=False,
label="Near neighbor first(Optional)",
Expand All @@ -341,6 +453,7 @@ def toggle_slider(enable):
fn=rag_answer_streaming,
inputs=[
inp,
auto_mode,
raw_radio,
vector_only_radio,
graph_only_radio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
from typing import List, Dict, Optional

from PyCGraph import GPipeline
from pycgraph import GPipeline

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-llm/src/hugegraph_llm/flows/build_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import json

from PyCGraph import GPipeline
from pycgraph import GPipeline

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import json

from PyCGraph import GPipeline
from pycgraph import GPipeline

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.nodes.document_node.chunk_split import ChunkSplitNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import json

from PyCGraph import GPipeline
from pycgraph import GPipeline

from hugegraph_llm.config import huge_settings, index_settings
from hugegraph_llm.flows.common import BaseFlow
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import json
from PyCGraph import GPipeline
from pycgraph import GPipeline
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.nodes.document_node.chunk_split import ChunkSplitNode
from hugegraph_llm.nodes.hugegraph_node.schema import SchemaNode
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-llm/src/hugegraph_llm/flows/import_graph_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json

import gradio as gr
from PyCGraph import GPipeline
from pycgraph import GPipeline
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.nodes.hugegraph_node.commit_to_hugegraph import Commit2GraphNode
from hugegraph_llm.nodes.hugegraph_node.schema import SchemaNode
Expand Down
Loading
Loading