Skip to content

Commit c504b41

Browse files
committed
fix: 修复bm25锁触发方式
1 parent c4b4493 commit c504b41

File tree

11 files changed

+432
-451
lines changed

11 files changed

+432
-451
lines changed

config/core/file_management/indexing/bm25_indexing_config.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,12 @@ class BM25IndexerConfig(AbstractConfig):
1313
# Batch processing configuration
1414
batch_size: int = Field(
1515
default=100,
16-
description="Number of chunks to accumulate before flushing to index"
16+
description="Number of chunks to accumulate before triggering a flush"
1717
)
1818
flush_interval: float = Field(
1919
default=5.0,
2020
description="Time interval (in seconds) to periodically flush pending chunks"
2121
)
22-
immediate_flush_threshold: int = Field(
23-
default=10,
24-
description="If pending chunks <= this threshold, flush immediately instead of waiting. Set to 0 to disable."
25-
)
2622

2723
def build(self):
2824
return BM25Indexer(self)

core/file_management/extractor/graphextractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ def is_valid_entity(self, name: str) -> bool:
345345
return False
346346

347347
# Filter pure numbers
348-
if re.match(r'^\d+$', name) or re.match(r'^[\d\s\.,;:!?()\[\]{}""''\-_]+$', name):
348+
if re.match(r'^\d+$', name) or re.match(r'^[\d\s\.,;:!?()\[\]{}""''\\-_]+$', name):
349349
return False
350350

351351
return True

core/file_management/extractor/hipporag2_extractor.py

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212

1313
import logging
14+
import re
1415
from typing import List, TYPE_CHECKING, Tuple
1516

1617
from core.file_management.extractor.base import ExtractorBase
@@ -19,7 +20,12 @@
1920
HIPPORAG2_NER_ONE_SHOT_INPUT, HIPPORAG2_NER_ONE_SHOT_OUTPUT,
2021
HIPPORAG2_NER_ONE_SHOT_INPUT_WITH_TYPES, HIPPORAG2_NER_ONE_SHOT_OUTPUT_WITH_TYPES,
2122
HIPPORAG2_NER_PROMPT, HIPPORAG2_NER_PROMPT_WITH_TYPES,
22-
HIPPORAG2_TRIPLE_SYSTEM, HIPPORAG2_TRIPLE_ONE_SHOT_INPUT, HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT, HIPPORAG2_TRIPLE_PROMPT
23+
HIPPORAG2_TRIPLE_SYSTEM, HIPPORAG2_TRIPLE_ONE_SHOT_INPUT, HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT, HIPPORAG2_TRIPLE_PROMPT,
24+
HIPPORAG2_NER_SYSTEM_ZH, HIPPORAG2_NER_SYSTEM_WITH_TYPES_ZH,
25+
HIPPORAG2_NER_ONE_SHOT_INPUT_ZH, HIPPORAG2_NER_ONE_SHOT_OUTPUT_ZH,
26+
HIPPORAG2_NER_ONE_SHOT_INPUT_WITH_TYPES_ZH, HIPPORAG2_NER_ONE_SHOT_OUTPUT_WITH_TYPES_ZH,
27+
HIPPORAG2_NER_PROMPT_ZH, HIPPORAG2_NER_PROMPT_WITH_TYPES_ZH,
28+
HIPPORAG2_TRIPLE_SYSTEM_ZH, HIPPORAG2_TRIPLE_ONE_SHOT_INPUT_ZH, HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT_ZH, HIPPORAG2_TRIPLE_PROMPT_ZH
2329
)
2430
from encapsulation.data_model.schema import Chunk, GraphData
2531

@@ -46,6 +52,25 @@ def __init__(self, config: "HippoRAG2ExtractorConfig"):
4652
self.logger = logging.getLogger(__name__)
4753
self.entity_types = getattr(config, 'entity_types', None) # Optional entity types to extract
4854

55+
def detect_language(self, text: str) -> str:
56+
"""
57+
Detect text language (Chinese or English)
58+
59+
Args:
60+
text: Input text to detect language
61+
62+
Returns:
63+
'zh' for Chinese, 'en' for English
64+
"""
65+
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
66+
total_chars = len(re.sub(r'\s', '', text))
67+
68+
if total_chars == 0:
69+
return 'en'
70+
71+
chinese_ratio = chinese_chars / total_chars
72+
return 'zh' if chinese_ratio > 0.1 else 'en'
73+
4974
async def extract(self, chunk: Chunk) -> GraphData:
5075
"""
5176
Main extraction method for HippoRAG2
@@ -79,7 +104,7 @@ async def extract_two_stage(self, chunk: Chunk) -> GraphData:
79104
if not entities:
80105
self.logger.warning("No entities extracted, skipping triple extraction")
81106
return GraphData()
82-
107+
print(entities)
83108
# Stage 2: Triple Extraction using extracted entities
84109
triples = await self.extract_triples(chunk.content, entities)
85110

@@ -132,7 +157,6 @@ async def extract_triples(self, text: str, entities: List[Tuple[str, str]]) -> L
132157
# Extract just entity names for the prompt
133158
entity_names = [entity[0] for entity in entities]
134159
prompt = self.build_triple_prompt(text, entity_names)
135-
136160
response = await self.llm.achat([{"role": "user", "content": prompt}])
137161

138162
triples = self.parse_triple_response(response)
@@ -147,6 +171,7 @@ async def extract_triples(self, text: str, entities: List[Tuple[str, str]]) -> L
147171
def build_ner_prompt(self, text: str) -> str:
148172
"""
149173
Build NER prompt - always outputs entity types in TSV format
174+
Supports both Chinese and English
150175
151176
Args:
152177
text: Input text to extract entities from
@@ -158,37 +183,73 @@ def build_ner_prompt(self, text: str) -> str:
158183
- If self.entity_types is specified: uses HIPPORAG2_NER_PROMPT_WITH_TYPES
159184
- If self.entity_types is None: uses HIPPORAG2_NER_PROMPT (LLM auto-determines types)
160185
- Both formats output entity\ttype TSV format
186+
- Language is auto-detected (Chinese or English)
161187
"""
188+
# Detect language
189+
language = self.detect_language(text)
190+
162191
if self.entity_types:
163192
# Use entity type-specific prompt (only extract specified types)
164193
entity_types_str = ', '.join(self.entity_types)
165-
return HIPPORAG2_NER_PROMPT_WITH_TYPES.format(
166-
system=HIPPORAG2_NER_SYSTEM_WITH_TYPES,
167-
entity_types=entity_types_str,
168-
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT_WITH_TYPES,
169-
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT_WITH_TYPES,
170-
passage=text
171-
)
194+
if language == 'zh':
195+
return HIPPORAG2_NER_PROMPT_WITH_TYPES_ZH.format(
196+
system=HIPPORAG2_NER_SYSTEM_WITH_TYPES_ZH,
197+
entity_types=entity_types_str,
198+
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT_WITH_TYPES_ZH,
199+
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT_WITH_TYPES_ZH,
200+
passage=text
201+
)
202+
else:
203+
return HIPPORAG2_NER_PROMPT_WITH_TYPES.format(
204+
system=HIPPORAG2_NER_SYSTEM_WITH_TYPES,
205+
entity_types=entity_types_str,
206+
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT_WITH_TYPES,
207+
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT_WITH_TYPES,
208+
passage=text
209+
)
172210
else:
173211
# Use auto-type prompt (LLM determines entity types)
174-
return HIPPORAG2_NER_PROMPT.format(
175-
system=HIPPORAG2_NER_SYSTEM,
176-
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT,
177-
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT,
178-
passage=text
179-
)
212+
if language == 'zh':
213+
return HIPPORAG2_NER_PROMPT_ZH.format(
214+
system=HIPPORAG2_NER_SYSTEM_ZH,
215+
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT_ZH,
216+
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT_ZH,
217+
passage=text
218+
)
219+
else:
220+
return HIPPORAG2_NER_PROMPT.format(
221+
system=HIPPORAG2_NER_SYSTEM,
222+
example_input=HIPPORAG2_NER_ONE_SHOT_INPUT,
223+
example_output=HIPPORAG2_NER_ONE_SHOT_OUTPUT,
224+
passage=text
225+
)
180226

181227
def build_triple_prompt(self, text: str, entities: List[str]) -> str:
182-
"""Build triple extraction prompt"""
228+
"""
229+
Build triple extraction prompt
230+
Supports both Chinese and English
231+
"""
183232
entities_str = '\n'.join(entities)
184233

185-
return HIPPORAG2_TRIPLE_PROMPT.format(
186-
system=HIPPORAG2_TRIPLE_SYSTEM,
187-
example_input=HIPPORAG2_TRIPLE_ONE_SHOT_INPUT,
188-
example_output=HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT,
189-
passage=text,
190-
entities=entities_str
191-
)
234+
# Detect language
235+
language = self.detect_language(text)
236+
237+
if language == 'zh':
238+
return HIPPORAG2_TRIPLE_PROMPT_ZH.format(
239+
system=HIPPORAG2_TRIPLE_SYSTEM_ZH,
240+
example_input=HIPPORAG2_TRIPLE_ONE_SHOT_INPUT_ZH,
241+
example_output=HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT_ZH,
242+
passage=text,
243+
entities=entities_str
244+
)
245+
else:
246+
return HIPPORAG2_TRIPLE_PROMPT.format(
247+
system=HIPPORAG2_TRIPLE_SYSTEM,
248+
example_input=HIPPORAG2_TRIPLE_ONE_SHOT_INPUT,
249+
example_output=HIPPORAG2_TRIPLE_ONE_SHOT_OUTPUT,
250+
passage=text,
251+
entities=entities_str
252+
)
192253

193254
def parse_ner_response(self, response: str) -> List[Tuple[str, str]]:
194255
"""

core/file_management/indexing/bm25_indexing.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ def __init__(self, config: "BM25IndexerConfig"):
2929
# Batch processing configuration
3030
self.batch_size = config.batch_size
3131
self.flush_interval = config.flush_interval
32-
self.immediate_flush_threshold = config.immediate_flush_threshold
3332

3433
# Async lock to ensure only one coroutine writes to the index
3534
self._write_lock = asyncio.Lock()
@@ -138,10 +137,12 @@ async def update_index(self, chunks: List[Chunk]) -> List[str]:
138137
"""
139138
Adds chunks to the pending queue for batch processing.
140139
141-
Flush strategies:
142-
1. If pending chunks >= batch_size: flush immediately
143-
2. If pending chunks <= immediate_flush_threshold: flush immediately (for small uploads)
144-
3. Otherwise: wait for periodic flush
140+
This method is NON-BLOCKING - it adds chunks to the queue and returns immediately.
141+
The actual indexing happens in the background flush worker.
142+
143+
Flush trigger strategies:
144+
1. If pending chunks >= batch_size: trigger immediate flush (non-blocking)
145+
2. Otherwise: wait for periodic flush
145146
"""
146147
if not chunks:
147148
return []
@@ -154,19 +155,13 @@ async def update_index(self, chunks: List[Chunk]) -> List[str]:
154155
total_pending = len(self._pending_chunks)
155156
logger.info(f"Added {len(chunks)} chunks to pending queue. Total pending: {total_pending}")
156157

157-
# Strategy 1: Batch size reached - flush immediately
158+
# Strategy 1: Batch size reached - trigger immediate flush (non-blocking)
158159
if total_pending >= self.batch_size:
159-
logger.info(f"Batch size ({self.batch_size}) reached, flushing immediately")
160-
return await self._flush_pending_chunks()
161-
162-
# Strategy 2: Small upload - flush immediately to reduce latency
163-
if self.immediate_flush_threshold > 0 and total_pending <= self.immediate_flush_threshold:
164-
logger.info(f"Small batch ({total_pending} <= {self.immediate_flush_threshold}), flushing immediately")
165-
return await self._flush_pending_chunks()
160+
logger.info(f"Batch size ({self.batch_size}) reached, triggering immediate flush")
161+
# Create a flush task but don't wait for it
162+
asyncio.create_task(self._flush_pending_chunks())
166163

167-
# Strategy 3: Medium batch - wait for periodic flush
168-
logger.info(f"Medium batch ({total_pending}), waiting for periodic flush")
169-
# Return the chunk IDs optimistically (they will be indexed later)
164+
# Return chunk IDs immediately (they will be indexed by background worker)
170165
return [chunk.id for chunk in chunks]
171166

172167
async def shutdown(self):

0 commit comments

Comments
 (0)