Skip to content

PlugaruT/pyeventcloud

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PyEventCloud

CloudEvents SDK for Python - A complete, type-safe implementation of the CloudEvents specification with zero dependencies.

PyPI version Python versions License

Installation

pip install pyeventcloud

Quick Start

Creating CloudEvents

from pyeventcloud import CloudEvent, create_event

# Create with auto-generated ID
event = create_event(
    source="https://example.com/source",
    type="com.example.event",
    data={"message": "Hello CloudEvents!"}
)

# Create with explicit ID
event = CloudEvent(
    id="custom-id-123",
    source="https://example.com/source",
    type="com.example.event",
    specversion="1.0",
    datacontenttype="application/json",
    data={"key": "value"}
)

# Access attributes directly
print(event.id)        # custom-id-123
print(event.source)    # https://example.com/source
print(event.data)      # {'key': 'value'}

JSON Serialization

from pyeventcloud import to_json, from_json

# Serialize to JSON
json_str = to_json(event)

# Deserialize from JSON
event = from_json(json_str)

# Batch operations
from pyeventcloud import to_json_batch, from_json_batch

events = [event1, event2, event3]
json_str = to_json_batch(events)
events = from_json_batch(json_str)

HTTP Binding

from pyeventcloud import HTTPBinding, JSONFormatter

binding = HTTPBinding()
formatter = JSONFormatter()

# Structured content mode (event in body)
body, headers = binding.to_structured(event, formatter)
# body is bytes, headers is dict[str, str]
# Content-Type: application/cloudevents+json

# Binary content mode (attributes in headers)
body, headers = binding.to_binary(event)
# headers["ce-id"], headers["ce-source"], etc.
# body contains event data

# Parse from HTTP request
event = binding.from_structured(request.body, request.headers, formatter)
event = binding.from_binary(request.body, request.headers)

Kafka Binding

from pyeventcloud import KafkaBinding, JSONFormatter

binding = KafkaBinding()
formatter = JSONFormatter()

# Structured content mode
message = binding.to_structured(event, formatter)
# Returns: KafkaMessage with value, headers, key

# Binary content mode
message = binding.to_binary(event)
# message["value"]: bytes
# message["headers"]: list[tuple[str, bytes]]
# message["key"]: bytes | None

# 'partitionkey' extension maps to Kafka message key
event = CloudEvent(
    id="123",
    source="app",
    type="order.created",
    partitionkey="customer-456"  # Maps to Kafka partition key
)
message = binding.to_binary(event)
# message["key"] == b"customer-456"

Extensions

from pyeventcloud import CloudEvent, register_extension

# Built-in partitionkey extension
event = CloudEvent(
    id="123",
    source="app",
    type="event.type",
    partitionkey="partition-1"  # Built-in extension
)

# Custom extensions
event = CloudEvent(
    id="123",
    source="app",
    type="event.type",
    customextension="value"  # Any custom extension
)

# Access extensions
print(event.extensions["customextension"])  # "value"

# Register custom extension
class MyExtension:
    name = "myext"
    attributes = {"myfield": str}

    def validate(self, event):
        value = event.extensions.get("myfield")
        if value and not isinstance(value, str):
            raise ValidationError("myfield must be a string")

    def get_attributes(self, event):
        return {"myfield": event.extensions.get("myfield")}

register_extension(MyExtension())

Factory Pattern

from pyeventcloud import JSONFormatter, from_dict

formatter = JSONFormatter()

# Custom factory for event creation
def custom_factory(data: dict) -> CloudEvent:
    # Add metadata or transform data
    data["id"] = f"prefix-{data['id']}"
    return from_dict(data)

event = formatter.deserialize(json_str, event_factory=custom_factory)

API Reference

Core

  • CloudEvent - Main CloudEvent class
  • create_event() - Factory function with auto-generated ID
  • from_dict() - Create CloudEvent from dictionary

Formats

  • JSONFormatter - JSON serialization/deserialization
  • to_json() - Serialize single event to JSON
  • from_json() - Deserialize single event from JSON
  • to_json_batch() - Serialize multiple events
  • from_json_batch() - Deserialize multiple events

Bindings

  • HTTPBinding - HTTP protocol binding (structured & binary modes)
  • KafkaBinding - Kafka protocol binding (structured & binary modes)
  • EventSerializer - Protocol for custom serializers

Extensions

  • PartitioningExtension - Built-in partitionkey extension
  • register_extension() - Register custom extension
  • get_extension() - Get registered extension
  • ExtensionRegistry - Extension registry class

Exceptions

  • CloudEventError - Base exception
  • ValidationError - Validation failures
  • SerializationError - Serialization/deserialization errors
  • BindingError - Protocol binding errors
  • ExtensionError - Extension-related errors

Design Philosophy

Fail-Fast Validation

PyEventCloud validates events immediately when created, raising clear errors:

# Invalid event raises ValidationError immediately
try:
    event = CloudEvent(
        id="",  # Empty ID
        source="https://example.com",
        type="event.type"
    )
except ValidationError as e:
    print(e)  # "id must be a non-empty string"
    print(e.attribute)  # "id"

Format Decoupling

Bindings are completely decoupled from formats via the EventSerializer protocol:

# Any serializer implementing the protocol works
binding = HTTPBinding()
body, headers = binding.to_structured(event, JSONFormatter())
body, headers = binding.to_structured(event, AvroFormatter())  # Future
body, headers = binding.to_structured(event, ProtobufFormatter())  # Future

Development

Setup

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Clone the repository
git clone https://github.com/plugarut/pyeventcloud.git
cd pyeventcloud

# Install dependencies
uv sync --dev

Running Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=src/pyeventcloud --cov-report=term-missing

# Run type checking
uv run mypy src/

# Run specific test file
uv run pytest tests/unit/test_event.py -v

Building

# Build distribution packages
uv build

# Check build artifacts
ls -lh dist/

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass and mypy is happy
  5. Submit a pull request

License

Apache License 2.0 - See LICENSE for details.

Acknowledgments

Links

About

CloudEvents SDK for Python

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages