Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ build/*
**/cache/**
*.pdf
*.csv
site/*
site/*
tool_cache/*
18 changes: 18 additions & 0 deletions extract_ths_tool_cache/compress.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

# 定义目标文件夹和输出文件名前缀
TARGET_DIR="tool_cache"
OUTPUT_PREFIX="extract_ths_tool_cache/tool_cache.tar.gz.part_"
SIZE_LIMIT="50m"

echo "正在压缩并切分 $TARGET_DIR ..."

# tar -c: 创建
# -z: gzip压缩
# -f -: 将结果输出到标准输出流
# split -b: 按大小切分
# -: 从标准输入流读取
tar -czf - "$TARGET_DIR" --exclude='.DS_Store' | split -b $SIZE_LIMIT - "$OUTPUT_PREFIX"

echo "压缩完成!生成的切分文件如下:"
ls -lh ${OUTPUT_PREFIX}*
19 changes: 19 additions & 0 deletions extract_ths_tool_cache/extract.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

# 定义切分文件的前缀
TARGET_DIR="tool_cache"
INPUT_PREFIX="extract_ths_tool_cache/tool_cache.tar.gz.part_"

# 检查是否有分卷文件
if ! ls ${INPUT_PREFIX}* 1> /dev/null 2>&1; then
echo "错误:未发现分卷文件 ${INPUT_PREFIX}*"
exit 1
fi

echo "正在合并并解压文件..."

# cat: 合并所有分卷
# tar -x: 解压
cat ${INPUT_PREFIX}* | tar -xzf -

echo "解压完成!文件夹 '$TARGET_DIR' 已还原。"
103 changes: 103 additions & 0 deletions extract_ths_tool_cache/merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import os
import json
import shutil

# 配置路径(Python 字符串不需要反斜杠转义空格)
SRC_ROOT_BASE = "/Users/tsc/研究工作/金融ASIO/代码/finance-mcp_3"
DST_ROOT_BASE = "/Users/tsc/研究工作/金融ASIO/代码/finance-mcp"

# 需要合并的根文件夹
TARGET_FOLDERS = ["tool_cache", "cache"]

def load_json_data(file_path):
"""加载 JSON 数据,支持列表或单个对象"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else [data]
except Exception as e:
print(f" [跳过] 无法解析 JSON: {file_path}. 错误: {e}")
return None

def save_json_data(file_path, data):
"""保存 JSON 数据"""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

def get_code(sample):
"""根据你提供的结构提取 code"""
try:
# 路径: sample -> tool_args -> code
return sample.get("tool_args", {}).get("code")
except:
return None

def merge_json_files(src_file, dst_file):
"""核心逻辑:合并两个文件中的 samples,按 code 去重"""
src_data = load_json_data(src_file)
dst_data = load_json_data(dst_file)

if src_data is None or dst_data is None:
return False # 读取出错,不执行合并

# 获取目标文件中已有的所有 code
existing_codes = {get_code(s) for s in dst_data if get_code(s) is not None}

initial_count = len(dst_data)
for sample in src_data:
code = get_code(sample)
if code and code not in existing_codes:
dst_data.append(sample)
existing_codes.add(code)

new_added = len(dst_data) - initial_count
if new_added > 0:
save_json_data(dst_file, dst_data)
print(f" [合并完成] {os.path.basename(dst_file)}: 新增了 {new_added} 条 code 样本")
else:
print(f" [无需合并] {os.path.basename(dst_file)}: 未发现新 code")
return True

def start_recursive_merge():
for target in TARGET_FOLDERS:
src_folder = os.path.join(SRC_ROOT_BASE, target)
dst_folder = os.path.join(DST_ROOT_BASE, target)

if not os.path.exists(src_folder):
print(f"源文件夹不存在,跳过: {src_folder}")
continue

print(f"\n>>> 正在扫描目录: {target}")

# 使用 os.walk 递归遍历所有子文件夹
for root, dirs, files in os.walk(src_folder):
# 计算当前子目录相对于源根目录的路径
rel_path = os.path.relpath(root, src_folder)
# 对应的目标子目录路径
target_dst_dir = os.path.join(dst_folder, rel_path)

# 1. 如果目标子目录不存在,直接创建
if not os.path.exists(target_dst_dir):
os.makedirs(target_dst_dir)
print(f"创建新目录: {target_dst_dir}")

# 2. 处理当前目录下的所有文件
for filename in files:
# 隐藏文件跳过 (如 .DS_Store)
if filename.startswith('.'): continue

src_file_path = os.path.join(root, filename)
dst_file_path = os.path.join(target_dst_dir, filename)

if not os.path.exists(dst_file_path):
# 如果目标位置没有这个文件,直接整体复制
shutil.copy2(src_file_path, dst_file_path)
print(f" [新文件] 已复制: {rel_path}/{filename}")
else:
# 如果目标位置有重名文件,执行深度合并
merge_json_files(src_file_path, dst_file_path)

if __name__ == "__main__":
print("开始深度合并任务...")
start_recursive_merge()
print("\n所有任务已结束。")
Binary file not shown.
Binary file not shown.
Binary file added extract_ths_tool_cache/tool_cache.tar.gz.part_ac
Binary file not shown.
156 changes: 156 additions & 0 deletions finance_mcp/config/ths_local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
flow:
crawl_ths_company:
flow_content: |
ReadLocalThsOp(tag="company")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取公司资料信息,例如:详细情况,高管介绍,发行相关,参控股公司,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_holder:
flow_content: |
ReadLocalThsOp(tag="holder")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取股东研究信息,例如:股东人数、十大流通股东、十大股东、十大债券持有人、控股层级关系,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_operate:
flow_content: |
ReadLocalThsOp(tag="operate")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取经营分析信息,例如:主营介绍、运营业务数据、主营构成分析、主要客户及供应商、董事会经营评述、产品价格,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_equity:
flow_content: |
ReadLocalThsOp(tag="equity")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取股本结构信息,例如:解禁时间表、总股本构成、A股结构图、历次股本变动,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_capital:
flow_content: |
ReadLocalThsOp(tag="capital")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取资本运作信息,例如:募集资金来源、项目投资、收购兼并、股权投资、参股IPO、股权转让、关联交易、质押解冻,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_worth:
flow_content: |
ReadLocalThsOp(tag="worth")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取盈利预测信息,例如:业绩预测、业绩预测详表、研报评级,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_news:
flow_content: |
ReadLocalThsOp(tag="news")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取新闻公告信息,例如:新闻与股价联动、公告列表、热点新闻列表、研报列表,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_concept:
flow_content: |
ReadLocalThsOp(tag="concept")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取概念题材信息,例如:常规概念、其他概念、题材要点、概念对比,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_position:
flow_content: |
ReadLocalThsOp(tag="position")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取主力持仓信息,例如:机构持股汇总、机构持股明细、被举牌情况、IPO获配机构,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_finance:
flow_content: |
ReadLocalThsOp(tag="finance")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取财务分析信息,例如:财务诊断、财务指标、指标变动说明、资产负债构成、财务报告、杜邦分析,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_bonus:
flow_content: |
ReadLocalThsOp(tag="bonus")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取分红融资信息,例如:分红诊断、分红情况、增发机构获配明细、增发概况、配股概况,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_event:
flow_content: |
ReadLocalThsOp(tag="event")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取公司大事信息,例如:高管持股变动、股东持股变动、担保明细、违规处理、机构调研、投资者互动,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true

crawl_ths_field:
flow_content: |
ReadLocalThsOp(tag="field")
enable_cache: false
cache_expire_hours: 1
description: "通过A股股票代码获取行业对比信息,例如:行业地位、行业新闻,最后返回和query相关的信息。"
input_schema:
code:
type: string
description: "stock code"
required: true
2 changes: 1 addition & 1 deletion finance_mcp/core/agent/conduct_research_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def build_tool_call(self) -> ToolCall:
)

async def async_execute(self):
"""Run the multi-step research loop and produce a final answer.
"""Run the multistep research loop and produce a final answer.

The method performs the following high-level steps:

Expand Down
2 changes: 2 additions & 0 deletions finance_mcp/core/crawl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
"""

from .crawl4ai_op import Crawl4aiOp, Crawl4aiLongTextOp
from .read_local_ths_op import ReadLocalThsOp
from .ths_url_op import ThsUrlOp

__all__ = [
"Crawl4aiOp",
"Crawl4aiLongTextOp",
"ThsUrlOp",
"ReadLocalThsOp",
]
64 changes: 64 additions & 0 deletions finance_mcp/core/crawl/read_local_ths_op.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json
from pathlib import Path
from typing import Dict

from flowllm.core.context import C
from flowllm.core.op import BaseAsyncOp
from loguru import logger


@C.register_op()
class ReadLocalThsOp(BaseAsyncOp):
# Class-level cache: {tag: {code: tool_result}}
_cache: Dict[str, Dict[str, str]] = None

def __init__(self, tag: str = "", **kwargs):
super().__init__(**kwargs)
self.tag: str = tag
# Initialize class-level cache if not exists
if ReadLocalThsOp._cache is None:
ReadLocalThsOp._cache = {}

def _load_cache(self) -> Dict[str, str]:
"""Load all crawl_ths_{tag}*.json files and build code->tool_result mapping."""
tool_cache_dir = Path("tool_cache")
pattern = f"crawl_ths_{self.tag}*.json"
matching_files = list(tool_cache_dir.glob(pattern))

total_files = len(matching_files)
logger.info(f"Found {total_files} files matching pattern '{pattern}'")

result_dict = {}
for idx, file_path in enumerate(matching_files, 1):
logger.info(f"Loading file [{idx}/{total_files}]: {file_path.name}")

with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)

file_records = 0
for item in data:
code = item['tool_args']['code']
result_dict[code] = item['tool_result']
file_records += 1

logger.info(f" → Processed {file_records} records from {file_path.name}")

logger.info(f"✓ Successfully loaded {len(result_dict)} total records for tag={self.tag}")
return result_dict

async def async_execute(self):
"""Read tool_result for self.context.code from cached data."""
# Load cache if not exists
if self.tag not in ReadLocalThsOp._cache:
ReadLocalThsOp._cache[self.tag] = self._load_cache()

# Get code from context
code = self.context.code
if not code:
self.context.response.answer = f"No code={code} found in context."
logger.info(self.context.response.answer)
return

# Get tool_result from cache
tool_result = ReadLocalThsOp._cache[self.tag].get(code)
self.context.response.answer = tool_result
Loading
Loading