PynneX requires Python 3.10 or higher, plus a running asyncio
event loop for any async usage.
- Usage Guide
- Table of Contents
PynneX depends on Python’s asyncio
. You must have a running event loop (e.g., via asyncio.run(...)
) for certain features like async listeners or cross-thread calls. If there’s no existing loop, PynneX raises a RuntimeError
instead of silently creating one, ensuring concurrency remains predictable. Typically, you’ll handle it in one of two ways:
- Inside asyncio.run(...):
For example:
async def main():
# Create objects, connect emitters/listeners
# Perform your logic
...
asyncio.run(main()) # Launches the event loop and runs 'main'
@nx_with_worker
Decorator:(aliases:@with_worker
)
When you decorate a class with @nx_with_worker
, PynneX automatically creates a dedicated worker thread with its own event loop. This worker loop is isolated to that thread, so any async usage in the main thread still requires its own loop or asyncio.run(...)
. You can queue tasks into the worker without manually configuring additional event loops.
PynneX implements an emitter-listener pattern, allowing loose coupling between components. Core ideas:
- Emitter: An event source that can be emitted.
- Listener: A function/method that responds to an emitter.
- Connection: A link binding emitters to listeners.
Use @nx_with_emitters
on a class and @nx_emitter
on a method to define an emitter:
aliases for @nx_with_emitters
:
@with_emitters
@with_signals
@with_publishers
aliases for @nx_emitter
:
@emitter
@signal
@publisher
from pynnex import nx_with_emitters, nx_emitter
@nx_with_emitters
class Button:
@nx_emitter
def clicked(self):
"""Emitted when the button is clicked."""
pass
def click(self):
self.clicked.emit()
Emit emitters using .emit()
:
self.clicked.emit() # No args
self.data_ready.emit(value, timestamp) # With args
Use @nx_listener
to mark a method as a listener:
@nx_listener
def on_clicked(self):
print("Button clicked")
Use @nx_listener
to mark a method as a listener:
from pynnex import nx_listener
@nx_with_emitters
class Display:
@nx_listener
def on_clicked(self):
print("Button was clicked!")
Listeners can be async, integrating seamlessly with asyncio
:
@nx_with_emitters
class DataProcessor:
@nx_listener
async def on_data_ready(self, data):
# Perform async operations
await asyncio.sleep(1)
print("Data processed:", data)
nx_property
provides thread-safe, event-loop-aware properties that can emit emitters on change:
from pynnex.contrib.extensions.property import nx_property
@nx_with_emitters
class Model:
@nx_emitter
def value_changed(self):
pass
@nx_property(notify=value_changed)
def value(self):
return self._value
@value.setter
def value(self, new_val):
self._value = new_val
- Accessing or modifying
value
from another thread is safely queued to the owner's event loop. - If
notify
is set, the specified emitter is emitted when the property changes.
By default, when you connect()
an emitter to a listener, AUTO_CONNECTION
is used. It automatically chooses:
- Direct Connection if emitter and listener share the same thread and listener is synchronous.
- Queued Connection if crossing threads or listener is async, queuing the call in the listener's event loop.
You can force a specific connection type using the connection_type
parameter:
from pynnex.core import NxConnectionType
# Force a direct connection
emitter.connect(receiver, receiver.on_listener, connection_type=NxConnectionType.DIRECT_CONNECTION)
# Force a queued connection
emitter.connect(receiver, receiver.on_listener, connection_type=NxConnectionType.QUEUED_CONNECTION)
Thread Safety
PynneX ensures thread-safe emitter emissions. Emitters can be emitted from any thread. Listeners execute in their designated event loop (often the thread they were created in).
Async Integration
Most PynneX features integrate with Python’s asyncio
. Typically you’ll have an async def main():
function that sets up your objects and emitters, then call asyncio.run(main())
to start the loop. If you use @nx_with_worker
, that worker thread and event loop is automatically managed.
async def main():
# Create objects and connect emitters/listeners
# Emit emitters and await async listener completions indirectly
await asyncio.sleep(1)
asyncio.run(main())
- Naming Conventions:
- Emitters: value_changed, data_ready, operation_completed
- Listeners: on_value_changed, on_data_ready
- Disconnect Unused Emitters: Before destroying objects or changing system states, disconnect emitters to prevent unwanted listener executions.
- Error Handling in Listeners: Handle exceptions in listeners to prevent crashes:
@nx_listener
def on_data_received(self, data):
try:
self.process_data(data)
except Exception as e:
logger.error(f"Error processing: {e}")
- Resource Cleanup: Disconnect emitters before cleanup or shutdown to ensure no pending queued calls to cleaned-up resources.
The @nx_with_worker
decorator creates an object with its own thread and event loop, enabling you to queue async tasks and offload work:
Basic Worker
from pynnex import nx_with_worker, nx_emitter
@nx_with_worker
class BackgroundWorker:
def __init__(self):
self.started.connect(self.on_started)
@nx_emitter
def work_done(self):
pass
@listener
async def on_started(self, *args, **kwargs):
# Initialize resources and start processing
await self.heavy_task(10)
async def heavy_task(self, data):
await asyncio.sleep(2) # Simulate heavy computation
self.work_done.emit(data * 2)
worker = BackgroundWorker()
worker.start()
worker.stop()
** Key Points for Workers**
- Connect to
started
signal to initialize resources and start processing - Use
queue_task(coro)
to run async tasks in the worker's event loop - Use
stop()
to request a graceful shutdown - Worker states: CREATED -> STARTING -> STARTED -> STOPPING -> STOPPED
Passing Arguments to Worker
Arguments passed to start()
are forwarded to the started
signal handlers:
@nx_with_worker
class ConfigurableWorker:
def __init__(self):
self.started.connect(self.on_started)
@listener
async def on_started(self, config=None):
# Use config here
print(f"Worker started with config: {config}")
worker = ConfigurableWorker()
worker.start(config={'threads': 4})
PynneX allows you to:
- Define emitters and listeners easily.
- Connect them across threads and async contexts without manual synchronization.
- Use worker threads for background tasks with seamless emitter/listener integration.
- Manage properties thread-safely with
nx_property
.
This ensures a clean, maintainable, and scalable architecture for event-driven Python applications.