-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/dual embedders late chunking 42 #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add SentenceTransformersEmbedder for high-throughput (48+ papers/sec) - Maintain JinaV4Embedder for sophisticated processing - Update factory to support both implementations - Add mandatory late chunking principle to CLAUDE.md - Create benchmark comparison script - Both implementations enforce late chunking Performance targets: - SentenceTransformers: 48+ papers/second - Transformers: 8-15 papers/second (current) Closes #42
- Replace encode_text with encode method for Jina v3 compatibility - Fix task parameter defaults to use 'retrieval.passage' - Add fallback for models without encode method - Ensure both embedders work with factory pattern
- Upgrade sentence-transformers from 2.7.0 to 4.1.0 - Update pyproject.toml dependency to allow v4.1.0 - Fix SentenceTransformersEmbedder to work with Jina v4 - Update task mapping for Jina v4 API (retrieval, text-matching, code) - Default to Jina v4 model for 2048-dim embeddings - Verified working with CPU tests (7.6 texts/sec) Ready for GPU testing to achieve 48+ papers/sec target
…v_daily.py as they are no longer needed for the current pipeline.
…essing - Introduced `arxiv_pipeline.py` to manage a two-phase processing system: - Phase 1: Extraction of PDFs to JSON using GPU-accelerated Docling. - Phase 2: Embedding staged JSONs and storing results in ArangoDB using Jina. - Enhanced logging and error handling throughout the pipeline. - Implemented metadata enrichment for extracted documents. - Added configuration management for flexible pipeline setup. - Refactored existing code to accommodate new architecture, including moving `ArangoDBManager` import to a new module.
… implementation, and filesystem citation extraction. These scripts demonstrated the use of the Academic Citation Toolkit with various document sources and storage systems, but are no longer needed.
…rkflows, phase 4 integration, robust extraction, and simple Tree-sitter extraction. These deletions streamline the test suite by eliminating outdated or redundant tests, ensuring a more maintainable and efficient testing process.
- Introduced a new script `check_arxiv_db.py` for verifying ArangoDB collections related to ArXiv metadata processing. - Implemented connection handling, collection checks, and detailed statistics reporting. - Added sample document checks for metadata, chunks, and embeddings collections. - Removed obsolete `test_mcp_env.sh` script. - Updated import paths in various test files to reflect the new project structure. - Created new tests for ArXiv metadata processing with 1000 records and 100k records on dual GPUs. - Added a test for default configuration validation and workflow execution. - Enhanced error handling and logging throughout the new tests.
- Fixed worker config to use embedding_batch_size instead of batch_size - Aligned chunk size parameters (500 tokens, 200 overlap) across all components - Added FP16 support with Flash Attention 2 detection - Implemented fast path for abstracts that fit in single chunks - Fixed Jina v4 output extraction to use single_vec_emb attribute - Added comprehensive monitoring utilities Performance improved from 3.3 to 36+ docs/second with batch size 24 Stable processing of 2.8M ArXiv abstracts with dual GPU utilization 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Reduced progress tracker update frequency (10x less updates) - Reduced monitoring log frequency (from 100 to 1000 records) - Queue sizes already optimized to 500 (user change) - Storage batch size already optimized to 500 (user change) - Fast path logging already removed (user change) These optimizations should gain 6-10 docs/second: - Progress updates: +2-3 docs/sec - Logging reduction: +1-2 docs/sec - Queue/batch optimizations: +3-5 docs/sec (already done) Target: 40+ docs/second (from current 34) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Removed torch.cuda.empty_cache() after every batch (HUGE win) - Commented out excessive logging in embedders_jina.py hot path - Disabled performance monitor to eliminate warning spam - These were adding 300-1500ms overhead per second! Key changes: - No more GPU cache clearing (PyTorch manages memory efficiently) - No more logging 30+ times/second in embedding loop - No more performance monitor overhead and warnings Expected gains: +8-10 docs/second Target: 40-45 docs/second sustained throughput 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
CodeRabbit identified a serious issue where we were creating a full JSON copy of all records just to measure memory usage. This DOUBLES memory consumption and can cause OOM errors. Fixed by: - Using psutil to get actual process RSS when available - Falling back to estimation (2KB per record) instead of serialization - Avoids creating duplicate copies of large datasets This prevents OOM errors when processing large batches and reduces peak memory usage by ~50% during measurement. Fixes CodeRabbit review comment about memory duplication. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
- Removed test workflow function from `workflow_arxiv_metadata.py`. - Enhanced `workflow_arxiv_parallel.py` to support resuming from checkpoints, including loading existing IDs and skipping already processed records. - Updated logging to include skipped records and improved progress reporting. - Added a new `notes.md` file with instructions for data integration and graph rebuilding. - Created `test_resume.py` to validate the resume functionality of the Arxiv workflow. - Introduced benchmarking tests for dual embedder implementations in `test_benchmark_embedders.py`. - Added unit tests for Jina v4 embedder in `test_embedders_jina.py`. - Implemented a test suite for ArXiv Metadata Workflow in `test_workflow_arxiv_metadata.py`. - Developed a verification script `verify_storage.py` to ensure records are stored correctly in the database.
- Implemented a test script for the size-sorted workflow in `test_sorted_workflow.sh`, verifying the setup and execution of the workflow with a sample dataset. - Created `test_unix_socket.py` to test Unix socket connections to ArangoDB, comparing performance with HTTP connections and ensuring proper error handling. - Developed comprehensive unit tests for `ArxivSortedWorkflow` in `test_workflow_arxiv_sorted.py`, covering sorting, processing order storage, checkpoint functionality, and batch processing logic. - Added unique method tests for `ArxivSortedWorkflow` in `test_workflow_arxiv_sorted_unique.py`, focusing on methods specific to the sorted workflow. - Included edge case tests in `test_workflow_arxiv_sorted_unique.py` to handle empty abstracts and missing fields.
WalkthroughExtensive repo update adding ArXiv metadata ingestion workflows (single-/multi-GPU), dual embedders with late chunking, Unix-socket ArangoDB connectivity, centralized logging, configurations/CLIs, and multiple dev/test utilities. Several legacy compatibility layers and ArXiv Postgres tooling/docs are removed. Tests are overhauled; setup scripts and configs added. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor CLI as CLI/Runner
participant CFG as ArxivMetadataConfig
participant WF as ArxivMetadataWorkflow
participant DB as DatabaseFactory/Arango
participant EMB as EmbedderFactory/Embedder
CLI->>CFG: Load YAML + overrides
CLI->>WF: Instantiate(config)
WF->>DB: get_arango (Unix socket preferred)
WF->>EMB: build embedder (model, fp16, device)
loop Stream JSONL
WF->>WF: Collect batch (respect max_records/resume)
WF->>EMB: embed_batch_with_late_chunking(abstracts)
WF->>DB: Transaction: insert metadata, chunks, embeddings
WF-->>WF: Update progress/checkpoint
end
WF-->>CLI: WorkflowResult (success, counts, throughput)
sequenceDiagram
autonumber
participant Main as Orchestrator
participant QIN as InputQueue
participant W0 as Worker[GPU0]
participant W1 as Worker[GPU1]
participant QOUT as OutputQueue
participant ST as StorageThread
participant DB as ArangoDB
Main->>QIN: Enqueue metadata batches
par Parallel workers
W0->>QIN: Dequeue batch
W0->>W0: Late-chunk embed
W0->>QOUT: Emit chunks+embeddings
W1->>QIN: Dequeue batch
W1->>W1: Late-chunk embed
W1->>QOUT: Emit chunks+embeddings
end
ST->>QOUT: Consume results
ST->>DB: Transactional insert (metadata/chunks/embeddings)
ST-->>Main: Progress updates
sequenceDiagram
autonumber
participant Client as get_database_for_workflow
participant FS as UnixSocket
participant AHTTP as Arango HTTP
participant ADB as ArangoClient
Client->>FS: Detect socket (exists, is_socket)
alt Socket available
Client->>ADB: Build URL (http+unix or HTTP fallback)
else No socket
Client->>ADB: Use HTTP host (localhost:8529)
end
Client->>ADB: Connect (db/user/pass)
ADB-->>Client: Handle (StandardDatabase)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes Possibly related issues
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 47
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (13)
tests/debug_extraction_failures.py (1)
44-46: Prefer core/tools config path with robust fallbacktests/debug_extraction_failures.py lines 44–46 — found core/tools/arxiv/configs/acid_pipeline_phased.yaml in this branch; the hardcoded 'tools/arxiv/...' will 404. Replace with a fallback lookup:
- config_path = 'tools/arxiv/configs/acid_pipeline_phased.yaml' - config = load_config(config_path) + candidates = [ + project_root / 'core' / 'tools' / 'arxiv' / 'configs' / 'acid_pipeline_phased.yaml', + project_root / 'tools' / 'arxiv' / 'configs' / 'acid_pipeline_phased.yaml', + ] + for p in candidates: + if p.exists(): + config_path = p + break + else: + logger.error("Config not found in: %s", candidates) + return + config = load_config(str(config_path))tests/arxiv/acid/test_integration.py (1)
27-34: Make this a real pytest test (assert/skip instead of returning booleans).
Returning False doesn’t fail in pytest. Use assertions and pytest.skip when prerequisites are missing.Apply:
+import pytest @@ - if not arango_password: - logger.error("ARANGO_PASSWORD environment variable not set") - return False + if not arango_password: + pytest.skip("ARANGO_PASSWORD not set; skipping integration test") @@ - return all(status in ['processed', 'already_processed'] for status in results.values()) + assert all(status in ['processed', 'already_processed'] for status in results.values())Also applies to: 88-88
core/tools/arxiv/arxiv_manager.py (1)
413-472: Persist within a single transaction and align schema with arxiv_chunks/arxiv_embeddings.
Current writes are non-atomic and skip arxiv_chunks; embeddings schema differs from contract (vector/model/chunk_id).Apply (abridged) transactional refactor:
- # Store main paper document - self.db_manager.insert_document('arxiv_papers', arxiv_doc) - - # Store chunks with embeddings - for chunk in result.chunks: - chunk_doc = { - '_key': f"{paper_info.sanitized_id}_chunk_{chunk.chunk_index}", - 'paper_id': paper_info.sanitized_id, - 'arxiv_id': paper_info.arxiv_id, - 'chunk_index': chunk.chunk_index, - 'text': chunk.text, - 'embedding': chunk.embedding.tolist(), - 'start_char': chunk.start_char, - 'end_char': chunk.end_char, - 'context_window_used': chunk.context_window_used - } - self.db_manager.insert_document('arxiv_embeddings', chunk_doc) + txn = self.db_manager.begin_transaction( + write_collections=['arxiv_papers','arxiv_chunks','arxiv_embeddings','arxiv_structures'] + ) + # main paper + txn.collection('arxiv_papers').insert(arxiv_doc, overwrite=False) + # chunks (text) + for chunk in result.chunks: + chunk_key = f"{paper_info.sanitized_id}_chunk_{chunk.chunk_index}" + txn.collection('arxiv_chunks').insert({ + '_key': chunk_key, + 'paper_id': paper_info.sanitized_id, + 'arxiv_id': paper_info.arxiv_id, + 'chunk_index': chunk.chunk_index, + 'text': chunk.text, + 'start_char': chunk.start_char, + 'end_char': chunk.end_char, + 'context_window_used': chunk.context_window_used + }) + # embeddings (vector-only) + txn.collection('arxiv_embeddings').insert({ + '_key': f"{chunk_key}_emb", + 'paper_id': paper_info.sanitized_id, + 'chunk_id': chunk_key, + 'arxiv_id': paper_info.arxiv_id, + 'vector': (chunk.embedding.tolist() if hasattr(chunk.embedding, 'tolist') else list(chunk.embedding)), + 'model': result.processing_metadata.get('embedding_model', 'jina-v4') + }) + txn.commit_transaction() @@ - self.db_manager.insert_document('arxiv_structures', structures_doc) + txn = txn if 'txn' in locals() else self.db_manager.begin_transaction(write_collections=['arxiv_structures']) + txn.collection('arxiv_structures').insert(structures_doc, overwrite=True) + txn.commit_transaction()tests/arxiv/acid/test_acid_compliance.py (2)
396-404: Use seconds for expiresAt to match TTL index semantics.
Arango TTL index expects epoch seconds; switching to ms desynchronizes auto-cleanup and conflicts with code writing seconds elsewhere.-# Worker 1 acquires lock (TTL needs Unix timestamp in milliseconds) -expiry_time = int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000) +# Worker 1 acquires lock (Unix timestamp in seconds) +expiry_time = int((datetime.now() + timedelta(seconds=2)).timestamp()) @@ - 'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000) + 'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp()) @@ - current_time = int(datetime.now().timestamp() * 1000) + current_time = int(datetime.now().timestamp()) @@ - 'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000) + 'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp())Also applies to: 412-413, 423-427, 441-442
221-313: Normalize expiresAt to milliseconds across tests.tests/arxiv/acid/test_acid_compliance.py (lines 221–313) sets expiresAt with seconds (int(...timestamp())), while other lock usages in the same file use milliseconds at lines 397, 412, 426, 441 (int(...timestamp() * 1000)). Change the seconds-based assignment to milliseconds (int(...timestamp() * 1000)) to avoid TTL/unit mismatches and flaky tests.
tests/arxiv/acid/test_acid_pipeline.py (4)
176-181: Fix AQL collection name: use arxiv_chunks.
Querying 'chunks' will 404; schema uses arxiv_chunks.- chunks_collection = self.processor.db.collection('arxiv_chunks') - chunk_count = len(list(self.processor.db.aql.execute( - "FOR c IN chunks FILTER c.paper_id == @id RETURN c", + chunks_collection = self.processor.db.collection('arxiv_chunks') + chunk_count = len(list(self.processor.db.aql.execute( + "FOR c IN arxiv_chunks FILTER c.paper_id == @id RETURN c", bind_vars={'id': sanitized_id} )))
233-238: Fix AQL collection name: use arxiv_papers.- cursor = self.processor.db.aql.execute( - "FOR p IN papers FILTER p.arxiv_id == @id RETURN p", + cursor = self.processor.db.aql.execute( + "FOR p IN arxiv_papers FILTER p.arxiv_id == @id RETURN p", bind_vars={'id': paper_id} )
308-315: Use epoch seconds for lock TTL and match manager semantics.
ISO string won’t be honored by TTL index configured for seconds.- locks_collection.insert({ + locks_collection.insert({ '_key': sanitized_id, 'paper_id': paper_id, 'worker_id': 99999, 'acquired_at': datetime.now().isoformat(), - 'expiresAt': (datetime.now() - timedelta(seconds=1)).isoformat() # Already expired + 'expiresAt': int((datetime.now() - timedelta(seconds=1)).timestamp()) # Already expired })
382-386: Fix OnDemandProcessor Arango config shape.
Pass only DB params with correct keys; current code passes mismatched keys (arango_host vs host).- on_demand_config = { + on_demand_config = { 'cache_root': self.test_pdf_dir, 'sqlite_db': self.test_pdf_dir / 'test_cache.db', - 'arango': self.config + 'arango': { + 'host': self.config.get('arango_host', 'http://127.0.0.1:8529').split(','), + 'database': self.config['database'], + 'username': self.config['username'], + 'password': self.config['password'], + } }core/tools/arxiv/arxiv_pipeline.py (1)
2-18: Docstring must tie to Conveyance framework with the correct efficiency formula.Add the required theory note and correct equation: C = (W·R·H / T) · Ctx^α (α applies only to Ctx).
""" ACID Pipeline with Phase-Separated Architecture =============================================== @@ - Full ACID compliance with atomic transactions + +Conveyance (Efficiency stance): +- Efficiency C = (W·R·H / T) · Ctx^α, with α∈[1.5,2.0] (default 1.7) applied only to Ctx. +- W (What): extraction fidelity & embedding quality; R (Where): DB topology/locks; H (Who): worker/agent throughput; T (Time): end-to-end convergence time. +- Zero‑propagation: guard against any W/R/H→0 and avoid double-counting T. """tests/arxiv/test_large_scale_processing.py (2)
419-437: Unmatched except and mis-indentation break the module (SyntaxError).
except Exception as e:at Line 437 has no correspondingtry:. The test calls are also over‑indented.Apply:
- # Run tests - self.test_performance_benchmarks(arxiv_ids, sample_size=100) + # Run tests + try: + self.test_performance_benchmarks(arxiv_ids, sample_size=100) @@ - self.test_batch_processing(arxiv_ids[:batch_size_limit], batch_size=100) + self.test_batch_processing(arxiv_ids[:batch_size_limit], batch_size=100) @@ - if error_test_start < error_test_end: - self.test_error_recovery(arxiv_ids[error_test_start:error_test_end]) + if error_test_start < error_test_end: + self.test_error_recovery(arxiv_ids[error_test_start:error_test_end]) @@ - self.test_error_recovery(arxiv_ids[1000:1050]) + self.test_error_recovery(arxiv_ids[1000:1050]) @@ - self.test_database_integrity() + self.test_database_integrity() except Exception as e: logger.error(f"Test suite failed: {e}") self.test_results['errors'].append({ 'test': 'suite', 'error': str(e) })
246-249: Database integrity queries don’t match stored schema/collections.Collections/fields mismatch the workflows (“arxiv_abstract_embeddings”,
chunk.paper_key, embedding field name). Queries currently undercount and misclassify.Align with workflow schema:
- integrity_checks['embeddings_count'] = db.collection('arxiv_embeddings').count() + integrity_checks['embeddings_count'] = db.collection('arxiv_abstract_embeddings').count() @@ - LET paper = DOCUMENT('arxiv_papers', chunk.paper_id) + LET paper = DOCUMENT('arxiv_papers', chunk.paper_key) @@ - LET chunk = DOCUMENT('arxiv_chunks', embedding.chunk_id) + LET chunk = DOCUMENT('arxiv_chunks', embedding.chunk_id) FILTER chunk == null @@ - FILTER e.chunk_id == chunk._id + FILTER e.chunk_id == chunk._key @@ - RETURN LENGTH(embedding.vector) + RETURN LENGTH(embedding.embedding)Optional: replace the “processing_status” aggregation (no
statusfield) with something meaningful, e.g.,has_abstract.Also applies to: 250-257, 262-271, 274-283, 288-295
core/database/arango/arango_unix_client.py (1)
1-200: Missing helper: get_database_for_workflow is imported but not definedcore/database/database_factory.py:61 imports get_database_for_workflow from .arango_unix_client, but core/database/arango/arango_unix_client.py only defines UnixSocketHTTPClient and ArangoUnixClient and does not provide that function — either add def get_database_for_workflow(...) to arango_unix_client.py or update the import to the module that actually implements it.
| # But keep configuration JSON files | ||
| !*_config.json No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*Risk: allowing all _config.json to be committed may leak secrets.
Unless guaranteed sanitized, prefer tracking only example config files.
-# But keep configuration JSON files
-!*_config.json
+# Keep only example configs; ignore real config JSONs to prevent secret leaks
+!*_config.example.json
+*_config.jsonConsider adding:
+secrets*.json
+*token*.json
+*key*.json📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # But keep configuration JSON files | |
| !*_config.json | |
| # Keep only example configs; ignore real config JSONs to prevent secret leaks | |
| *_config.json | |
| secrets*.json | |
| *token*.json | |
| *key*.json | |
| !*_config.example.json |
🤖 Prompt for AI Agents
In config/.gitignore around lines 9-10, the current allow-all rule
"!*_config.json" risks committing secrets; change this to only allow
example/sanitized configs (e.g. "!*_config.example.json" or
"!*_config.sample.json") and ensure raw config filenames remain ignored (add a
pattern like "*_config.json" if needed), update README or a contributing note to
instruct developers to copy example files to create local configs, and remove or
replace the permissive whitelist so only non-sensitive example files are
tracked.
| DEFAULT_UNIX_SOCKET = "/tmp/arangodb.sock" | ||
| DEFAULT_HTTP_HOST = "http://localhost:8529" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
🧩 Analysis chain
Duplicate Unix-socket client exists; unify to one implementation.
This module overlaps with core/database/arango/arango_unix_client.py (different path/name, different approach). Split implementations risk drift and import confusion.
Proposed action: keep one canonical client (prefer the HTTP‑over‑Unix implementation) and re‑export if needed to maintain import paths.
Also applies to: 221-251
🏁 Script executed:
#!/bin/bash
# Locate duplicates and current imports
fd -a --type f 'arango_unix_client.py'
rg -nP '\barango_unix_client\b' -g '!**/dist/**' -g '!**/build/**'Length of output: 265
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Searching for arango_unix_client.py files..."
mapfile -t files < <(fd -a --type f 'arango_unix_client.py' || true)
if [ ${#files[@]} -eq 0 ]; then
echo "No files found."
exit 0
fi
echo "Found ${#files[@]} file(s):"
printf '%s\n' "${files[@]}"
echo
for f in "${files[@]}"; do
echo "----- FILE: $f -----"
stat -c "%n — %s bytes — modified: %y" "$f" || ls -l "$f"
echo "---- head (first 120 lines) ----"
sed -n '1,120p' "$f" || true
echo "---- tail (last 120 lines) ----"
tail -n 120 "$f" || true
echo
done
if [ ${#files[@]} -ge 2 ]; then
echo "=== Diff (unified) between first two files ==="
diff -u "${files[0]}" "${files[1]}" || true
echo
fi
echo "=== Search for references to arango_unix_client across repo ==="
rg -n --hidden -uu '\barango_unix_client\b' -S || trueLength of output: 101
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Working dir: $(pwd)"
tmp=$(mktemp)
# Try fd first, fall back to find
fd -a --type f 'arango_unix_client.py' > "$tmp" 2>/dev/null || true
if [ ! -s "$tmp" ]; then
find . -type f -name 'arango_unix_client.py' > "$tmp" 2>/dev/null || true
fi
if [ ! -s "$tmp" ]; then
echo "No arango_unix_client.py files found"
rm -f "$tmp"
exit 0
fi
echo "Found files:"
nl -ba "$tmp"
echo
while IFS= read -r file; do
echo "----- FILE: $file -----"
stat -c "%n — %s bytes — modified: %y" "$file" || ls -l "$file"
echo "---- head (first 200 lines) ----"
sed -n '1,200p' "$file" || true
echo "---- tail (last 200 lines) ----"
tail -n 200 "$file" || true
echo
done < "$tmp"
f1=$(sed -n '1p' "$tmp")
f2=$(sed -n '2p' "$tmp" || true)
if [ -n "$f2" ]; then
echo "=== Diff (unified) between first two files ==="
diff -u "$f1" "$f2" || true
echo
fi
echo "=== Search repository for references to 'arango_unix_client' (tracked files via git) ==="
git ls-files -z 2>/dev/null | xargs -0 rg -n --hidden -uu '\barango_unix_client\b' -S || true
echo "=== Fallback grep across filesystem (excluding common build dirs) ==="
grep -RIn --exclude-dir={dist,build,.git,__pycache__} '\barango_unix_client' . || true
rm -f "$tmp"Length of output: 86
Unify duplicate Unix-socket Arango client implementations
Two implementations exist: core/database/arango_unix_client.py and core/database/arango/arango_unix_client.py. Consolidate into a single canonical client (prefer the HTTP-over-UNIX implementation), move shared constants there, and re-export a compatibility shim at the secondary path to avoid breaking imports.
🧰 Tools
🪛 Ruff (0.12.2)
31-31: Probable insecure usage of temporary file or directory: "/tmp/arangodb.sock"
(S108)
| # Setup log directory | ||
| log_dir = Path(__file__).parent.parent / "logs" | ||
| log_dir.mkdir(exist_ok=True) | ||
|
|
||
| # Configure Python logging | ||
| logging.basicConfig( | ||
| level=getattr(logging, log_level), | ||
| format='%(message)s' | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid writing under the installed package directory; use user-writable path and set permissions.
Current path resolves to core/logs (often read-only in site-packages).
- log_dir.mkdir(exist_ok=True)
+ log_dir.mkdir(parents=True, exist_ok=True, mode=0o750)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In core/logging/logging.py around lines 45 to 53, the current log directory is
created under the package (core/logs) which may be read-only when installed;
change to a user-writable location (e.g., use platform-appropriate user data/log
dir via appdirs/XDG or fallback to ~/.local/share/<app>/logs or
tempfile.gettempdir()), create the directory there with safe permissions
(restrict to user, e.g. 0o700), handle and log any OSError when creating the dir
and fall back to a safe location (stdout or temp dir) if creation fails, and
update logging.basicConfig to point its FileHandler (or use logging.basicConfig
with filename set to a file inside the new user-writable log dir) instead of
writing under the installed package directory.
| FixedSizeChunking, | ||
| SemanticChunking | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "ChunkingStrategy", | ||
| "TokenBasedChunking", | ||
| "SemanticChunking", | ||
| "SlidingWindowChunking", | ||
| "ChunkingStrategyFactory", | ||
| # Deprecated - kept for backward compatibility | ||
| "DocumentProcessor", | ||
| "ProcessingConfig", | ||
| "ProcessingResult", | ||
| ] | ||
|
|
||
| # Lazy import for backward compatibility using __getattr__ | ||
| def __getattr__(name): | ||
| """ | ||
| Lazy import deprecated classes from workflows. | ||
| This provides backward compatibility while encouraging migration | ||
| to the new module structure. | ||
| """ | ||
| import warnings | ||
|
|
||
| deprecated_imports = { | ||
| "DocumentProcessor": "core.workflows.workflow_pdf", | ||
| "ProcessingConfig": "core.workflows.workflow_pdf", | ||
| "ProcessingResult": "core.workflows.workflow_pdf", | ||
| } | ||
|
|
||
| if name in deprecated_imports: | ||
| module_path = deprecated_imports[name] | ||
| warnings.warn( | ||
| f"Importing {name} from core.processors is deprecated. " | ||
| f"Please use {module_path} instead.", | ||
| DeprecationWarning, | ||
| stacklevel=2 | ||
| ) | ||
|
|
||
| # Dynamically import from the new location | ||
| import importlib | ||
| module = importlib.import_module(module_path) | ||
| return getattr(module, name) | ||
|
|
||
| raise AttributeError(f"module {__name__!r} has no attribute {name!r}") | ||
| 'ChunkingStrategy', | ||
| 'ChunkingStrategyFactory', | ||
| 'FixedSizeChunking', | ||
| 'SemanticChunking', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Confirm FixedSizeChunking exists in text.chunking_strategies.
Avoid exporting a non-existent symbol.
Run:
🏁 Script executed:
#!/bin/bash
ast-grep --pattern $'class FixedSizeChunking { $$$ }' || rg -nP 'class\s+FixedSizeChunking\b' coreLength of output: 0
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== repo root: $(pwd) ==="
echo "=== git branch ==="
git rev-parse --abbrev-ref HEAD 2>/dev/null || true
echo
echo "=== Listing core/processors ==="
ls -la core/processors || true
echo
echo "=== core/processors/__init__.py (first 300 lines) ==="
if [ -f core/processors/__init__.py ]; then
sed -n '1,300p' core/processors/__init__.py
else
echo "core/processors/__init__.py not found"
fi
echo
echo "=== rg: occurrences of FixedSizeChunking (word match) ==="
rg -n --hidden -S '\bFixedSizeChunking\b' || true
echo
echo "=== rg: class FixedSizeChunking definitions ==="
rg -n --hidden -S 'class\s+FixedSizeChunking\b' || true
echo
echo "=== rg: occurrences of SemanticChunking ==="
rg -n --hidden -S '\bSemanticChunking\b' || true
echo
echo "=== rg: occurrences of ChunkingStrategyFactory / ChunkingStrategy ==="
rg -n --hidden -S '\bChunkingStrategyFactory\b' || true
rg -n --hidden -S '\bChunkingStrategy\b' || true
echo
echo "=== rg: files/modules named chunking_strategies or chunking-strategies ==="
rg -n --hidden -S 'chunking_strategies|chunking-strategies' || true
echo
echo "=== rg: any files under text/ mentioning chunking ==="
rg -n --hidden -S 'chunking' text || true
echo
echo "=== List python files that mention 'Chunking' (case-sensitive) ==="
rg -n --hidden -S 'Chunking' --glob '!**/site-packages/**' || trueLength of output: 5313
Remove FixedSizeChunking from exports in core/processors/init.py
core/processors/text/chunking_strategies.py does not define FixedSizeChunking (it defines TokenBasedChunking, SemanticChunking, SlidingWindowChunking, ChunkingStrategyFactory); remove FixedSizeChunking from the import and all in core/processors/init.py (lines 11–19) or add/alias the intended class in chunking_strategies.py.
🤖 Prompt for AI Agents
In core/processors/__init__.py around lines 11 to 19, the module exports and
imports reference FixedSizeChunking which does not exist in
core/processors/text/chunking_strategies.py; remove FixedSizeChunking from the
import list and from the __all__ array (or alternatively add/alias the correct
class in chunking_strategies.py), ensuring the exported names match actual
classes (e.g., keep TokenBasedChunking, SemanticChunking, SlidingWindowChunking,
ChunkingStrategyFactory).
| if "jina" not in self.embedder_model.lower(): | ||
| errors.append(f"Embedder model {self.embedder_model} may not support late chunking") | ||
|
|
||
| # Validate GPU settings consistency | ||
| if self.use_gpu and not self.gpu_device.startswith("cuda"): | ||
| errors.append(f"GPU enabled but device {self.gpu_device} is not CUDA") | ||
|
|
||
| # Validate batch size relationships | ||
| if self.embedding_batch_size > self.batch_size: | ||
| errors.append(f"Embedding batch size ({self.embedding_batch_size}) exceeds record batch size ({self.batch_size})") | ||
|
|
||
| # Validate checkpoint interval | ||
| if self.checkpoint_interval > 100000: | ||
| errors.append(f"Checkpoint interval ({self.checkpoint_interval}) may be too large for recovery") | ||
|
|
||
| # Validate throughput feasibility | ||
| if self.batch_size < 100 and self.target_throughput > 10: | ||
| errors.append(f"Batch size ({self.batch_size}) too small for target throughput ({self.target_throughput} papers/sec)") | ||
|
|
||
| # Validate collection names are different | ||
| collections = [self.metadata_collection, self.chunks_collection, self.embeddings_collection] | ||
| if len(set(collections)) != len(collections): | ||
| errors.append("Collection names must be unique") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allow dual embedders; validate expected dims accordingly.
Restricting to “jina” contradicts the dual‑embedder design. Support Sentence-Transformers and validate dims.
- # Validate embedder model compatibility
- if "jina" not in self.embedder_model.lower():
- errors.append(f"Embedder model {self.embedder_model} may not support late chunking")
+ # Validate embedder model compatibility (dual embedders)
+ model_l = self.embedder_model.lower()
+ if not any(k in model_l for k in ("jina", "sentence", "bge", "gte", "mixedbread")):
+ errors.append(f"Unsupported embedder model: {self.embedder_model}")
+
+ # Expected dims check if enabled
+ if self.validate_embeddings:
+ if "jina" in model_l:
+ expected = 2048
+ elif any(k in model_l for k in ("sentence", "bge", "gte", "mixedbread")):
+ expected = 768
+ else:
+ expected = self.expected_embedding_dim
+ if self.expected_embedding_dim != expected:
+ errors.append(f"expected_embedding_dim={self.expected_embedding_dim} "
+ f"does not match model family expectation ({expected})")Also applies to: 164-172
| def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory): | ||
| """Test that processing order is stored in database.""" | ||
| # Setup mocks | ||
| mock_db = MagicMock() | ||
| mock_order_collection = MagicMock() | ||
|
|
||
| mock_db.has_collection.return_value = False | ||
| mock_db.create_collection.return_value = mock_order_collection | ||
| mock_db.collection.return_value = mock_order_collection | ||
| mock_db_factory.get_arango.return_value = mock_db | ||
|
|
||
| mock_embedder = MagicMock() | ||
| mock_embedder_factory.create_embedder.return_value = mock_embedder | ||
|
|
||
| # Create workflow | ||
| workflow = ArxivSortedWorkflow(self.config) | ||
|
|
||
| # Create sorted records | ||
| sorted_records = [ | ||
| {'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]}, | ||
| {'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]}, | ||
| ] | ||
|
|
||
| # Store processing order | ||
| workflow._store_processing_order(sorted_records) | ||
|
|
||
| # Verify collection was created | ||
| mock_db.create_collection.assert_called_once_with('arxiv_processing_order') | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests target removed API _store_processing_order.
Workflow now resumes via query; either update tests to reflect the query-based approach or mark skipped.
+ @unittest.skip("Order persistence removed; resume is now query-based in workflow")
def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory): | |
| """Test that processing order is stored in database.""" | |
| # Setup mocks | |
| mock_db = MagicMock() | |
| mock_order_collection = MagicMock() | |
| mock_db.has_collection.return_value = False | |
| mock_db.create_collection.return_value = mock_order_collection | |
| mock_db.collection.return_value = mock_order_collection | |
| mock_db_factory.get_arango.return_value = mock_db | |
| mock_embedder = MagicMock() | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder | |
| # Create workflow | |
| workflow = ArxivSortedWorkflow(self.config) | |
| # Create sorted records | |
| sorted_records = [ | |
| {'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]}, | |
| {'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]}, | |
| ] | |
| # Store processing order | |
| workflow._store_processing_order(sorted_records) | |
| # Verify collection was created | |
| mock_db.create_collection.assert_called_once_with('arxiv_processing_order') | |
| @unittest.skip("Order persistence removed; resume is now query-based in workflow") | |
| def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory): | |
| """Test that processing order is stored in database.""" | |
| # Setup mocks | |
| mock_db = MagicMock() | |
| mock_order_collection = MagicMock() | |
| mock_db.has_collection.return_value = False | |
| mock_db.create_collection.return_value = mock_order_collection | |
| mock_db.collection.return_value = mock_order_collection | |
| mock_db_factory.get_arango.return_value = mock_db | |
| mock_embedder = MagicMock() | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder | |
| # Create workflow | |
| workflow = ArxivSortedWorkflow(self.config) | |
| # Create sorted records | |
| sorted_records = [ | |
| {'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]}, | |
| {'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]}, | |
| ] | |
| # Store processing order | |
| workflow._store_processing_order(sorted_records) | |
| # Verify collection was created | |
| mock_db.create_collection.assert_called_once_with('arxiv_processing_order') |
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 131 to 159, the test calls
the removed private API _store_processing_order; update the test to either (A)
exercise the public, query-based resume flow by instantiating
ArxivSortedWorkflow and invoking the public method that triggers the
resume/query behavior, then assert that the Arango client was used to run the
expected query (mock_db.query.assert_called_with(...) or similar) and remove
assertions about create_collection, or (B) if you prefer to keep current intent,
mark this specific test as skipped with pytest.mark.skip(reason="workflow
resumes via query; update test"), ensuring mocks reflect the query-based
interaction instead of a private method call.
| def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory): | ||
| """Test resume functionality from checkpoint.""" | ||
| # Setup mocks | ||
| mock_db = MagicMock() | ||
| mock_collection = MagicMock() | ||
| mock_order_collection = MagicMock() | ||
|
|
||
| # Mock AQL query for resume position | ||
| mock_cursor = iter([2]) # Resume from position 2 | ||
| mock_db.aql.execute.return_value = mock_cursor | ||
|
|
||
| mock_db.has_collection.return_value = True | ||
| mock_db.collection.side_effect = lambda name: ( | ||
| mock_collection if name == 'test_arxiv_papers' | ||
| else mock_order_collection if name == 'arxiv_processing_order' | ||
| else None | ||
| ) | ||
| mock_db_factory.get_arango.return_value = mock_db | ||
|
|
||
| mock_embedder = MagicMock() | ||
| mock_embedder_factory.create_embedder.return_value = mock_embedder | ||
|
|
||
| # Enable resume in config | ||
| self.config.resume_from_checkpoint = True | ||
|
|
||
| # Create workflow | ||
| workflow = ArxivSortedWorkflow(self.config) | ||
|
|
||
| # Get resume position | ||
| position = workflow._get_resume_position() | ||
|
|
||
| # Should resume from position 3 (last completed + 1) | ||
| self.assertEqual(position, 3) | ||
|
|
||
| # Verify query was executed | ||
| mock_db.aql.execute.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests target removed API _get_resume_position.
Replace with assertions around _get_unprocessed_records_sorted() or skip for now.
+ @unittest.skip("Resume position method removed; resume handled via query of unprocessed records")
def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory): | |
| """Test resume functionality from checkpoint.""" | |
| # Setup mocks | |
| mock_db = MagicMock() | |
| mock_collection = MagicMock() | |
| mock_order_collection = MagicMock() | |
| # Mock AQL query for resume position | |
| mock_cursor = iter([2]) # Resume from position 2 | |
| mock_db.aql.execute.return_value = mock_cursor | |
| mock_db.has_collection.return_value = True | |
| mock_db.collection.side_effect = lambda name: ( | |
| mock_collection if name == 'test_arxiv_papers' | |
| else mock_order_collection if name == 'arxiv_processing_order' | |
| else None | |
| ) | |
| mock_db_factory.get_arango.return_value = mock_db | |
| mock_embedder = MagicMock() | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder | |
| # Enable resume in config | |
| self.config.resume_from_checkpoint = True | |
| # Create workflow | |
| workflow = ArxivSortedWorkflow(self.config) | |
| # Get resume position | |
| position = workflow._get_resume_position() | |
| # Should resume from position 3 (last completed + 1) | |
| self.assertEqual(position, 3) | |
| # Verify query was executed | |
| mock_db.aql.execute.assert_called_once() | |
| @unittest.skip("Resume position method removed; resume handled via query of unprocessed records") | |
| def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory): | |
| """Test resume functionality from checkpoint.""" | |
| # Setup mocks | |
| mock_db = MagicMock() | |
| mock_collection = MagicMock() | |
| mock_order_collection = MagicMock() | |
| # Mock AQL query for resume position | |
| mock_cursor = iter([2]) # Resume from position 2 | |
| mock_db.aql.execute.return_value = mock_cursor | |
| mock_db.has_collection.return_value = True | |
| mock_db.collection.side_effect = lambda name: ( | |
| mock_collection if name == 'test_arxiv_papers' | |
| else mock_order_collection if name == 'arxiv_processing_order' | |
| else None | |
| ) | |
| mock_db_factory.get_arango.return_value = mock_db | |
| mock_embedder = MagicMock() | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder | |
| # Enable resume in config | |
| self.config.resume_from_checkpoint = True | |
| # Create workflow | |
| workflow = ArxivSortedWorkflow(self.config) | |
| # Get resume position | |
| position = workflow._get_resume_position() | |
| # Should resume from position 3 (last completed + 1) | |
| self.assertEqual(position, 3) | |
| # Verify query was executed | |
| mock_db.aql.execute.assert_called_once() |
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 184 to 219, the test calls
the removed private API _get_resume_position; update the test to exercise the
current API by calling _get_unprocessed_records_sorted() (or skip the test if
that behavior is no longer applicable). Change the test to: call
workflow._get_unprocessed_records_sorted(), assert the returned
iterable/sequence reflects the mocked DB state such that processing would resume
after index 2 (e.g., first unprocessed record corresponds to position 3 or the
returned list length/first item matches expectations), and keep the
mock_db.aql.execute assertion to verify the query ran; adjust the
mock_db.aql.execute return value to match the new method's expected result
shape.
| mock_embedder_class.return_value = mock_embedder | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined name mock_embedder_class.
Use the patched factory (mock_embedder_factory) instead.
- mock_embedder_class.return_value = mock_embedder
+ mock_embedder_factory.create_embedder.return_value = mock_embedder📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mock_embedder_class.return_value = mock_embedder | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder |
🧰 Tools
🪛 Ruff (0.12.2)
243-243: Undefined name mock_embedder_class
(F821)
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 243-244, the test references
an undefined name `mock_embedder_class`; replace that usage with the patched
factory `mock_embedder_factory` (i.e., set mock_embedder_factory.return_value =
mock_embedder) so the test uses the correct mock provided by the patching
fixture and remove or rename any lingering references to `mock_embedder_class`.
|
|
||
| mock_embedder = MagicMock() | ||
| mock_embedder.embed_batch.return_value = [[0.1] * 768] * 2 | ||
| mock_embedder_class.return_value = mock_embedder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined name mock_embedder_class.
Same fix as above.
- mock_embedder_class.return_value = mock_embedder
+ mock_embedder_factory.create_embedder.return_value = mock_embedder📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mock_embedder_class.return_value = mock_embedder | |
| mock_embedder_factory.create_embedder.return_value = mock_embedder |
🧰 Tools
🪛 Ruff (0.12.2)
352-352: Undefined name mock_embedder_class
(F821)
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around line 352, the name
mock_embedder_class is undefined; replace it with the correctly defined
fixture/name used elsewhere (mock_embedder_cls) or explicitly define
mock_embedder_class = mock_embedder before this line so that
mock_embedder_class.return_value = mock_embedder refers to a defined mock;
ensure the test uses the consistent mock variable name throughout.
| class TestIntegrationWorkflow(unittest.TestCase): | ||
| """Integration tests for the complete workflow.""" | ||
|
|
||
| @patch('core.workflows.workflow_arxiv_sorted.DatabaseFactory') | ||
| @patch('core.workflows.workflow_arxiv_sorted.EmbedderFactory') | ||
| def test_end_to_end_processing(self, mock_embedder_factory, mock_db_factory): | ||
| """Test complete workflow from file to database.""" | ||
| # Create larger test dataset | ||
| test_data = [] | ||
| for i in range(10): | ||
| test_data.append({ | ||
| "id": f"paper_{i}", | ||
| "title": f"Paper {i}", | ||
| "abstract": "x" * (i * 20 + 10), # Varying lengths | ||
| "authors": [f"Author {i}"], | ||
| "categories": ["cs.AI"] | ||
| }) | ||
|
|
||
| # Create temp file | ||
| temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) | ||
| for record in test_data: | ||
| temp_file.write(json.dumps(record) + '\n') | ||
| temp_file.close() | ||
|
|
||
| # Setup mocks | ||
| mock_db = MagicMock() | ||
| mock_collection = MagicMock() | ||
| mock_order_collection = MagicMock() | ||
|
|
||
| mock_db.has_collection.return_value = False | ||
| mock_db.create_collection.return_value = mock_order_collection | ||
| mock_db.collection.side_effect = lambda name: ( | ||
| mock_collection if name == 'test_papers' | ||
| else mock_order_collection if name == 'arxiv_processing_order' | ||
| else None | ||
| ) | ||
| mock_db_factory.get_arango.return_value = mock_db | ||
|
|
||
| mock_embedder = MagicMock() | ||
| mock_embedder.embed_batch.return_value = [[0.1] * 768] * 5 | ||
| mock_embedder_factory.create_embedder.return_value = mock_embedder | ||
|
|
||
| # Configure workflow | ||
| config = ArxivMetadataConfig( | ||
| metadata_file=Path(temp_file.name), | ||
| batch_size=5, | ||
| num_workers=1, | ||
| embedding_batch_size=5, | ||
| gpu_device='cpu', | ||
| use_gpu=False, | ||
| resume_from_checkpoint=False, | ||
| metadata_collection='test_papers', | ||
| embedder_model='sentence-transformers/all-MiniLM-L6-v2', | ||
| arango_database='test_db' | ||
| ) | ||
|
|
||
| # Run workflow | ||
| workflow = ArxivSortedWorkflow(config) | ||
| workflow.run() | ||
|
|
||
| # Verify processing order collection was created | ||
| mock_db.create_collection.assert_called_with('arxiv_processing_order') | ||
|
|
||
| # Verify all papers were processed | ||
| self.assertEqual(mock_collection.insert_many.call_count, 2) # 10 papers / batch_size 5 | ||
|
|
||
| # Verify papers were processed in size order | ||
| order_inserts = mock_order_collection.insert.call_args_list | ||
| positions = [call[0][0]['position'] for call in order_inserts] | ||
| self.assertEqual(positions, list(range(10))) # Should be 0-9 in order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration test expects arxiv_processing_order collection creation.
That contract was removed (query-based resume). Update assertion or gate behind feature flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review continued from previous batch...
| # Add project root to path | ||
| project_root = Path(__file__).parent.parent.parent | ||
| sys.path.insert(0, str(project_root)) | ||
|
|
||
| # Core infrastructure imports | ||
| from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult | ||
| from core.workflows.state import StateManager, CheckpointManager | ||
| from core.workflows.storage.storage_local import StorageManager | ||
| from core.monitoring.progress_tracker import ProgressTracker, ProgressState | ||
| from core.monitoring.performance_monitor import PerformanceMonitor | ||
| from core.database.database_factory import DatabaseFactory | ||
| from core.embedders.embedders_factory import EmbedderFactory | ||
| from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig | ||
| from arango.exceptions import DocumentInsertError, CollectionDeleteError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop sys.path hacks; switch to package‑relative imports
Manipulating sys.path breaks package semantics and causes import fragility. Use relative imports within core.workflows.
-# Add project root to path
-project_root = Path(__file__).parent.parent.parent
-sys.path.insert(0, str(project_root))
-
-# Core infrastructure imports
-from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
-from core.workflows.state import StateManager, CheckpointManager
-from core.workflows.storage.storage_local import StorageManager
+from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
+from .state import StateManager, CheckpointManager
+from .storage.storage_local import StorageManager
-from core.monitoring.progress_tracker import ProgressTracker, ProgressState
+from core.monitoring.progress_tracker import ProgressTracker, ProgressState
-from core.monitoring.performance_monitor import PerformanceMonitor
+from core.monitoring.performance_monitor import PerformanceMonitor
-from core.database.database_factory import DatabaseFactory
+from core.database.database_factory import DatabaseFactory
-from core.embedders.embedders_factory import EmbedderFactory
+from core.embedders.embedders_factory import EmbedderFactory
-from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
+from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
-from arango.exceptions import DocumentInsertError, CollectionDeleteError
+from arango.exceptions import CollectionDeleteError📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Add project root to path | |
| project_root = Path(__file__).parent.parent.parent | |
| sys.path.insert(0, str(project_root)) | |
| # Core infrastructure imports | |
| from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult | |
| from core.workflows.state import StateManager, CheckpointManager | |
| from core.workflows.storage.storage_local import StorageManager | |
| from core.monitoring.progress_tracker import ProgressTracker, ProgressState | |
| from core.monitoring.performance_monitor import PerformanceMonitor | |
| from core.database.database_factory import DatabaseFactory | |
| from core.embedders.embedders_factory import EmbedderFactory | |
| from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig | |
| from arango.exceptions import DocumentInsertError, CollectionDeleteError | |
| from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult | |
| from .state import StateManager, CheckpointManager | |
| from .storage.storage_local import StorageManager | |
| from core.monitoring.progress_tracker import ProgressTracker, ProgressState | |
| from core.monitoring.performance_monitor import PerformanceMonitor | |
| from core.database.database_factory import DatabaseFactory | |
| from core.embedders.embedders_factory import EmbedderFactory | |
| from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig | |
| from arango.exceptions import CollectionDeleteError |
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_metadata.py around lines 34 to 47, remove the
sys.path hack (project_root assignment and sys.path.insert) and convert the
absolute imports that reference the local package to package-relative imports
(use single-dot and double-dot relative imports from the current package: e.g.
from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult; from
.state import StateManager, CheckpointManager; from .storage.storage_local
import StorageManager; from ..monitoring.progress_tracker import
ProgressTracker, ProgressState; from ..monitoring.performance_monitor import
PerformanceMonitor; from ..database.database_factory import DatabaseFactory;
from ..embedders.embedders_factory import EmbedderFactory; from
..tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig). Keep the
third-party import (arango.exceptions) as-is.
| # Begin transaction | ||
| txn = db.begin_transaction( | ||
| write=[ | ||
| self.metadata_config.metadata_collection, | ||
| self.metadata_config.chunks_collection, | ||
| self.metadata_config.embeddings_collection | ||
| ] | ||
| ) | ||
|
|
||
| transaction_committed = False | ||
|
|
||
| try: | ||
| # txn is already the transaction database | ||
| txn_db = txn | ||
|
|
||
| # Store metadata records | ||
| for record in batch: | ||
| arxiv_id = record.get('id', '') | ||
| if not arxiv_id: | ||
| failed += 1 | ||
| continue | ||
|
|
||
| # Sanitize ID for ArangoDB key | ||
| sanitized_id = arxiv_id.replace('.', '_').replace('/', '_') | ||
|
|
||
| # Prepare metadata document | ||
| metadata_doc = { | ||
| '_key': sanitized_id, | ||
| 'arxiv_id': arxiv_id, | ||
| 'submitter': record.get('submitter'), | ||
| 'authors': record.get('authors'), | ||
| 'title': record.get('title'), | ||
| 'comments': record.get('comments'), | ||
| 'journal_ref': record.get('journal-ref'), | ||
| 'doi': record.get('doi'), | ||
| 'report_no': record.get('report-no'), | ||
| 'categories': record.get('categories'), | ||
| 'license': record.get('license'), | ||
| 'abstract': record.get('abstract'), | ||
| 'versions': record.get('versions', []), | ||
| 'update_date': record.get('update_date'), | ||
| 'authors_parsed': record.get('authors_parsed', []), | ||
| 'processed_at': datetime.now().isoformat(), | ||
| 'has_abstract': bool(record.get('abstract')) | ||
| } | ||
|
|
||
| txn_db.collection(self.metadata_config.metadata_collection).insert( | ||
| metadata_doc, overwrite=True | ||
| ) | ||
|
|
||
| # Store chunks and embeddings | ||
| for item in chunks_with_embeddings: | ||
| record = item['record'] | ||
| chunk = item['chunk'] | ||
| arxiv_id = record.get('id', '') | ||
| sanitized_id = arxiv_id.replace('.', '_').replace('/', '_') | ||
|
|
||
| # Create unique chunk ID | ||
| chunk_id = f"{sanitized_id}_chunk_{chunk.chunk_index}" | ||
|
|
||
| # Store chunk document | ||
| chunk_doc = { | ||
| '_key': chunk_id, | ||
| 'arxiv_id': arxiv_id, | ||
| 'paper_key': sanitized_id, | ||
| 'chunk_index': chunk.chunk_index, | ||
| 'total_chunks': chunk.total_chunks, | ||
| 'text': chunk.text, | ||
| 'start_char': chunk.start_char, | ||
| 'end_char': chunk.end_char, | ||
| 'start_token': chunk.start_token, | ||
| 'end_token': chunk.end_token, | ||
| 'context_window_used': chunk.context_window_used, | ||
| 'created_at': datetime.now().isoformat() | ||
| } | ||
|
|
||
| txn_db.collection(self.metadata_config.chunks_collection).insert( | ||
| chunk_doc, overwrite=True | ||
| ) | ||
|
|
||
| # Store embedding document | ||
| embedding_doc = { | ||
| '_key': f"{chunk_id}_emb", | ||
| 'chunk_id': chunk_id, | ||
| 'arxiv_id': arxiv_id, | ||
| 'paper_key': sanitized_id, | ||
| 'embedding': chunk.embedding.tolist(), # Convert numpy to list | ||
| 'embedding_dim': len(chunk.embedding), | ||
| 'model': self.metadata_config.embedder_model, | ||
| 'created_at': datetime.now().isoformat() | ||
| } | ||
|
|
||
| txn_db.collection(self.metadata_config.embeddings_collection).insert( | ||
| embedding_doc, overwrite=True | ||
| ) | ||
|
|
||
| # Commit transaction | ||
| txn.commit_transaction() | ||
| transaction_committed = True | ||
| successful = len(batch) | ||
|
|
||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit inside per‑record loop; transaction misuse and incorrect success count
You commit the transaction inside the loop, then continue using txn_db. This risks operations on a committed txn and misreports success as len(batch) even on partial failures.
- # Begin transaction
- txn = db.begin_transaction(
+ # Begin transaction
+ txn = db.begin_transaction(
write=[
self.metadata_config.metadata_collection,
self.metadata_config.chunks_collection,
self.metadata_config.embeddings_collection
]
)
-
- transaction_committed = False
+ transaction_committed = False
@@
- # Store metadata records
- for record in batch:
+ # Store metadata records
+ for record in batch:
arxiv_id = record.get('id', '')
if not arxiv_id:
- failed += 1
+ failed += 1
continue
@@
- txn_db.collection(self.metadata_config.metadata_collection).insert(
- metadata_doc, overwrite=True
- )
-
- # Store chunks and embeddings
- for item in chunks_with_embeddings:
- record = item['record']
- chunk = item['chunk']
- arxiv_id = record.get('id', '')
- sanitized_id = arxiv_id.replace('.', '_').replace('/', '_')
+ txn_db.collection(self.metadata_config.metadata_collection).insert(
+ metadata_doc, overwrite=True
+ )
+ # Store chunks and embeddings for THIS record only
+ record_chunks = chunks_by_key.get(sanitized_id, [])
+ for chunk in record_chunks:
# Create unique chunk ID
chunk_id = f"{sanitized_id}_chunk_{chunk.chunk_index}"
@@
- txn_db.collection(self.metadata_config.chunks_collection).insert(
- chunk_doc, overwrite=True
- )
+ txn_db.collection(self.metadata_config.chunks_collection).insert(chunk_doc, overwrite=True)
@@
- txn_db.collection(self.metadata_config.embeddings_collection).insert(
- embedding_doc, overwrite=True
- )
-
- # Commit transaction
- txn.commit_transaction()
- transaction_committed = True
- successful = len(batch)
+ txn_db.collection(self.metadata_config.embeddings_collection).insert(embedding_doc, overwrite=True)
+ successful += 1
+
+ # Commit transaction AFTER all writes
+ txn.commit_transaction()
+ transaction_committed = TrueCommittable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.12.2)
368-368: Do not catch blind exception: Exception
(BLE001)
| cursor = self.db.aql.execute(''' | ||
| FOR doc IN arxiv_abstract_embeddings | ||
| RETURN doc.arxiv_id | ||
| ''', ttl=300, batch_size=10000) # 5 minute timeout, large batch size | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix: use configured embeddings collection (resume query hardcodes collection).
Using 'arxiv_abstract_embeddings' ignores config and breaks resume for custom collection names.
Apply:
- cursor = self.db.aql.execute('''
- FOR doc IN arxiv_abstract_embeddings
- RETURN doc.arxiv_id
- ''', ttl=300, batch_size=10000) # 5 minute timeout, large batch size
+ cursor = self.db.aql.execute(f'''
+ FOR doc IN {self.metadata_config.embeddings_collection}
+ RETURN doc.arxiv_id
+ ''', ttl=300, batch_size=10000)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cursor = self.db.aql.execute(''' | |
| FOR doc IN arxiv_abstract_embeddings | |
| RETURN doc.arxiv_id | |
| ''', ttl=300, batch_size=10000) # 5 minute timeout, large batch size | |
| cursor = self.db.aql.execute(f''' | |
| FOR doc IN {self.metadata_config.embeddings_collection} | |
| RETURN doc.arxiv_id | |
| ''', ttl=300, batch_size=10000) |
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 645-649 the AQL
currently hardcodes the collection name 'arxiv_abstract_embeddings', which
ignores configured collection names and breaks resume for custom setups; replace
the hardcoded name with the configured collection variable (e.g.,
self.embeddings_collection or the config value used elsewhere), constructing the
AQL string using that variable (or validated/escaped value) instead of the
literal, or build the query with Python string formatting before passing to
self.db.aql.execute; keep the existing ttl and batch_size options unchanged and
ensure the collection name is validated to avoid injection.
| # Update counters | ||
| self.processed_count += len(batch) | ||
|
|
||
| # Update progress tracker less frequently for better performance | ||
| # Only update for larger batches to reduce overhead | ||
| if len(batch) >= 100: | ||
| self.progress_tracker.update_step( | ||
| "database_storage", | ||
| completed=len(batch) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Counting chunks as “records” inflates metrics; count unique papers instead.
processed_count and progress updates should reflect unique arxiv_ids, not number of chunks.
Apply:
- # Update counters
- self.processed_count += len(batch)
+ # Update counters by unique papers
+ unique_papers = len(papers_seen)
+ self.processed_count += unique_papers
@@
- if len(batch) >= 100:
+ if unique_papers >= 1:
self.progress_tracker.update_step(
"database_storage",
- completed=len(batch)
+ completed=unique_papers
)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 770 to 779, the code
increments processed_count and progress by len(batch) which counts chunks
instead of unique papers; change it to compute the number of unique arxiv_ids in
the batch (e.g. collect arxiv_id from each record and deduplicate) and use that
unique_count to increment self.processed_count and to pass
completed=unique_count to progress_tracker.update_step so metrics reflect unique
papers processed.
| def validate_inputs(self, **kwargs) -> bool: | ||
| """Validate workflow inputs.""" | ||
| return True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate inputs (Todd’s Law 1).
Implement minimal config/file checks instead of unconditional True to fail fast.
Apply:
- def validate_inputs(self, **kwargs) -> bool:
- """Validate workflow inputs."""
- return True
+ def validate_inputs(self, **kwargs) -> bool:
+ """Validate workflow inputs."""
+ errors = []
+ mf = Path(self.metadata_config.metadata_file)
+ if not mf.exists():
+ errors.append(f"metadata_file missing: {mf}")
+ if self.metadata_config.embedding_batch_size <= 0:
+ errors.append("embedding_batch_size must be > 0")
+ if self.metadata_config.batch_size <= 0:
+ errors.append("batch_size must be > 0")
+ if self.metadata_config.chunk_overlap_tokens >= self.metadata_config.chunk_size_tokens:
+ errors.append("chunk_overlap_tokens must be < chunk_size_tokens")
+ if errors:
+ self.logger.error("input_validation_failed", errors=errors)
+ return False
+ return TrueCommittable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.12.2)
1014-1014: Unused method argument: kwargs
(ARG002)
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 1014-1017, replace the
unconditional "return True" with concrete validation: ensure expected kwargs
keys exist (e.g., 'config', 'input_file', 'output_dir' or the actual names used
by this workflow), verify that file paths point to existing files/directories
(use os.path.exists/os.path.isfile/os.path.isdir), check basic config sanity
(required keys present and simple type/value checks like positive ints for
worker counts), and on any failure log a clear message via the workflow logger
(self.logger.error) and return False; otherwise return True.
| if dimensions: | ||
| stats['embedding_dimensions'] = dimensions | ||
| expected_dim = 2048 # Jina v4 expected dimension | ||
|
|
||
| print(f"\n🎯 Embedding Validation:") | ||
| print(f" Dimensions found: {dimensions}") | ||
| print(f" Expected (Jina v4): {expected_dim}") | ||
|
|
||
| if all(d == expected_dim for d in dimensions): | ||
| print(f" ✅ All embeddings have correct dimensions") | ||
| else: | ||
| print(f" ⚠️ Dimension mismatch detected!") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard‑coded expected_dim=2048 will flag valid non‑v4 runs
The ingestion may use v3 or other models; infer expected dims from data or config instead of hard‑coding 2048.
- if dimensions:
- stats['embedding_dimensions'] = dimensions
- expected_dim = 2048 # Jina v4 expected dimension
+ if dimensions:
+ stats['embedding_dimensions'] = dimensions
+ # Derive expected dims per model from data
+ model_dims = list(self.db.aql.execute(f"""
+ FOR e IN {self.embeddings_collection}
+ LIMIT 1000
+ RETURN DISTINCT {{model: e.model, dim: e.embedding_dim}}
+ """))
+ print(f"\n🎯 Embedding Validation:")
+ print(f" Dimensions found: {dimensions}")
+ print(f" Models/dims sample: {model_dims}")
+ expected_dim = model_dims[0]['dim'] if model_dims else dimensions[0]
print(f"\n🎯 Embedding Validation:")
print(f" Dimensions found: {dimensions}")
- print(f" Expected (Jina v4): {expected_dim}")
+ print(f" Expected: {expected_dim}")Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.12.2)
297-297: f-string without any placeholders
Remove extraneous f prefix
(F541)
302-302: f-string without any placeholders
Remove extraneous f prefix
(F541)
304-304: f-string without any placeholders
Remove extraneous f prefix
(F541)
| print_info "Loading credentials from .env file..." | ||
| export $(grep -v '^#' .env | xargs) | ||
| ROOT_PASSWORD="${ARANGO_PASSWORD:-}" | ||
| fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sourcing .env is unsafe; avoid word‑splitting and code injection
export $(grep …) is brittle. Use POSIX‑safe auto‑export.
- export $(grep -v '^#' .env | xargs)
- ROOT_PASSWORD="${ARANGO_PASSWORD:-}"
+ # shellcheck disable=SC1091
+ set -a
+ . ./.env
+ set +a
+ ROOT_PASSWORD="${ARANGO_PASSWORD:-}"🧰 Tools
🪛 Shellcheck (0.10.0)
[warning] 96-96: Quote this to prevent word splitting.
(SC2046)
🤖 Prompt for AI Agents
In setup_database.sh around lines 95-98, replace the unsafe "export $(grep -v
'^#' .env | xargs)" pattern with a POSIX-safe parser that reads .env
line-by-line, skips blank/comment lines, splits on the first '=' using IFS and
read -r to avoid word-splitting and code execution, strips optional surrounding
quotes from values, and exports each key=value safely; then assign
ROOT_PASSWORD="${ARANGO_PASSWORD:-}" as before. Ensure the loop uses no eval,
handles empty values, and preserves literal characters by using IFS='=' and read
-r key val.
Feature: Dual Embedders with Late Chunking
Summary
This PR implements dual embedder support with mandatory late chunking for ArXiv metadata processing, achieving production-ready throughput of 40+ papers/second.
Conveyance Summary
C = (W·R·H/T)·Ctx^α
This implementation maximizes conveyance through:
W/R/H/T Mapping
W (What) - Signal Quality
R (Where) - Relational Positioning
H (Who) - Agent Capability
T (Time) - Convergence Speed
Performance Evidence
Production run results:
Tests & Compatibility
Test Coverage
tests/test_workflow_arxiv_sorted.py- Workflow integration teststests/core/embedders/test_benchmark_embedders.py- Embedder performance benchmarkstests/test_unix_socket.py- Unix socket connectivity teststests/test_resume.py- Checkpoint/resume functionalityCompatibility
Changes Implemented
Core Features
Key Files
core/workflows/workflow_arxiv_*.py- Workflow implementationscore/embedders/embedders_*.py- Embedder implementationscore/database/arango_unix_client.py- Unix socket supportdev-utils/monitor_*.py- Monitoring toolsFixes #42
🤖 Generated with Claude Code
Co-Authored-By: Claude noreply@anthropic.com