a simple python event bus
pip install simple_event_bus
import asyncio
from simple_event_bus import AsyncEventBus, Event, run_simple_event_source_async
app = AsyncEventBus()
tick_list = []
@app.listening("HeartBeat")
async def tick_collector(event: Event) -> None:
print(event.now)
tick_list.append(event)
if len(tick_list) > 5:
await app.publish_event("close_loop")
asyncio.get_event_loop().run_until_complete(run_simple_event_source_async(app))
- EventBus
- AsyncEventBus
- EventBus.run_forever function
- EventBus.publish_event accept Event , EVENT_TYPE and str.
- Event can get current_app
- listening function args check
- add remove method
- allow no param method to listening
- Independent event sources
- add before event listener and after event listener
This project use black, please set
Continuation indent
= 4
Pycharm - File - Settings - Editor - Code Style - Python - Tabs and Indents
Use flake8 to check your code style.
- This project is made by AngusWG/cookiecutter-py-package