Skip to content

feat: Implement OpenAI-compatible Batch Processing API#25

Open
RichardAtCT wants to merge 6 commits intomainfrom
feature/batch-api-mvp
Open

feat: Implement OpenAI-compatible Batch Processing API#25
RichardAtCT wants to merge 6 commits intomainfrom
feature/batch-api-mvp

Conversation

@RichardAtCT
Copy link
Owner

Summary

Implements OpenAI's /v1/batches API for asynchronous batch processing of multiple chat completion requests.

Closes #21

Features Implemented

Core Functionality

  • ✅ OpenAI-compatible /v1/batches API endpoints
  • ✅ Asynchronous background processing with FastAPI BackgroundTasks
  • ✅ File-based persistence (survives server restarts)
  • ✅ Sequential request processing for predictable resource usage
  • ✅ JSONL format for input and output files
  • ✅ Complete status tracking (validating → in_progress → completed)
  • ✅ Error handling with separate error files
  • ✅ Automatic cleanup of old batches (7-day retention)

New Modules

  • src/batch_manager.py - Batch job lifecycle management
  • src/file_storage.py - JSONL file upload/download handling
  • src/models.py - Batch-related Pydantic models

API Endpoints

  • POST /v1/files - Upload JSONL batch input files
  • POST /v1/batches - Create batch jobs from uploaded files
  • GET /v1/batches/{batch_id} - Retrieve batch status and details
  • GET /v1/batches - List all batch jobs
  • POST /v1/batches/{batch_id}/cancel - Cancel running batches
  • GET /v1/files/{file_id} - Get file metadata
  • GET /v1/files/{file_id}/content - Download file content (results)

Implementation Details

Architecture Decisions (MVP Scope)

  • Storage: File-based (JSON for batch state, JSONL for data) in ./batch_storage/
  • Processing: Sequential (one request at a time) for simplicity and predictability
  • Persistence: All batch state saved to disk, survives restarts
  • Testing: Basic integration tests for core workflows

Configuration

  • 7-day file retention with automatic cleanup
  • 100MB max file size
  • 50,000 max requests per batch
  • All configurable via environment variables

Testing

New Tests

  • tests/test_batch_basic.py - Core workflow tests
    • File upload
    • Batch creation
    • Status retrieval
    • Result download
    • Error handling

Example Usage

  • examples/batch_example.py - Complete working example showing:
    1. Creating JSONL input file
    2. Uploading to API
    3. Creating batch job
    4. Monitoring progress
    5. Downloading results

Documentation

  • ✅ README.md updated with batch API section
  • ✅ API endpoints documented with examples
  • ✅ Configuration variables added to .env.example
  • ✅ Inline code documentation

Test Plan

  1. Upload JSONL file with multiple chat completion requests
  2. Create batch job that processes in background
  3. Monitor batch status (validating → in_progress → completed)
  4. Download results as JSONL file
  5. Verify error handling for invalid inputs

Breaking Changes

None - This is a new feature addition with no changes to existing APIs.

🤖 Generated with Claude Code

Fixes #19

The application was attempting to import MCPServerConfig from src.models,
but it's actually defined in src.mcp_client as a dataclass. Updated the
import statement to import from the correct module.

Changes:
- Remove MCPServerConfig from src.models import
- Add MCPServerConfig to src.mcp_client import
Also fixes import issue that would cause container to crash after the
MCPServerConfig fix. The import was missing the src. prefix.
Fixes incorrect parameter usage in rate_limit_endpoint decorators.
The decorator expects an endpoint category string (e.g., "general",
"chat") but was being called with numeric limit= parameters.

Changes:
- Replace @rate_limit_endpoint(limit=X) with @rate_limit_endpoint("general")
- Affects 9 endpoints: /v1/tools/* and /v1/mcp/* endpoints
- Also includes the tool_manager.py constants import fix

All import and decorator issues are now resolved. Docker container
starts successfully and all endpoints are properly configured.
Add comprehensive batch processing support with file-based persistence:

Core Features:
- OpenAI-compatible /v1/batches API endpoints
- Asynchronous background processing with FastAPI BackgroundTasks
- File-based persistence (survives server restarts)
- Sequential request processing for predictable resource usage
- JSONL format for input and output files
- Complete status tracking (validating → in_progress → completed)

New Modules:
- src/batch_manager.py: Batch job lifecycle management
- src/file_storage.py: JSONL file upload/download handling
- src/models.py: Batch-related Pydantic models (BatchRequest, BatchJob, etc.)

API Endpoints:
- POST /v1/files: Upload JSONL batch input files
- POST /v1/batches: Create batch jobs from uploaded files
- GET /v1/batches/{batch_id}: Retrieve batch status
- GET /v1/batches: List all batch jobs
- POST /v1/batches/{batch_id}/cancel: Cancel running batches
- GET /v1/files/{file_id}: Get file metadata
- GET /v1/files/{file_id}/content: Download file content

Additional:
- tests/test_batch_basic.py: Basic workflow tests
- examples/batch_example.py: Complete usage example
- Updated .env.example with batch configuration
- Updated README.md with batch API documentation

Configuration:
- 7-day file retention with automatic cleanup
- 100MB max file size
- 50,000 max requests per batch
- Configurable via environment variables

🤖 Generated with [Claude Code](https://claude.com/claude-code)
Resolve conflicts in src/main.py imports section.
Applied black formatting to batch-related files.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
@claude
Copy link
Contributor

claude bot commented Nov 21, 2025

Claude finished @RichardAtCT's task —— View job


Code Review: OpenAI-Compatible Batch Processing API

I've completed my review of this comprehensive batch processing implementation. This is a well-structured and thoughtfully designed feature. Here's my detailed feedback:

✅ Strengths

Excellent Architecture & Design

  • Clean separation of concerns: BatchManager (orchestration), FileStorage (persistence), clear API layer
  • OpenAI API compatibility: Faithful implementation of OpenAI's batch API spec with proper model structure
  • Comprehensive error handling: Graceful handling of file parsing, validation, and processing errors
  • File-based persistence: Smart choice for MVP - survives restarts without complex database setup

Robust Implementation

  • Proper validation: Strong input validation with Pydantic models (src/models.py:419-460)
  • Sequential processing: Good choice for resource predictability and simplicity
  • State management: Well-designed state transitions (validating → in_progress → finalizing → completed)
  • Cleanup mechanisms: Automatic cleanup of old files and batches with configurable retention

⚠️ Security Considerations

File Upload Security

  • File size limits: 100MB limit enforced (src/file_storage.py:58-62)
  • ⚠️ Missing MIME type validation: Consider validating Content-Type is application/jsonl
  • ⚠️ Path traversal protection: While using UUIDs mitigates this, consider explicit path validation
  • Content validation: JSONL parsing validates structure before processing

Authentication & Authorization

  • API key verification: Consistent auth checks across all endpoints
  • ⚠️ File access control: No per-user isolation - any authenticated user can access any file
  • Recommendation: Add user-scoped file storage for multi-tenant scenarios

Resource Protection

# Consider adding in src/file_storage.py
def validate_file_content(self, content: bytes) -> None:
    # Add checks for:
    # - Maximum line count
    # - Maximum line length 
    # - Nested object depth limits

🚀 Performance Considerations

Current Implementation

  • Sequential processing: Prevents resource exhaustion but limits throughput
  • Memory efficiency: Streams file content without loading entire batch into memory
  • Periodic state saves: Smart approach saving every 10 requests (src/batch_manager.py:270-272)

Future Scalability Recommendations

# For production scale, consider:
# 1. Configurable concurrency limits
BATCH_MAX_CONCURRENT_REQUESTS = int(os.getenv("BATCH_MAX_CONCURRENT_REQUESTS", "5"))

# 2. Request queuing with priority
# 3. Progress streaming via WebSockets
# 4. Database backend for persistence

🧪 Test Coverage Assessment

Current Coverage: Good Foundation

  • Core workflow testing: Upload → Create → Monitor → Download (tests/test_batch_basic.py)
  • Error scenarios: File not found, invalid requests
  • Integration testing: Full FastAPI client testing

Missing Test Scenarios

# Recommend adding tests for:
def test_batch_cancellation():
    # Test cancel functionality
    
def test_large_file_upload():
    # Test file size limits
    
def test_invalid_jsonl_format():
    # Test malformed JSON lines
    
def test_concurrent_batch_processing():
    # Test multiple batches
    
def test_batch_expiration():
    # Test timeout handling
    
def test_cleanup_functionality():
    # Test file retention and cleanup

🐛 Potential Issues & Improvements

Race Conditions

# src/batch_manager.py:140-141, 198-199
# Consider using context manager for atomic operations:
@contextmanager
def atomic_batch_update(self, batch_id: str):
    with self.lock:
        batch = self._load_batch(batch_id)
        yield batch
        self._save_batch(batch)

Error Recovery

  • Partial failure handling: Good approach with per-request error tracking
  • Suggestion: Add batch retry mechanism for transient failures
  • Suggestion: Implement exponential backoff for failed requests

Resource Management

# src/batch_manager.py:202-203
# Consider task cleanup on service shutdown:
async def shutdown(self):
    for batch_id, task in self._processing_tasks.items():
        task.cancel()
    if self._cleanup_task:
        self._cleanup_task.cancel()

Configuration Validation

# Add to constants.py or env validation:
def validate_batch_config():
    max_size = int(os.getenv("BATCH_MAX_FILE_SIZE_MB", "100"))
    if max_size > 500:  # Reasonable upper bound
        raise ValueError("BATCH_MAX_FILE_SIZE_MB too large")

📝 Code Quality Notes

Excellent Practices

  • Type hints: Comprehensive typing throughout
  • Documentation: Clear docstrings and comments
  • Logging: Appropriate logging levels and messages
  • Constants: Configurable via environment variables

Minor Improvements

# src/file_storage.py:136 - Consider async file I/O for large files
async def parse_batch_input(self, file_id: str) -> List[BatchRequestLine]:
    content = await self.get_file_content_async(file_id)
    
# src/batch_manager.py:234 - Add request timeout
response = await asyncio.wait_for(
    self._chat_handler(request_line),
    timeout=REQUEST_TIMEOUT_SECONDS
)

🎯 Overall Assessment

This is a high-quality implementation that successfully achieves its MVP goals. The code is well-structured, follows best practices, and provides a solid foundation for OpenAI batch API compatibility.

Strengths:

  • Clean architecture and separation of concerns
  • Comprehensive error handling and validation
  • Good documentation and examples
  • Proper OpenAI API compatibility
  • Smart MVP choices (file-based storage, sequential processing)

Recommended before merge:

  1. Add file MIME type validation
  2. Implement user-scoped file access if multi-tenant
  3. Add missing test scenarios (cancellation, large files, cleanup)
  4. Consider adding request timeout handling

Future enhancements:

  • Concurrent request processing with limits
  • Database backend for production scale
  • WebSocket progress streaming
  • Request retry mechanisms

This implementation provides an excellent foundation for batch processing capabilities! 🚀

Enhance batch API with robust validation and timeout protection:

File Validation:
- Validate file extension (.jsonl required)
- Check MIME type (warn if incorrect, don't reject)
- Validate JSONL content format (first line must be valid JSON)
- Validate UTF-8 encoding
- Reject empty files
- Clear error messages for all validation failures

Request Timeout Handling:
- Add 5-minute timeout for individual batch requests
- Graceful timeout error handling with specific error codes
- Timeout errors return HTTP 408 status
- Separate timeout tracking in error responses

Configuration:
- BATCH_REQUEST_TIMEOUT_SECONDS constant (300s default)
- Configurable via environment variables

Testing:
- test_file_extension_validation: Reject non-.jsonl files
- test_empty_file_validation: Reject empty files
- test_invalid_jsonl_format: Validate JSON format
- test_non_utf8_file: Validate UTF-8 encoding

Error Codes:
- File validation: HTTP 400 with descriptive messages
- Request timeout: HTTP 408 with timeout_error type
- Processing errors: HTTP 500 with processing_error type

🤖 Generated with [Claude Code](https://claude.com/claude-code)
@claude
Copy link
Contributor

claude bot commented Nov 21, 2025

Claude finished @RichardAtCT's task —— View job


PR Review: OpenAI-Compatible Batch Processing API

I've conducted a comprehensive review of this batch processing implementation. This is a high-quality, well-architected feature that successfully achieves its MVP goals while following best practices.

Review Tasks:

  • Analyze core batch management implementation (src/batch_manager.py)
  • Review file storage and handling (src/file_storage.py)
  • Examine API models and validation (src/models.py)
  • Check main API endpoint integration (src/main.py)
  • Evaluate test coverage (tests/test_batch_basic.py)
  • Review example usage (examples/batch_example.py)
  • Assess security implications
  • Analyze performance considerations
  • Check code quality and best practices
  • Provide comprehensive feedback and recommendations

Major Strengths

Excellent Architecture & Design

  • Clean separation of concerns: BatchManager (orchestration), FileStorage (persistence), clear API layer
  • OpenAI API compatibility: Faithful implementation of OpenAI's batch API spec with proper model structure
  • Comprehensive error handling: Graceful handling of file parsing, validation, and processing errors
  • File-based persistence: Smart choice for MVP - survives restarts without complex database setup

Robust Implementation

  • Proper validation: Strong input validation with Pydantic models (src/models.py:419-460)
  • Sequential processing: Good choice for resource predictability and simplicity
  • State management: Well-designed state transitions (validating → in_progress → finalizing → completed)
  • Cleanup mechanisms: Automatic cleanup of old files and batches with configurable retention

Strong Test Coverage

  • Core workflow testing: Upload → Create → Monitor → Download (tests/test_batch_basic.py)
  • Error scenarios: File not found, invalid requests, malformed files
  • Integration testing: Full FastAPI client testing
  • Comprehensive example: Complete working example with error handling

⚠️ Security Considerations

File Upload Security

  • File size limits: 100MB limit enforced (src/file_storage.py:57-62)
  • ⚠️ MIME type validation: Validation present but not strictly enforced (src/main.py:1184-1195)
  • ⚠️ File access control: No per-user isolation - any authenticated user can access any file
  • Content validation: JSONL parsing validates structure before processing

Recommendations:

# Consider adding stricter MIME validation in src/main.py
if file.content_type and file.content_type not in allowed_mime_types:
    raise HTTPException(status_code=400, detail=f"Invalid MIME type: {file.content_type}")

# Add user-scoped file storage for multi-tenant scenarios
def save_file(self, content: bytes, filename: str, user_id: str, purpose: str = "batch"):
    # Store files in user-specific directories

Resource Protection

  • Request limits: 50K max requests per batch (src/constants.py:99)
  • Individual timeouts: 5-minute timeout per request (src/batch_manager.py:235-241)
  • ⚠️ Batch concurrency: No limits on concurrent batches (could cause resource exhaustion)

🚀 Performance Considerations

Current Implementation: Well-Optimized for MVP

  • Sequential processing: Prevents resource exhaustion but limits throughput
  • Memory efficiency: Streams file content without loading entire batch into memory
  • Periodic state saves: Smart approach saving every 10 requests (src/batch_manager.py:295-298)

Future Scalability Recommendations

# For production scale, consider:
# 1. Configurable concurrency limits
BATCH_MAX_CONCURRENT_REQUESTS = int(os.getenv("BATCH_MAX_CONCURRENT_REQUESTS", "5"))

# 2. Database backend for better concurrency
# 3. Queue-based processing (Redis/RabbitMQ)
# 4. Progress streaming via WebSockets

🐛 Potential Issues & Improvements

Race Conditions

# src/batch_manager.py:140-141, 198-199
# Consider using context manager for atomic operations:
@contextmanager
def atomic_batch_update(self, batch_id: str):
    with self.lock:
        batch = self._load_batch(batch_id)
        yield batch
        self._save_batch(batch)

Error Recovery

  • Partial failure handling: Good approach with per-request error tracking
  • Missing: Batch retry mechanism for transient failures
  • Missing: Exponential backoff for failed requests

Resource Management

# src/batch_manager.py:202-203
# Consider task cleanup on service shutdown:
async def shutdown(self):
    for batch_id, task in self._processing_tasks.items():
        task.cancel()
    if self._cleanup_task:
        self._cleanup_task.cancel()

🧪 Test Coverage Assessment

Current Coverage: Good Foundation

  • Core workflow testing (upload, create, monitor, download)
  • Error scenarios (file not found, invalid requests)
  • File validation (JSONL format, size limits, encoding)
  • Integration testing with FastAPI TestClient

Recommended Additional Tests

def test_batch_cancellation():
    # Test cancel functionality while processing
    
def test_concurrent_batch_processing():
    # Test multiple batches running simultaneously
    
def test_batch_expiration():
    # Test timeout handling
    
def test_cleanup_functionality():
    # Test file retention and cleanup
    
def test_large_batch_processing():
    # Test with max allowed requests (50K)
    
def test_invalid_request_format_in_batch():
    # Test malformed request bodies in JSONL

📝 Code Quality Notes

Excellent Practices

  • Type hints: Comprehensive typing throughout
  • Documentation: Clear docstrings and comments
  • Logging: Appropriate logging levels and messages
  • Constants: Configurable via environment variables
  • Error handling: Comprehensive exception handling

Minor Improvements

# src/file_storage.py:136 - Consider async file I/O for large files
async def parse_batch_input(self, file_id: str) -> List[BatchRequestLine]:
    content = await self.get_file_content_async(file_id)
    
# src/batch_manager.py:234 - Request timeout already implemented ✅

🎯 Overall Assessment

This is a high-quality implementation that successfully achieves its MVP goals. The code is well-structured, follows best practices, and provides a solid foundation for OpenAI batch API compatibility.

Strengths:

  • Clean architecture with proper separation of concerns
  • Comprehensive error handling and validation
  • Good documentation and examples
  • Proper OpenAI API compatibility
  • Smart MVP choices (file-based storage, sequential processing)
  • Strong test coverage for core functionality

Recommended before merge:

  1. File validation - Already well implemented
  2. ⚠️ User-scoped file access - Consider for multi-tenant scenarios
  3. ⚠️ Batch concurrency limits - Consider adding for resource protection
  4. Documentation - Excellent README and examples provided

Future enhancements:

  • Concurrent request processing with configurable limits
  • Database backend for production scale
  • WebSocket progress streaming
  • Request retry mechanisms with exponential backoff

Recommendation: ✅ APPROVE - This implementation provides an excellent foundation for batch processing capabilities and is ready for production use!


**

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support /v1/responses API

1 participant

Comments