Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Conversation

@fantasy-lotus
Copy link
Collaborator

@fantasy-lotus fantasy-lotus commented Aug 12, 2025

Summary by CodeRabbit

  • New Features

    • Added support for multiple vector database backends: Milvus and Qdrant alongside Faiss.
    • Introduced admin authentication system with login configuration.
    • Enhanced RAG API with reranking methods, neighbor-first option, and custom priority information.
    • Added new Gremlin output types for query results.
  • Configuration

    • Improved type safety across core configurations.
    • Added embedding dimension specifications for LLM models.
    • New vector database configuration options.
  • Documentation

    • Updated setup instructions for vector database backends and NLTK dependencies.
  • Refactoring

    • Redesigned vector index system to support pluggable backends.
    • Restructured workflow system with extensible flow abstraction.

@github-actions
Copy link

@codecov-ai-reviewer review

@coderabbitai
Copy link

coderabbitai bot commented Aug 12, 2025

Caution

Review failed

Failed to post review comments

代码审查报告

概览

此PR引入了流程驱动的工作流架构(BaseFlow/Scheduler/Node),替换了VectorIndex为可插拔向量存储抽象(Faiss/Milvus/Qdrant),添加了新的配置管理系统(AdminConfig/IndexConfig),并更新了嵌入模型和LLM初始化逻辑以支持维度参数和多向量数据库后端。

变更概览

变更内容 文件列表
核心流程架构 flows/common.py, flows/scheduler.py, flows/build_schema.py, flows/build_vector_index.py, flows/graph_extract.py, flows/import_graph_data.py, flows/prompt_generate.py, flows/rag_flow_*.py, flows/text2gremlin.py, flows/update_vid_embeddings.py, flows/get_graph_index_info.py, flows/utils.py
节点系统实现 nodes/base_node.py, nodes/common_node/merge_rerank_node.py, nodes/document_node/chunk_split.py, nodes/hugegraph_node/[*], nodes/index_node/[*], nodes/llm_node/[*], nodes/util.py
向量存储抽象 indices/vector_index/base.py, indices/vector_index/faiss_vector_store.py, indices/vector_index/milvus_vector_store.py, indices/vector_index/qdrant_vector_store.py, indices/vector_index.py
配置管理 config/admin_config.py, config/index_config.py, config/hugegraph_config.py, config/llm_config.py, config/generate.py, config/models/[*]
嵌入模型更新 models/embeddings/base.py, models/embeddings/openai.py, models/embeddings/ollama.py, models/embeddings/litellm.py, models/embeddings/init_embedding.py
LLM工厂函数 models/llms/init_llm.py, models/llms/ollama.py
工作流状态 state/ai_state.py
操作符更新 operators/gremlin_generate_task.py, operators/kg_construction_task.py, operators/index_op/[*], operators/llm_op/[*], operators/graph_rag_task.py
演示UI扩展 demo/rag_demo/configs_block.py, demo/rag_demo/rag_block.py, demo/rag_demo/text2gremlin_block.py, demo/rag_demo/[other_block.py](other_block.py)
工具函数 utils/vector_index_utils.py, utils/graph_index_utils.py, utils/decorators.py, utils/[*]
API层 api/rag_api.py, api/models/rag_requests.py
测试和文档 tests/config/test_config.py, tests/indices/test_*.py, config.md, README.md, pyproject.toml

架构序列图

sequenceDiagram
    participant User
    participant Scheduler
    participant Flow
    participant GPipeline
    participant Node
    participant Operator
    participant VectorStore

    User->>Scheduler: schedule_flow(flow_name, ...)
    Scheduler->>Flow: build_flow(...)
    Flow->>GPipeline: 创建管道
    Flow->>Node: 注册节点
    Node->>Operator: 初始化操作符
    Operator->>VectorStore: 获取向量存储实例
    
    GPipeline->>Node: run()
    Node->>Node: node_init()
    Node->>Operator: operator_schedule(data_json)
    Operator->>VectorStore: add/search/remove()
    
    Node-->>GPipeline: 返回状态
    Flow->>Flow: post_deal(pipeline)
    Flow-->>Scheduler: 返回结果
    Scheduler-->>User: 返回答案
Loading

代码审查工作量评估

🎯 5 (关键) | ⏱️ ~120-150 分钟

复杂性标签: 关键

评估理由:

  • 范围广泛: 超过200个文件被修改或新增,涉及多个子系统
  • 架构重构: 引入全新的流程-节点-操作符三层架构模式,替换了原有设计
  • 异构变更: 既有新的抽象层(VectorStoreBase)、工厂函数、状态管理,又有大量的配置调整和参数传递链的改动
  • 逻辑密度高: 多个Flow类实现管道编排、Node类实现初始化和调度、向量存储支持多个后端(Faiss/Milvus/Qdrant)
  • 交叉依赖: 配置系统、向量存储、节点系统、演示UI之间存在紧密的相互依赖关系
  • 类型系统改动: LLMConfig和HugeGraphConfig的Optional类型移除,增加了维度参数
  • 公共API更变: RAGRequest新增字段,GremlinGenerateRequest新增client_config,嵌入接口调整

可能相关的PR

诗歌

🐰 流程如溪水,节点若鹅卵
向量库百般,从此不再扎
配置添新章,API焕新装
架构大升级,拥抱无限想
— 一只代码兔子的绝句

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.73% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed PR标题"feat(llm): vector-db"准确反映了本次变更的核心内容。根据原始总结分析,此PR的主要工作是:引入向量数据库支持(包括新的索引配置、多个向量存储实现如FaissVectorIndex、MilvusVectorIndex、QdrantVectorIndex),重构整个向量索引系统以支持多个后端,以及添加新的工作流编排框架。标题简洁清晰地传达了主要功能变化,开发者可以通过扫过版本历史快速理解这个PR添加了向量数据库功能支持。
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch vector-db

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-ai

This comment has been minimized.

Comment on lines +27 to +34
graph_url: str = "127.0.0.1:8080"
graph_name: str = "hugegraph"
graph_user: str = "admin"
graph_pwd: str = "xxx"
graph_space: Optional[str] = None

# graph query config
limit_property: Optional[str] = "False"
max_graph_path: Optional[int] = 10
max_graph_items: Optional[int] = 30
edge_limit_pre_label: Optional[int] = 8
limit_property: str = "False"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing Optional typing from required configuration fields like graph_url, graph_name, etc. is a breaking change. This could cause issues for existing configurations that rely on these being nullable. Consider maintaining backward compatibility or providing proper migration documentation.

Suggested change
graph_url: str = "127.0.0.1:8080"
graph_name: str = "hugegraph"
graph_user: str = "admin"
graph_pwd: str = "xxx"
graph_space: Optional[str] = None
# graph query config
limit_property: Optional[str] = "False"
max_graph_path: Optional[int] = 10
max_graph_items: Optional[int] = 30
edge_limit_pre_label: Optional[int] = 8
limit_property: str = "False"
# Keep Optional for backward compatibility
graph_url: Optional[str] = "127.0.0.1:8080"
graph_name: Optional[str] = "hugegraph"
graph_user: Optional[str] = "admin"
graph_pwd: Optional[str] = "xxx"

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines 26 to 36

qdrant_host: Optional[str] = os.environ.get("QDRANT_HOST", None)
qdrant_port: int = int(os.environ.get("QDRANT_PORT", "6333"))
qdrant_api_key: Optional[str] = os.environ.get("QDRANT_API_KEY") if os.environ.get("QDRANT_API_KEY") else None

milvus_host: Optional[str] = os.environ.get("MILVUS_HOST", None)
milvus_port: int = int(os.environ.get("MILVUS_PORT", "19530"))
milvus_user: str = os.environ.get("MILVUS_USER", "")
milvus_password: str = os.environ.get("MILVUS_PASSWORD", "")

now_vector_index: str = 'Faiss'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The environment variable access pattern with default fallbacks could be simplified and made more robust. Also, the default vector index should be configurable rather than hardcoded.

Suggested change
qdrant_host: Optional[str] = os.environ.get("QDRANT_HOST", None)
qdrant_port: int = int(os.environ.get("QDRANT_PORT", "6333"))
qdrant_api_key: Optional[str] = os.environ.get("QDRANT_API_KEY") if os.environ.get("QDRANT_API_KEY") else None
milvus_host: Optional[str] = os.environ.get("MILVUS_HOST", None)
milvus_port: int = int(os.environ.get("MILVUS_PORT", "19530"))
milvus_user: str = os.environ.get("MILVUS_USER", "")
milvus_password: str = os.environ.get("MILVUS_PASSWORD", "")
now_vector_index: str = 'Faiss'
qdrant_host: Optional[str] = os.getenv("QDRANT_HOST")
qdrant_port: int = int(os.getenv("QDRANT_PORT", "6333"))
qdrant_api_key: Optional[str] = os.getenv("QDRANT_API_KEY")
milvus_host: Optional[str] = os.getenv("MILVUS_HOST")
milvus_port: int = int(os.getenv("MILVUS_PORT", "19530"))
milvus_user: str = os.getenv("MILVUS_USER", "")
milvus_password: str = os.getenv("MILVUS_PASSWORD", "")
now_vector_index: str = os.getenv("VECTOR_INDEX_TYPE", "Faiss")

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +40 to +42
@abstractmethod
def get_all_properties(self) -> list[str]:
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abstract base class has inconsistent return type annotations. get_all_properties() returns list[str] but should be more flexible to handle different property types as seen in the implementations.

Suggested change
@abstractmethod
def get_all_properties(self) -> list[str]:
"""
@abstractmethod
def get_all_properties(self) -> List[Any]:
"""
Get all properties stored in the vector index.
Returns a list of properties associated with the vectors.
"""

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +97 to +110

def _deserialize_property(self, prop) -> str:
"""If input is a string, return as-is. If dict or list, convert to JSON string."""
if isinstance(prop, str):
return prop
return json.dumps(prop)

def _serialize_property(self, prop: str):
"""If input is a JSON string, parse it. Otherwise, return as-is."""
try:
return json.loads(prop)
except (json.JSONDecodeError, TypeError):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _deserialize_property method has a potential security vulnerability. Using json.dumps/loads on untrusted data without validation could lead to issues. Consider adding input validation and size limits.

Suggested change
def _deserialize_property(self, prop) -> str:
"""If input is a string, return as-is. If dict or list, convert to JSON string."""
if isinstance(prop, str):
return prop
return json.dumps(prop)
def _serialize_property(self, prop: str):
"""If input is a JSON string, parse it. Otherwise, return as-is."""
try:
return json.loads(prop)
except (json.JSONDecodeError, TypeError):
def _deserialize_property(self, prop) -> str:
"""If input is a string, return as-is. If dict or list, convert to JSON string."""
if isinstance(prop, str):
if len(prop) > 65535: # Milvus VARCHAR limit
raise ValueError("Property string too long")
return prop
try:
result = json.dumps(prop)
if len(result) > 65535:
raise ValueError("Serialized property too long")
return result
except (TypeError, ValueError) as e:
raise ValueError(f"Cannot serialize property: {e}")

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +75 to +82
id=i,
vector=vector,
payload={"property": prop},
)
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing proper error handling for Qdrant client operations. Network failures, authentication errors, or API errors should be caught and handled gracefully.

Suggested change
id=i,
vector=vector,
payload={"property": prop},
)
)
try:
self.client.upsert(collection_name=self.name, points=points, wait=True)
except Exception as e:
log.error("Failed to add vectors to Qdrant: %s", e)
raise RuntimeError(f"Vector addition failed: {e}") from e

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines 66 to 67
@abstractmethod
def get_texts_embeddings(
self,
texts: List[str]
) -> List[List[float]]:
def get_embedding_dim(
self,
) -> int:
"""Comment"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new abstract method get_embedding_dim() is a breaking change that will cause failures in existing embedding implementations that don't implement this method. Consider providing a default implementation or using a different approach.

Suggested change
@abstractmethod
def get_texts_embeddings(
self,
texts: List[str]
) -> List[List[float]]:
def get_embedding_dim(
self,
) -> int:
"""Comment"""
def get_embedding_dim(self) -> int:
"""Get the embedding dimension. Override this method in implementations."""
# Default implementation for backward compatibility
try:
sample_embedding = self.get_text_embedding("test")
return len(sample_embedding)
except Exception:
raise NotImplementedError("get_embedding_dim must be implemented")

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines 25 to 32
class OllamaEmbedding(BaseEmbedding):
def __init__(self, model_name: str, host: str = "127.0.0.1", port: int = 11434, **kwargs):
self.model_name = model_name
def __init__(
self,
model: str = 'quentinz/bge-large-zh-v1.5',
embedding_dimension: int = 1024,
host: str = "127.0.0.1",
port: int = 11434,
**kwargs,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name change from model_name to model is a breaking change. This should be handled with backward compatibility or proper deprecation warnings.

Suggested change
class OllamaEmbedding(BaseEmbedding):
def __init__(self, model_name: str, host: str = "127.0.0.1", port: int = 11434, **kwargs):
self.model_name = model_name
def __init__(
self,
model: str = 'quentinz/bge-large-zh-v1.5',
embedding_dimension: int = 1024,
host: str = "127.0.0.1",
port: int = 11434,
**kwargs,
def __init__(
self,
model: str = 'quentinz/bge-large-zh-v1.5',
model_name: Optional[str] = None, # Deprecated, use 'model'
embedding_dimension: int = 1024,
host: str = "127.0.0.1",
port: int = 11434,
**kwargs,
):
if model_name is not None:
import warnings
warnings.warn("model_name is deprecated, use 'model' instead", DeprecationWarning)
model = model_name
self.model = model

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines 94 to 103
"Faiss": FaissVectorIndex,
"Milvus": MilvusVectorIndex,
"Qdrant": QdrantVectorIndex,
}
ret = mapping.get(vector_index_str)
assert ret
return ret # type: ignore
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_vector_index_class function uses assertions for validation which should be replaced with proper error handling for production code.

Suggested change
"Faiss": FaissVectorIndex,
"Milvus": MilvusVectorIndex,
"Qdrant": QdrantVectorIndex,
}
ret = mapping.get(vector_index_str)
assert ret
return ret # type: ignore
def get_vector_index_class(vector_index_str: str) -> Type[VectorStoreBase]:
mapping = {
"Faiss": FaissVectorIndex,
"Milvus": MilvusVectorIndex,
"Qdrant": QdrantVectorIndex,
}
ret = mapping.get(vector_index_str)
if ret is None:
raise ValueError(f"Unsupported vector index type: {vector_index_str}. Supported types: {list(mapping.keys())}")
return ret

Did we get this right? 👍 / 👎 to inform future reviews.

raise gr.Error("Connection to localhost or private network addresses is not allowed.")


def test_litellm_embedding(api_key, api_base, model_name, model_dim) -> int:

Check failure

Code scanning / CodeQL

Full server-side request forgery Critical

The full URL of this request depends on a
user-provided value
.
The full URL of this request depends on a
user-provided value
.
The full URL of this request depends on a
user-provided value
.
openai_embedding_api_base: Optional[str] = os.environ.get("OPENAI_EMBEDDING_BASE_URL", "https://api.openai.com/v1")
openai_embedding_api_key: Optional[str] = os.environ.get("OPENAI_EMBEDDING_API_KEY")
openai_embedding_model: Optional[str] = "text-embedding-3-small"
openai_chat_api_base: str = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixed Type Annotations: Lines 35-49 use inconsistent type annotations (mixing with ). Consider using consistently for better Python 3.10 compatibility.

"hugegraph-python-client",

# Vector DB backends
"pymilvus==2.5.9",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version Pinning Risk: Hard-pinning pymilvus==2.5.9 and qdrant-client==1.14.2 may cause compatibility issues. Consider using compatible version ranges (e.g., pymilvus>=2.5.9,<3.0.0) to allow for patch updates and better compatibility.

if admin_settings.admin_token != req.admin_token:
raise generate_response(RAGResponse(status_code=status.HTTP_403_FORBIDDEN, #pylint: disable=E0702
message="Invalid admin_token"))
return generate_response(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Best Practice: Good change from raising an exception to returning a proper HTTP response for authentication failures. This prevents potential information leakage through exception traces.

graph_name: Optional[str] = "hugegraph"
graph_user: Optional[str] = "admin"
graph_pwd: Optional[str] = "xxx"
graph_url: str = "127.0.0.1:8080"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration Validation: Removing Optional[] from graph_url, graph_name, etc. makes these required fields, which is good. However, ensure there's proper validation that these are not empty strings and have valid formats (especially for graph_url).

ollama_embedding_host: str = "127.0.0.1"
ollama_embedding_port: int = int(11434)
ollama_embedding_model: str | None = None
# 来自 vector-db 的可选维度配置(用于向量索引维度一致)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment Variable Parsing: The conditional logic for ollama_embedding_model_dim could be simplified and made more robust. Consider adding validation for the integer conversion:

_env_ollama_dim = os.getenv("OLLAMA_EMBEDDING_MODEL_DIM")
try:
    ollama_embedding_model_dim: Optional[int] = int(_env_ollama_dim) if _env_ollama_dim else None
except (ValueError, TypeError):
    ollama_embedding_model_dim = None

- Remove qianfan.py and references from init_embedding
- Clean LLMConfig: delete QianFan/WenXin fields
- No behavior change for other providers
- Add backend connection test & persistence in Gradio UI
- Support CUR_VECTOR_INDEX env override in IndexConfig
- Lazy import Milvus/Qdrant with user-friendly errors
- LiteLLM/OpenAI/Ollama use unified 'model' field
- Add async batch method usage in gremlin example index builder
- Improve Ollama async batch fallback
- Reformat 42 files to comply with Black code style
- Align with CI workflow requirements
- No functional changes
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 373 files.

Valid Invalid Ignored Fixed
305 1 67 0
Click to see the invalid file list
  • hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 32

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
hugegraph-llm/src/hugegraph_llm/middleware/middleware.py (2)

37-43: 时间单位转换逻辑合理,但后续日志存在单位不一致问题。

时间单位的自动切换逻辑(ms 到 s)和响应头设置都是正确的。但是,第 44 行的日志消息硬编码了 "ms" 单位,而 process_time 变量在超过 1000ms 时已经被转换为秒。这会导致日志中显示错误的单位。

请参考下面的注释修复日志单位不一致问题。


44-44: 修复日志中的单位不一致错误。

日志消息硬编码了 "ms" 单位,但 process_time 变量可能已经被转换为秒(当处理时间 > 1000ms 时)。这会导致日志输出错误的单位信息。

例如,如果请求耗时 2000ms:

  • process_time 被转换为 2.0 秒
  • 日志显示 "Request process time: 2.00 ms"(错误)
  • 实际应该显示 "2.00 s" 或 "2000.00 ms"

应用此差异修复单位不一致问题:

-        log.info("Request process time: %.2f ms, code=%d", process_time, response.status_code)
+        log.info("Request process time: %.2f %s, code=%d", process_time, unit, response.status_code)

或者,如果希望日志始终使用毫秒单位,可以在转换前保存原始值:

         process_time = (time.perf_counter() - start_time) * 1000  # ms
+        process_time_ms = process_time
         unit = "ms"
         if process_time > 1000:
             process_time /= 1000
             unit = "s"
 
         response.headers["X-Process-Time"] = f"{process_time:.2f} {unit}"
-        log.info("Request process time: %.2f ms, code=%d", process_time, response.status_code)
+        log.info("Request process time: %.2f ms, code=%d", process_time_ms, response.status_code)
hugegraph-llm/src/hugegraph_llm/utils/decorators.py (2)

119-121: 闭包附加逻辑存在根本性缺陷

当前实现存在多个问题:

  1. 大多数返回值不会有 __closure__ 属性 - 只有函数/闭包对象才有此属性,普通对象、字典、列表等都不具备
  2. setattr(result, "task_id", task_id) 不会修改闭包上下文 - 即使检查了 __closure__,这只是给函数对象添加属性,并不会将 task_id 注入到闭包的变量空间中
  3. 该机制对大多数返回类型无效

如果目标是将 task_id 与结果关联,建议考虑以下替代方案:

  • 返回包含 task_id 的元组或字典
  • 使用上下文管理器或线程本地存储
  • 要求被装饰函数返回特定的数据结构(如包含 task_id 字段的对象)

示例替代方案(返回包含 task_id 的字典):

def with_task_id(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        import uuid
        
        task_id = f"task_{uuid.uuid4()}"
        log.debug("New task created with id: %s", task_id)
        
        result = func(*args, **kwargs)
        
        # 如果返回值是字典,添加 task_id
        if isinstance(result, dict):
            result["task_id"] = task_id
        
        return result
    
    return wrapper

109-124: 缺少 @wraps 装饰器和异步函数支持

当前装饰器实现存在两个重要问题:

  1. 缺少 @wraps(func) - 这会导致被装饰函数丢失原始的名称、文档字符串和其他元数据,影响调试和内省
  2. 不支持异步函数 - 如果 func 是协程函数,装饰器将无法正确工作

文件中的其他装饰器(如 log_timerecord_rpm)已正确实现了这两个特性。为保持一致性,with_task_id 应采用相同模式。

应用此修改以添加 @wraps 和异步支持:

 def with_task_id(func: Callable) -> Callable:
-    def wrapper(*args: Any, **kwargs: Any) -> Any:
+    @wraps(func)
+    async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
         import uuid
-
+        
         task_id = f"task_{uuid.uuid4()}"
         log.debug("New task created with id: %s", task_id)
-
-        # Store the original return value
-        result = func(*args, **kwargs)
-        # Add the task_id to the function's context
-        if hasattr(result, "__closure__") and result.__closure__:
-            # If it's a closure, we can add the task_id to its context
-            setattr(result, "task_id", task_id)
+        
+        result = await func(*args, **kwargs)
+        # TODO: Implement proper task_id tracking mechanism
+        return result
+    
+    @wraps(func)
+    def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
+        import uuid
+        
+        task_id = f"task_{uuid.uuid4()}"
+        log.debug("New task created with id: %s", task_id)
+        
+        result = func(*args, **kwargs)
+        # TODO: Implement proper task_id tracking mechanism
         return result
-
-    return wrapper
+    
+    if asyncio.iscoroutinefunction(func):
+        return async_wrapper
+    return sync_wrapper
hugegraph-llm/src/hugegraph_llm/operators/llm_op/keyword_extract.py (1)

90-94: 不要过滤合法的单字符关键词

这里改成 len(k.strip()) > 1 会把所有单字符关键词直接丢掉。对于中文等语言,很多有效关键词就是单个汉字,这会显著降低召回。建议只过滤空字符串,保留长度为 1 的合法词。

-                for k in re.split(r"[,,]+", match)
-                if len(k.strip()) > 1
+                for k in re.split(r"[,,]+", match)
+                if k.strip()
hugegraph-llm/src/hugegraph_llm/operators/llm_op/answer_synthesize.py (1)

313-316: __llm_generate_with_meta_info 添加返回类型注解并补充异常处理

当前 __llm_generate_with_meta_info 缺少返回类型注解,应添加 -> AsyncGenerator[Tuple[int, str, str], None]。同时缺乏异常处理,建议在 agenerate_streaming 调用处捕获异常以防止任务组因单一失败而中断:

async def __llm_generate_with_meta_info(self, task_id: int, target_key: str, prompt: str) -> AsyncGenerator[Tuple[int, str, str], None]:
    try:
        async for token in self._llm.agenerate_streaming(prompt=prompt):
            yield task_id, target_key, token
    except Exception as e:
        # Handle/log exception appropriately
        raise

(FIXME 注释中的类型不匹配源于 BaseLLM.agenerate_streaming 定义为 async def 返回 AsyncGenerator[str, None],导致调用时得到 Coroutine[Any, Any, AsyncGenerator[str, None]]。添加返回类型注解后 mypy 应能正确推断。)

hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/graph_query_node.py (1)

105-142: 立即修复:OperatorList 空依赖导致运行时崩溃

这里用 OperatorList(None, None) 初始化时将 LLM 和 Embedding 都设为 None。后续 _gremlin_generate_query() 会调用 example_index_query()gremlin_generate_synthesize(),而根据 hugegraph_llm/operators/operator_list.py(如第 64-143 行),这些算子会直接访问 self.embedding / self.llm 执行业务逻辑。一旦执行到这里就会触发 AttributeErrorNoneType 访问,整个节点会在首次运行时崩溃,属于确定性的阻断错误。

请确保在构造 OperatorList 时注入真实的 LLM 与 Embedding 实例(例如从上下文或 wk_input 取得),或者在 operator_schedule 前完成正确赋值,否则该节点无法成功返回任何结果。

♻️ Duplicate comments (1)
hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/schema.py (1)

18-18: 验证 PyCGraph 包中 CStatus 的可用性。

text2gremlin.py 中的 GPipeline 导入问题类似,Pylint 报告第 18 行的 CStatus 导入错误。请参考 text2gremlin.py 中的验证建议,一并检查 PyCGraph 包的正确导入方式。

🧹 Nitpick comments (55)
hugegraph-llm/src/tests/config/test_config.py (1)

27-28: 建议添加显式断言以提高测试覆盖率。

当前测试仅依赖于异常抛出来验证 NLTK 资源是否找到,缺少显式断言。建议添加断言来明确验证资源路径配置的正确性,提升测试的可维护性和可读性。

应用以下 diff 来增强测试:

 nltk.data.path.append(resource_path)
-nltk.data.find("corpora/stopwords")
+found_path = nltk.data.find("corpora/stopwords")
+self.assertIsNotNone(found_path)
+self.assertTrue(found_path.startswith(resource_path) or resource_path in found_path)
hugegraph-llm/src/hugegraph_llm/indices/graph_index.py (4)

38-39: 考虑添加错误处理和确认机制

此方法执行破坏性操作(删除所有顶点),但缺少:

  1. 异常处理:如果 Gremlin 查询失败不会捕获错误
  2. 确认机制:在生产环境中可能需要确认或日志记录

建议改进:

 def clear_graph(self):
-    self.client.gremlin().exec("g.V().drop()")
+    try:
+        self.client.gremlin().exec("g.V().drop()")
+    except Exception as e:
+        raise RuntimeError(f"Failed to clear graph: {e}") from e

42-43: 提醒:TODO 需要实现

此方法当前为空实现。需要实现将三元组添加到图的功能。

您希望我协助生成实现代码或创建新的 issue 来跟踪此任务吗?


46-47: 提醒:TODO 需要实现

此方法当前为空实现。需要实现搜索三元组的功能。

您希望我协助生成实现代码或创建新的 issue 来跟踪此任务吗?


49-50: 考虑添加错误处理和类型注解

建议的可选改进:

  1. 添加异常处理以提供更好的错误信息
  2. 添加返回类型注解以提高代码可读性
-def execute_gremlin_query(self, query: str):
-    return self.client.gremlin().exec(query)
+def execute_gremlin_query(self, query: str) -> Any:
+    try:
+        return self.client.gremlin().exec(query)
+    except Exception as e:
+        raise RuntimeError(f"Failed to execute Gremlin query: {e}") from e

需要添加导入:from typing import Any

hugegraph-llm/src/hugegraph_llm/operators/llm_op/info_extract.py (1)

194-194: 建议将硬编码的 max_length 改为可配置参数。

TODO 注释指出 max_length 应该从配置文件中读取,而不是硬编码为 256。这将提高代码的灵活性和可维护性。

您希望我生成将 max_length 改为可配置参数的实现代码吗?

hugegraph-llm/src/hugegraph_llm/models/llms/ollama.py (2)

32-35: 建议添加客户端初始化的错误处理和参数验证。

当前实现直接创建 Ollama 客户端,没有处理可能的初始化失败。建议:

  1. 添加 try-except 块捕获客户端创建异常
  2. 验证 host 和 port 参数的有效性(如端口范围检查)
  3. 考虑记录客户端初始化成功的日志

应用此差异添加错误处理:

 def __init__(self, model: str, host: str = "127.0.0.1", port: int = 11434, **kwargs):
+    if not (1 <= port <= 65535):
+        raise ValueError(f"Invalid port number: {port}")
     self.model = model
-    self.client = ollama.Client(host=f"http://{host}:{port}", **kwargs)
-    self.async_client = ollama.AsyncClient(host=f"http://{host}:{port}", **kwargs)
+    try:
+        self.client = ollama.Client(host=f"http://{host}:{port}", **kwargs)
+        self.async_client = ollama.AsyncClient(host=f"http://{host}:{port}", **kwargs)
+        log.info("Ollama client initialized successfully for model: %s", model)
+    except Exception as e:
+        log.error("Failed to initialize Ollama client: %s", e)
+        raise

60-60: 建议使用日志记录代替 print() 语句。

代码在多处使用 print() 输出错误信息(第 60、86、130 行),而其他地方使用了 log.info()。为保持一致性和便于生产环境日志管理,建议将 print() 替换为 log.error()log.warning()

示例修改:

# 第 60 行
log.error("Retrying LLM call: %s", e)

# 第 86 行
log.error("Retrying LLM call: %s", e)

# 第 130 行
log.error("Retrying LLM call: %s", e)

Also applies to: 86-86, 130-130

hugegraph-llm/src/hugegraph_llm/api/models/rag_requests.py (3)

49-51: 建议为 custom_priority_info 添加验证。

custom_priority_info 字段目前没有长度限制或格式验证,可能导致用户传入过长或格式不正确的字符串。建议:

  • 添加 max_length 约束(例如 Field(..., max_length=500)
  • 在描述中说明期望的格式或提供示例

应用此差异添加长度验证:

-    custom_priority_info: str = Query(
-        "", description="Custom information to prioritize certain results."
-    )
+    custom_priority_info: str = Query(
+        "", 
+        max_length=500,
+        description="Custom information to prioritize certain results (max 500 characters)."
+    )

57-66: 可选:改进字符串格式化样式。

描述字符串中使用反斜杠续行不是最佳的 Python 风格。建议使用括号和隐式字符串拼接以提高可读性:

应用此差异改进格式:

-    vector_dis_threshold: float = Query(
-        0.9,
-        description="Threshold for vector similarity\
-                                         (results greater than this will be ignored).",
-    )
-    topk_per_keyword: int = Query(
-        1,
-        description="TopK results returned for each keyword \
-                                   extracted from the query, by default only the most similar one is returned.",
-    )
+    vector_dis_threshold: float = Query(
+        0.9,
+        description=(
+            "Threshold for vector similarity "
+            "(results greater than this will be ignored)."
+        ),
+    )
+    topk_per_keyword: int = Query(
+        1,
+        description=(
+            "TopK results returned for each keyword extracted from the query, "
+            "by default only the most similar one is returned."
+        ),
+    )

注意:GraphRAGRequest 中的 lines 94-103 也存在相同的样式问题。


185-195: 可选:将必需占位符提取为类常量。

验证器中的 required_placeholders 列表硬编码在方法内部。为了提高可维护性和可重用性,建议将其提取为类常量或模块级常量。

应用此差异将占位符提取为常量:

 class GremlinGenerateRequest(BaseModel):
+    REQUIRED_PROMPT_PLACEHOLDERS = ["{query}", "{schema}", "{example}", "{vertices}"]
+    
     query: str
     example_num: Optional[int] = Query(
         0, description="Number of Gremlin templates to use.(0 means no templates)"
     )
     ...
     
     @field_validator("gremlin_prompt")
     @classmethod
     def validate_prompt_placeholders(cls, v):
         if v is not None:
-            required_placeholders = ["{query}", "{schema}", "{example}", "{vertices}"]
-            missing = [p for p in required_placeholders if p not in v]
+            missing = [p for p in cls.REQUIRED_PROMPT_PLACEHOLDERS if p not in v]
             if missing:
                 raise ValueError(
                     f"Prompt template is missing required placeholders: {', '.join(missing)}"
                 )
         return v
hugegraph-llm/src/hugegraph_llm/flows/rag_flow_vector_only.py (2)

80-80: 代码风格:多余的return语句

第80行的裸return语句是多余的,可以移除以提高代码简洁性。

应用此diff:

     }
-    return

111-116: API返回值说明

post_deal()返回包含4种答案类型的字典,但在向量专用流程中,只有vector_only_answer应该有内容,其他字段(graph_only_answergraph_vector_answer)将为空。

建议在文档字符串中明确说明:

  • 在vector-only流程中,只有vector_only_answer字段会被填充
  • 其他字段保留是为了保持API一致性

可选的改进:在方法上添加文档字符串说明返回值结构:

 def post_deal(self, pipeline=None):
+    """
+    Post-process the pipeline results.
+    
+    Args:
+        pipeline: The executed pipeline instance
+        
+    Returns:
+        dict: Contains answer fields. For vector-only flow, only vector_only_answer 
+              will be populated; other fields (graph_only_answer, graph_vector_answer) 
+              are included for API consistency but will be empty.
+    """
     if pipeline is None:
hugegraph-llm/src/hugegraph_llm/operators/llm_op/answer_synthesize.py (1)

76-84: 在已有事件循环中调用 asyncio.run 可能崩溃

asyncio.run(...) 若在已有事件循环(如 Gradio/Notebook/Web 服务器)内调用,会抛出 RuntimeError。建议兼容处理:

-        context = asyncio.run(
-            self.async_generate(
-                context,
-                context_head_str,
-                context_tail_str,
-                vector_result_context,
-                graph_result_context,
-            )
-        )
+        try:
+            loop = asyncio.get_running_loop()
+        except RuntimeError:
+            # 无运行中的事件循环,安全使用 asyncio.run
+            context = asyncio.run(
+                self.async_generate(
+                    context,
+                    context_head_str,
+                    context_tail_str,
+                    vector_result_context,
+                    graph_result_context,
+                )
+            )
+        else:
+            # 已存在事件循环,转用 run_until_complete
+            context = loop.run_until_complete(
+                self.async_generate(
+                    context,
+                    context_head_str,
+                    context_tail_str,
+                    vector_result_context,
+                    graph_result_context,
+                )
+            )
hugegraph-llm/src/hugegraph_llm/state/ai_state.py (1)

168-199: setup 增加返回类型注解以满足 mypy

建议显式标注返回类型,提高可读性与类型检查通过率:

-    def setup(self):
+    def setup(self) -> CStatus:
hugegraph-llm/src/hugegraph_llm/config/hugegraph_config.py (1)

33-33: limit_property 定义为布尔型而非字符串

当前值为字符串 "False",下游判断易出错。建议改为布尔并保持默认 False:

-    limit_property: Optional[str] = "False"
+    limit_property: Optional[bool] = False
hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py (5)

214-232: 调度器集成看起来合理,但建议加强异常信息回传

UI 层已包装 try/except 并返回 JSON error,良好。可考虑在异常时同时记录 flow_name 与输入片段,便于定位:

-    except Exception as e:  # pylint: disable=broad-except
-        log.error("UI text2gremlin error: %s", e)
+    except Exception as e:  # pylint: disable=broad-except
+        log.error("UI text2gremlin error: %s | flow=text2gremlin, inp_preview=%s", e, str(inp)[:200])

261-297: UI 显示小优化:将 Code 组件语言改为 json,更贴合输出内容

match/tmpl_exec_out/raw_exec_out 输出均为 JSON 字符串,建议统一设置为 language="json",提升高亮与可读性。

-            match = gr.Code(
-                label="Similar Template (TopN)",
-                language="javascript",
+            match = gr.Code(
+                label="Similar Template (TopN)",
+                language="json",
                 elem_classes="code-container-show",
             )
@@
-            tmpl_exec_out = gr.Code(
-                label="Query With Template Output",
-                language="json",
+            tmpl_exec_out = gr.Code(
+                label="Query With Template Output",
+                language="json",
                 elem_classes="code-container-show",
             )
@@
-            raw_exec_out = gr.Code(
-                label="Query Without Template Output",
-                language="json",
+            raw_exec_out = gr.Code(
+                label="Query Without Template Output",
+                language="json",
                 elem_classes="code-container-show",
             )

84-86: 容错建议:确保 graph_name/graph_space 非空后再拼接索引目录名

huge_settings.graph_name/graph_space 类型为 Optional,虽然提供了默认值,但建议在此处显式校验并在日志中提示,避免出现 None 参与路径拼接的隐患。

-    folder_name = get_index_folder_name(
-        huge_settings.graph_name, huge_settings.graph_space
-    )
+    if not huge_settings.graph_name:
+        return {"error": "graph_name is not configured"}
+    folder_name = get_index_folder_name(
+        huge_settings.graph_name, huge_settings.graph_space or None
+    )

Also applies to: 98-100, 119-121


339-353: 命名一致性提醒:is_vector_only 与 get_vertex_only 语义可能不一致

get_vertex_only 透传为 is_vector_only,从命名看一个是“仅向量”,另一个像“仅顶点”。请确认语义一致,避免误导。

若确实表达“仅返回顶点”,建议使用更贴切的参数名或单独开关。


183-209: 删除未使用的 _process_schema 函数
在本模块未发现对 _process_schema 的调用,建议将其删除或迁移至单元测试/工具模块,以提升模块内聚性。

hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py (1)

98-103: 委托给调度器后缺少错误处理。

scheduler.schedule_flow() 可能抛出 RuntimeError(参见 scheduler.py:102-138),但此处未捕获异常。考虑添加更具体的错误处理以向用户提供更清晰的反馈。

应用以下差异添加错误处理:

 def build_vector_index(input_file, input_text):
     if input_file and input_text:
         raise gr.Error("Please only choose one between file and text.")
     texts = read_documents(input_file, input_text)
     scheduler = SchedulerSingleton.get_instance()
-    return scheduler.schedule_flow("build_vector_index", texts)
+    try:
+        return scheduler.schedule_flow("build_vector_index", texts)
+    except RuntimeError as e:
+        raise gr.Error(f"Failed to build vector index: {e}") from e
hugegraph-llm/src/hugegraph_llm/models/embeddings/init_embedding.py (1)

25-29: 考虑 model_map 的时效性问题。

model_map 在模块加载时使用 llm_settings 的当前值进行初始化。如果 llm_settings 在运行时更新,model_map 将包含过时的值。考虑将其改为函数或属性以获取最新配置。

hugegraph-llm/src/hugegraph_llm/flows/build_vector_index.py (2)

31-35: 考虑参数化硬编码配置值。

第 33-34 行将 languagesplit_type 硬编码为 "zh""paragraph"。考虑将这些作为参数传入 build_flow 或从配置读取,以提高灵活性。

应用以下差异参数化这些值:

-    def build_flow(self, texts):
+    def build_flow(self, texts, language="zh", split_type="paragraph"):
         pipeline = GPipeline()
         # prepare for workflow input
         prepared_input = WkFlowInput()
-        self.prepare(prepared_input, texts)
+        prepared_input.texts = texts
+        prepared_input.language = language
+        prepared_input.split_type = split_type

28-29: 移除空的 __init__ 方法。

空的 __init__ 方法可以移除,Python 将使用默认构造函数。

应用以下差异:

 class BuildVectorIndexFlow(BaseFlow):
-    def __init__(self):
-        pass
-
     def prepare(self, prepared_input: WkFlowInput, texts):
hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/commit_to_hugegraph.py (2)

27-31: 简化冗余的数据初始化逻辑

Lines 27-29 手动处理 data_json 的逻辑与 BaseNode.node_init() 中的处理重复。根据 base_node.py 的实现,super().node_init() 已经会检查 self.wk_input.data_json 并调用 self.context.assign_from_json(),因此这里的手动处理是多余的。

此外,Line 27 的三元表达式 data_json = self.wk_input.data_json if self.wk_input.data_json else None 可以简化为 data_json = self.wk_input.data_json

建议移除 Lines 27-29,直接在 Line 30 创建操作符,让基类处理 data_json 的赋值。

应用以下改动:

 def node_init(self):
-    data_json = self.wk_input.data_json if self.wk_input.data_json else None
-    if data_json:
-        self.context.assign_from_json(data_json)
     self.commit_to_graph_op = Commit2Graph()
     return super().node_init()

30-30: 添加操作符初始化的错误处理

Commit2Graph() 构造函数会创建 PyHugeClient 并初始化 schema(根据 commit_to_hugegraph.py 操作符实现),这可能因为连接失败、配置错误等原因抛出异常。建议添加 try-except 块捕获异常并返回适当的 CStatus 错误状态。

参考 fetch_graph_data.py 中的类似模式:

def node_init(self):
    try:
        self.commit_to_graph_op = Commit2Graph()
    except Exception as e:
        log.error("Failed to initialize Commit2Graph operator: %s", e)
        return CStatus(-1, f"Failed to initialize Commit2Graph operator: {e}")
    return super().node_init()
hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py (1)

80-80: 考虑将硬编码的 limit 设为可配置参数

查询限制固定为 20 可能不适合所有使用场景。建议将其作为可选参数传入,或者从配置中读取。

def get_vertex_details(
    vertex_ids: List[str], 
    context: Dict[str, Any],
    limit: int = 20
) -> List[Dict[str, Any]]:
    ...
    gremlin_query = f"g.V({formatted_ids}).limit({limit})"
    ...
hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/fetch_graph_data.py (1)

28-29: 类型提示与基类不一致

Lines 28-29 将 contextwk_input 声明为 Optional[...],但 BaseNode 中这些属性的类型提示是非 Optional 的(默认值为 None)。这种不一致可能导致类型检查器的混淆。

建议与 BaseNode 保持一致,移除 Optional 包装,仅保留默认值 = None

应用以下改动:

 class FetchGraphDataNode(BaseNode):
     fetch_graph_data_op: FetchGraphData
-    context: Optional[WkFlowState] = None
-    wk_input: Optional[WkFlowInput] = None
+    context: WkFlowState = None
+    wk_input: WkFlowInput = None

同时移除不必要的 Optional 导入(如果它仅用于此处)。

hugegraph-llm/src/hugegraph_llm/flows/update_vid_embeddings.py (1)

24-31: prepare 方法未使用其参数

prepare() 方法接收 prepared_input 参数但未使用它,而 build_flow() 在 Line 29 创建了一个新的 WkFlowInput() 实例。这表明 prepare() 方法可能没有正确实现,或者这个流程不需要准备输入数据。

如果此流程确实不需要准备步骤,建议在 prepare() 中添加注释说明;否则应该修改 build_flow() 使用从 prepare() 传入的 prepared_input

选项 1:如果不需要准备步骤,添加文档说明:

def prepare(self, prepared_input: WkFlowInput):
    # This flow does not require input preparation
    return CStatus()

选项 2:如果需要使用 prepared_input,修改 build_flow:

-def build_flow(self):
+def build_flow(self, prepared_input: WkFlowInput = None):
     pipeline = GPipeline()
-    prepared_input = WkFlowInput()
+    if prepared_input is None:
+        prepared_input = WkFlowInput()
     # prepare input data
     self.prepare(prepared_input)
hugegraph-llm/src/hugegraph_llm/nodes/llm_node/keyword_extract_node.py (1)

48-53: 验证 query 参数的非空性

Line 49 将 self.wk_input.query 传递给 KeywordExtract 构造函数。根据操作符实现,如果 text 参数为 None,操作符会在 run() 方法中从 context 获取 "query"。然而,如果两者都为 None,会触发断言失败。

建议在 node_init() 中添加验证,确保至少一个来源可以提供 query。

         extract_template = self.wk_input.keywords_extract_prompt
 
+        # Validate that query will be available
+        if self.wk_input.query is None:
+            log.warning("Query not provided in wk_input, will be read from context during execution")
+        
         self.operator = KeywordExtract(
             text=self.wk_input.query,
             max_keywords=max_keywords,
             language=language,
             extract_template=extract_template,
         )
hugegraph-llm/src/hugegraph_llm/flows/build_example_index.py (2)

30-31: 移除空的 init 方法

这个空的 __init__ 方法没有实际作用,Python 会自动使用父类的 __init__。建议删除以减少代码冗余。

 class BuildExampleIndexFlow(BaseFlow):
-    def __init__(self):
-        pass
-
     def prepare(
         self, prepared_input: WkFlowInput, examples: Optional[List[Dict[str, str]]]
     ):

33-37: prepare 方法应返回 CStatus 以保持一致性

根据 BaseFlow 抽象基类和其他流程实现(如 UpdateVidEmbeddingsFlow),prepare() 方法应该返回 CStatus() 对象以表示准备阶段的状态。当前实现隐式返回 None,这与其他流程不一致。

应用以下改动:

+from PyCGraph import CStatus
+
 def prepare(
     self, prepared_input: WkFlowInput, examples: Optional[List[Dict[str, str]]]
 ):
     prepared_input.examples = examples
-    return
+    return CStatus()
hugegraph-llm/src/hugegraph_llm/nodes/llm_node/extract_info.py (1)

34-34: 添加 LLM 初始化的错误处理

get_chat_llm() 调用可能因为配置错误、网络问题或其他原因失败。建议添加 try-except 块来捕获潜在异常,并返回适当的错误状态。

参考其他节点(如 FetchGraphDataNode)的错误处理模式。

 def node_init(self):
-    llm = get_chat_llm(llm_settings)
+    try:
+        llm = get_chat_llm(llm_settings)
+    except Exception as e:
+        from hugegraph_llm.utils.log import log
+        log.error("Failed to initialize LLM: %s", e)
+        return CStatus(-1, f"Failed to initialize LLM: {e}")
+        
     if self.wk_input.example_prompt is None:
         return CStatus(-1, "Error occurs when prepare for workflow input")
hugegraph-llm/src/hugegraph_llm/nodes/llm_node/text2gremlin.py (1)

69-71: 避免重复/夸大的 call_count 统计

此处手动 +1,而 GremlinGenerateSynthesize.run() 内部会再 +2,总计 +3,容易造成观测混乱。建议完全交由 operator 维护,删除本处自增。

-        # increase call count for observability
-        prev = data_json.get("call_count", 0) or 0
-        data_json["call_count"] = prev + 1
hugegraph-llm/src/hugegraph_llm/nodes/llm_node/schema_build.py (1)

32-35: 补充返回类型注解,保持与 BaseNode 接口一致

node_init/operator_schedule 建议声明 -> CStatus/-> Dict[str, Any],利于 mypy 检查与 IDE 辅助。可保持实现不变。

hugegraph-llm/src/hugegraph_llm/nodes/index_node/semantic_id_query_node.py (1)

42-61: semantic_by 做输入校验并补充类型注解

当前未限制取值,SemanticIdQuery 期望 Literal["query","keywords"]。建议在赋值后校验非法值回退为默认或直接报错;同时为 node_init/operator_schedule 增加返回类型注解,便于 mypy。

示例(可选):

  • if by not in {"query","keywords"}: by = "keywords"
hugegraph-llm/src/hugegraph_llm/flows/import_graph_data.py (2)

31-46: prepare 的错误信息清晰,建议补充类型注解与 schema 形态约束说明

  • prepare/build_flow/post_deal 增加类型注解以配合 mypy。
  • schema 既支持图名也支持 JSON 字符串,建议在 docstring 明确;或在此处做轻量校验(避免后续 SchemaNode 报错再回溯)。

64-67: Gradio 依赖可选化以便非 UI 场景运行

gr.Info 在无 Gradio 环境会引入硬依赖。建议:

  • 用日志为主,gr.Info 尝试调用(捕获 ImportError/AttributeError)。
  • 或将提示下沉到调用侧 UI 层。
hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/gremlin_execute.py (2)

37-41: 补充类型注解以通过 mypy 并统一接口

-class GremlinExecuteNode(BaseNode):
-    def node_init(self):
-        return CStatus()
+class GremlinExecuteNode(BaseNode):
+    def node_init(self) -> CStatus:
+        return CStatus()

同理为 operator_schedule 增加返回类型注解:

-    def operator_schedule(self, data_json: Dict[str, Any]):
+    def operator_schedule(self, data_json: Dict[str, Any]) -> Dict[str, Any]:

26-34: limit 追加逻辑可保留,建议仅微调健壮性(可选)

  • query = query.strip() 以避免尾随空格导致拼接异常。
  • 当前已对大小写不敏感,覆盖 g.V()/g.E() 常见场景,保持即可。
hugegraph-llm/src/hugegraph_llm/nodes/index_node/gremlin_example_index_query.py (1)

33-44: 保留 BaseNode.node_init 的基础初始化

BaseNode.node_init() 会把 wk_input.data_json 合并进上下文并清理输入,如果这里直接返回 CStatus(),当上游节点通过 data_json 传状态时会被悄然跳过,导致 context 缺字段甚至影响后续节点。建议在完成自定义初始化后调用基类逻辑。

-        return CStatus()
+        return super().node_init()
hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py (1)

30-45: 考虑参数化 split_type 以提高灵活性。

当前 split_type 硬编码为 "document"(第 42 行)。如果未来需要支持不同的分割策略,建议将其作为参数传入。

hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py (1)

90-117: 考虑简化语言检测逻辑。

第 96-100 行的嵌套 getattr 调用可以简化为:

language = getattr(getattr(prompt, "llm_settings", None), "language", None) or getattr(prompt, "language", "EN")

或者提取为单独的辅助函数以提高可读性。

hugegraph-llm/src/hugegraph_llm/nodes/llm_node/answer_synthesize_node.py (1)

29-52: 考虑将 CStatus 导入移至文件顶部。

第 50 行在异常处理块中导入 CStatus。虽然功能正确,但建议将其与第 18 行的其他导入一起放在文件顶部,以提高代码可读性和一致性。

应用此差异:

 from typing import Dict, Any
 from hugegraph_llm.nodes.base_node import BaseNode
 from hugegraph_llm.operators.llm_op.answer_synthesize import AnswerSynthesize
 from hugegraph_llm.utils.log import log
+from PyCGraph import CStatus

并删除第 50 行的导入语句。

hugegraph-llm/src/hugegraph_llm/nodes/index_node/vector_query_node.py (1)

31-48: 考虑将 CStatus 导入移至文件顶部。

answer_synthesize_node.py 类似,第 46 行在异常处理块中导入 CStatus。建议将其移至文件顶部与其他导入一起,以保持一致性。

应用此差异:

 from typing import Dict, Any
 from hugegraph_llm.config import llm_settings
 from hugegraph_llm.nodes.base_node import BaseNode
 from hugegraph_llm.operators.index_op.vector_index_query import VectorIndexQuery
 from hugegraph_llm.models.embeddings.init_embedding import get_embedding
 from hugegraph_llm.utils.log import log
+from PyCGraph import CStatus

并删除第 46 行的导入语句。

hugegraph-llm/src/hugegraph_llm/flows/text2gremlin.py (1)

32-66: 考虑将魔法数字提取为常量。

第 44 行的 10(最大 example_num)和第 59 行的 5(最大 requested_outputs 长度)可以提取为类常量或模块级常量,以提高可维护性。

例如:

class Text2GremlinFlow(BaseFlow):
    MAX_EXAMPLE_NUM = 10
    MAX_REQUESTED_OUTPUTS = 5
    
    def prepare(self, ...):
        example_num = max(0, min(self.MAX_EXAMPLE_NUM, example_num))
        ...
        if len(req) > self.MAX_REQUESTED_OUTPUTS:
            req = req[:self.MAX_REQUESTED_OUTPUTS]
hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/schema.py (2)

34-47: 考虑改进错误消息的描述性。

第 47 行的错误消息 "No input data / invalid schema type" 可以更具体,例如指出期望的输入类型(from_hugegraph、from_user_defined 或 from_extraction)。

建议修改为:

raise ValueError(
    "Schema source must be specified: use from_hugegraph, from_user_defined, or from_extraction"
)

49-63: JSON 检测方式可以更加健壮。

第 53 行使用 startswith("{") 判断是否为 JSON,这种启发式方法在某些边缘情况下可能不可靠(例如,带前导空格的 JSON 或以图名称开头恰好是 { 字符的情况)。

可以考虑先尝试 JSON 解析,失败后再假定为图名称:

def node_init(self):
    if self.wk_input.schema is None:
        return CStatus(-1, "Schema message is required in SchemaNode")
    self.schema = self.wk_input.schema.strip()
    
    # Try JSON parsing first
    try:
        schema = json.loads(self.schema)
        self.check_schema = self._import_schema(from_user_defined=schema)
    except json.JSONDecodeError:
        # Not JSON, assume it's a graph name
        log.info("Get schema '%s' from graphdb.", self.schema)
        self.schema_manager = self._import_schema(from_hugegraph=self.schema)
    
    return super().node_init()
hugegraph-llm/src/hugegraph_llm/flows/rag_flow_graph_vector.py (1)

73-85: 避免使用 or 作为默认值回退,保留 0/0.0 等合法配置

or 会将 0/0.0 判为 falsy 并错误地回退到默认值。建议使用显式 None 判断。
[建议修改]

-        prepared_input.max_graph_items = (
-            max_graph_items or huge_settings.max_graph_items
-        )
+        prepared_input.max_graph_items = (
+            max_graph_items
+            if max_graph_items is not None
+            else huge_settings.max_graph_items
+        )
-        prepared_input.topk_return_results = (
-            topk_return_results or huge_settings.topk_return_results
-        )
+        prepared_input.topk_return_results = (
+            topk_return_results
+            if topk_return_results is not None
+            else huge_settings.topk_return_results
+        )
-        prepared_input.topk_per_keyword = (
-            topk_per_keyword or huge_settings.topk_per_keyword
-        )
+        prepared_input.topk_per_keyword = (
+            topk_per_keyword
+            if topk_per_keyword is not None
+            else huge_settings.topk_per_keyword
+        )
-        prepared_input.vector_dis_threshold = (
-            vector_dis_threshold or huge_settings.vector_dis_threshold
-        )
+        prepared_input.vector_dis_threshold = (
+            vector_dis_threshold
+            if vector_dis_threshold is not None
+            else huge_settings.vector_dis_threshold
+        )
@@
-        prepared_input.data_json = {
+        prepared_input.data_json = {
             "query": query,
             "vector_search": vector_search,
             "graph_search": graph_search,
-            "max_graph_items": max_graph_items or huge_settings.max_graph_items,
+            "max_graph_items": (
+                max_graph_items
+                if max_graph_items is not None
+                else huge_settings.max_graph_items
+            ),
         }

(依据项目编码规范)

Also applies to: 79-83, 98-99

hugegraph-llm/src/hugegraph_llm/flows/scheduler.py (2)

127-131: 复用 pipeline 时建议先重置 wkflow_input,避免残留状态干扰

当从管理器取到已有 pipeline 时,直接在原有 wkflow_input 上覆写可能留下未被新参数覆盖的旧值。建议在 prepare 前对输入做一次 reset。
[建议修改]

-                prepared_input = pipeline.getGParamWithNoEmpty("wkflow_input")
+                prepared_input = pipeline.getGParamWithNoEmpty("wkflow_input")
+                # 可选:若可用则重置,避免脏状态
+                try:
+                    from PyCGraph import CStatus  # 局部导入避免模块级依赖
+                    prepared_input.reset(CStatus())
+                except Exception:
+                    pass
                 flow.prepare(prepared_input, *args, **kwargs)

(依据项目编码规范)


41-43: 未使用的配置字段 max_pipeline

max_pipeline 目前仅保存未使用。建议实现容量控制或移除该字段以避免误导。

Also applies to: 97-101

hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py (1)

70-85: 确认多选模式下的优先级是否符合预期

当前当 raw_answer=True 且同时勾选 graph/vector 模式时,会优先选择组合/单一检索流,而不会返回纯 raw。请确认这是否为预期交互;若希望 raw 优先生效,需调整分支优先级。

hugegraph-llm/src/hugegraph_llm/flows/rag_flow_graph_only.py (2)

136-144: 命名可读性建议:避免 schema_node 变量名指向 Condition

schema_node = VectorOnlyCondition(...) 容易与 only_schema_node = SchemaNode() 混淆。建议重命名为 schema_condition 以提高清晰度。


104-106: 同样避免 or 回退丢失 0/0.0 合法值

建议与 graph+vector Flow 保持一致,使用显式 None 判断。
[建议修改]

-        prepared_input.vector_dis_threshold = (
-            vector_dis_threshold or huge_settings.vector_dis_threshold
-        )
+        prepared_input.vector_dis_threshold = (
+            vector_dis_threshold
+            if vector_dis_threshold is not None
+            else huge_settings.vector_dis_threshold
+        )
@@
-        prepared_input.data_json = {
+        prepared_input.data_json = {
             "query": query,
             "vector_search": vector_search,
             "graph_search": graph_search,
-            "max_graph_items": max_graph_items or huge_settings.max_graph_items,
+            "max_graph_items": (
+                max_graph_items
+                if max_graph_items is not None
+                else huge_settings.max_graph_items
+            ),
         }

(依据项目编码规范)

Also applies to: 112-116


[tool.uv.sources]
hugegraph-python-client = { workspace = true }
pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "main", marker = "sys_platform == 'linux'" }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

请固定 PyCGraph 的 Git 引用到稳定版本。

目前 pycgraph 的 UV 源直接跟踪上游 main 分支(rev = "main"),这会导致:

  • 构建结果随上游提交波动且不可复现;
  • 一旦上游提交破坏兼容性或删除历史,安装就会失败;
  • 难以完成第三方合规审计(无法对应具体源码快照)。

建议改为引用已发布的 tag 或明确的提交哈希,并在 dependencies 中同步标注相应版本。例如:

-pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "main", marker = "sys_platform == 'linux'" }
+pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "v3.1.2", marker = "sys_platform == 'linux'" }

或使用 PyPI 上的正式版本(如 pycgraph==3.1.2)以保持可重复、可追溯的发布流程。

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "main", marker = "sys_platform == 'linux'" }
pycgraph = { git = "https://github.com/ChunelFeng/CGraph.git", subdirectory = "python", rev = "v3.1.2", marker = "sys_platform == 'linux'" }
🤖 Prompt for AI Agents
In hugegraph-llm/pyproject.toml around line 89, pycgraph currently pins the git
dependency to rev = "main", which makes builds non-reproducible and brittle;
change the git reference to a specific released tag or a commit SHA (e.g., rev =
"vX.Y.Z" or rev = "abcdef1234") or switch to the official PyPI package (e.g.,
pycgraph==X.Y.Z) in dependencies, and ensure the marker and subdirectory remain
correct so the project uses a stable, traceable snapshot.

Comment on lines 34 to 39
raise generate_response(
RAGResponse(
status_code=status.HTTP_403_FORBIDDEN, # pylint: disable=E0702
message="Invalid admin_token",
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

修复类型错误:不能抛出字典

代码尝试 raise 一个字典对象,这在 Python 中是无效的。根据相关代码片段(rag_exceptions.py),generate_response() 返回一个字典而非异常对象。

这会导致运行时错误:TypeError: exceptions must derive from BaseException

应用此修复:

-        raise generate_response(
-            RAGResponse(
-                status_code=status.HTTP_403_FORBIDDEN,  # pylint: disable=E0702
-                message="Invalid admin_token",
-            )
-        )
+        return generate_response(
+            RAGResponse(
+                status_code=status.HTTP_403_FORBIDDEN,
+                message="Invalid admin_token",
+            )
+        )
🧰 Tools
🪛 GitHub Actions: Pylint

[error] 34-34: pylint: E0702: Raising dict while only classes or instances are allowed (raising-bad-type)

🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/api/admin_api.py around lines 34 to 39, the
code calls raise generate_response(...) but generate_response() returns a dict,
causing TypeError because you cannot raise a non-BaseException; replace the
raise with a proper return of the generated response (i.e., return
generate_response(...)) or, if you intend to raise an exception, raise a proper
FastAPI HTTPException with the same status_code and message
(HTTPException(status_code=..., detail=...)) so the endpoint returns a valid
response without raising a dict.

Comment on lines 25 to 27
enable_login: Optional[str] = "False"
user_token: Optional[str] = "4321"
admin_token: Optional[str] = "xxxx"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

注意:默认令牌值存在安全隐患

默认的 user_tokenadmin_token 使用弱占位符值("4321"、"xxxx"),如果用户未在生产环境中覆盖这些值,可能导致安全风险。

此外,enable_login 使用字符串 "False" 而非布尔值 False,这可能导致类型混淆(非空字符串在布尔上下文中为 True)。

建议:

  1. enable_login 类型改为 bool 并默认为 False
  2. 在文档中明确说明必须在生产环境中设置强令牌
  3. 考虑在初始化时检查是否使用默认值并发出警告
enable_login: bool = False  # 使用布尔类型
user_token: Optional[str] = None  # 强制用户设置
admin_token: Optional[str] = None  # 强制用户设置

I need to verify the security concerns and code quality issues raised in this review. Let me examine the codebase.


---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the admin_config.py file to verify the concerns
cat hugegraph-llm/src/hugegraph_llm/config/admin_config.py | head -50

Length of output: 1024


🏁 Script executed:

#!/bin/bash
# Search for usages of enable_login, user_token, and admin_token throughout the codebase
echo "=== enable_login usages ==="
rg -n "enable_login" --type py -C 2

echo -e "\n=== user_token usages ==="
rg -n "user_token" --type py -C 2

echo -e "\n=== admin_token usages ==="
rg -n "admin_token" --type py -C 2

Length of output: 4713


🏁 Script executed:

#!/bin/bash
# Check for validation or initialization logic related to these fields
echo "=== Searching for validation checks ==="
rg -n "admin_token|user_token|enable_login" --type py | grep -i "valid\|check\|warn\|assert"

echo -e "\n=== Check initialization patterns ==="
rg -n "def.*init\|class.*Config" hugegraph-llm/src/hugegraph_llm/config/ --type py -A 3

Length of output: 181


🏁 Script executed:

#!/bin/bash
# Check the entire config directory structure and related files
find hugegraph-llm/src/hugegraph_llm/config -type f -name "*.py" | head -20

Length of output: 511


🏁 Script executed:

#!/bin/bash
# 查看 BaseConfig 的实现
sed -n '1,200p' hugegraph-llm/src/hugegraph_llm/config/models/base_config.py

Length of output: 5972


修复安全隐患:使用强类型和强制配置令牌

验证确认了原始审查的安全隐患。调查显示:

  1. enable_login 类型错误:在 app.py:167 中使用 enable_login.lower() == "true" 进行比较,虽然当前代码通过显式比较避免了问题,但非空字符串 "False" 在布尔上下文中仍为真值,容易引发隐藏的逻辑错误。建议改为 bool 类型。

  2. 弱令牌默认值user_token ("4321") 和 admin_token ("xxxx") 的弱占位符在以下位置被直接用于身份验证比较:

    • app.py:52credentials.credentials != correct_token
    • admin_api.py:33admin_token != req.admin_token
    • admin_block.py:77password == admin_token

    若用户未在生产环境中覆盖这些值,会导致严重安全风险。

  3. 缺少验证和警告BaseConfig 初始化时未检查是否使用了默认令牌值,也未发出警告。

建议改为:

enable_login: bool = False  # 改为布尔类型
user_token: Optional[str] = None  # 无默认值,强制用户设置
admin_token: Optional[str] = None  # 无默认值,强制用户设置

并在 BaseConfig.__init__() 中添加验证逻辑,检查当 enable_login=True 时令牌是否已被设置。

🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/config/admin_config.py around lines 25-27,
replace the current string-typed default values with strong types and no weak
placeholders: change enable_login to a bool default (False) and set user_token
and admin_token to None (no default), and then update BaseConfig.__init__ to
validate: when enable_login is True, assert or raise a clear configuration error
(or log and exit) if user_token or admin_token is None or empty; also emit a
warning during initialization if enable_login is True but tokens are missing to
force explicit configuration in production.

Comment on lines +357 to 362
graph_loading_bt.click(import_graph_data, inputs=[out, input_schema], outputs=[out]).then(
update_vid_embedding
).then(
store_prompt,
inputs=[input_text, input_schema, info_extract_template],
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

验证 update_vid_embedding 的输出处理。

第 358 行调用 update_vid_embedding 时未指定 outputs 参数。请确认这是否符合预期,或者是否需要将其结果传递给下一个处理器。

根据第 342 行的用法,update_vid_embedding 确实返回 outputs=out,因此这里可能需要一致的处理。

可以运行以下脚本验证 update_vid_embedding 的使用情况:


🏁 Script executed:

#!/bin/bash
# 查找 update_vid_embedding 的所有调用位置
rg -n "update_vid_embedding" --type=py -A 2 -B 2

Length of output: 6916


为 update_vid_embedding 链式调用指定 outputs

vector_graph_block.py 第 357-362 行的 .then(update_vid_embedding) 未指定 outputs,且该函数无入参却会接收上一步输出,可能引发 TypeError 或导致 UI 不更新。改为 .then(update_vid_embedding, outputs=[out]) 以确保结果正确传递。

🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py around
lines 357 to 362, the chained call uses .then(update_vid_embedding) without
specifying outputs; update_vid_embedding receives the previous step's output
despite having no params which can cause a TypeError or prevent UI updates.
Change the chain to pass the output explicitly by using
.then(update_vid_embedding, outputs=[out]) so the function result is routed to
the correct output component and no unexpected value is injected as an argument.

Comment on lines +61 to +71
def post_deal(self, pipeline=None):
state_json = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
if "schema" not in state_json:
return ""
res = state_json["schema"]
try:
formatted_schema = json.dumps(res, ensure_ascii=False, indent=2)
return formatted_schema
except (TypeError, ValueError) as e:
log.error("Failed to format schema: %s", e)
return str(res)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

返回 schema 时避免二次 JSON 编码

SchemaBuildNode 返回的 schema 很可能已经是字符串(例如 LLM 生成的文本或错误信息)。此处直接 json.dumps 会把它再次包上一层引号,调用方拿到的就是被转义过的 "\"...\"",等同于破坏原始输出。建议仅在值为 dict/list 时做格式化,其他情况直接返回字符串。

-        try:
-            formatted_schema = json.dumps(res, ensure_ascii=False, indent=2)
-            return formatted_schema
-        except (TypeError, ValueError) as e:
-            log.error("Failed to format schema: %s", e)
-            return str(res)
+        if isinstance(res, (dict, list)):
+            try:
+                return json.dumps(res, ensure_ascii=False, indent=2)
+            except (TypeError, ValueError) as e:
+                log.error("Failed to format schema: %s", e)
+                return str(res)
+        return str(res)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def post_deal(self, pipeline=None):
state_json = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
if "schema" not in state_json:
return ""
res = state_json["schema"]
try:
formatted_schema = json.dumps(res, ensure_ascii=False, indent=2)
return formatted_schema
except (TypeError, ValueError) as e:
log.error("Failed to format schema: %s", e)
return str(res)
def post_deal(self, pipeline=None):
state_json = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
if "schema" not in state_json:
return ""
res = state_json["schema"]
if isinstance(res, (dict, list)):
try:
return json.dumps(res, ensure_ascii=False, indent=2)
except (TypeError, ValueError) as e:
log.error("Failed to format schema: %s", e)
return str(res)
return str(res)
🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/flows/build_schema.py around lines 61 to 71,
the code always calls json.dumps on state_json["schema"], which double-encodes
when the schema is already a string; change the logic to check the type first
and only call json.dumps when the value is a dict or list (i.e., structured
JSON), otherwise return the value as a string directly; keep the existing
try/except around json.dumps and fall back to str(res) on error, preserving
ensure_ascii=False and indent=2 for formatted output.

Comment on lines +85 to 88
def __init__(self, llm: BaseLLM, example_prompt: str = prompt.extract_graph_prompt) -> None:
self.llm = llm
self.example_prompt = example_prompt
self.NECESSARY_ITEM_KEYS = {"label", "type", "properties"} # pylint: disable=invalid-name
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

默认参数仍然不会响应配置热更新

prompt.extract_graph_prompt 在函数定义阶段就会被求值成字符串,后续即使配置或 prompt.extract_graph_prompt 动态更新,新创建的 PropertyGraphExtract 仍旧拿到旧的默认文案,和代码注释里期望的行为相违。

请把默认值延迟到 __init__ 执行时再读取,并允许外部显式传入覆盖。例如:

-from typing import List, Any, Dict
+from typing import Any, Dict, List, Optional
@@
-    def __init__(self, llm: BaseLLM, example_prompt: str = prompt.extract_graph_prompt) -> None:
-        self.llm = llm
-        self.example_prompt = example_prompt
+    def __init__(self, llm: BaseLLM, example_prompt: Optional[str] = None) -> None:
+        self.llm = llm
+        self.example_prompt = (
+            prompt.extract_graph_prompt if example_prompt is None else example_prompt
+        )

这样任意时刻加载的新配置都能被新的实例正确感知。

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, llm: BaseLLM, example_prompt: str = prompt.extract_graph_prompt) -> None:
self.llm = llm
self.example_prompt = example_prompt
self.NECESSARY_ITEM_KEYS = {"label", "type", "properties"} # pylint: disable=invalid-name
def __init__(self, llm: BaseLLM, example_prompt: Optional[str] = None) -> None:
self.llm = llm
self.example_prompt = (
prompt.extract_graph_prompt if example_prompt is None else example_prompt
)
self.NECESSARY_ITEM_KEYS = {"label", "type", "properties"} # pylint: disable=invalid-name
🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/operators/llm_op/property_graph_extract.py
around lines 85 to 88, the default prompt is evaluated at function-definition
time which prevents runtime config/hot updates from taking effect; change the
signature to accept example_prompt: Optional[str] = None (import typing.Optional
if needed) and inside __init__ set self.example_prompt = example_prompt if
example_prompt is not None else prompt.extract_graph_prompt so the prompt is
looked up at instance-creation time and external callers can override it.

# See the License for the specific language governing permissions and
# limitations under the License.

from PyCGraph import GParam, CStatus
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

修复 PyCGraph 导入(兼容 pycgraph/PyCGraph

与 nodes/base_node 同样问题,建议采用回退导入:

-from PyCGraph import GParam, CStatus
+try:
+    from PyCGraph import GParam, CStatus  # type: ignore[import]
+except Exception:  # pragma: no cover
+    from pycgraph import GParam, CStatus  # type: ignore[import]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from PyCGraph import GParam, CStatus
try:
from PyCGraph import GParam, CStatus # type: ignore[import]
except Exception: # pragma: no cover
from pycgraph import GParam, CStatus # type: ignore[import]
🧰 Tools
🪛 GitHub Actions: Pylint

[error] 16-16: pylint: E0611: No name 'GParam' in module 'PyCGraph' (no-name-in-module)

🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/state/ai_state.py around line 16, the import
from PyCGraph may fail depending on package name casing; replace the direct
import with a fallback import that tries from pycgraph first then PyCGraph (or
vice versa) inside a try/except, assigning GParam and CStatus from whichever
import succeeds, and raise a clear ImportError if neither is available; keep
names unchanged and ensure the import block is concise and lints cleanly.

Comment on lines 111 to 113
import uuid

task_id = f"task_{str(uuid.uuid4())[:8]}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

任务 ID 截断导致碰撞风险增加

使用 str(uuid.uuid4())[:8] 只保留 8 个字符,大幅降低了 UUID 的唯一性保证。8 个十六进制字符仅提供约 43 亿种可能性,在高并发场景下碰撞概率显著增加,可能导致任务追踪错误。

建议使用完整的 UUID 或至少保留更长的前缀(如 16-24 字符)以确保任务 ID 的唯一性。

应用此修改以使用完整 UUID:

-    task_id = f"task_{str(uuid.uuid4())[:8]}"
+    task_id = f"task_{uuid.uuid4()}"

或者如果需要更短的 ID,至少使用 16 个字符:

-    task_id = f"task_{str(uuid.uuid4())[:8]}"
+    task_id = f"task_{str(uuid.uuid4()).replace('-', '')[:16]}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import uuid
task_id = f"task_{str(uuid.uuid4())[:8]}"
import uuid
task_id = f"task_{uuid.uuid4()}"
🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/utils/decorators.py around lines 111 to 113,
the task_id is generated by truncating a UUID to 8 characters which increases
collision risk; change generation to use the full UUID string (e.g., task_id =
f"task_{uuid.uuid4()}" or task_id = f"task_{str(uuid.uuid4())}") or if a shorter
form is required use a longer prefix (at least 16 characters) like task_id =
f"task_{str(uuid.uuid4())[:16]}", ensuring uuid is imported and the surrounding
code that depends on task_id length still functions.

Comment on lines 79 to 81
formatted_ids = ", ".join(f"'{vid}'" for vid in vertex_ids)
gremlin_query = f"g.V({formatted_ids}).limit(20)"
result = client.gremlin().exec(gremlin=gremlin_query)["data"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Gremlin 查询注入风险和缺少错误处理

Line 79 直接使用字符串格式化构建 Gremlin 查询,存在注入风险。虽然 vertex_ids 来自内部上下文,但如果这些 ID 包含特殊字符(如单引号),可能导致查询语法错误或潜在的注入攻击。

此外,Line 81 的 gremlin().exec() 调用没有错误处理。如果查询失败(例如,由于网络问题或无效的 vertex ID),会导致未捕获的异常。

建议的改进:

  1. 添加 vertex ID 验证和转义
  2. 添加异常处理
  3. 考虑将 limit 20 设为可配置参数
 def get_vertex_details(
     vertex_ids: List[str], context: Dict[str, Any]
 ) -> List[Dict[str, Any]]:
+    from hugegraph_llm.utils.log import log
+    
     if isinstance(context.get("graph_client"), PyHugeClient):
         client = context["graph_client"]
     else:
         url = context.get("url") or "http://localhost:8080"
         graph = context.get("graph") or "hugegraph"
         user = context.get("user") or "admin"
         pwd = context.get("pwd") or "admin"
         gs = context.get("graphspace") or None
         client = PyHugeClient(url, graph, user, pwd, gs)
     if not vertex_ids:
         return []
 
+    # Validate and escape vertex IDs to prevent injection
+    sanitized_ids = []
+    for vid in vertex_ids:
+        # Basic validation - adjust based on your ID format
+        if isinstance(vid, str) and vid:
+            # Escape single quotes
+            sanitized_ids.append(vid.replace("'", "\\'"))
+        else:
+            log.warning(f"Invalid vertex ID skipped: {vid}")
+    
+    if not sanitized_ids:
+        return []
+    
-    formatted_ids = ", ".join(f"'{vid}'" for vid in vertex_ids)
+    formatted_ids = ", ".join(f"'{vid}'" for vid in sanitized_ids)
     gremlin_query = f"g.V({formatted_ids}).limit(20)"
-    result = client.gremlin().exec(gremlin=gremlin_query)["data"]
+    try:
+        result = client.gremlin().exec(gremlin=gremlin_query)["data"]
+    except Exception as e:
+        log.error(f"Failed to fetch vertex details: {e}")
+        return []
     return result
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
formatted_ids = ", ".join(f"'{vid}'" for vid in vertex_ids)
gremlin_query = f"g.V({formatted_ids}).limit(20)"
result = client.gremlin().exec(gremlin=gremlin_query)["data"]
def get_vertex_details(
vertex_ids: List[str], context: Dict[str, Any]
) -> List[Dict[str, Any]]:
from hugegraph_llm.utils.log import log
if isinstance(context.get("graph_client"), PyHugeClient):
client = context["graph_client"]
else:
url = context.get("url") or "http://localhost:8080"
graph = context.get("graph") or "hugegraph"
user = context.get("user") or "admin"
pwd = context.get("pwd") or "admin"
gs = context.get("graphspace") or None
client = PyHugeClient(url, graph, user, pwd, gs)
if not vertex_ids:
return []
# Validate and escape vertex IDs to prevent injection
sanitized_ids = []
for vid in vertex_ids:
# Basic validation - adjust based on your ID format
if isinstance(vid, str) and vid:
# Escape single quotes
sanitized_ids.append(vid.replace("'", "\\'"))
else:
log.warning(f"Invalid vertex ID skipped: {vid}")
if not sanitized_ids:
return []
formatted_ids = ", ".join(f"'{vid}'" for vid in sanitized_ids)
gremlin_query = f"g.V({formatted_ids}).limit(20)"
try:
result = client.gremlin().exec(gremlin=gremlin_query)["data"]
except Exception as e:
log.error(f"Failed to fetch vertex details: {e}")
return []
return result
🤖 Prompt for AI Agents
In hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py around lines 79 to
81, the Gremlin string is built by raw formatting which risks injection or
syntax errors and the exec call lacks exception handling; validate/whitelist
vertex_ids (e.g. allow only alphanumeric, dash, underscore or explicitly escape
single quotes), build the gremlin call using safe parameter binding if the
client supports it or otherwise escape/quote each id before joining, make the
limit (20) a configurable argument with a default, and wrap the
client.gremlin().exec(...) call in a try/except to log the error and return a
safe fallback (or re-raise a wrapped exception) so failures are handled
gracefully.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants