Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
643 changes: 643 additions & 0 deletions .vibedev/spec/hugegraph-llm/fixed_flow/design.md

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions .vibedev/spec/hugegraph-llm/fixed_flow/requirements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## 需求列表

### 核心框架设计

**核心**:Scheduler类中的schedule_flow设计与实现

**验收标准**:
1.1. 核心框架尽可能复用资源,避免资源的重复分配和释放
1.2. 应该保证正常的请求处理指标要求
1.3. 应该能够配置框架整体使用的资源上限

### 固定工作流移植

**核心**:移植Web Demo中的所有用例
2.1. 保证使用核心框架移植后的工作流的程序行为和移植之前保持一致即可

**已完成的工作流类型**:
- build_vector_index: 向量索引构建工作流
- graph_extract: 图抽取工作流
- import_graph_data: 图数据导入工作流
- update_vid_embeddings: 向量更新工作流
- get_graph_index_info: 图索引信息获取工作流
- build_schema: 模式构建工作流
- prompt_generate: 提示词生成工作流
36 changes: 36 additions & 0 deletions .vibedev/spec/hugegraph-llm/fixed_flow/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# HugeGraph-ai 固定工作流框架设计和用例移植

本文档将 HugeGraph 固定工作流框架设计和用例移植转换为一系列可执行的编码任务。

## 1. schedule_flow设计与实现

- [x] **1.1 构建Scheduler框架1.0**
- 需要能够复用已经创建过的Pipeline(Pipeline Pooling)
- 使用CGraph(Graph-based engine)作为底层执行引擎
- 不同Node之间松耦合

- [ ] **1.2 优化Scheduler框架资源配置**
- 支持用户配置底层线程池参数
- 现有的workflow可能会根据输入有细小的变化,导致相同的用例得到不同的workflow,怎么解决这个问题呢?
- Node/Operator解耦,Node负责生命周期和上下文,Operator只关注业务逻辑
- Flow只负责组装Node,所有业务逻辑下沉到Node/Operator
- Scheduler支持多类型Flow注册,注册方式更灵活

- [ ] **1.3 优化Scheduler框架资源使用**
- 根据负载控制每个PipelineManager管理的Pipeline数量,实现动态扩缩容
- Node层支持参数区自动绑定和并发安全
- Operator只需实现run(data_json)方法,Node负责调度和结果写回

## 2. 固定工作流用例移植

- [x] **2.1 build_vector_index workflow移植**
- [x] **2.2 graph_extract workflow移植**
- [x] **2.3 import_graph_data workflow移植**
- 基于Node/Operator机制实现import_graph_data工作流
- [x] **2.4 update_vid_embeddings workflow移植**
- 基于Node/Operator机制实现update_vid_embeddings工作流
- [x] **2.5 get_graph_index_info workflow移植**
- [x] **2.6 build_schema workflow移植**
- 基于Node/Operator机制实现build_schema工作流
- [x] **2.7 prompt_generate workflow移植**
- 基于Node/Operator机制实现prompt_generate工作流
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from hugegraph_llm.config import huge_settings
from hugegraph_llm.config import prompt
from hugegraph_llm.config import resource_path
from hugegraph_llm.models.llms.init_llm import LLMs
from hugegraph_llm.operators.llm_op.prompt_generate import PromptGenerate
from hugegraph_llm.flows.scheduler import SchedulerSingleton
from hugegraph_llm.utils.graph_index_utils import (
get_graph_index_info,
clean_all_graph_index,
Expand Down Expand Up @@ -61,27 +60,21 @@ def store_prompt(doc, schema, example_prompt):

def generate_prompt_for_ui(source_text, scenario, example_name):
"""
Handles the UI logic for generating a new prompt. It calls the PromptGenerate operator.
Handles the UI logic for generating a new prompt using the new workflow architecture.
"""
if not all([source_text, scenario, example_name]):
gr.Warning(
"Please provide original text, expected scenario, and select an example!"
)
return gr.update()
try:
prompt_generator = PromptGenerate(llm=LLMs().get_chat_llm())
context = {
"source_text": source_text,
"scenario": scenario,
"example_name": example_name,
}
result_context = prompt_generator.run(context)
# Presents the result of generating prompt
generated_prompt = result_context.get(
"generated_extract_prompt", "Generation failed. Please check the logs."
# using new architecture
scheduler = SchedulerSingleton.get_instance()
result = scheduler.schedule_flow(
"prompt_generate", source_text, scenario, example_name
)
gr.Info("Prompt generated successfully!")
return generated_prompt
return result
except Exception as e:
log.error("Error generating Prompt: %s", e, exc_info=True)
raise gr.Error(f"Error generating Prompt: {e}") from e
Expand Down
71 changes: 71 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/build_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
from hugegraph_llm.nodes.llm_node.schema_build import SchemaBuildNode
from hugegraph_llm.utils.log import log

import json
from PyCGraph import GPipeline


class BuildSchemaFlow(BaseFlow):
def __init__(self):
pass

def prepare(
self,
prepared_input: WkFlowInput,
texts=None,
query_examples=None,
few_shot_schema=None,
):
prepared_input.texts = texts
# Optional fields packed into wk_input for SchemaBuildNode
# Keep raw values; node will parse if strings
prepared_input.query_examples = query_examples
prepared_input.few_shot_schema = few_shot_schema
return

def build_flow(self, texts=None, query_examples=None, few_shot_schema=None):
pipeline = GPipeline()
prepared_input = WkFlowInput()
self.prepare(
prepared_input,
texts=texts,
query_examples=query_examples,
few_shot_schema=few_shot_schema,
)

pipeline.createGParam(prepared_input, "wkflow_input")
pipeline.createGParam(WkFlowState(), "wkflow_state")

schema_build_node = SchemaBuildNode()
pipeline.registerGElement(schema_build_node, set(), "schema_build")

return pipeline

def post_deal(self, pipeline=None):
state_json = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
if "schema" not in state_json:
return ""
res = state_json["schema"]
try:
formatted_schema = json.dumps(res, ensure_ascii=False, indent=2)
return formatted_schema
except (TypeError, ValueError) as e:
log.error("Failed to format schema: %s", e)
return str(res)
4 changes: 2 additions & 2 deletions hugegraph-llm/src/hugegraph_llm/flows/build_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
# limitations under the License.

from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.nodes.document_node.chunk_split import ChunkSplitNode
from hugegraph_llm.nodes.index_node.build_vector_index import BuildVectorIndexNode
from hugegraph_llm.state.ai_state import WkFlowInput

import json
from PyCGraph import GPipeline

from hugegraph_llm.operators.document_op.chunk_split import ChunkSplitNode
from hugegraph_llm.operators.index_op.build_vector_index import BuildVectorIndexNode
from hugegraph_llm.state.ai_state import WkFlowState


Expand Down
68 changes: 68 additions & 0 deletions hugegraph-llm/src/hugegraph_llm/flows/get_graph_index_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os

from hugegraph_llm.config import huge_settings, llm_settings, resource_path
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.indices.vector_index import VectorIndex
from hugegraph_llm.models.embeddings.init_embedding import model_map
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
from hugegraph_llm.nodes.hugegraph_node.fetch_graph_data import FetchGraphDataNode
from PyCGraph import GPipeline
from hugegraph_llm.utils.embedding_utils import (
get_filename_prefix,
get_index_folder_name,
)


class GetGraphIndexInfoFlow(BaseFlow):
def __init__(self):
pass

def prepare(self, prepared_input: WkFlowInput, *args, **kwargs):
return

def build_flow(self, *args, **kwargs):
pipeline = GPipeline()
prepared_input = WkFlowInput()
self.prepare(prepared_input, *args, **kwargs)
pipeline.createGParam(prepared_input, "wkflow_input")
pipeline.createGParam(WkFlowState(), "wkflow_state")
fetch_node = FetchGraphDataNode()
pipeline.registerGElement(fetch_node, set(), "fetch_node")
return pipeline

def post_deal(self, pipeline=None):
graph_summary_info = pipeline.getGParamWithNoEmpty("wkflow_state").to_json()
folder_name = get_index_folder_name(
huge_settings.graph_name, huge_settings.graph_space
)
index_dir = str(os.path.join(resource_path, folder_name, "graph_vids"))
filename_prefix = get_filename_prefix(
llm_settings.embedding_type,
model_map.get(llm_settings.embedding_type, None),
)
try:
vector_index = VectorIndex.from_index_file(index_dir, filename_prefix)
except FileNotFoundError:
return json.dumps(graph_summary_info, ensure_ascii=False, indent=2)
graph_summary_info["vid_index"] = {
"embed_dim": vector_index.index.d,
"num_vectors": vector_index.index.ntotal,
"num_vids": len(vector_index.properties),
}
return json.dumps(graph_summary_info, ensure_ascii=False, indent=2)
58 changes: 6 additions & 52 deletions hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,17 @@
import json
from PyCGraph import GPipeline
from hugegraph_llm.flows.common import BaseFlow
from hugegraph_llm.nodes.document_node.chunk_split import ChunkSplitNode
from hugegraph_llm.nodes.hugegraph_node.schema import SchemaNode
from hugegraph_llm.nodes.llm_node.extract_info import ExtractNode
from hugegraph_llm.state.ai_state import WkFlowInput, WkFlowState
from hugegraph_llm.operators.common_op.check_schema import CheckSchemaNode
from hugegraph_llm.operators.document_op.chunk_split import ChunkSplitNode
from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManagerNode
from hugegraph_llm.operators.llm_op.info_extract import InfoExtractNode
from hugegraph_llm.operators.llm_op.property_graph_extract import (
PropertyGraphExtractNode,
)
from hugegraph_llm.utils.log import log


class GraphExtractFlow(BaseFlow):
def __init__(self):
pass

def _import_schema(
self,
from_hugegraph=None,
from_extraction=None,
from_user_defined=None,
):
if from_hugegraph:
return SchemaManagerNode()
elif from_user_defined:
return CheckSchemaNode()
elif from_extraction:
raise NotImplementedError("Not implemented yet")
else:
raise ValueError("No input data / invalid schema type")

def prepare(
self, prepared_input: WkFlowInput, schema, texts, example_prompt, extract_type
):
Expand All @@ -55,17 +36,7 @@ def prepare(
prepared_input.split_type = "document"
prepared_input.example_prompt = example_prompt
prepared_input.schema = schema
schema = schema.strip()
if schema.startswith("{"):
try:
schema = json.loads(schema)
prepared_input.schema = schema
except json.JSONDecodeError as exc:
log.error("Invalid JSON format in schema. Please check it again.")
raise ValueError("Invalid JSON format in schema.") from exc
else:
log.info("Get schema '%s' from graphdb.", schema)
prepared_input.graph_name = schema
prepared_input.extract_type = extract_type
return

def build_flow(self, schema, texts, example_prompt, extract_type):
Expand All @@ -76,27 +47,10 @@ def build_flow(self, schema, texts, example_prompt, extract_type):

pipeline.createGParam(prepared_input, "wkflow_input")
pipeline.createGParam(WkFlowState(), "wkflow_state")
schema = schema.strip()
schema_node = None
if schema.startswith("{"):
try:
schema = json.loads(schema)
schema_node = self._import_schema(from_user_defined=schema)
except json.JSONDecodeError as exc:
log.error("Invalid JSON format in schema. Please check it again.")
raise ValueError("Invalid JSON format in schema.") from exc
else:
log.info("Get schema '%s' from graphdb.", schema)
schema_node = self._import_schema(from_hugegraph=schema)
schema_node = SchemaNode()

chunk_split_node = ChunkSplitNode()
graph_extract_node = None
if extract_type == "triples":
graph_extract_node = InfoExtractNode()
elif extract_type == "property_graph":
graph_extract_node = PropertyGraphExtractNode()
else:
raise ValueError(f"Unsupported extract_type: {extract_type}")
graph_extract_node = ExtractNode()
pipeline.registerGElement(schema_node, set(), "schema_node")
pipeline.registerGElement(chunk_split_node, set(), "chunk_split")
pipeline.registerGElement(
Expand Down
Loading
Loading