Skip to content

Conversation

@r3d91ll
Copy link
Owner

@r3d91ll r3d91ll commented Sep 14, 2025

no real e2e work has been done here. at the start we determined that there was still work to be done. this is just clean up and requires no review we just need to merge to main so that we can do work in another part of the directory. we will close this and reopen it latter when we are ready for e2e testing.

Summary by CodeRabbit

  • New Features
    • Introduced unified configuration management, modular extractors and embedders (incl. Jina v4), workflow orchestration with checkpointing, storage backends, and comprehensive monitoring (performance, progress, compatibility).
    • Added database connection factory for ArangoDB, PostgreSQL, and Redis with pooling.
  • Refactor
    • Reorganized public APIs: processors now routed through workflows; chunking strategies exposed under processors.text.
    • Deprecated legacy framework and MCP server components.
  • Documentation
    • Added focused core module READMEs; removed outdated top-level and ADR documents.
  • Chores
    • Added repository-wide PR review configuration and ignored scratch directory.

r3d91ll and others added 20 commits September 13, 2025 14:11
## Changes
- Created core/embedders/ module with clean structure
  - embedders_base.py: Base class and interface
  - embedders_factory.py: Factory pattern for embedder creation
  - embedders_jina.py: Jina v4 embedder implementation
  - Proper __init__.py with auto-registration

- Created core/extractors/ module with clean structure
  - extractors_base.py: Base class and interface
  - extractors_factory.py: Factory with format detection
  - extractors_docling.py: GPU-accelerated PDF extraction
  - extractors_latex.py: LaTeX source extraction
  - extractors_code.py: Code extraction
  - extractors_treesitter.py: Tree-sitter based extraction
  - extractors_robust.py: Fallback extraction

## Backward Compatibility
- Added embedders_compat.py with deprecation warnings
- Updated imports in arxiv_pipeline.py to use new structure
- All existing functionality preserved

## Testing
- Created and ran test_phase1_migration.py
- All tests passed successfully
- Factory patterns working correctly
- Auto-registration functioning

This completes Phase 1 of the core restructure (Issue #35)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
- Test EmbedderFactory model type detection logic
- Test ExtractorFactory format detection and mapping
- Test late chunking algorithm (overlap calculations)
- Test factory registration and retrieval
- All 6 tests passing successfully

These tests cover the most critical custom algorithms in Phase 1.
## Fixes Applied

### JinaV4Embedder
- Now properly inherits from EmbedderBase
- Added required interface methods: embed_single()
- Added required properties: embedding_dimension, max_sequence_length, supports_late_chunking, supports_multimodal
- Fixed device_map issue - now loads model then moves to device

### CodeExtractor
- Now properly inherits from ExtractorBase
- Fixed import path for TreeSitterExtractor (relative import)
- Added config parameter to constructor with super().__init__(config)
- Changed extract() to return ExtractionResult instead of dict/None
- Added extract_batch() method
- Added supported_formats property
- Improved error handling with specific exception types

These changes ensure all extractors and embedders follow the base class contracts
and work correctly with the factory pattern.

Addresses all critical issues raised by @coderabbitai in PR #36
## Changes

### Workflows Module (`core/workflows/`)
- Created workflows module for orchestration and pipelines
- Added workflow_base.py with WorkflowBase, WorkflowConfig, WorkflowResult
- Moved document_processor.py → workflow_pdf.py
- Moved generic_document_processor.py → workflow_pdf_batch.py
- Created state/ subdirectory for state management
  - Moved state_manager.py to workflows/state/
- Created storage/ subdirectory for storage backends
  - Added storage_base.py defining storage interface
  - Moved storage.py → storage_local.py

### Database Module (`core/database/`)
- Created database subdirectories: arango/ and postgres/
- Moved arango_db_manager.py → arango/arango_client.py
- Added DatabaseFactory for centralized connection management
- Supports ArangoDB (with Unix socket), PostgreSQL, and Redis

### Processors Module (`core/processors/`)
- Created text/ subdirectory for text processing
- Moved chunking_strategies.py → text/chunking_strategies.py
- Added backward compatibility with deprecation warnings

## Testing
- Created comprehensive Phase 2 tests (12 tests, all passing)
- Verified module imports and structure
- Tested backward compatibility
- No circular import issues

## Benefits
- Clear separation of concerns
- Workflows handle orchestration
- Database access centralized through factory
- Storage integrated with workflows
- Text processing properly categorized

This completes Phase 2 of the core restructure (Issue #35)

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
- Enable auto-review on all branches (not just main)
- Configure review settings and tools
- Add path filters and ignores
- Include custom instructions for restructure project
- Set up language-specific settings for Python

This allows CodeRabbit to review PRs between feature branches,
which is essential for our multi-phase restructure workflow.
- Fix invalid regex pattern 'feature/**/*' to 'feature/.*'
- Change tools from boolean to object format with 'enabled' property
- Remove invalid glob patterns from base_branches

This fixes the parsing errors reported by CodeRabbit.
- Correct formula: C = (W · R · H / T) · Ctx^α (α only on Ctx)
- Maps W/R/H/T to practical code aspects
- Includes Todd's 3 Laws for data/reliability/ownership
- Requires quantitative Conveyance Scorecard in reviews
- Forces PR descriptions to include Conveyance analysis
- Default α = 1.7, weights = 0.25 each
- Implements zero-propagation gate for critical failures

This turns CodeRabbit into a Conveyance Framework enforcer.
- Fixed base_branches to use regex anchors (^main$, ^feature/.*$)
- Changed 'draft' to 'drafts' (correct schema key)
- Removed invalid 'path_ignores', using '!' exclusions in path_filters
- Moved instructions to path_instructions (correct location)
- Removed unsupported top-level keys (language_settings, pr_description)
- Added pre_merge_checks for PR description validation

Most importantly:
- FIXED the Conveyance formula representation
- Correct: C = (W · R · H / T) · Ctx^α
- Wrong: WHERE × WHAT × CONVEYANCE × TIME
- Added red flags to catch formula errors
- Emphasized that T divides (not multiplies)
- Clarified that C is the outcome, not an input
- Fix PostgreSQL connection string builder to normalize keys (database→dbname, username→user)
- Replace insecure /dev/shm staging path with cross-platform tempfile solution
- Add atomic checkpoint writes to prevent corruption
- Use lazy __getattr__ for backward compatibility imports
- Ensure proper cleanup of temp files on failure

These changes improve security, reliability, and cross-platform compatibility.
feat: Phase 2 - Database and Workflows Consolidation
Implements unified configuration management and monitoring infrastructure
with full Conveyance Framework integration.

## Configuration Module (core/config/)
- BaseConfig with Context scoring (L/I/A/G components)
- Hierarchical configuration loading with priority resolution
- Centralized ConfigManager with factory patterns
- JSON schema validation and type conversion

## Monitoring Module (core/monitoring/)
- MetricsCollector base class with Conveyance calculation
- PerformanceMonitor for CPU/GPU/memory tracking
- ProgressTracker with ETA and Context coherence
- Backward compatibility layer for existing tools

## Theory Integration
- Direct Conveyance Framework implementation: C = (W·R·H/T)·Ctx^α
- Automatic Context scoring: Ctx = wL·L + wI·I + wA·A + wG·G
- Real-time performance optimization feedback
- Information Reconstructionism principles throughout

## Benefits
- Unified configuration across all components
- Consistent monitoring with theory connection
- Thread-safe singleton patterns
- Seamless migration from existing tools
- Comprehensive examples and migration guide

This completes the core infrastructure consolidation, providing
a solid foundation for all HADES components with deep theoretical
integration and production-ready implementations.
Change 'paths' to 'path' in path_instructions to match schema requirements
- Fix Pydantic version compatibility in config_base.py
  - Handle both v1 and v2 field requirement detection
  - Add fallback for missing is_required() method
  - Handle zero required fields case

- Fix ConfigSource dataclass initialization
  - Make exists and readable fields non-init with defaults
  - Prevents constructor errors

- Improve exception chaining
  - Add 'from e' to preserve stack traces

- Update PR description with required sections
  - Add Conveyance Summary
  - Add W/R/H/T Mapping with metrics
  - Add Performance Evidence with before/after
  - Add Tests & Compatibility details
feat: Phase 3 - Configuration and Monitoring Consolidation (Issue #35)
Complete import updates and integration testing for the core restructure.

## Import Updates
- Updated all imports to use new module structure
- Fixed 20+ files using old core.framework imports
- Updated test files to use new paths
- Maintained all functionality

## Backward Compatibility
- Created comprehensive compatibility layers in core/framework/
- All old imports work with deprecation warnings
- Smooth migration path for existing code
- Zero breaking changes

## Import Mappings Applied
- core.framework.embedders → core.embedders
- core.framework.extractors → core.extractors
- core.framework.storage → core.workflows.storage
- core.processors.document_processor → core.workflows.workflow_pdf
- core.utils.state_manager → core.workflows.state

## Testing
- Created comprehensive integration test suite
- Verified new imports work correctly
- Confirmed backward compatibility
- All critical paths tested

This completes the integration phase of the core restructure,
ensuring all code works seamlessly with the new module structure.
- Add safe fallbacks for missing compat modules using SimpleNamespace
- Add StorageManager alias for backward compatibility
- Implement best-effort database connection with fallback
- Remove shebang from test files per Ruff
- Auto-fix numerous Ruff linter issues:
  - Use X | Y for type annotations (UP007)
  - Use timezone-aware datetime objects
  - Add 'from e' to exception chains (B904)
  - Remove unused variables (F841)
  - Fix bare except clauses (E722)
  - Clean up unused imports

These changes improve code quality, error handling, and compatibility.
Following the Acheron Protocol, moved deprecated directories to timestamped
archives before E2E testing:

- core/framework/ → Acheron/core/framework_2025-09-13_17-59-57/
  All compatibility shims and old framework code (20 files)

- core/mcp_server/ → Acheron/core/mcp_server_2025-09-13_17-59-57/
  MCP server implementation (9 files) - to be reconsidered later

- core/search/ → Acheron/core/search_2025-09-13_17-59-57/
  Empty search module (just __pycache__)

Preserved core/graph/ as it will be the next development phase after
the restructure is validated.

Clean core/ structure now contains only:
- config/     (Phase 3 - Configuration)
- database/   (Phase 2 - Database interfaces)
- embedders/  (Phase 1 - Embedders)
- extractors/ (Phase 1 - Extractors)
- graph/      (Next phase - Graph building)
- monitoring/ (Phase 3 - Monitoring)
- processors/ (Reorganized processing)
- utils/      (Minimal utilities)
- workflows/  (Phase 2 - Orchestration)

Ready for E2E testing with clean module structure.
feat: Phase 4 - Integration and Testing (Issue #35)
… ingestion pipeline

- Deleted `store_in_arango.py`, `verify_arango.py`, and `verify_github_in_arango.py` scripts as part of the transition to a new architecture.
- Introduced `e2e_ingestion.py` for processing papers from PostgreSQL to ArangoDB, validating the new core structure.
- Added `simple_e2e_test.py` for direct PDF processing tests, ensuring extractor and embedder functionality.
- Updated logging and error handling throughout the new pipeline for improved monitoring and debugging.
…erformance tracking

- Introduced core monitoring functionalities including progress tracking, resource monitoring, and performance metrics.
- Implemented various monitoring patterns such as real-time dashboards, distributed monitoring, and alerting mechanisms.
- Developed workflows module to orchestrate complex document processing pipelines with state management and error recovery features.
- Added support for batch processing, parallel execution, and conditional routing within workflows.
- Enhanced documentation for both modules, providing comprehensive usage examples and configuration options.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 14, 2025

Walkthrough

Adds a new core architecture: configuration system (BaseConfig/Loader/Manager), monitoring (metrics/performance/progress), embedders/extractors factories and bases, workflows (base/PDF/batch/state/storage), and a database factory. Refactors/relocates processors to workflows with deprecation shims. Removes legacy core/framework and MCP server. Updates exports, adds .coderabbit.yaml, ignores scratch/, and replaces extensive docs.

Changes

Cohort / File(s) Summary
Repo config
/.coderabbit.yaml, /.gitignore
Adds CodeRabbit review config with tools, path filters, instructions, and pre-merge checks. Ignores scratch/*.
Top-level docs removal
/README.md, /MIGRATION_README.md, docs/*
Removes main README and numerous ADR/PRD/methodology/issues/theory docs.
Core README docs
/core/README.md, /core/config/README.md, /core/embedders/README.md, /core/extractors/README.md, /core/monitoring/README.md, /core/workflows/README.md
Adds new documentation for core subsystems: infra, config, embedders, extractors, monitoring, workflows.
Config system
/core/config/__init__.py, /core/config/config_base.py, /core/config/config_loader.py, /core/config/config_manager.py
Introduces Pydantic-based BaseConfig, schema/env hierarchical loader, centralized manager with scopes, caching, and global accessors. Public APIs and defaults added.
Database layer
/core/database/__init__.py, /core/database/arango/__init__.py, /core/database/database_factory.py
Adds DatabaseFactory (Arango/Postgres/Redis, pools, env creds). Re-exports Arango manager/retry via new arango package (with optional Unix client). Updates public all.
Embedders
/core/embedders/__init__.py, /core/embedders/embedders_base.py, /core/embedders/embedders_factory.py, /core/embedders/embedders_jina.py
Introduces base interface, config dataclass, factory with auto-registration, and updates JinaV4Embedder to EmbedderBase with properties and single/batch APIs.
Extractors
/core/extractors/__init__.py, /core/extractors/extractors_base.py, /core/extractors/extractors_factory.py, /core/extractors/extractors_docling.py, /core/extractors/extractors_code.py
Adds base result/config classes, factory with type detection/auto-reg, refactors Docling/Code extractors to new config/result protocol, adds batch APIs and supported formats.
Monitoring
/core/monitoring/__init__.py, /core/monitoring/metrics_base.py, /core/monitoring/performance_monitor.py, /core/monitoring/progress_tracker.py, /core/monitoring/compat_monitor.py, /core/monitoring/migration_guide.py
Adds typed metrics, performance/system/GPU monitor, progress tracking, legacy compatibility layer, helpers, and migration utilities. Exposes factory functions and conveyance helpers.
Workflows
/core/workflows/__init__.py, /core/workflows/workflow_base.py, /core/workflows/workflow_pdf.py, /core/workflows/workflow_pdf_batch.py, /core/workflows/state/__init__.py, /core/workflows/storage/__init__.py, /core/workflows/storage/storage_base.py
Adds workflow base/config/result with checkpointing; PDF workflow implementing extraction→chunking→embedding; batch wiring import updates; state/storage package initializers and abstract storage base.
Processors refactor
/core/processors/__init__.py, /core/processors/document_processor.py, /core/processors/text/__init__.py
Shifts public API to text chunking; adds lazy deprecations mapping to workflows; document_processor becomes deprecation shim re-exporting workflow_pdf.
Legacy framework removal
/core/framework/__init__.py, /core/framework/base_processor.py, /core/framework/config.py, /core/framework/extractors/__init__.py, /core/framework/logging.py, /core/framework/metrics.py, /core/framework/requirements_framework.txt
Removes legacy framework facades, base processor, config, logging, metrics, and related exports.
MCP server removal
/core/mcp_server/*
Removes MCP server code, launcher, config, and requirements.
Utils API removal
/core/utils/__init__.py
Deletes utils package re-exports (state, preflight, batch processors).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App
  participant CM as ConfigManager
  participant CL as ConfigLoader
  participant FS as Filesystem
  participant ENV as Environment

  App->>CM: get(name, instance_id?, overrides?)
  CM->>CL: discover_sources(name)
  CL->>FS: check CWD/home/base_dir for *.yaml/json
  FS-->>CL: source list
  CM->>CL: load_hierarchical(name, config_class, schema?, env_prefix)
  CL->>FS: read files (YAML/JSON)
  CL->>ENV: read HADES_* variables
  CL-->>CM: merged dict (validated)
  CM-->>App: Config instance (cached)
Loading
sequenceDiagram
  autonumber
  actor Caller
  participant WF as DocumentProcessor (workflow_pdf)
  participant EX as ExtractorFactory/Extractors
  participant CK as Chunkers
  participant EM as EmbedderFactory/Embedders

  Caller->>WF: process_document(pdf_path, latex_path?)
  WF->>EX: create_for_file(pdf_path) / LaTeX optional
  EX-->>WF: ExtractionResult (text, tables, equations, ...)
  WF->>CK: choose strategy (late|token) and chunk text
  CK-->>WF: chunks
  WF->>EM: create(model_name)
  WF->>EM: embed(chunks)
  EM-->>WF: embeddings
  WF-->>Caller: ProcessingResult (extraction, chunks+emb, timings)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120–180 minutes

Possibly related issues

  • r3d91ll/HADES#9 — Adds DatabaseFactory and PDF processing with atomic patterns; aligns with ACID processing described.
  • r3d91ll/HADES#13 — Introduces CodeExtractor, factories, and generic processor/workflows required for GitHub integration.

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Pr Description Sections ⚠️ Warning I reviewed the PR description provided in the PR metadata and it does not include the required sections specified by the custom check: Conveyance Summary, W/R/H/T Mapping, Performance Evidence, and Tests & Compatibility. The current description states only that this is cleanup and that e2e work will come later, with no structured evidence or compatibility/testing notes. Therefore, the check cannot pass as the mandatory sections are missing. Please edit the PR description to add the four sections. Include: (1) Conveyance Summary explaining the intent and impact of the cleanup; (2) W/R/H/T Mapping describing how the changes affect WHERE/WHAT/WHO/TIME (e.g., path refactors, API moves, time/latency implications); (3) Performance Evidence with any measurements or a statement that no runtime impact is expected and why; (4) Tests & Compatibility outlining test coverage, breaking changes, deprecations, and migration guidance. Once added, re-run the check to pass.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Feature/core restructure phase5 e2e" references the primary activity (a core restructure) and therefore is related to the changeset; however it reads like a branch name, lacks a clear human‑readable verb phrase, and the "e2e" token is misleading because the PR description states no end‑to‑end work was completed.
Docstring Coverage ✅ Passed Docstring coverage is 98.37% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/core-restructure-phase5-e2e

Comment @coderabbitai help to get the list of available commands and usage tips.

@r3d91ll r3d91ll merged commit 14c9696 into main Sep 14, 2025
1 check was pending
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 23

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
core/workflows/workflow_pdf_batch.py (1)

512-522: Unify document_id across collections (use sanitized_id).

Paper doc stores the raw document_id while chunks/embeddings use sanitized_id. This breaks joins and downstream queries.

-                paper_doc = {
-                    '_key': sanitized_id,
-                    'document_id': document_id,
+                paper_doc = {
+                    '_key': sanitized_id,
+                    'document_id': sanitized_id,
                     'metadata': doc.get('metadata', {}),
                     'status': 'PROCESSED',
                     'processing_date': datetime.now().isoformat(),
                     'num_chunks': len(chunks),
                     'has_latex': extracted.get('has_latex', False)
                 }

If you must preserve the original ID, add it as source_document_id.

+                paper_doc['source_document_id'] = document_id
core/extractors/extractors_docling.py (3)

191-193: Over-broad “empty” check blocks valid fallback paths.

Matching substring “empty” can catch unrelated errors and skip fallback. Tighten to specific messages.

-            if any(x in error_msg for x in ["empty", "Invalid PDF header", "Cannot read PDF file"]):
+            if any(x in error_msg for x in [
+                "Text file is empty",
+                "File is empty",
+                "Invalid PDF header",
+                "Cannot read PDF file",
+            ]):

317-326: Data loss: structures not exposed at top level; missing version field.

Downstream expects tables/equations/images/figures at top level; otherwise they become empty by default. Add them and set a stable version.

-        return {
-            'full_text': full_text,
-            'markdown': full_text,  # Also provide as 'markdown' key
-            'structures': structures,
-            'metadata': {
-                'extractor': 'docling_v2',
-                'num_pages': num_pages,
-                'processing_time': None  # Docling doesn't provide this
-            }
-        }
+        return {
+            'full_text': full_text,
+            'markdown': full_text,
+            # Expose commonly used structures at top level to avoid silent loss
+            'tables': structures.get('tables', []),
+            'equations': structures.get('equations', []),
+            'images': structures.get('images', []),
+            'figures': structures.get('images', []),  # alias for compatibility
+            'structures': structures,
+            'metadata': {
+                'extractor': 'docling_v2',
+                'num_pages': num_pages,
+                'processing_time': None
+            },
+            'version': 'docling_v2',
+        }

352-361: Align fallback return schema with top-level keys and version.

Ensure downstream consumers receive expected fields.

-                return {
-                    'full_text': full_text,
-                    'markdown': markdown_text,  # Add markdown key
-                    'structures': {},
-                    'metadata': {
-                        'extractor': 'pymupdf_fallback',
-                        'num_pages': num_pages,
-                        'warning': 'Using fallback extractor - structures not extracted'
-                    }
-                }
+                return {
+                    'full_text': full_text,
+                    'markdown': markdown_text,
+                    'tables': [],
+                    'equations': [],
+                    'images': [],
+                    'figures': [],
+                    'structures': {},
+                    'metadata': {
+                        'extractor': 'pymupdf_fallback',
+                        'num_pages': num_pages,
+                        'warning': 'Using fallback extractor - structures not extracted'
+                    },
+                    'version': 'pymupdf_fallback',
+                }
core/embedders/embedders_jina.py (3)

132-136: Align method signature with base class and respect configured batch size

EmbedderBase.embed_texts accepts batch_size: Optional[int] = None. Here it’s int = 4, which breaks substitutability and ignores self.config.batch_size.

Apply:

-    def embed_texts(self, 
-                    texts: List[str], 
-                    task: str = "retrieval",
-                    batch_size: int = 4) -> np.ndarray:
+    def embed_texts(self,
+                    texts: List[str],
+                    task: str = "retrieval",
+                    batch_size: Optional[int] = None) -> np.ndarray:
@@
-        all_embeddings = []
+        all_embeddings = []
+        batch_size = batch_size or getattr(self.config, "batch_size", 4)

1015-1035: Fix test harness and formula example

  • The constructor here uses device=..., which current class signature doesn't accept; use the normalized config.
  • The example function uses “conveyance” as an input and omits T and α; this contradicts the mandated formula.

Apply:

-    embedder = JinaV4Embedder(device="cuda" if torch.cuda.is_available() else "cpu")
+    embedder = JinaV4Embedder(EmbeddingConfig(
+        model_name="jinaai/jina-embeddings-v4",
+        device="cuda" if torch.cuda.is_available() else "cpu"
+    ))
@@
-    code = [
-        "def calculate_information(where, what, conveyance):\n    return where * what * conveyance"
-    ]
+    code = [
+        "def efficiency(W, R, H, T, Ctx, alpha=1.7):\n    return (W*R*H/T) * (Ctx**alpha)"
+    ]

66-103: Fix: initialize base class and set self.config; restore backward-compatible constructor

File: core/embedders/embedders_jina.py Lines: 66-103

  • Missing call to super().init(...) and setting self.config; this will break downstream calls (e.g., get_model_info()).
  • Constructor drops legacy usage (e.g., JinaV4Embedder(device="cuda")); normalize inputs and accept kwargs.
-    def __init__(self, config=None):
+    def __init__(self, config: Optional[EmbeddingConfig] = None, **kwargs):
@@
-        # Handle both old-style params and new config object
-        if config is None:
-            config = {}
-        elif hasattr(config, 'device'):
-            # It's an EmbeddingConfig object - extract values
-            old_config = config
-            config = {
-                'device': old_config.device,
-                'use_fp16': old_config.use_fp16,
-                'batch_size': old_config.batch_size,
-                'chunk_size_tokens': 1000,
-                'chunk_overlap_tokens': 200
-            }
-        elif not isinstance(config, dict):
-            # Old-style single param (device)
-            config = {'device': str(config)}
-
-        # Remove config_path handling since we have the config already
-        device = None
-        use_fp16 = None
-        chunk_size_tokens = None
-        chunk_overlap_tokens = None
-        # Config is already processed above, no need to load from file
+        # Normalize to EmbeddingConfig and keep backward-compat kwargs (e.g., device="cuda")
+        if isinstance(config, EmbeddingConfig):
+            cfg_dict = {
+                "model_name": config.model_name,
+                "device": config.device,
+                "batch_size": config.batch_size,
+                "max_seq_length": config.max_seq_length,
+                "use_fp16": config.use_fp16,
+                "chunk_size_tokens": config.chunk_size_tokens,
+                "chunk_overlap_tokens": config.chunk_overlap_tokens,
+            }
+        elif isinstance(config, dict) or config is None:
+            cfg_dict = dict(config or {})
+        else:
+            # Legacy single positional param treated as device
+            cfg_dict = {"device": str(config)}
+
+        # Merge explicit kwargs (take precedence)
+        cfg_dict.update(kwargs)
+
+        model_name = cfg_dict.get("model_name", "jinaai/jina-embeddings-v4")
+        device = cfg_dict.get("device") or ("cuda" if torch.cuda.is_available() else "cpu")
+        self.config = EmbeddingConfig(
+            model_name=model_name,
+            device=device,
+            batch_size=cfg_dict.get("batch_size", 32),
+            max_seq_length=cfg_dict.get("max_seq_length", 32768),
+            use_fp16=bool(cfg_dict.get("use_fp16", True)),
+            chunk_size_tokens=cfg_dict.get("chunk_size_tokens", 1000),
+            chunk_overlap_tokens=cfg_dict.get("chunk_overlap_tokens", 200),
+        )
+        super().__init__(self.config)
@@
-        # Set parameters with config as defaults, overridable by arguments
-        self.device = device or config.get('device', 'cuda')
-        self.model_name = config.get('model_name', 'jinaai/jina-embeddings-v4')
-        self.chunk_size_tokens = chunk_size_tokens or config.get('chunk_size_tokens', 1000)
-        self.chunk_overlap_tokens = chunk_overlap_tokens or config.get('chunk_overlap_tokens', 200)
-        use_fp16 = use_fp16 if use_fp16 is not None else config.get('use_fp16', True)
+        # Set instance parameters from normalized config
+        self.device = self.config.device
+        self.model_name = self.config.model_name
+        self.chunk_size_tokens = self.config.chunk_size_tokens or 1000
+        self.chunk_overlap_tokens = self.config.chunk_overlap_tokens or 200
+        use_fp16 = self.config.use_fp16
@@
-        dtype = torch.float16 if (use_fp16 and self.device.startswith("cuda")) else torch.float32
+        dtype = torch.float16 if (use_fp16 and str(self.device).startswith("cuda")) else torch.float32

Comment on lines +9 to +14
base_branches:
- "^main$"
- "^feature/.*$"
- "^develop$"
- "^release/.*$"
- "^hotfix/.*$"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm branch regexes cover your flow.

If you use bugfix/, chore/, or docs/ branches, add them.


🏁 Script executed:

#!/bin/bash
# Verify which remote branches are NOT matched by the configured patterns.
patterns=('^main$' '^feature/.*$' '^develop$' '^release/.*$' '^hotfix/.*$')
git ls-remote --heads origin | awk '{print $2}' | sed 's#refs/heads/##' | python - "$@" << 'PY'
import re,sys
pats=[re.compile(p) for p in sys.argv[1:]]
unmatched=[]
for line in sys.stdin:
  b=line.strip()
  if not any(p.match(b) for p in pats):
    unmatched.append(b)
if unmatched:
  print("Unmatched branches:")
  print("\n".join(sorted(set(unmatched))))
else:
  print("All remote branches match configured patterns.")
PY "${patterns[@]}"

Length of output: 193


🏁 Script executed:

#!/bin/bash
set -euo pipefail
patterns=('^main$' '^feature/.*$' '^develop$' '^release/.*$' '^hotfix/.*$')
git ls-remote --heads origin 2>/dev/null | awk '{print $2}' | sed 's#refs/heads/##' > /tmp/branches.txt || true
if [ ! -s /tmp/branches.txt ]; then
  echo "git ls-remote returned no branches (maybe origin not set or network disabled)."
  exit 0
fi
unmatched=()
while IFS= read -r b; do
  matched=0
  for p in "${patterns[@]}"; do
    if [[ "$b" =~ $p ]]; then matched=1; break; fi
  done
  if [ $matched -eq 0 ]; then unmatched+=("$b"); fi
done < /tmp/branches.txt
if [ ${#unmatched[@]} -gt 0 ]; then
  echo "Unmatched branches:"
  printf "%s\n" "${unmatched[@]}" | sort -u
else
  echo "All remote branches match configured patterns."
fi

Length of output: 281


Add missing branch patterns to .coderabbit.yaml

git ls-remote returned these remote branches that do not match the configured base_branches (file: .coderabbit.yaml, lines 9–14):

  • coderabbitai/docstrings/429692f
  • coderabbitai/docstrings/67e8628
  • coderabbitai/docstrings/7fc37dd
  • coderabbitai/docstrings/91d9a2f
  • coderabbitai/docstrings/ce6c7ae
  • experiement/word2vec_TPB
  • experiement_word2vec_lineage
  • pr-17
  • rebuild_safe
  • separate_pdf_proc_from_arxiv

Update or broaden base_branches to cover them. Suggested additions:

  • ^coderabbitai/docstrings/.*$
  • ^experiement.*$
  • ^pr-.*$
  • ^rebuild_safe$
  • ^separate_pdf_proc_from_arxiv$
    (also add ^bugfix/.$, ^chore/.$, ^docs/.*$ if those flows are used)

Comment on lines +487 to +493
if self.staging_directory is not None:
# Staging directory can be created if it doesn't exist
try:
self.staging_directory.mkdir(parents=True, exist_ok=True)
except OSError as e:
errors.append(f"Cannot create staging directory {self.staging_directory}: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid side effects in validation (creates directories during validate).

Creating directories in validate_semantics() can mutate FS on mere load/score calls (e.g., get_context_score). Validate should be read‑only.
Apply:

-        if self.staging_directory is not None:
-            # Staging directory can be created if it doesn't exist
-            try:
-                self.staging_directory.mkdir(parents=True, exist_ok=True)
-            except OSError as e:
-                errors.append(f"Cannot create staging directory {self.staging_directory}: {e}")
+        if self.staging_directory is not None:
+            if not self.staging_directory.exists():
+                errors.append(f"Staging directory does not exist: {self.staging_directory}")
+            elif not self.staging_directory.is_dir():
+                errors.append(f"Staging directory path is not a directory: {self.staging_directory}")

Optionally expose a separate helper (outside validation) to provision directories.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.staging_directory is not None:
# Staging directory can be created if it doesn't exist
try:
self.staging_directory.mkdir(parents=True, exist_ok=True)
except OSError as e:
errors.append(f"Cannot create staging directory {self.staging_directory}: {e}")
if self.staging_directory is not None:
if not self.staging_directory.exists():
errors.append(f"Staging directory does not exist: {self.staging_directory}")
elif not self.staging_directory.is_dir():
errors.append(f"Staging directory path is not a directory: {self.staging_directory}")
🤖 Prompt for AI Agents
In core/config/config_base.py around lines 487-493, the validate_semantics()
currently creates the staging directory (side effect) which must be avoided;
change this to only perform read-only checks (e.g., if staging_directory is not
None then verify that it exists and is a directory and is writable, appending
errors if not) and remove the try/except that calls mkdir; add or expose a
separate non-validation helper (e.g., provision_staging_directory or
ensure_directories_exist) elsewhere in the class or module that performs the
mkdir(parents=True, exist_ok=True) and handles OSError, and call that helper
from runtime provisioning code paths (not from validate_semantics).

Comment on lines +317 to +324
# Build cache key
cache_key = name
if instance_id:
cache_key = f"{name}:{instance_id}"
if overrides:
override_hash = hash(frozenset(overrides.items()))
cache_key = f"{cache_key}:{override_hash}"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Cache key may raise TypeError for unhashable overrides (dict/list). Use stable hash.

-        if overrides:
-            override_hash = hash(frozenset(overrides.items()))
-            cache_key = f"{cache_key}:{override_hash}"
+        if overrides:
+            # Stable hash from JSON representation; handles nested/unhashable types.
+            override_hash = hashlib.sha1(
+                json.dumps(overrides, sort_keys=True, default=str).encode("utf-8")
+            ).hexdigest()
+            cache_key = f"{cache_key}:{override_hash}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Build cache key
cache_key = name
if instance_id:
cache_key = f"{name}:{instance_id}"
if overrides:
override_hash = hash(frozenset(overrides.items()))
cache_key = f"{cache_key}:{override_hash}"
# Build cache key
cache_key = name
if instance_id:
cache_key = f"{name}:{instance_id}"
if overrides:
# Stable hash from JSON representation; handles nested/unhashable types.
override_hash = hashlib.sha1(
json.dumps(overrides, sort_keys=True, default=str).encode("utf-8")
).hexdigest()
cache_key = f"{cache_key}:{override_hash}"
🤖 Prompt for AI Agents
In core/config/config_manager.py around lines 317-324, the current override_hash
calculation uses hash(frozenset(overrides.items())) which raises TypeError for
unhashable values; replace it with a stable, deterministic hash by serializing
overrides to a canonical string (e.g. json.dumps(overrides, sort_keys=True,
separators=(',', ':'), default=str)) and computing a cryptographic hash (e.g.
hashlib.sha256(...).hexdigest()), import hashlib and json at top, and use that
hex digest in the cache_key construction so lists/dicts and nested/unhashable
values no longer cause errors.

Comment on lines 8 to 13
from .database_factory import DatabaseFactory

__all__ = ['ArangoDBManager', 'retry_with_backoff']
# Import from subdirectories
from .arango import ArangoDBManager, retry_with_backoff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Re-export wiring looks good.

Relative imports used correctly; infra does not depend on experiments/.

If callers previously imported from deprecated paths, run to spot breakages:


🏁 Script executed:

#!/bin/bash
# Find legacy imports replaced by this re-export surface
rg -nP -C2 '(from\s+core\.database\.(arango_db_manager|arango_client)\s+import|import\s+core\.database\.(arango_db_manager|arango_client))'

Length of output: 1702


Update legacy imports to use core.database re-exports

Found direct imports of core.database.arango_db_manager; update callers to import the re-exported symbols from core.database.

  • tools/arxiv/arxiv_manager.py:31 — from core.database.arango_db_manager import ArangoDBManager
  • tools/arxiv/utils/check_papers.py:21 — from core.database.arango_db_manager import ArangoDBManager
  • core/workflows/workflow_pdf_batch.py:412 — from core.database.arango_db_manager import ArangoDBManager
  • tests/test_github_treesitter.py:163 — from core.database.arango_db_manager import ArangoDBManager
  • tests/test_c_files.py:129 — from core.database.arango_db_manager import ArangoDBManager

Replace with:
from core.database import ArangoDBManager
(and import retry_with_backoff from core.database where required)

Comment on lines +1 to +6
"""
ArangoDB Database Interface
Provides optimized ArangoDB connections including Unix socket support
for improved performance and atomic transaction handling.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Docstring: explicitly connect to the efficiency equation and atomicity claim.

Per guidelines, tie to W/R/H/T and ensure the “atomic transaction handling” statement matches the public API (see insert_document in arango_client.py which is non-transactional by default).

-"""
-ArangoDB Database Interface
-
-Provides optimized ArangoDB connections including Unix socket support
-for improved performance and atomic transaction handling.
-"""
+"""
+ArangoDB Database Interface
+
+Provides optimized ArangoDB connections, including Unix socket support,
+with transactional primitives for atomic operations.
+
+Theory link (Efficiency stance):
+  C = (W · R · H / T) · Ctx^α (α only on Ctx; default 1.7).
+This module advances R via topology‑aware connectivity and reduces T via low‑latency sockets and pooling,
+while H is leveraged through safe factory/manager abstractions.
+"""

Follow‑up: Ensure all write operations exposed by ArangoDBManager are atomic by default (begin/commit or AQL transaction). Today, insert_document(..., overwrite=False) performs a bare insert. Either:

  • wrap it in a transaction by default, or
  • clearly document it as non‑atomic and provide insert_document_atomic(...).

Would you like a patch to add an atomic insert/update API around begin_transaction?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"""
ArangoDB Database Interface
Provides optimized ArangoDB connections including Unix socket support
for improved performance and atomic transaction handling.
"""
"""
ArangoDB Database Interface
Provides optimized ArangoDB connections, including Unix socket support,
with transactional primitives for atomic operations.
Theory link (Efficiency stance):
C = (W · R · H / T) · Ctx^α (α only on Ctx; default 1.7).
This module advances R via topologyaware connectivity and reduces T via lowlatency sockets and pooling,
while H is leveraged through safe factory/manager abstractions.
"""

Comment on lines +7 to +8
Workflows orchestrate the interaction between extractors, embedders, storage systems, and monitoring components, implementing the complete Conveyance Framework pipeline. They manage the flow of information through all dimensions (W×R×H×T) while maintaining Context coherence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Conveyance equation usage (don’t multiply by T).

“W×R×H×T” implies multiplying Time; per guidelines T divides. Align wording.

-Workflows orchestrate the interaction between extractors, embedders, storage systems, and monitoring components, implementing the complete Conveyance Framework pipeline. They manage the flow of information through all dimensions (W×R×H×T) while maintaining Context coherence.
+Workflows orchestrate the interaction between extractors, embedders, storage systems, and monitoring components, implementing the complete Conveyance Framework pipeline. They manage the flow across W, R, and H while reducing T (i.e., efficiency ∝ W·R·H/T), maintaining Context coherence.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Workflows orchestrate the interaction between extractors, embedders, storage systems, and monitoring components, implementing the complete Conveyance Framework pipeline. They manage the flow of information through all dimensions (W×R×H×T) while maintaining Context coherence.
Workflows orchestrate the interaction between extractors, embedders, storage systems, and monitoring components, implementing the complete Conveyance Framework pipeline. They manage the flow across W, R, and H while reducing T (i.e., efficiency ∝ W·R·H/T), maintaining Context coherence.
🤖 Prompt for AI Agents
In core/workflows/README.md around lines 7 to 8, the Conveyance equation is
incorrectly written as “W×R×H×T” which multiplies by Time; update the wording to
reflect that Time (T) divides rather than multiplies. Replace “W×R×H×T” with a
correct expression such as “W×R×H ÷ T” or “(W×R×H) / T” and adjust the
surrounding sentence to say the workflows manage flow across dimensions W×R×H
while dividing by T (temporal normalization) to maintain Context coherence.

Comment on lines +79 to +407
### PDFWorkflow

Specialized workflow for PDF processing:

```python
from core.workflows import PDFWorkflow

# Initialize workflow
workflow = PDFWorkflow(
config={
"extraction": {
"type": "docling",
"use_ocr": True,
"chunk_size": 1000
},
"embedding": {
"model": "jinaai/jina-embeddings-v3",
"device": "cuda",
"batch_size": 32
},
"storage": {
"type": "arango",
"database": "academy_store",
"collection": "papers"
}
}
)

# Process single PDF
result = workflow.process("research_paper.pdf")

print(f"Success: {result.success}")
print(f"Chunks: {len(result.chunks)}")
print(f"Processing time: {result.duration:.2f}s")

# Process batch
results = workflow.process_batch(
pdf_files,
num_workers=8,
show_progress=True
)
```

### BatchWorkflow

High-throughput batch processing:

```python
from core.workflows import BatchWorkflow

workflow = BatchWorkflow(
max_workers=32,
batch_size=24,
use_gpu=True,
checkpoint_interval=100 # Save state every 100 items
)

# Process large dataset with checkpointing
results = workflow.process_dataset(
input_dir="/data/papers",
output_dir="/data/processed",
resume_from_checkpoint=True # Resume if interrupted
)

print(f"Processed: {results.total_processed}")
print(f"Failed: {results.total_failed}")
print(f"Throughput: {results.items_per_minute:.2f} items/min")
```

### StateManager

Pipeline state tracking and recovery:

```python
from core.workflows.state import StateManager

state = StateManager(
checkpoint_dir="/tmp/pipeline_state",
save_interval=60 # Save every 60 seconds
)

# Track processing state
state.mark_started("doc_123")
state.mark_completed("doc_123", result_data)

# Check status
if state.is_completed("doc_123"):
print("Already processed")

# Get pending items
pending = state.get_pending()
print(f"{len(pending)} items pending")

# Recovery after crash
state.load_checkpoint()
resumed_items = state.get_in_progress()
for item in resumed_items:
reprocess(item)
```

## Workflow Patterns

### Basic Pipeline

```python
from core.workflows import PDFWorkflow

# Simple sequential processing
workflow = PDFWorkflow()

for pdf_file in pdf_files:
try:
result = workflow.process(pdf_file)
print(f"Processed: {pdf_file}")
except Exception as e:
print(f"Failed: {pdf_file} - {e}")
```

### Parallel Processing

```python
from core.workflows import ParallelWorkflow
from concurrent.futures import ProcessPoolExecutor

workflow = ParallelWorkflow(
num_workers=16,
chunk_size=10 # Items per worker
)

# Process in parallel
with ProcessPoolExecutor(max_workers=16) as executor:
futures = workflow.submit_batch(executor, documents)
results = workflow.collect_results(futures)

print(f"Success rate: {results.success_rate:.2%}")
```

### Phased Execution

```python
from core.workflows import PhasedWorkflow

# Separate extraction and embedding phases
workflow = PhasedWorkflow(
phases=["extraction", "embedding", "storage"]
)

# Phase 1: Extract all documents
with workflow.phase("extraction") as phase:
extracted = phase.process_all(
documents,
workers=32, # CPU-intensive
batch_size=24
)

# Phase 2: Generate embeddings
with workflow.phase("embedding") as phase:
embeddings = phase.process_all(
extracted,
workers=8, # GPU-limited
batch_size=32
)

# Phase 3: Store results
with workflow.phase("storage") as phase:
phase.process_all(embeddings, workers=4)
```

### Error Recovery

```python
from core.workflows import ResilientWorkflow

workflow = ResilientWorkflow(
max_retries=3,
retry_delay=5.0,
fallback_strategies=["docling", "pypdf", "ocr"]
)

# Process with automatic retry and fallback
results = workflow.process_with_recovery(
documents,
on_error="continue", # or "stop", "retry"
save_failed="/tmp/failed_docs"
)

# Review failed items
for failed in results.failed_items:
print(f"Failed: {failed.path}")
print(f"Error: {failed.error}")
print(f"Attempts: {failed.attempts}")
```

### Streaming Workflow

```python
from core.workflows import StreamingWorkflow

# Process documents as stream
workflow = StreamingWorkflow(
buffer_size=100,
flush_interval=30 # Flush every 30 seconds
)

# Stream processing
for document in workflow.stream_process(document_source):
# Results available immediately
print(f"Processed: {document.id}")

# Periodic flush to storage
if workflow.should_flush():
workflow.flush_to_storage()
```

## Advanced Features

### Custom Components

```python
from core.workflows import WorkflowBase
from core.extractors import ExtractorBase
from core.embedders import EmbedderBase

class CustomWorkflow(WorkflowBase):
"""Workflow with custom components."""

def create_extractor(self) -> ExtractorBase:
"""Create custom extractor."""
if self.config.get("use_custom_extractor"):
return MyCustomExtractor(self.config.extraction)
return super().create_extractor()

def create_embedder(self) -> EmbedderBase:
"""Create custom embedder."""
if self.config.get("use_custom_embedder"):
return MyCustomEmbedder(self.config.embedding)
return super().create_embedder()

def post_process(self, result: WorkflowResult) -> WorkflowResult:
"""Custom post-processing."""
# Add custom metadata
result.metadata["processed_by"] = "CustomWorkflow"
result.metadata["version"] = "1.0.0"

# Custom validation
if not self.validate_result(result):
raise ValueError("Result validation failed")

return result
```

### Conditional Routing

```python
from core.workflows import ConditionalWorkflow

workflow = ConditionalWorkflow()

# Define routing rules
workflow.add_route(
condition=lambda doc: doc.page_count > 100,
handler=LargeDocumentWorkflow()
)

workflow.add_route(
condition=lambda doc: doc.has_equations,
handler=MathDocumentWorkflow()
)

workflow.add_route(
condition=lambda doc: doc.is_scanned,
handler=OCRWorkflow()
)

# Process with automatic routing
for document in documents:
result = workflow.process(document)
print(f"Processed via: {result.workflow_used}")
```

### Pipeline Composition

```python
from core.workflows import PipelineComposer

# Compose complex pipeline
pipeline = PipelineComposer()

pipeline.add_stage("preprocessing", PreprocessWorkflow())
pipeline.add_stage("extraction", ExtractionWorkflow())
pipeline.add_stage("enrichment", EnrichmentWorkflow())
pipeline.add_stage("embedding", EmbeddingWorkflow())
pipeline.add_stage("storage", StorageWorkflow())

# Add transformations between stages
pipeline.add_transform(
from_stage="extraction",
to_stage="enrichment",
transform=lambda x: enrich_with_metadata(x)
)

# Execute pipeline
results = pipeline.execute(documents)
```

### Workflow Monitoring

```python
from core.workflows import MonitoredWorkflow
from core.monitoring import WorkflowMonitor

# Workflow with integrated monitoring
workflow = MonitoredWorkflow(
base_workflow=PDFWorkflow(),
monitor=WorkflowMonitor(
metrics_file="workflow_metrics.json",
log_interval=10
)
)

# Process with monitoring
results = workflow.process_batch(documents)

# Get performance metrics
metrics = workflow.get_metrics()
print(f"Average processing time: {metrics.avg_time:.2f}s")
print(f"Throughput: {metrics.throughput:.2f} docs/min")
print(f"Success rate: {metrics.success_rate:.2%}")
print(f"Resource usage: {metrics.resource_usage}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify referenced classes exist or mark examples as aspirational.

Docs reference many workflow classes (PDFWorkflow, BatchWorkflow, ParallelWorkflow, PhasedWorkflow, ResilientWorkflow, StreamingWorkflow, PipelineComposer, MonitoredWorkflow, CheckpointedWorkflow, TransactionalWorkflow, MemoryOptimizedWorkflow, GPUOptimizedWorkflow, CachedWorkflow, WorkflowFactory, StateManager). Ensure they’re implemented/exported or annotate “coming soon”.


🏁 Script executed:

#!/bin/bash
# Verify classes referenced by README exist somewhere under core/workflows
rg -nP --glob 'core/workflows/**' -C1 \
  -e 'class\s+(PDFWorkflow|BatchWorkflow|ParallelWorkflow|PhasedWorkflow|ResilientWorkflow|StreamingWorkflow|PipelineComposer|MonitoredWorkflow|CheckpointedWorkflow|TransactionalWorkflow|MemoryOptimizedWorkflow|GPUOptimizedWorkflow|CachedWorkflow|WorkflowFactory|WorkflowBase|StateManager)\b' || true

Length of output: 293


Implement the referenced workflow classes or mark these README examples as “coming soon”.

Verification: only WorkflowBase (core/workflows/workflow_base.py) and StateManager (core/workflows/state/state_manager.py) are present; the README (core/workflows/README.md, lines 79–407) references the following missing classes: PDFWorkflow, BatchWorkflow, ParallelWorkflow, PhasedWorkflow, ResilientWorkflow, StreamingWorkflow, PipelineComposer, MonitoredWorkflow, CheckpointedWorkflow, TransactionalWorkflow, MemoryOptimizedWorkflow, GPUOptimizedWorkflow, CachedWorkflow, WorkflowFactory. Either implement/export those classes under core/workflows or annotate/remove the examples to avoid misleading users.

Comment on lines +171 to +183
def get_storage_info(self) -> Dict[str, Any]:
"""
Get information about the storage backend.
Returns:
Dictionary with storage metadata
"""
return {
"type": self.storage_type,
"supports_metadata": self.supports_metadata,
"supports_streaming": self.supports_streaming,
"config": self.config
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Redact secrets in get_storage_info().

Returning raw config can leak credentials (e.g., passwords, tokens, keys). Redact sensitive fields before exposing/logging.

     def get_storage_info(self) -> Dict[str, Any]:
         """
         Get information about the storage backend.
 
         Returns:
             Dictionary with storage metadata
         """
-        return {
+        def _redact(cfg: Dict[str, Any]) -> Dict[str, Any]:
+            redacted = {}
+            for k, v in cfg.items():
+                if any(s in k.lower() for s in ("password","secret","token","key","access_key","secret_key","credential")):
+                    redacted[k] = "***"
+                else:
+                    redacted[k] = v
+            return redacted
+
+        return {
             "type": self.storage_type,
             "supports_metadata": self.supports_metadata,
             "supports_streaming": self.supports_streaming,
-            "config": self.config
+            "config": _redact(self.config)
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_storage_info(self) -> Dict[str, Any]:
"""
Get information about the storage backend.
Returns:
Dictionary with storage metadata
"""
return {
"type": self.storage_type,
"supports_metadata": self.supports_metadata,
"supports_streaming": self.supports_streaming,
"config": self.config
}
def get_storage_info(self) -> Dict[str, Any]:
"""
Get information about the storage backend.
Returns:
Dictionary with storage metadata
"""
def _redact(cfg: Dict[str, Any]) -> Dict[str, Any]:
redacted = {}
for k, v in cfg.items():
if any(s in k.lower() for s in ("password", "secret", "token", "key", "access_key", "secret_key", "credential")):
redacted[k] = "***"
else:
redacted[k] = v
return redacted
return {
"type": self.storage_type,
"supports_metadata": self.supports_metadata,
"supports_streaming": self.supports_streaming,
"config": _redact(self.config)
}
🤖 Prompt for AI Agents
In core/workflows/storage/storage_base.py around lines 171 to 183,
get_storage_info() currently returns the raw self.config which can leak secrets;
instead create a deep-copied redacted version of the config and return that.
Implement a small recursive sanitizer that walks dicts/lists and for any key
(case-insensitive) matching common secret names (e.g., "password", "pass",
"token", "secret", "api_key", "apikey", "access_key", "secret_key",
"private_key", "credentials", "auth", "key") replaces the value with a constant
mask like "***REDACTED***" (and leaves structure intact), ensure the original
self.config is not mutated, and return the sanitized config in the "config"
field of the returned dict.

Comment on lines +303 to 308
from core.extractors import CodeExtractor
WORKER_DOCLING = CodeExtractor()
else:
# Use RobustExtractor for PDFs
from core.framework.extractors.robust_extractor import RobustExtractor
from core.extractors import RobustExtractor
WORKER_DOCLING = RobustExtractor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Use relative imports for intra-package modules under core/.

Guideline: use relative imports within the package.

-        from core.extractors import CodeExtractor
+        from ..extractors import CodeExtractor
@@
-        from core.extractors import RobustExtractor
+        from ..extractors import RobustExtractor
@@
-    from core.embedders import JinaV4Embedder
+    from ..embedders import JinaV4Embedder

Additionally, update the Arango import (outside the changed hunk) to be relative as well:

-    from core.database.arango_db_manager import ArangoDBManager
+    from ..database.arango_db_manager import ArangoDBManager

Also applies to: 408-409

🤖 Prompt for AI Agents
In core/workflows/workflow_pdf_batch.py around lines 303 to 308, the code uses
absolute imports like "from core.extractors import CodeExtractor" and "from
core.extractors import RobustExtractor"; change these to relative package
imports ("from .extractors import CodeExtractor" and "from .extractors import
RobustExtractor") to follow intra-package import guidelines. Also update the
Arango import referenced around lines 408-409 to a relative form (e.g., "from
.arango_module import ...") so all imports within the core package are relative
and consistent.

# Performance settings
cache_embeddings: bool = True
use_ramfs_staging: bool = True
staging_dir: str = '/dev/shm/document_staging'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Document security implications of /dev/shm usage.

While using /dev/shm for staging provides performance benefits, it has security implications (shared memory, potential for information leakage). Consider adding a configuration option to use a regular temp directory for security-sensitive deployments.

Would you like me to generate a more secure staging directory implementation that validates permissions and provides options for both performance and security-focused deployments?

 # Performance settings
 cache_embeddings: bool = True
 use_ramfs_staging: bool = True
-staging_dir: str = '/dev/shm/document_staging'
+staging_dir: str = '/dev/shm/document_staging'  # Warning: shared memory location
+secure_staging: bool = False  # Use secure temp directory instead of /dev/shm
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
staging_dir: str = '/dev/shm/document_staging'
# Performance settings
cache_embeddings: bool = True
use_ramfs_staging: bool = True
staging_dir: str = '/dev/shm/document_staging' # Warning: shared memory location
secure_staging: bool = False # Use secure temp directory instead of /dev/shm
🧰 Tools
🪛 Ruff (0.12.2)

68-68: Probable insecure usage of temporary file or directory: "/dev/shm/document_staging"

(S108)

🤖 Prompt for AI Agents
In core/workflows/workflow_pdf.py around line 68, the hardcoded staging_dir =
'/dev/shm/document_staging' poses security risks; change it to be configurable
(via env var or app config flag) so deployments can choose between performance
(/dev/shm) and security (regular temp dir). Implement validation: if using
/dev/shm ensure the directory exists, is owned by the running user and has
restrictive permissions (e.g., 0700), otherwise fall back or error; if using a
regular temp dir, create it with secure permissions and a unique name (use
tempfile-style behavior). Document the option in code comments and ensure any
failure to validate permissions raises a clear error so operators can choose the
secure alternative.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants