A high-performance library for sharing GPU memory objects across processes using IPC mechanisms with JSON-RPC 2.0 protocol, enabling model and inference engine separation architecture.
Shared Tensor is a cross-process communication library designed specifically for deep learning and AI applications, utilizing IPC mechanisms and JSON-RPC protocol to achieve:
- Efficient GPU Memory Sharing: Cross-process sharing of PyTorch tensors and models
- Remote Function Execution: Easy remote function calls through decorators
- Async/Sync Support: Flexible execution modes for different scenarios
- Model Serving: Deploy machine learning models as independent services
- Distributed Inference: Support for distributed computing in multi-GPU environments
- JSON-RPC 2.0 Protocol: Standardized remote procedure calls
- HTTP Transport: Reliable HTTP-based communication mechanism
- Serialization Optimization: Efficient PyTorch object serialization/deserialization
- Decorator Pattern: Easy function sharing using
@provider.share - Auto Discovery: Smart function path resolution and import
- Parameter Passing: Support for complex data type parameters
- Async Execution:
AsyncSharedTensorProvidersupports non-blocking calls - Task Management: Complete async task status tracking
- Concurrent Processing: Efficient concurrent request handling
- CUDA Support: Native CUDA tensor sharing support
- Device Management: Smart data migration between devices
- Memory Optimization: Efficient GPU memory usage
- Python: 3.8+
- Operating System: Linux (recommended)
- PyTorch: 1.12.0+
- CUDA: Optional, for GPU support
pip install shared-tensor# Clone the repository
git clone https://github.com/world-sim-dev/shared-tensor.git
cd shared-tensor
# Install dependencies
pip install -r requirements.txt
# Install the package
pip install -e .# Install with development dependencies
pip install -e ".[dev]"
# Install with test dependencies
pip install -e ".[test]"# Check core functionality
python -c "import shared_tensor; print('β Shared Tensor installed successfully')"from shared_tensor.async_provider import AsyncSharedTensorProvider
# Create provider
provider = AsyncSharedTensorProvider()
# Share simple function
@provider.share()
def add_numbers(a, b):
return a + b
# Share PyTorch function
@provider.share()
def create_tensor(shape):
import torch
return torch.zeros(shape)
# Load PyTorch model
@provider.share()
def load_model():
...# Method 1: Use command line tool, single server
shared-tensor-server
# Method 2: Use torchrun
torchrun --nproc_per_node=4 --no-python shared-tensor-server
# Method 3: Custom configuration
python shared_tensor/server.pyimport torch
import torch.nn as nn
from shared_tensor.async_provider import AsyncSharedTensorProvider
# Create provider
provider = AsyncSharedTensorProvider()
# Define model
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Share model creation function
@provider.share(name="create_model")
def create_model(input_size=784, hidden_size=128, output_size=10):
model = SimpleNet(input_size, hidden_size, output_size)
return model
# Share inference function
model = create_model()
with torch.no_grad():
model(input_data)provider = AsyncSharedTensorProvider(
server_port: int = 2537 + global_rank, # Local Http Server Port
verbose_debug: bool = False, # Logging more detailed params
poll_interval: float = 1.0, # Check status interval only for Async provider
default_enabled: bool = True # Whether enable shared-tenser and re-enable via env `export __SHARED_TENSOR_ENABLED__=true`
)
@provider.share(
name: Optional[str] = None, # name for logging and debug, when singleton enabled, as default cache key
wait: bool = True, # whether return func return or a async handler
singleton: bool = True, # whether maintain only one instance of func result
singleton_key_formatter: Optional[str] = None): # python template can be formatted by user function params, act as final cache key
def get_demo_model():
...# Run all tests
python tests/run_tests.py
# Run specific category tests
python tests/run_tests.py --category unit
python tests/run_tests.py --category integration
python tests/run_tests.py --category pytorch
# Run only PyTorch related tests
python tests/run_tests.py --torch-only
# Verbose output
python tests/run_tests.py --verbose# Check test environment
python tests/run_tests.py --env-info# Test tensor serialization
python tests/pytorch_tests/test_tensor_serialization.py
# Test async system
python tests/integration/test_async_system.py
# Test client
python tests/integration/test_client.pyshared-tensor/
βββ shared_tensor/ # Core modules
β βββ server.py # JSON-RPC server
β βββ client.py # Sync client
β βββ provider.py # Sync provider
β βββ async_client.py # Async client
β βββ async_provider.py # Async provider
β βββ async_task.py # Async task management
β βββ jsonrpc.py # JSON-RPC protocol implementation
β βββ utils.py # Utility functions
β βββ errors.py # Exception definitions
βββ examples/ # Usage examples
βββ tests/ # Test suite
sequenceDiagram
participant CA as Client App
participant SC as SharedTensorClient
participant SS as SharedTensorServer
participant FE as Function Executor
Note over CA, FE: Client-Server Communication Flow
CA->>SC: call_function("model_inference", args)
SC->>SC: Serialize parameters
SC->>SS: HTTP POST /jsonrpc<br/>JSON-RPC Request
Note over SS: Server Processing
SS->>SS: Parse JSON-RPC request
SS->>SS: Resolve function path
SS->>FE: Import & execute function
FE->>FE: Deserialize parameters
FE->>FE: Execute function logic
FE->>SS: Return execution result
Note over SS: Response Preparation
SS->>SS: Serialize result
SS->>SS: Create JSON-RPC response
SS->>SC: HTTP Response<br/>JSON-RPC Result
Note over SC: Client Processing
SC->>SC: Parse response
SC->>SC: Deserialize result
SC->>CA: Return final result
Note over CA, FE: End-to-End Process Complete
- Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)- Use debug mode:
provider = SharedTensorProvider(verbose_debug=True)- Check function paths:
provider = SharedTensorProvider()
print(provider._registered_functions)We welcome community contributions! Please follow these steps:
# Clone repository
git clone https://github.com/world-sim-dev/shared-tensor.git
cd shared-tensor
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install development dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
# Package & Publish
python -m pip install build
python -m build --sdist --wheel
python -m twine upload --repository testpypi dist/*
python -m twine upload dist/*# Code formatting
black shared_tensor/ tests/ examples/
# Import sorting
isort shared_tensor/ tests/ examples/
# Static checking
flake8 shared_tensor/
mypy shared_tensor/- Fork the project and create a feature branch
- Write code and tests
- Run the complete test suite
- Submit a Pull Request
- New features must include tests
- Maintain test coverage > 90%
- All tests must pass
This project is licensed under the Apache 2.0 License - see the LICENSE file for details
- PyTorch - Deep learning framework
- JSON-RPC 2.0 - Remote procedure call protocol
- Issues: GitHub Issues
- Documentation: Shared Tensor Documentation
- Source: GitHub Repository
Shared Tensor - Making GPU memory sharing simple and efficient π