Skip to content

Conversation

@r3d91ll
Copy link
Owner

@r3d91ll r3d91ll commented Sep 17, 2025

Feature: Dual Embedders with Late Chunking

Summary

This PR implements dual embedder support with mandatory late chunking for ArXiv metadata processing, achieving production-ready throughput of 40+ papers/second.

Conveyance Summary

C = (W·R·H/T)·Ctx^α

This implementation maximizes conveyance through:

  • Late chunking preserves Ctx - avoiding zero-propagation by maintaining context across chunk boundaries
  • Dual embedders optimize H - choice between Jina v4 (quality) and SentenceTransformers (speed)
  • Size-sorted processing improves R - optimal GPU cache locality through abstract size ordering
  • Parallel workflows reduce T - multi-GPU scaling for throughput

W/R/H/T Mapping

W (What) - Signal Quality

  • Jina v4 embeddings with 2048 dimensions
  • 32k token context window
  • Late chunking preserves semantic relationships

R (Where) - Relational Positioning

  • Size-sorted processing for GPU cache efficiency
  • ArangoDB graph structure for metadata/chunks/embeddings
  • Unix socket connectivity for lower latency

H (Who) - Agent Capability

  • Dual embedder architecture (Jina v4 + SentenceTransformers)
  • Multi-GPU parallel processing
  • Worker-based architecture with configurable parallelism

T (Time) - Convergence Speed

  • 40+ papers/second throughput achieved
  • 17-hour full corpus processing (2.8M papers)
  • Atomic transactions prevent partial updates

Performance Evidence

Production run results:

  • Throughput: 48-60 papers/second with dual A6000 GPUs
  • Success rate: 99.93% (2,074 failures out of 2.8M)
  • Processing time: 17 hours for complete ArXiv corpus
  • Memory usage: 7-8GB VRAM per worker with fp16

Tests & Compatibility

Test Coverage

  • tests/test_workflow_arxiv_sorted.py - Workflow integration tests
  • tests/core/embedders/test_benchmark_embedders.py - Embedder performance benchmarks
  • tests/test_unix_socket.py - Unix socket connectivity tests
  • tests/test_resume.py - Checkpoint/resume functionality

Compatibility

  • Python 3.12+
  • CUDA 12.1+ (for GPU acceleration)
  • ArangoDB 3.11+
  • 16GB+ VRAM recommended per GPU

Changes Implemented

Core Features

  • ArXiv metadata ingestion workflows (single/parallel/sorted)
  • Dual embedder architecture with late chunking
  • Unix socket ArangoDB connectivity
  • Centralized logging with structlog
  • Checkpoint/resume support
  • Progress monitoring utilities

Key Files

  • core/workflows/workflow_arxiv_*.py - Workflow implementations
  • core/embedders/embedders_*.py - Embedder implementations
  • core/database/arango_unix_client.py - Unix socket support
  • dev-utils/monitor_*.py - Monitoring tools

Fixes #42

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com

r3d91ll and others added 16 commits September 13, 2025 21:39
- Add SentenceTransformersEmbedder for high-throughput (48+ papers/sec)
- Maintain JinaV4Embedder for sophisticated processing
- Update factory to support both implementations
- Add mandatory late chunking principle to CLAUDE.md
- Create benchmark comparison script
- Both implementations enforce late chunking

Performance targets:
- SentenceTransformers: 48+ papers/second
- Transformers: 8-15 papers/second (current)

Closes #42
- Replace encode_text with encode method for Jina v3 compatibility
- Fix task parameter defaults to use 'retrieval.passage'
- Add fallback for models without encode method
- Ensure both embedders work with factory pattern
- Upgrade sentence-transformers from 2.7.0 to 4.1.0
- Update pyproject.toml dependency to allow v4.1.0
- Fix SentenceTransformersEmbedder to work with Jina v4
- Update task mapping for Jina v4 API (retrieval, text-matching, code)
- Default to Jina v4 model for 2048-dim embeddings
- Verified working with CPU tests (7.6 texts/sec)

Ready for GPU testing to achieve 48+ papers/sec target
…v_daily.py as they are no longer needed for the current pipeline.
…essing

- Introduced `arxiv_pipeline.py` to manage a two-phase processing system:
  - Phase 1: Extraction of PDFs to JSON using GPU-accelerated Docling.
  - Phase 2: Embedding staged JSONs and storing results in ArangoDB using Jina.

- Enhanced logging and error handling throughout the pipeline.
- Implemented metadata enrichment for extracted documents.
- Added configuration management for flexible pipeline setup.
- Refactored existing code to accommodate new architecture, including moving `ArangoDBManager` import to a new module.
… implementation, and filesystem citation extraction. These scripts demonstrated the use of the Academic Citation Toolkit with various document sources and storage systems, but are no longer needed.
…rkflows, phase 4 integration, robust extraction, and simple Tree-sitter extraction. These deletions streamline the test suite by eliminating outdated or redundant tests, ensuring a more maintainable and efficient testing process.
- Introduced a new script `check_arxiv_db.py` for verifying ArangoDB collections related to ArXiv metadata processing.
- Implemented connection handling, collection checks, and detailed statistics reporting.
- Added sample document checks for metadata, chunks, and embeddings collections.
- Removed obsolete `test_mcp_env.sh` script.
- Updated import paths in various test files to reflect the new project structure.
- Created new tests for ArXiv metadata processing with 1000 records and 100k records on dual GPUs.
- Added a test for default configuration validation and workflow execution.
- Enhanced error handling and logging throughout the new tests.
- Fixed worker config to use embedding_batch_size instead of batch_size
- Aligned chunk size parameters (500 tokens, 200 overlap) across all components
- Added FP16 support with Flash Attention 2 detection
- Implemented fast path for abstracts that fit in single chunks
- Fixed Jina v4 output extraction to use single_vec_emb attribute
- Added comprehensive monitoring utilities

Performance improved from 3.3 to 36+ docs/second with batch size 24
Stable processing of 2.8M ArXiv abstracts with dual GPU utilization

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Reduced progress tracker update frequency (10x less updates)
- Reduced monitoring log frequency (from 100 to 1000 records)
- Queue sizes already optimized to 500 (user change)
- Storage batch size already optimized to 500 (user change)
- Fast path logging already removed (user change)

These optimizations should gain 6-10 docs/second:
- Progress updates: +2-3 docs/sec
- Logging reduction: +1-2 docs/sec
- Queue/batch optimizations: +3-5 docs/sec (already done)

Target: 40+ docs/second (from current 34)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Removed torch.cuda.empty_cache() after every batch (HUGE win)
- Commented out excessive logging in embedders_jina.py hot path
- Disabled performance monitor to eliminate warning spam
- These were adding 300-1500ms overhead per second!

Key changes:
- No more GPU cache clearing (PyTorch manages memory efficiently)
- No more logging 30+ times/second in embedding loop
- No more performance monitor overhead and warnings

Expected gains: +8-10 docs/second
Target: 40-45 docs/second sustained throughput

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
CodeRabbit identified a serious issue where we were creating a full JSON
copy of all records just to measure memory usage. This DOUBLES memory
consumption and can cause OOM errors.

Fixed by:
- Using psutil to get actual process RSS when available
- Falling back to estimation (2KB per record) instead of serialization
- Avoids creating duplicate copies of large datasets

This prevents OOM errors when processing large batches and reduces
peak memory usage by ~50% during measurement.

Fixes CodeRabbit review comment about memory duplication.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Removed test workflow function from `workflow_arxiv_metadata.py`.
- Enhanced `workflow_arxiv_parallel.py` to support resuming from checkpoints, including loading existing IDs and skipping already processed records.
- Updated logging to include skipped records and improved progress reporting.
- Added a new `notes.md` file with instructions for data integration and graph rebuilding.
- Created `test_resume.py` to validate the resume functionality of the Arxiv workflow.
- Introduced benchmarking tests for dual embedder implementations in `test_benchmark_embedders.py`.
- Added unit tests for Jina v4 embedder in `test_embedders_jina.py`.
- Implemented a test suite for ArXiv Metadata Workflow in `test_workflow_arxiv_metadata.py`.
- Developed a verification script `verify_storage.py` to ensure records are stored correctly in the database.
- Implemented a test script for the size-sorted workflow in `test_sorted_workflow.sh`, verifying the setup and execution of the workflow with a sample dataset.
- Created `test_unix_socket.py` to test Unix socket connections to ArangoDB, comparing performance with HTTP connections and ensuring proper error handling.
- Developed comprehensive unit tests for `ArxivSortedWorkflow` in `test_workflow_arxiv_sorted.py`, covering sorting, processing order storage, checkpoint functionality, and batch processing logic.
- Added unique method tests for `ArxivSortedWorkflow` in `test_workflow_arxiv_sorted_unique.py`, focusing on methods specific to the sorted workflow.
- Included edge case tests in `test_workflow_arxiv_sorted_unique.py` to handle empty abstracts and missing fields.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

Walkthrough

Extensive repo update adding ArXiv metadata ingestion workflows (single-/multi-GPU), dual embedders with late chunking, Unix-socket ArangoDB connectivity, centralized logging, configurations/CLIs, and multiple dev/test utilities. Several legacy compatibility layers and ArXiv Postgres tooling/docs are removed. Tests are overhauled; setup scripts and configs added.

Changes

Cohort / File(s) Summary
Docs & Guidance
CLAUDE.md, core/workflows/arxiv_ingestion.md, docs/prd/dual_embedders_late_chunking_prd.md, notes.md, tools/arxiv/CLAUDE.md (removed), tools/arxiv/README.md (removed)
Major rewrite emphasizing late chunking, dual embedders, workflows/architecture; added ingestion doc and PRD; removed legacy ArXiv tool docs; minor formatting fixes.
Configs & Defaults
core/config/workflows/arxiv_metadata_default.yaml, core/tools/arxiv/configs/metadata_pipeline.yaml, config/.gitignore, setup_database.config.example, pyproject.toml
Added default YAMLs for ArXiv metadata workflows; gitignore for secrets; setup template; dependency bump (sentence-transformers ^4.1.0) and add flash-attn.
Database: Arango Unix Socket & Factory
core/database/arango/arango_unix_client.py, core/database/arango_unix_client.py, core/database/database_factory.py
Introduced Unix-socket client(s), HTTP fallback helpers, and factory integration via get_database_for_workflow(prefer_unix=True); connection-info/reporting helpers; error handling.
Embedders: Dual + Late Chunking
core/embedders/embedders_factory.py, core/embedders/embedders_jina.py, core/embedders/embedders_sentence.py
Factory selection revised; Jina v4 path standardized to retrieval.passage, batching and late-chunking APIs adjusted; new SentenceTransformers embedder with late chunking and properties; removed Jina test stub.
Logging & Monitoring
core/logging/logging.py, core/monitoring/__init__.py, core/monitoring/compat_monitor.py (removed), core/monitoring/migration_guide.py (removed)
Added structlog-based centralized logging; removed legacy monitor exports and compatibility/migration guides; updated public monitoring surface.
Processors Public API Cleanup
core/processors/__init__.py, core/processors/document_processor.py (removed)
Streamlined exports to newer chunking strategies; removed deprecated symbols and lazy back-compat; deleted deprecated document processor shim.
Workflows: ArXiv (single/parallel/sorted)
core/workflows/__init__.py, core/workflows/workflow_arxiv_metadata.py, core/workflows/workflow_arxiv_parallel.py, core/workflows/workflow_arxiv_sorted.py, core/workflows/workflow_arxiv_sorted_simple.py
Added single-GPU streaming metadata workflow, multi-GPU parallel, and size-sorted variants; atomic DB transactions, checkpoints/resume, progress/metrics; public export for metadata workflow.
ArXiv Tools & Config
core/tools/arxiv/README.md, core/tools/arxiv/arxiv_manager.py, core/tools/arxiv/arxiv_metadata_config.py, core/tools/arxiv/metadata_processor.py, core/tools/arxiv/configs/...
Added config class with validations/YAML loader; CLI metadata processor; updated manager import source; README for tools; new pipeline config.
Dev Utilities & Monitors
dev-utils/check_arxiv_db.py, dev-utils/create_arxiv_repository_db.py, dev-utils/monitor_arxiv_progress.py, dev-utils/monitor_sorted.py, dev-utils/monitor_workflow_logs.py, dev-utils/setup_arxiv_database.py, dev-utils/setup_arxiv_database_simple.py, dev-utils/simple_monitor.py
New DB provisioning scripts, progress/log monitors, verification tools, and simplified/production DB setup automation.
Setup & Shell Orchestration
setup_database.sh, tests/arxiv/run_large_scale_test.sh, tests/test_sorted_production.sh, tests/test_sorted_simple.sh, tests/test_sorted_workflow.sh, test_mcp_env.sh (removed)
Added automated DB setup and test runners; removed old environment check script; assorted CLI flows with env handling.
Tests: ArXiv Workflows & Embedders
tests/core/embedders/test_benchmark_embedders.py, tests/core/embedders/test_embedders_jina.py, tests/core/workflows/test_workflow_arxiv_metadata.py, tests/arxiv/test_metadata_1000.py, tests/arxiv/test_metadata_100k_dual_gpu.py, tests/test_unix_socket.py, tests/test_resume.py
New benchmarks, workflow tests, Unix-socket connectivity test, and resume/scale harnesses.
Tests: Adjustments & Removals
tests/arxiv/acid/test_* (imports/collection tweak), tests/debug_extraction_failures.py (import), tests/test_document_processor.py (imports), tests/arxiv/validate_pipeline.py (import), tests/test_github_* (removed), tests/test_overnight_* (removed), tests/test_phase* (removed), tests/test_treesitter_simple.py (removed)
Updated imports/targets to new paths; removed legacy GitHub/overnight/phase integration tests.
Removed ArXiv Postgres Tooling
tools/arxiv/db/* (multiple files removed)
Deleted Postgres config, OAI harvest, ID export, and compute assessment tools; removed package exports and Alembic config.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor CLI as CLI/Runner
  participant CFG as ArxivMetadataConfig
  participant WF as ArxivMetadataWorkflow
  participant DB as DatabaseFactory/Arango
  participant EMB as EmbedderFactory/Embedder

  CLI->>CFG: Load YAML + overrides
  CLI->>WF: Instantiate(config)
  WF->>DB: get_arango (Unix socket preferred)
  WF->>EMB: build embedder (model, fp16, device)
  loop Stream JSONL
    WF->>WF: Collect batch (respect max_records/resume)
    WF->>EMB: embed_batch_with_late_chunking(abstracts)
    WF->>DB: Transaction: insert metadata, chunks, embeddings
    WF-->>WF: Update progress/checkpoint
  end
  WF-->>CLI: WorkflowResult (success, counts, throughput)
Loading
sequenceDiagram
  autonumber
  participant Main as Orchestrator
  participant QIN as InputQueue
  participant W0 as Worker[GPU0]
  participant W1 as Worker[GPU1]
  participant QOUT as OutputQueue
  participant ST as StorageThread
  participant DB as ArangoDB

  Main->>QIN: Enqueue metadata batches
  par Parallel workers
    W0->>QIN: Dequeue batch
    W0->>W0: Late-chunk embed
    W0->>QOUT: Emit chunks+embeddings
    W1->>QIN: Dequeue batch
    W1->>W1: Late-chunk embed
    W1->>QOUT: Emit chunks+embeddings
  end
  ST->>QOUT: Consume results
  ST->>DB: Transactional insert (metadata/chunks/embeddings)
  ST-->>Main: Progress updates
Loading
sequenceDiagram
  autonumber
  participant Client as get_database_for_workflow
  participant FS as UnixSocket
  participant AHTTP as Arango HTTP
  participant ADB as ArangoClient

  Client->>FS: Detect socket (exists, is_socket)
  alt Socket available
    Client->>ADB: Build URL (http+unix or HTTP fallback)
  else No socket
    Client->>ADB: Use HTTP host (localhost:8529)
  end
  Client->>ADB: Connect (db/user/pass)
  ADB-->>Client: Handle (StandardDatabase)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Possibly related issues

  • r3d91ll/HADES#9 — Adds Arango Unix clients and workflow atomicity; this PR implements Unix-socket clients and ArXiv workflows with transactional storage.
  • feat: Implement dual embedders with mandatory late chunking #42 — Dual embedders with mandatory late chunking; this PR adds SentenceTransformersEmbedder, updates Jina embedder, and enforces late chunking.

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Pr Description Sections ⚠️ Warning I reviewed the PR body and the provided AI-generated summary of changes; while the PR touches the Conveyance Framework in code/docs, the PR description itself does not include the required sections. There is no explicit Conveyance Summary, no W/R/H/T Mapping, no consolidated Performance Evidence (with metrics/hardware/method), and no Tests & Compatibility section summarizing test coverage, breaking changes, or migration guidance. Therefore, the PR description is missing all four mandated sections. Please edit the PR description to add the four sections. Suggested template: 1) Conveyance Summary: one-paragraph summary of how this change affects C = (W·R·H/T)·Ctx^α (what improved, why). 2) W/R/H/T Mapping: brief bullets mapping code changes to W, R, H, T and Ctx, with α used. 3) Performance Evidence: dataset, hardware, method, metrics (throughput, latency, memory), before/after numbers and sample logs; note variability. 4) Tests & Compatibility: what tests were added/updated, pass status, known breaking changes (e.g., removed legacy exports), migration notes, config defaults, and dependency changes (e.g., sentence-transformers v4.x, optional flash-attn). After adding these, re-request this check.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Feature/dual embedders late chunking 42" accurately identifies the primary change in this PR — introducing dual embedders and mandating late-chunking — which matches the large embedder, workflow, config, and documentation changes in the diff. It is concise and on-topic, providing a reviewer with a clear sense of the main feature. The branch-style "Feature/" prefix and trailing "42" are minor noise but do not make the title misleading.
Docstring Coverage ✅ Passed Docstring coverage is 94.29% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/dual-embedders-late-chunking-42

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 47

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (13)
tests/debug_extraction_failures.py (1)

44-46: Prefer core/tools config path with robust fallback

tests/debug_extraction_failures.py lines 44–46 — found core/tools/arxiv/configs/acid_pipeline_phased.yaml in this branch; the hardcoded 'tools/arxiv/...' will 404. Replace with a fallback lookup:

-    config_path = 'tools/arxiv/configs/acid_pipeline_phased.yaml'
-    config = load_config(config_path)
+    candidates = [
+        project_root / 'core' / 'tools' / 'arxiv' / 'configs' / 'acid_pipeline_phased.yaml',
+        project_root / 'tools' / 'arxiv' / 'configs' / 'acid_pipeline_phased.yaml',
+    ]
+    for p in candidates:
+        if p.exists():
+            config_path = p
+            break
+    else:
+        logger.error("Config not found in: %s", candidates)
+        return
+    config = load_config(str(config_path))
tests/arxiv/acid/test_integration.py (1)

27-34: Make this a real pytest test (assert/skip instead of returning booleans).
Returning False doesn’t fail in pytest. Use assertions and pytest.skip when prerequisites are missing.

Apply:

+import pytest
@@
-    if not arango_password:
-        logger.error("ARANGO_PASSWORD environment variable not set")
-        return False
+    if not arango_password:
+        pytest.skip("ARANGO_PASSWORD not set; skipping integration test")
@@
-    return all(status in ['processed', 'already_processed'] for status in results.values())
+    assert all(status in ['processed', 'already_processed'] for status in results.values())

Also applies to: 88-88

core/tools/arxiv/arxiv_manager.py (1)

413-472: Persist within a single transaction and align schema with arxiv_chunks/arxiv_embeddings.
Current writes are non-atomic and skip arxiv_chunks; embeddings schema differs from contract (vector/model/chunk_id).

Apply (abridged) transactional refactor:

-            # Store main paper document
-            self.db_manager.insert_document('arxiv_papers', arxiv_doc)
-
-            # Store chunks with embeddings
-            for chunk in result.chunks:
-                chunk_doc = {
-                    '_key': f"{paper_info.sanitized_id}_chunk_{chunk.chunk_index}",
-                    'paper_id': paper_info.sanitized_id,
-                    'arxiv_id': paper_info.arxiv_id,
-                    'chunk_index': chunk.chunk_index,
-                    'text': chunk.text,
-                    'embedding': chunk.embedding.tolist(),
-                    'start_char': chunk.start_char,
-                    'end_char': chunk.end_char,
-                    'context_window_used': chunk.context_window_used
-                }
-                self.db_manager.insert_document('arxiv_embeddings', chunk_doc)
+            txn = self.db_manager.begin_transaction(
+                write_collections=['arxiv_papers','arxiv_chunks','arxiv_embeddings','arxiv_structures']
+            )
+            # main paper
+            txn.collection('arxiv_papers').insert(arxiv_doc, overwrite=False)
+            # chunks (text)
+            for chunk in result.chunks:
+                chunk_key = f"{paper_info.sanitized_id}_chunk_{chunk.chunk_index}"
+                txn.collection('arxiv_chunks').insert({
+                    '_key': chunk_key,
+                    'paper_id': paper_info.sanitized_id,
+                    'arxiv_id': paper_info.arxiv_id,
+                    'chunk_index': chunk.chunk_index,
+                    'text': chunk.text,
+                    'start_char': chunk.start_char,
+                    'end_char': chunk.end_char,
+                    'context_window_used': chunk.context_window_used
+                })
+                # embeddings (vector-only)
+                txn.collection('arxiv_embeddings').insert({
+                    '_key': f"{chunk_key}_emb",
+                    'paper_id': paper_info.sanitized_id,
+                    'chunk_id': chunk_key,
+                    'arxiv_id': paper_info.arxiv_id,
+                    'vector': (chunk.embedding.tolist() if hasattr(chunk.embedding, 'tolist') else list(chunk.embedding)),
+                    'model': result.processing_metadata.get('embedding_model', 'jina-v4')
+                })
+            txn.commit_transaction()
@@
-                self.db_manager.insert_document('arxiv_structures', structures_doc)
+                txn = txn if 'txn' in locals() else self.db_manager.begin_transaction(write_collections=['arxiv_structures'])
+                txn.collection('arxiv_structures').insert(structures_doc, overwrite=True)
+                txn.commit_transaction()
tests/arxiv/acid/test_acid_compliance.py (2)

396-404: Use seconds for expiresAt to match TTL index semantics.
Arango TTL index expects epoch seconds; switching to ms desynchronizes auto-cleanup and conflicts with code writing seconds elsewhere.

-# Worker 1 acquires lock (TTL needs Unix timestamp in milliseconds)
-expiry_time = int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000)
+# Worker 1 acquires lock (Unix timestamp in seconds)
+expiry_time = int((datetime.now() + timedelta(seconds=2)).timestamp())
@@
-    'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000)
+    'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp())
@@
-    current_time = int(datetime.now().timestamp() * 1000)
+    current_time = int(datetime.now().timestamp())
@@
-    'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp() * 1000)
+    'expiresAt': int((datetime.now() + timedelta(seconds=2)).timestamp())

Also applies to: 412-413, 423-427, 441-442


221-313: Normalize expiresAt to milliseconds across tests.

tests/arxiv/acid/test_acid_compliance.py (lines 221–313) sets expiresAt with seconds (int(...timestamp())), while other lock usages in the same file use milliseconds at lines 397, 412, 426, 441 (int(...timestamp() * 1000)). Change the seconds-based assignment to milliseconds (int(...timestamp() * 1000)) to avoid TTL/unit mismatches and flaky tests.

tests/arxiv/acid/test_acid_pipeline.py (4)

176-181: Fix AQL collection name: use arxiv_chunks.
Querying 'chunks' will 404; schema uses arxiv_chunks.

-            chunks_collection = self.processor.db.collection('arxiv_chunks')
-            chunk_count = len(list(self.processor.db.aql.execute(
-                "FOR c IN chunks FILTER c.paper_id == @id RETURN c",
+            chunks_collection = self.processor.db.collection('arxiv_chunks')
+            chunk_count = len(list(self.processor.db.aql.execute(
+                "FOR c IN arxiv_chunks FILTER c.paper_id == @id RETURN c",
                 bind_vars={'id': sanitized_id}
             )))

233-238: Fix AQL collection name: use arxiv_papers.

-            cursor = self.processor.db.aql.execute(
-                "FOR p IN papers FILTER p.arxiv_id == @id RETURN p",
+            cursor = self.processor.db.aql.execute(
+                "FOR p IN arxiv_papers FILTER p.arxiv_id == @id RETURN p",
                 bind_vars={'id': paper_id}
             )

308-315: Use epoch seconds for lock TTL and match manager semantics.
ISO string won’t be honored by TTL index configured for seconds.

-            locks_collection.insert({
+            locks_collection.insert({
                 '_key': sanitized_id,
                 'paper_id': paper_id,
                 'worker_id': 99999,
                 'acquired_at': datetime.now().isoformat(),
-                'expiresAt': (datetime.now() - timedelta(seconds=1)).isoformat()  # Already expired
+                'expiresAt': int((datetime.now() - timedelta(seconds=1)).timestamp())  # Already expired
             })

382-386: Fix OnDemandProcessor Arango config shape.
Pass only DB params with correct keys; current code passes mismatched keys (arango_host vs host).

-            on_demand_config = {
+            on_demand_config = {
                 'cache_root': self.test_pdf_dir,
                 'sqlite_db': self.test_pdf_dir / 'test_cache.db',
-                'arango': self.config
+                'arango': {
+                    'host': self.config.get('arango_host', 'http://127.0.0.1:8529').split(','),
+                    'database': self.config['database'],
+                    'username': self.config['username'],
+                    'password': self.config['password'],
+                }
             }
core/tools/arxiv/arxiv_pipeline.py (1)

2-18: Docstring must tie to Conveyance framework with the correct efficiency formula.

Add the required theory note and correct equation: C = (W·R·H / T) · Ctx^α (α applies only to Ctx).

 """
 ACID Pipeline with Phase-Separated Architecture
 ===============================================
 
@@
 - Full ACID compliance with atomic transactions
+
+Conveyance (Efficiency stance):
+- Efficiency C = (W·R·H / T) · Ctx^α, with α∈[1.5,2.0] (default 1.7) applied only to Ctx.
+- W (What): extraction fidelity & embedding quality; R (Where): DB topology/locks; H (Who): worker/agent throughput; T (Time): end-to-end convergence time.
+- Zero‑propagation: guard against any W/R/H→0 and avoid double-counting T.
 """
tests/arxiv/test_large_scale_processing.py (2)

419-437: Unmatched except and mis-indentation break the module (SyntaxError).

except Exception as e: at Line 437 has no corresponding try:. The test calls are also over‑indented.

Apply:

-        # Run tests
-            self.test_performance_benchmarks(arxiv_ids, sample_size=100)
+        # Run tests
+        try:
+            self.test_performance_benchmarks(arxiv_ids, sample_size=100)
@@
-            self.test_batch_processing(arxiv_ids[:batch_size_limit], batch_size=100)
+            self.test_batch_processing(arxiv_ids[:batch_size_limit], batch_size=100)
@@
-            if error_test_start < error_test_end:
-                self.test_error_recovery(arxiv_ids[error_test_start:error_test_end])
+            if error_test_start < error_test_end:
+                self.test_error_recovery(arxiv_ids[error_test_start:error_test_end])
@@
-            self.test_error_recovery(arxiv_ids[1000:1050])
+            self.test_error_recovery(arxiv_ids[1000:1050])
@@
-            self.test_database_integrity()
+            self.test_database_integrity()
         except Exception as e:
             logger.error(f"Test suite failed: {e}")
             self.test_results['errors'].append({
                 'test': 'suite',
                 'error': str(e)
             })

246-249: Database integrity queries don’t match stored schema/collections.

Collections/fields mismatch the workflows (“arxiv_abstract_embeddings”, chunk.paper_key, embedding field name). Queries currently undercount and misclassify.

Align with workflow schema:

- integrity_checks['embeddings_count'] = db.collection('arxiv_embeddings').count()
+ integrity_checks['embeddings_count'] = db.collection('arxiv_abstract_embeddings').count()
@@
- LET paper = DOCUMENT('arxiv_papers', chunk.paper_id)
+ LET paper = DOCUMENT('arxiv_papers', chunk.paper_key)
@@
- LET chunk = DOCUMENT('arxiv_chunks', embedding.chunk_id)
+ LET chunk = DOCUMENT('arxiv_chunks', embedding.chunk_id)
   FILTER chunk == null
@@
- FILTER e.chunk_id == chunk._id
+ FILTER e.chunk_id == chunk._key
@@
- RETURN LENGTH(embedding.vector)
+ RETURN LENGTH(embedding.embedding)

Optional: replace the “processing_status” aggregation (no status field) with something meaningful, e.g., has_abstract.

Also applies to: 250-257, 262-271, 274-283, 288-295

core/database/arango/arango_unix_client.py (1)

1-200: Missing helper: get_database_for_workflow is imported but not defined

core/database/database_factory.py:61 imports get_database_for_workflow from .arango_unix_client, but core/database/arango/arango_unix_client.py only defines UnixSocketHTTPClient and ArangoUnixClient and does not provide that function — either add def get_database_for_workflow(...) to arango_unix_client.py or update the import to the module that actually implements it.

Comment on lines +9 to +10
# But keep configuration JSON files
!*_config.json No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

*Risk: allowing all _config.json to be committed may leak secrets.

Unless guaranteed sanitized, prefer tracking only example config files.

-# But keep configuration JSON files
-!*_config.json
+# Keep only example configs; ignore real config JSONs to prevent secret leaks
+!*_config.example.json
+*_config.json

Consider adding:

+secrets*.json
+*token*.json
+*key*.json
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# But keep configuration JSON files
!*_config.json
# Keep only example configs; ignore real config JSONs to prevent secret leaks
*_config.json
secrets*.json
*token*.json
*key*.json
!*_config.example.json
🤖 Prompt for AI Agents
In config/.gitignore around lines 9-10, the current allow-all rule
"!*_config.json" risks committing secrets; change this to only allow
example/sanitized configs (e.g. "!*_config.example.json" or
"!*_config.sample.json") and ensure raw config filenames remain ignored (add a
pattern like "*_config.json" if needed), update README or a contributing note to
instruct developers to copy example files to create local configs, and remove or
replace the permissive whitelist so only non-sensitive example files are
tracked.

Comment on lines +31 to +33
DEFAULT_UNIX_SOCKET = "/tmp/arangodb.sock"
DEFAULT_HTTP_HOST = "http://localhost:8529"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Duplicate Unix-socket client exists; unify to one implementation.

This module overlaps with core/database/arango/arango_unix_client.py (different path/name, different approach). Split implementations risk drift and import confusion.

Proposed action: keep one canonical client (prefer the HTTP‑over‑Unix implementation) and re‑export if needed to maintain import paths.

Also applies to: 221-251


🏁 Script executed:

#!/bin/bash
# Locate duplicates and current imports
fd -a --type f 'arango_unix_client.py'
rg -nP '\barango_unix_client\b' -g '!**/dist/**' -g '!**/build/**'

Length of output: 265


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Searching for arango_unix_client.py files..."
mapfile -t files < <(fd -a --type f 'arango_unix_client.py' || true)

if [ ${#files[@]} -eq 0 ]; then
  echo "No files found."
  exit 0
fi

echo "Found ${#files[@]} file(s):"
printf '%s\n' "${files[@]}"
echo

for f in "${files[@]}"; do
  echo "----- FILE: $f -----"
  stat -c "%n — %s bytes — modified: %y" "$f" || ls -l "$f"
  echo "---- head (first 120 lines) ----"
  sed -n '1,120p' "$f" || true
  echo "---- tail (last 120 lines) ----"
  tail -n 120 "$f" || true
  echo
done

if [ ${#files[@]} -ge 2 ]; then
  echo "=== Diff (unified) between first two files ==="
  diff -u "${files[0]}" "${files[1]}" || true
  echo
fi

echo "=== Search for references to arango_unix_client across repo ==="
rg -n --hidden -uu '\barango_unix_client\b' -S || true

Length of output: 101


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Working dir: $(pwd)"
tmp=$(mktemp)

# Try fd first, fall back to find
fd -a --type f 'arango_unix_client.py' > "$tmp" 2>/dev/null || true
if [ ! -s "$tmp" ]; then
  find . -type f -name 'arango_unix_client.py' > "$tmp" 2>/dev/null || true
fi

if [ ! -s "$tmp" ]; then
  echo "No arango_unix_client.py files found"
  rm -f "$tmp"
  exit 0
fi

echo "Found files:"
nl -ba "$tmp"
echo

while IFS= read -r file; do
  echo "----- FILE: $file -----"
  stat -c "%n — %s bytes — modified: %y" "$file" || ls -l "$file"
  echo "---- head (first 200 lines) ----"
  sed -n '1,200p' "$file" || true
  echo "---- tail (last 200 lines) ----"
  tail -n 200 "$file" || true
  echo
done < "$tmp"

f1=$(sed -n '1p' "$tmp")
f2=$(sed -n '2p' "$tmp" || true)
if [ -n "$f2" ]; then
  echo "=== Diff (unified) between first two files ==="
  diff -u "$f1" "$f2" || true
  echo
fi

echo "=== Search repository for references to 'arango_unix_client' (tracked files via git) ==="
git ls-files -z 2>/dev/null | xargs -0 rg -n --hidden -uu '\barango_unix_client\b' -S || true

echo "=== Fallback grep across filesystem (excluding common build dirs) ==="
grep -RIn --exclude-dir={dist,build,.git,__pycache__} '\barango_unix_client' . || true

rm -f "$tmp"

Length of output: 86


Unify duplicate Unix-socket Arango client implementations

Two implementations exist: core/database/arango_unix_client.py and core/database/arango/arango_unix_client.py. Consolidate into a single canonical client (prefer the HTTP-over-UNIX implementation), move shared constants there, and re-export a compatibility shim at the secondary path to avoid breaking imports.

🧰 Tools
🪛 Ruff (0.12.2)

31-31: Probable insecure usage of temporary file or directory: "/tmp/arangodb.sock"

(S108)

Comment on lines +45 to +53
# Setup log directory
log_dir = Path(__file__).parent.parent / "logs"
log_dir.mkdir(exist_ok=True)

# Configure Python logging
logging.basicConfig(
level=getattr(logging, log_level),
format='%(message)s'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid writing under the installed package directory; use user-writable path and set permissions.

Current path resolves to core/logs (often read-only in site-packages).

-        log_dir.mkdir(exist_ok=True)
+        log_dir.mkdir(parents=True, exist_ok=True, mode=0o750)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In core/logging/logging.py around lines 45 to 53, the current log directory is
created under the package (core/logs) which may be read-only when installed;
change to a user-writable location (e.g., use platform-appropriate user data/log
dir via appdirs/XDG or fallback to ~/.local/share/<app>/logs or
tempfile.gettempdir()), create the directory there with safe permissions
(restrict to user, e.g. 0o700), handle and log any OSError when creating the dir
and fall back to a safe location (stdout or temp dir) if creation fails, and
update logging.basicConfig to point its FileHandler (or use logging.basicConfig
with filename set to a file inside the new user-writable log dir) instead of
writing under the installed package directory.

Comment on lines 10 to 19
FixedSizeChunking,
SemanticChunking
)

__all__ = [
"ChunkingStrategy",
"TokenBasedChunking",
"SemanticChunking",
"SlidingWindowChunking",
"ChunkingStrategyFactory",
# Deprecated - kept for backward compatibility
"DocumentProcessor",
"ProcessingConfig",
"ProcessingResult",
]

# Lazy import for backward compatibility using __getattr__
def __getattr__(name):
"""
Lazy import deprecated classes from workflows.
This provides backward compatibility while encouraging migration
to the new module structure.
"""
import warnings

deprecated_imports = {
"DocumentProcessor": "core.workflows.workflow_pdf",
"ProcessingConfig": "core.workflows.workflow_pdf",
"ProcessingResult": "core.workflows.workflow_pdf",
}

if name in deprecated_imports:
module_path = deprecated_imports[name]
warnings.warn(
f"Importing {name} from core.processors is deprecated. "
f"Please use {module_path} instead.",
DeprecationWarning,
stacklevel=2
)

# Dynamically import from the new location
import importlib
module = importlib.import_module(module_path)
return getattr(module, name)

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
'ChunkingStrategy',
'ChunkingStrategyFactory',
'FixedSizeChunking',
'SemanticChunking',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Confirm FixedSizeChunking exists in text.chunking_strategies.

Avoid exporting a non-existent symbol.

Run:


🏁 Script executed:

#!/bin/bash
ast-grep --pattern $'class FixedSizeChunking { $$$ }' || rg -nP 'class\s+FixedSizeChunking\b' core

Length of output: 0


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== repo root: $(pwd) ==="
echo "=== git branch ==="
git rev-parse --abbrev-ref HEAD 2>/dev/null || true
echo

echo "=== Listing core/processors ==="
ls -la core/processors || true
echo

echo "=== core/processors/__init__.py (first 300 lines) ==="
if [ -f core/processors/__init__.py ]; then
  sed -n '1,300p' core/processors/__init__.py
else
  echo "core/processors/__init__.py not found"
fi
echo

echo "=== rg: occurrences of FixedSizeChunking (word match) ==="
rg -n --hidden -S '\bFixedSizeChunking\b' || true
echo

echo "=== rg: class FixedSizeChunking definitions ==="
rg -n --hidden -S 'class\s+FixedSizeChunking\b' || true
echo

echo "=== rg: occurrences of SemanticChunking ==="
rg -n --hidden -S '\bSemanticChunking\b' || true
echo

echo "=== rg: occurrences of ChunkingStrategyFactory / ChunkingStrategy ==="
rg -n --hidden -S '\bChunkingStrategyFactory\b' || true
rg -n --hidden -S '\bChunkingStrategy\b' || true
echo

echo "=== rg: files/modules named chunking_strategies or chunking-strategies ==="
rg -n --hidden -S 'chunking_strategies|chunking-strategies' || true
echo

echo "=== rg: any files under text/ mentioning chunking ==="
rg -n --hidden -S 'chunking' text || true
echo

echo "=== List python files that mention 'Chunking' (case-sensitive) ==="
rg -n --hidden -S 'Chunking' --glob '!**/site-packages/**' || true

Length of output: 5313


Remove FixedSizeChunking from exports in core/processors/init.py

core/processors/text/chunking_strategies.py does not define FixedSizeChunking (it defines TokenBasedChunking, SemanticChunking, SlidingWindowChunking, ChunkingStrategyFactory); remove FixedSizeChunking from the import and all in core/processors/init.py (lines 11–19) or add/alias the intended class in chunking_strategies.py.

🤖 Prompt for AI Agents
In core/processors/__init__.py around lines 11 to 19, the module exports and
imports reference FixedSizeChunking which does not exist in
core/processors/text/chunking_strategies.py; remove FixedSizeChunking from the
import list and from the __all__ array (or alternatively add/alias the correct
class in chunking_strategies.py), ensuring the exported names match actual
classes (e.g., keep TokenBasedChunking, SemanticChunking, SlidingWindowChunking,
ChunkingStrategyFactory).

Comment on lines +205 to +227
if "jina" not in self.embedder_model.lower():
errors.append(f"Embedder model {self.embedder_model} may not support late chunking")

# Validate GPU settings consistency
if self.use_gpu and not self.gpu_device.startswith("cuda"):
errors.append(f"GPU enabled but device {self.gpu_device} is not CUDA")

# Validate batch size relationships
if self.embedding_batch_size > self.batch_size:
errors.append(f"Embedding batch size ({self.embedding_batch_size}) exceeds record batch size ({self.batch_size})")

# Validate checkpoint interval
if self.checkpoint_interval > 100000:
errors.append(f"Checkpoint interval ({self.checkpoint_interval}) may be too large for recovery")

# Validate throughput feasibility
if self.batch_size < 100 and self.target_throughput > 10:
errors.append(f"Batch size ({self.batch_size}) too small for target throughput ({self.target_throughput} papers/sec)")

# Validate collection names are different
collections = [self.metadata_collection, self.chunks_collection, self.embeddings_collection]
if len(set(collections)) != len(collections):
errors.append("Collection names must be unique")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Allow dual embedders; validate expected dims accordingly.

Restricting to “jina” contradicts the dual‑embedder design. Support Sentence-Transformers and validate dims.

-        # Validate embedder model compatibility
-        if "jina" not in self.embedder_model.lower():
-            errors.append(f"Embedder model {self.embedder_model} may not support late chunking")
+        # Validate embedder model compatibility (dual embedders)
+        model_l = self.embedder_model.lower()
+        if not any(k in model_l for k in ("jina", "sentence", "bge", "gte", "mixedbread")):
+            errors.append(f"Unsupported embedder model: {self.embedder_model}")
+
+        # Expected dims check if enabled
+        if self.validate_embeddings:
+            if "jina" in model_l:
+                expected = 2048
+            elif any(k in model_l for k in ("sentence", "bge", "gte", "mixedbread")):
+                expected = 768
+            else:
+                expected = self.expected_embedding_dim
+            if self.expected_embedding_dim != expected:
+                errors.append(f"expected_embedding_dim={self.expected_embedding_dim} "
+                              f"does not match model family expectation ({expected})")

Also applies to: 164-172

Comment on lines +131 to +159
def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory):
"""Test that processing order is stored in database."""
# Setup mocks
mock_db = MagicMock()
mock_order_collection = MagicMock()

mock_db.has_collection.return_value = False
mock_db.create_collection.return_value = mock_order_collection
mock_db.collection.return_value = mock_order_collection
mock_db_factory.get_arango.return_value = mock_db

mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder

# Create workflow
workflow = ArxivSortedWorkflow(self.config)

# Create sorted records
sorted_records = [
{'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]},
{'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]},
]

# Store processing order
workflow._store_processing_order(sorted_records)

# Verify collection was created
mock_db.create_collection.assert_called_once_with('arxiv_processing_order')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Tests target removed API _store_processing_order.

Workflow now resumes via query; either update tests to reflect the query-based approach or mark skipped.

+    @unittest.skip("Order persistence removed; resume is now query-based in workflow")
     def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory):
"""Test that processing order is stored in database."""
# Setup mocks
mock_db = MagicMock()
mock_order_collection = MagicMock()
mock_db.has_collection.return_value = False
mock_db.create_collection.return_value = mock_order_collection
mock_db.collection.return_value = mock_order_collection
mock_db_factory.get_arango.return_value = mock_db
mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder
# Create workflow
workflow = ArxivSortedWorkflow(self.config)
# Create sorted records
sorted_records = [
{'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]},
{'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]},
]
# Store processing order
workflow._store_processing_order(sorted_records)
# Verify collection was created
mock_db.create_collection.assert_called_once_with('arxiv_processing_order')
@unittest.skip("Order persistence removed; resume is now query-based in workflow")
def test_processing_order_storage(self, mock_embedder_factory, mock_db_factory):
"""Test that processing order is stored in database."""
# Setup mocks
mock_db = MagicMock()
mock_order_collection = MagicMock()
mock_db.has_collection.return_value = False
mock_db.create_collection.return_value = mock_order_collection
mock_db.collection.return_value = mock_order_collection
mock_db_factory.get_arango.return_value = mock_db
mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder
# Create workflow
workflow = ArxivSortedWorkflow(self.config)
# Create sorted records
sorted_records = [
{'id': 'paper4', 'abstract_length': 5, 'record': self.test_metadata[3]},
{'id': 'paper1', 'abstract_length': 15, 'record': self.test_metadata[0]},
]
# Store processing order
workflow._store_processing_order(sorted_records)
# Verify collection was created
mock_db.create_collection.assert_called_once_with('arxiv_processing_order')
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 131 to 159, the test calls
the removed private API _store_processing_order; update the test to either (A)
exercise the public, query-based resume flow by instantiating
ArxivSortedWorkflow and invoking the public method that triggers the
resume/query behavior, then assert that the Arango client was used to run the
expected query (mock_db.query.assert_called_with(...) or similar) and remove
assertions about create_collection, or (B) if you prefer to keep current intent,
mark this specific test as skipped with pytest.mark.skip(reason="workflow
resumes via query; update test"), ensuring mocks reflect the query-based
interaction instead of a private method call.

Comment on lines +184 to +219
def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory):
"""Test resume functionality from checkpoint."""
# Setup mocks
mock_db = MagicMock()
mock_collection = MagicMock()
mock_order_collection = MagicMock()

# Mock AQL query for resume position
mock_cursor = iter([2]) # Resume from position 2
mock_db.aql.execute.return_value = mock_cursor

mock_db.has_collection.return_value = True
mock_db.collection.side_effect = lambda name: (
mock_collection if name == 'test_arxiv_papers'
else mock_order_collection if name == 'arxiv_processing_order'
else None
)
mock_db_factory.get_arango.return_value = mock_db

mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder

# Enable resume in config
self.config.resume_from_checkpoint = True

# Create workflow
workflow = ArxivSortedWorkflow(self.config)

# Get resume position
position = workflow._get_resume_position()

# Should resume from position 3 (last completed + 1)
self.assertEqual(position, 3)

# Verify query was executed
mock_db.aql.execute.assert_called_once()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Tests target removed API _get_resume_position.

Replace with assertions around _get_unprocessed_records_sorted() or skip for now.

+    @unittest.skip("Resume position method removed; resume handled via query of unprocessed records")
     def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory):
"""Test resume functionality from checkpoint."""
# Setup mocks
mock_db = MagicMock()
mock_collection = MagicMock()
mock_order_collection = MagicMock()
# Mock AQL query for resume position
mock_cursor = iter([2]) # Resume from position 2
mock_db.aql.execute.return_value = mock_cursor
mock_db.has_collection.return_value = True
mock_db.collection.side_effect = lambda name: (
mock_collection if name == 'test_arxiv_papers'
else mock_order_collection if name == 'arxiv_processing_order'
else None
)
mock_db_factory.get_arango.return_value = mock_db
mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder
# Enable resume in config
self.config.resume_from_checkpoint = True
# Create workflow
workflow = ArxivSortedWorkflow(self.config)
# Get resume position
position = workflow._get_resume_position()
# Should resume from position 3 (last completed + 1)
self.assertEqual(position, 3)
# Verify query was executed
mock_db.aql.execute.assert_called_once()
@unittest.skip("Resume position method removed; resume handled via query of unprocessed records")
def test_resume_from_checkpoint(self, mock_embedder_factory, mock_db_factory):
"""Test resume functionality from checkpoint."""
# Setup mocks
mock_db = MagicMock()
mock_collection = MagicMock()
mock_order_collection = MagicMock()
# Mock AQL query for resume position
mock_cursor = iter([2]) # Resume from position 2
mock_db.aql.execute.return_value = mock_cursor
mock_db.has_collection.return_value = True
mock_db.collection.side_effect = lambda name: (
mock_collection if name == 'test_arxiv_papers'
else mock_order_collection if name == 'arxiv_processing_order'
else None
)
mock_db_factory.get_arango.return_value = mock_db
mock_embedder = MagicMock()
mock_embedder_factory.create_embedder.return_value = mock_embedder
# Enable resume in config
self.config.resume_from_checkpoint = True
# Create workflow
workflow = ArxivSortedWorkflow(self.config)
# Get resume position
position = workflow._get_resume_position()
# Should resume from position 3 (last completed + 1)
self.assertEqual(position, 3)
# Verify query was executed
mock_db.aql.execute.assert_called_once()
🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 184 to 219, the test calls
the removed private API _get_resume_position; update the test to exercise the
current API by calling _get_unprocessed_records_sorted() (or skip the test if
that behavior is no longer applicable). Change the test to: call
workflow._get_unprocessed_records_sorted(), assert the returned
iterable/sequence reflects the mocked DB state such that processing would resume
after index 2 (e.g., first unprocessed record corresponds to position 3 or the
returned list length/first item matches expectations), and keep the
mock_db.aql.execute assertion to verify the query ran; adjust the
mock_db.aql.execute return value to match the new method's expected result
shape.

Comment on lines +243 to +244
mock_embedder_class.return_value = mock_embedder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Undefined name mock_embedder_class.

Use the patched factory (mock_embedder_factory) instead.

-        mock_embedder_class.return_value = mock_embedder
+        mock_embedder_factory.create_embedder.return_value = mock_embedder
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mock_embedder_class.return_value = mock_embedder
mock_embedder_factory.create_embedder.return_value = mock_embedder
🧰 Tools
🪛 Ruff (0.12.2)

243-243: Undefined name mock_embedder_class

(F821)

🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around lines 243-244, the test references
an undefined name `mock_embedder_class`; replace that usage with the patched
factory `mock_embedder_factory` (i.e., set mock_embedder_factory.return_value =
mock_embedder) so the test uses the correct mock provided by the patching
fixture and remove or rename any lingering references to `mock_embedder_class`.


mock_embedder = MagicMock()
mock_embedder.embed_batch.return_value = [[0.1] * 768] * 2
mock_embedder_class.return_value = mock_embedder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Undefined name mock_embedder_class.

Same fix as above.

-        mock_embedder_class.return_value = mock_embedder
+        mock_embedder_factory.create_embedder.return_value = mock_embedder
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mock_embedder_class.return_value = mock_embedder
mock_embedder_factory.create_embedder.return_value = mock_embedder
🧰 Tools
🪛 Ruff (0.12.2)

352-352: Undefined name mock_embedder_class

(F821)

🤖 Prompt for AI Agents
In tests/test_workflow_arxiv_sorted.py around line 352, the name
mock_embedder_class is undefined; replace it with the correctly defined
fixture/name used elsewhere (mock_embedder_cls) or explicitly define
mock_embedder_class = mock_embedder before this line so that
mock_embedder_class.return_value = mock_embedder refers to a defined mock;
ensure the test uses the consistent mock variable name throughout.

Comment on lines +378 to +447
class TestIntegrationWorkflow(unittest.TestCase):
"""Integration tests for the complete workflow."""

@patch('core.workflows.workflow_arxiv_sorted.DatabaseFactory')
@patch('core.workflows.workflow_arxiv_sorted.EmbedderFactory')
def test_end_to_end_processing(self, mock_embedder_factory, mock_db_factory):
"""Test complete workflow from file to database."""
# Create larger test dataset
test_data = []
for i in range(10):
test_data.append({
"id": f"paper_{i}",
"title": f"Paper {i}",
"abstract": "x" * (i * 20 + 10), # Varying lengths
"authors": [f"Author {i}"],
"categories": ["cs.AI"]
})

# Create temp file
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
for record in test_data:
temp_file.write(json.dumps(record) + '\n')
temp_file.close()

# Setup mocks
mock_db = MagicMock()
mock_collection = MagicMock()
mock_order_collection = MagicMock()

mock_db.has_collection.return_value = False
mock_db.create_collection.return_value = mock_order_collection
mock_db.collection.side_effect = lambda name: (
mock_collection if name == 'test_papers'
else mock_order_collection if name == 'arxiv_processing_order'
else None
)
mock_db_factory.get_arango.return_value = mock_db

mock_embedder = MagicMock()
mock_embedder.embed_batch.return_value = [[0.1] * 768] * 5
mock_embedder_factory.create_embedder.return_value = mock_embedder

# Configure workflow
config = ArxivMetadataConfig(
metadata_file=Path(temp_file.name),
batch_size=5,
num_workers=1,
embedding_batch_size=5,
gpu_device='cpu',
use_gpu=False,
resume_from_checkpoint=False,
metadata_collection='test_papers',
embedder_model='sentence-transformers/all-MiniLM-L6-v2',
arango_database='test_db'
)

# Run workflow
workflow = ArxivSortedWorkflow(config)
workflow.run()

# Verify processing order collection was created
mock_db.create_collection.assert_called_with('arxiv_processing_order')

# Verify all papers were processed
self.assertEqual(mock_collection.insert_many.call_count, 2) # 10 papers / batch_size 5

# Verify papers were processed in size order
order_inserts = mock_order_collection.insert.call_args_list
positions = [call[0][0]['position'] for call in order_inserts]
self.assertEqual(positions, list(range(10))) # Should be 0-9 in order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Integration test expects arxiv_processing_order collection creation.

That contract was removed (query-based resume). Update assertion or gate behind feature flag.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Comment on lines +34 to +47
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

# Core infrastructure imports
from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
from core.workflows.state import StateManager, CheckpointManager
from core.workflows.storage.storage_local import StorageManager
from core.monitoring.progress_tracker import ProgressTracker, ProgressState
from core.monitoring.performance_monitor import PerformanceMonitor
from core.database.database_factory import DatabaseFactory
from core.embedders.embedders_factory import EmbedderFactory
from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
from arango.exceptions import DocumentInsertError, CollectionDeleteError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Drop sys.path hacks; switch to package‑relative imports

Manipulating sys.path breaks package semantics and causes import fragility. Use relative imports within core.workflows.

-# Add project root to path
-project_root = Path(__file__).parent.parent.parent
-sys.path.insert(0, str(project_root))
-
-# Core infrastructure imports
-from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
-from core.workflows.state import StateManager, CheckpointManager
-from core.workflows.storage.storage_local import StorageManager
+from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
+from .state import StateManager, CheckpointManager
+from .storage.storage_local import StorageManager
-from core.monitoring.progress_tracker import ProgressTracker, ProgressState
+from core.monitoring.progress_tracker import ProgressTracker, ProgressState
-from core.monitoring.performance_monitor import PerformanceMonitor
+from core.monitoring.performance_monitor import PerformanceMonitor
-from core.database.database_factory import DatabaseFactory
+from core.database.database_factory import DatabaseFactory
-from core.embedders.embedders_factory import EmbedderFactory
+from core.embedders.embedders_factory import EmbedderFactory
-from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
+from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
-from arango.exceptions import DocumentInsertError, CollectionDeleteError
+from arango.exceptions import CollectionDeleteError
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
# Core infrastructure imports
from core.workflows.workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
from core.workflows.state import StateManager, CheckpointManager
from core.workflows.storage.storage_local import StorageManager
from core.monitoring.progress_tracker import ProgressTracker, ProgressState
from core.monitoring.performance_monitor import PerformanceMonitor
from core.database.database_factory import DatabaseFactory
from core.embedders.embedders_factory import EmbedderFactory
from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
from arango.exceptions import DocumentInsertError, CollectionDeleteError
from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult
from .state import StateManager, CheckpointManager
from .storage.storage_local import StorageManager
from core.monitoring.progress_tracker import ProgressTracker, ProgressState
from core.monitoring.performance_monitor import PerformanceMonitor
from core.database.database_factory import DatabaseFactory
from core.embedders.embedders_factory import EmbedderFactory
from core.tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig
from arango.exceptions import CollectionDeleteError
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_metadata.py around lines 34 to 47, remove the
sys.path hack (project_root assignment and sys.path.insert) and convert the
absolute imports that reference the local package to package-relative imports
(use single-dot and double-dot relative imports from the current package: e.g.
from .workflow_base import WorkflowBase, WorkflowConfig, WorkflowResult; from
.state import StateManager, CheckpointManager; from .storage.storage_local
import StorageManager; from ..monitoring.progress_tracker import
ProgressTracker, ProgressState; from ..monitoring.performance_monitor import
PerformanceMonitor; from ..database.database_factory import DatabaseFactory;
from ..embedders.embedders_factory import EmbedderFactory; from
..tools.arxiv.arxiv_metadata_config import ArxivMetadataConfig). Keep the
third-party import (arango.exceptions) as-is.

Comment on lines +267 to +368
# Begin transaction
txn = db.begin_transaction(
write=[
self.metadata_config.metadata_collection,
self.metadata_config.chunks_collection,
self.metadata_config.embeddings_collection
]
)

transaction_committed = False

try:
# txn is already the transaction database
txn_db = txn

# Store metadata records
for record in batch:
arxiv_id = record.get('id', '')
if not arxiv_id:
failed += 1
continue

# Sanitize ID for ArangoDB key
sanitized_id = arxiv_id.replace('.', '_').replace('/', '_')

# Prepare metadata document
metadata_doc = {
'_key': sanitized_id,
'arxiv_id': arxiv_id,
'submitter': record.get('submitter'),
'authors': record.get('authors'),
'title': record.get('title'),
'comments': record.get('comments'),
'journal_ref': record.get('journal-ref'),
'doi': record.get('doi'),
'report_no': record.get('report-no'),
'categories': record.get('categories'),
'license': record.get('license'),
'abstract': record.get('abstract'),
'versions': record.get('versions', []),
'update_date': record.get('update_date'),
'authors_parsed': record.get('authors_parsed', []),
'processed_at': datetime.now().isoformat(),
'has_abstract': bool(record.get('abstract'))
}

txn_db.collection(self.metadata_config.metadata_collection).insert(
metadata_doc, overwrite=True
)

# Store chunks and embeddings
for item in chunks_with_embeddings:
record = item['record']
chunk = item['chunk']
arxiv_id = record.get('id', '')
sanitized_id = arxiv_id.replace('.', '_').replace('/', '_')

# Create unique chunk ID
chunk_id = f"{sanitized_id}_chunk_{chunk.chunk_index}"

# Store chunk document
chunk_doc = {
'_key': chunk_id,
'arxiv_id': arxiv_id,
'paper_key': sanitized_id,
'chunk_index': chunk.chunk_index,
'total_chunks': chunk.total_chunks,
'text': chunk.text,
'start_char': chunk.start_char,
'end_char': chunk.end_char,
'start_token': chunk.start_token,
'end_token': chunk.end_token,
'context_window_used': chunk.context_window_used,
'created_at': datetime.now().isoformat()
}

txn_db.collection(self.metadata_config.chunks_collection).insert(
chunk_doc, overwrite=True
)

# Store embedding document
embedding_doc = {
'_key': f"{chunk_id}_emb",
'chunk_id': chunk_id,
'arxiv_id': arxiv_id,
'paper_key': sanitized_id,
'embedding': chunk.embedding.tolist(), # Convert numpy to list
'embedding_dim': len(chunk.embedding),
'model': self.metadata_config.embedder_model,
'created_at': datetime.now().isoformat()
}

txn_db.collection(self.metadata_config.embeddings_collection).insert(
embedding_doc, overwrite=True
)

# Commit transaction
txn.commit_transaction()
transaction_committed = True
successful = len(batch)

except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Commit inside per‑record loop; transaction misuse and incorrect success count

You commit the transaction inside the loop, then continue using txn_db. This risks operations on a committed txn and misreports success as len(batch) even on partial failures.

-        # Begin transaction
-        txn = db.begin_transaction(
+        # Begin transaction
+        txn = db.begin_transaction(
             write=[
                 self.metadata_config.metadata_collection,
                 self.metadata_config.chunks_collection,
                 self.metadata_config.embeddings_collection
             ]
         )
-
-        transaction_committed = False
+        transaction_committed = False
@@
-            # Store metadata records
-            for record in batch:
+            # Store metadata records
+            for record in batch:
                 arxiv_id = record.get('id', '')
                 if not arxiv_id:
-                    failed += 1
+                    failed += 1
                     continue
@@
-                txn_db.collection(self.metadata_config.metadata_collection).insert(
-                    metadata_doc, overwrite=True
-                )
-
-                # Store chunks and embeddings
-                for item in chunks_with_embeddings:
-                    record = item['record']
-                    chunk = item['chunk']
-                    arxiv_id = record.get('id', '')
-                    sanitized_id = arxiv_id.replace('.', '_').replace('/', '_')
+                txn_db.collection(self.metadata_config.metadata_collection).insert(
+                    metadata_doc, overwrite=True
+                )
+                # Store chunks and embeddings for THIS record only
+                record_chunks = chunks_by_key.get(sanitized_id, [])
+                for chunk in record_chunks:
                     # Create unique chunk ID
                     chunk_id = f"{sanitized_id}_chunk_{chunk.chunk_index}"
@@
-                    txn_db.collection(self.metadata_config.chunks_collection).insert(
-                        chunk_doc, overwrite=True
-                    )
+                    txn_db.collection(self.metadata_config.chunks_collection).insert(chunk_doc, overwrite=True)
@@
-                    txn_db.collection(self.metadata_config.embeddings_collection).insert(
-                        embedding_doc, overwrite=True
-                    )
-
-                # Commit transaction
-                txn.commit_transaction()
-                transaction_committed = True
-                successful = len(batch)
+                    txn_db.collection(self.metadata_config.embeddings_collection).insert(embedding_doc, overwrite=True)
+                successful += 1
+
+            # Commit transaction AFTER all writes
+            txn.commit_transaction()
+            transaction_committed = True

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

368-368: Do not catch blind exception: Exception

(BLE001)

Comment on lines +645 to +649
cursor = self.db.aql.execute('''
FOR doc IN arxiv_abstract_embeddings
RETURN doc.arxiv_id
''', ttl=300, batch_size=10000) # 5 minute timeout, large batch size

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix: use configured embeddings collection (resume query hardcodes collection).

Using 'arxiv_abstract_embeddings' ignores config and breaks resume for custom collection names.

Apply:

-            cursor = self.db.aql.execute('''
-                FOR doc IN arxiv_abstract_embeddings
-                    RETURN doc.arxiv_id
-            ''', ttl=300, batch_size=10000)  # 5 minute timeout, large batch size
+            cursor = self.db.aql.execute(f'''
+                FOR doc IN {self.metadata_config.embeddings_collection}
+                    RETURN doc.arxiv_id
+            ''', ttl=300, batch_size=10000)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cursor = self.db.aql.execute('''
FOR doc IN arxiv_abstract_embeddings
RETURN doc.arxiv_id
''', ttl=300, batch_size=10000) # 5 minute timeout, large batch size
cursor = self.db.aql.execute(f'''
FOR doc IN {self.metadata_config.embeddings_collection}
RETURN doc.arxiv_id
''', ttl=300, batch_size=10000)
🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 645-649 the AQL
currently hardcodes the collection name 'arxiv_abstract_embeddings', which
ignores configured collection names and breaks resume for custom setups; replace
the hardcoded name with the configured collection variable (e.g.,
self.embeddings_collection or the config value used elsewhere), constructing the
AQL string using that variable (or validated/escaped value) instead of the
literal, or build the query with Python string formatting before passing to
self.db.aql.execute; keep the existing ttl and batch_size options unchanged and
ensure the collection name is validated to avoid injection.

Comment on lines +770 to +779
# Update counters
self.processed_count += len(batch)

# Update progress tracker less frequently for better performance
# Only update for larger batches to reduce overhead
if len(batch) >= 100:
self.progress_tracker.update_step(
"database_storage",
completed=len(batch)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Counting chunks as “records” inflates metrics; count unique papers instead.

processed_count and progress updates should reflect unique arxiv_ids, not number of chunks.

Apply:

-            # Update counters
-            self.processed_count += len(batch)
+            # Update counters by unique papers
+            unique_papers = len(papers_seen)
+            self.processed_count += unique_papers
@@
-            if len(batch) >= 100:
+            if unique_papers >= 1:
                 self.progress_tracker.update_step(
                     "database_storage",
-                    completed=len(batch)
+                    completed=unique_papers
                 )

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 770 to 779, the code
increments processed_count and progress by len(batch) which counts chunks
instead of unique papers; change it to compute the number of unique arxiv_ids in
the batch (e.g. collect arxiv_id from each record and deduplicate) and use that
unique_count to increment self.processed_count and to pass
completed=unique_count to progress_tracker.update_step so metrics reflect unique
papers processed.

Comment on lines +1014 to +1017
def validate_inputs(self, **kwargs) -> bool:
"""Validate workflow inputs."""
return True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate inputs (Todd’s Law 1).

Implement minimal config/file checks instead of unconditional True to fail fast.

Apply:

-    def validate_inputs(self, **kwargs) -> bool:
-        """Validate workflow inputs."""
-        return True
+    def validate_inputs(self, **kwargs) -> bool:
+        """Validate workflow inputs."""
+        errors = []
+        mf = Path(self.metadata_config.metadata_file)
+        if not mf.exists():
+            errors.append(f"metadata_file missing: {mf}")
+        if self.metadata_config.embedding_batch_size <= 0:
+            errors.append("embedding_batch_size must be > 0")
+        if self.metadata_config.batch_size <= 0:
+            errors.append("batch_size must be > 0")
+        if self.metadata_config.chunk_overlap_tokens >= self.metadata_config.chunk_size_tokens:
+            errors.append("chunk_overlap_tokens must be < chunk_size_tokens")
+        if errors:
+            self.logger.error("input_validation_failed", errors=errors)
+            return False
+        return True

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

1014-1014: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
In core/workflows/workflow_arxiv_parallel.py around lines 1014-1017, replace the
unconditional "return True" with concrete validation: ensure expected kwargs
keys exist (e.g., 'config', 'input_file', 'output_dir' or the actual names used
by this workflow), verify that file paths point to existing files/directories
(use os.path.exists/os.path.isfile/os.path.isdir), check basic config sanity
(required keys present and simple type/value checks like positive ints for
worker counts), and on any failure log a clear message via the workflow logger
(self.logger.error) and return False; otherwise return True.

Comment on lines +293 to +305
if dimensions:
stats['embedding_dimensions'] = dimensions
expected_dim = 2048 # Jina v4 expected dimension

print(f"\n🎯 Embedding Validation:")
print(f" Dimensions found: {dimensions}")
print(f" Expected (Jina v4): {expected_dim}")

if all(d == expected_dim for d in dimensions):
print(f" ✅ All embeddings have correct dimensions")
else:
print(f" ⚠️ Dimension mismatch detected!")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Hard‑coded expected_dim=2048 will flag valid non‑v4 runs

The ingestion may use v3 or other models; infer expected dims from data or config instead of hard‑coding 2048.

-            if dimensions:
-                stats['embedding_dimensions'] = dimensions
-                expected_dim = 2048  # Jina v4 expected dimension
+            if dimensions:
+                stats['embedding_dimensions'] = dimensions
+                # Derive expected dims per model from data
+                model_dims = list(self.db.aql.execute(f"""
+                FOR e IN {self.embeddings_collection}
+                  LIMIT 1000
+                  RETURN DISTINCT {{model: e.model, dim: e.embedding_dim}}
+                """))
+                print(f"\n🎯 Embedding Validation:")
+                print(f"   Dimensions found: {dimensions}")
+                print(f"   Models/dims sample: {model_dims}")
+                expected_dim = model_dims[0]['dim'] if model_dims else dimensions[0]
                 print(f"\n🎯 Embedding Validation:")
                 print(f"   Dimensions found: {dimensions}")
-                print(f"   Expected (Jina v4): {expected_dim}")
+                print(f"   Expected: {expected_dim}")

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

297-297: f-string without any placeholders

Remove extraneous f prefix

(F541)


302-302: f-string without any placeholders

Remove extraneous f prefix

(F541)


304-304: f-string without any placeholders

Remove extraneous f prefix

(F541)

Comment on lines +95 to +98
print_info "Loading credentials from .env file..."
export $(grep -v '^#' .env | xargs)
ROOT_PASSWORD="${ARANGO_PASSWORD:-}"
fi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Sourcing .env is unsafe; avoid word‑splitting and code injection

export $(grep …) is brittle. Use POSIX‑safe auto‑export.

-        export $(grep -v '^#' .env | xargs)
-        ROOT_PASSWORD="${ARANGO_PASSWORD:-}"
+        # shellcheck disable=SC1091
+        set -a
+        . ./.env
+        set +a
+        ROOT_PASSWORD="${ARANGO_PASSWORD:-}"
🧰 Tools
🪛 Shellcheck (0.10.0)

[warning] 96-96: Quote this to prevent word splitting.

(SC2046)

🤖 Prompt for AI Agents
In setup_database.sh around lines 95-98, replace the unsafe "export $(grep -v
'^#' .env | xargs)" pattern with a POSIX-safe parser that reads .env
line-by-line, skips blank/comment lines, splits on the first '=' using IFS and
read -r to avoid word-splitting and code execution, strips optional surrounding
quotes from values, and exports each key=value safely; then assign
ROOT_PASSWORD="${ARANGO_PASSWORD:-}" as before. Ensure the loop uses no eval,
handles empty values, and preserves literal characters by using IFS='=' and read
-r key val.

@r3d91ll r3d91ll merged commit a68e137 into main Sep 17, 2025
1 check passed
This was referenced Sep 20, 2025
Merged
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Implement dual embedders with mandatory late chunking

2 participants