CloudEvents SDK for Python - A complete, type-safe implementation of the CloudEvents specification with zero dependencies.
pip install pyeventcloud
from pyeventcloud import CloudEvent, create_event
# Create with auto-generated ID
event = create_event(
source="https://example.com/source",
type="com.example.event",
data={"message": "Hello CloudEvents!"}
)
# Create with explicit ID
event = CloudEvent(
id="custom-id-123",
source="https://example.com/source",
type="com.example.event",
specversion="1.0",
datacontenttype="application/json",
data={"key": "value"}
)
# Access attributes directly
print(event.id) # custom-id-123
print(event.source) # https://example.com/source
print(event.data) # {'key': 'value'}
from pyeventcloud import to_json, from_json
# Serialize to JSON
json_str = to_json(event)
# Deserialize from JSON
event = from_json(json_str)
# Batch operations
from pyeventcloud import to_json_batch, from_json_batch
events = [event1, event2, event3]
json_str = to_json_batch(events)
events = from_json_batch(json_str)
from pyeventcloud import HTTPBinding, JSONFormatter
binding = HTTPBinding()
formatter = JSONFormatter()
# Structured content mode (event in body)
body, headers = binding.to_structured(event, formatter)
# body is bytes, headers is dict[str, str]
# Content-Type: application/cloudevents+json
# Binary content mode (attributes in headers)
body, headers = binding.to_binary(event)
# headers["ce-id"], headers["ce-source"], etc.
# body contains event data
# Parse from HTTP request
event = binding.from_structured(request.body, request.headers, formatter)
event = binding.from_binary(request.body, request.headers)
from pyeventcloud import KafkaBinding, JSONFormatter
binding = KafkaBinding()
formatter = JSONFormatter()
# Structured content mode
message = binding.to_structured(event, formatter)
# Returns: KafkaMessage with value, headers, key
# Binary content mode
message = binding.to_binary(event)
# message["value"]: bytes
# message["headers"]: list[tuple[str, bytes]]
# message["key"]: bytes | None
# 'partitionkey' extension maps to Kafka message key
event = CloudEvent(
id="123",
source="app",
type="order.created",
partitionkey="customer-456" # Maps to Kafka partition key
)
message = binding.to_binary(event)
# message["key"] == b"customer-456"
from pyeventcloud import CloudEvent, register_extension
# Built-in partitionkey extension
event = CloudEvent(
id="123",
source="app",
type="event.type",
partitionkey="partition-1" # Built-in extension
)
# Custom extensions
event = CloudEvent(
id="123",
source="app",
type="event.type",
customextension="value" # Any custom extension
)
# Access extensions
print(event.extensions["customextension"]) # "value"
# Register custom extension
class MyExtension:
name = "myext"
attributes = {"myfield": str}
def validate(self, event):
value = event.extensions.get("myfield")
if value and not isinstance(value, str):
raise ValidationError("myfield must be a string")
def get_attributes(self, event):
return {"myfield": event.extensions.get("myfield")}
register_extension(MyExtension())
from pyeventcloud import JSONFormatter, from_dict
formatter = JSONFormatter()
# Custom factory for event creation
def custom_factory(data: dict) -> CloudEvent:
# Add metadata or transform data
data["id"] = f"prefix-{data['id']}"
return from_dict(data)
event = formatter.deserialize(json_str, event_factory=custom_factory)
CloudEvent
- Main CloudEvent classcreate_event()
- Factory function with auto-generated IDfrom_dict()
- Create CloudEvent from dictionary
JSONFormatter
- JSON serialization/deserializationto_json()
- Serialize single event to JSONfrom_json()
- Deserialize single event from JSONto_json_batch()
- Serialize multiple eventsfrom_json_batch()
- Deserialize multiple events
HTTPBinding
- HTTP protocol binding (structured & binary modes)KafkaBinding
- Kafka protocol binding (structured & binary modes)EventSerializer
- Protocol for custom serializers
PartitioningExtension
- Built-in partitionkey extensionregister_extension()
- Register custom extensionget_extension()
- Get registered extensionExtensionRegistry
- Extension registry class
CloudEventError
- Base exceptionValidationError
- Validation failuresSerializationError
- Serialization/deserialization errorsBindingError
- Protocol binding errorsExtensionError
- Extension-related errors
PyEventCloud validates events immediately when created, raising clear errors:
# Invalid event raises ValidationError immediately
try:
event = CloudEvent(
id="", # Empty ID
source="https://example.com",
type="event.type"
)
except ValidationError as e:
print(e) # "id must be a non-empty string"
print(e.attribute) # "id"
Bindings are completely decoupled from formats via the EventSerializer
protocol:
# Any serializer implementing the protocol works
binding = HTTPBinding()
body, headers = binding.to_structured(event, JSONFormatter())
body, headers = binding.to_structured(event, AvroFormatter()) # Future
body, headers = binding.to_structured(event, ProtobufFormatter()) # Future
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Clone the repository
git clone https://github.com/plugarut/pyeventcloud.git
cd pyeventcloud
# Install dependencies
uv sync --dev
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=src/pyeventcloud --cov-report=term-missing
# Run type checking
uv run mypy src/
# Run specific test file
uv run pytest tests/unit/test_event.py -v
# Build distribution packages
uv build
# Check build artifacts
ls -lh dist/
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass and mypy is happy
- Submit a pull request
Apache License 2.0 - See LICENSE for details.
- Inspired by the CloudEvents Python SDK
- Implements CloudEvents v1.0
- HTTP binding follows HTTP Protocol Binding
- Kafka binding follows Kafka Protocol Binding