Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ocpp library as an event based message handler #168 #171

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions examples/v16/central_system_asgi_stateful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import datetime

try:
import uvicorn
import websockets
except ModuleNotFoundError:
print("This example relies on the 'uvicorn' and 'websockets' packages.")
print("Please install it by running: ")
print()
print(" $ pip install uvicorn websockets")
import sys

sys.exit(1)
from ocpp.routing import on, after
from ocpp.asgi_routing import Router
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result


# Stateful Router class
class ChargingStation(Router):
@on(Action.BootNotification)
def on_boot_notification(self, **kwargs):
return call_result.BootNotificationPayload(
current_time=datetime.utcnow().isoformat(),
interval=10,
status=RegistrationStatus.accepted,
)

@after(Action.BootNotification)
async def after_boot_notification(self, **kwargs):
pass

@on(Action.MeterValues)
def on_meter_values(self, **kwargs):
return call_result.MeterValuesPayload()


# Stateful Router class acting as Central System
class CentralSystem(Router):

charging_stations = {}

async def on_connect(self, context):
self.charging_stations[context.id] = ChargingStation(context=context)

async def on_disconnect(self, context):
del self.charging_stations[context.id]

async def on_receive(self, message, context):
await self.charging_stations[context.id].route_message(message)


if __name__ == "__main__":
central_system = CentralSystem()
headers = [("Sec-WebSocket-Protocol", "ocpp1.6")]
uvicorn.run(central_system, host="0.0.0.0", port=9000, headers=headers)
48 changes: 48 additions & 0 deletions examples/v16/central_system_asgi_stateless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from datetime import datetime

try:
import uvicorn
import websockets
except ModuleNotFoundError:
print("This example relies on the 'uvicorn' and 'websockets' packages.")
print("Please install it by running: ")
print()
print(" $ pip install uvicorn websockets")
import sys

sys.exit(1)
from ocpp.routing import on, after
from ocpp.asgi_routing import Router, Context
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result


# Stateless Router class
class ProvisioningRouter(Router):
@on(Action.BootNotification)
def on_boot_notification(self, event: dict, context: Context):
id = context.id # we can identify the charging_station by "id" in context
return call_result.BootNotificationPayload(
current_time=datetime.utcnow().isoformat(),
interval=10,
status=RegistrationStatus.accepted,
)

@after(Action.BootNotification)
async def after_boot_notification(self, event: dict, context: Context):
pass


# Stateless Router class
class MeterValuesRouter(Router):
@on(Action.MeterValues)
def on_meter_values(self, event: dict, context: Context):
return call_result.MeterValuesPayload()


if __name__ == "__main__":
router = Router(stateless=True)
router.include_router(ProvisioningRouter())
router.include_router(MeterValuesRouter())
headers = [("Sec-WebSocket-Protocol", "ocpp1.6")]
uvicorn.run(router, host="0.0.0.0", port=9000, headers=headers)
141 changes: 141 additions & 0 deletions ocpp/asgi_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# ASGIRouter can also be excluded from ocpp-library

from enum import Enum

from ocpp.routing import Router as RouterBase, Context, Connection, OCPPAdapter
from ocpp.v201 import call_result as v201_call_result, call as v201_call
from ocpp.v20 import call_result as v20_call_result, call as v20_call
from ocpp.v16 import call_result as v16_call_result, call as v16_call

from typing import MutableMapping, Any, Callable, Awaitable

Scope = MutableMapping[str, Any]
Message = MutableMapping[str, Any]

Receive = Callable[[], Awaitable[Message]]
Send = Callable[[Message], Awaitable[None]]

ASGIApp = Callable[[Scope, Receive, Send], Awaitable[None]]


class Subprotocols(str, Enum):
ocpp16 = "ocpp1.6"
ocpp20 = "ocpp2.0"
ocpp201 = "ocpp2.0.1"


ocpp_adapters = {
Subprotocols.ocpp201: OCPPAdapter(
call=v201_call, call_result=v201_call_result, ocpp_version="2.0.1"
),
Subprotocols.ocpp20: OCPPAdapter(
call=v20_call, call_result=v20_call_result, ocpp_version="2.0"
),
Subprotocols.ocpp16: OCPPAdapter(
call=v16_call, call_result=v16_call_result, ocpp_version="1.6"
),
}


class ASGIConnection(Connection):
"""Connection for sending and receiving messages."""

def __init__(self, send: Send, receive: Receive):
self._send = send
# self._receive is not set as receive happens via ASGI interface

async def send(self, message: str):
await self._send({"type": "websocket.send", "text": message})

async def recv(self) -> str:
raise NotImplementedError


def websocket_context(scope: Scope, receive: Receive, send: Send) -> Context:
id = scope["path"].strip("/")
subprotocols = scope["subprotocols"][0]
# Pick the highest matching subprotocol
if Subprotocols.ocpp201 in subprotocols:
ocpp_adapter = ocpp_adapters[Subprotocols.ocpp201]
elif Subprotocols.ocpp20 in subprotocols:
ocpp_adapter = ocpp_adapters[Subprotocols.ocpp201]
elif Subprotocols.ocpp16 in subprotocols:
ocpp_adapter = ocpp_adapters[Subprotocols.ocpp16]
else:
raise ValueError
context = Context(
connection=ASGIConnection(send, receive),
id=id,
ocpp_adapter=ocpp_adapter,
)
return context


class Router(RouterBase):
"""ASGI compatible router."""

async def __call__(
self, scope: Scope, receive: Receive, send: Send
) -> None:
"""ASGI signature handler.

Args:
scope (Scope): ASGI scope
receive (Receive): ASGI handle for receiving messages
send (Send): ASGI handle for sending messages
"""
if scope["type"] == "lifespan":
await self._lifecycle_handler(scope, receive, send)
elif scope["type"] == "websocket":
await self._websocket_handler(scope, receive, send)
elif scope["type"] == "http":
await self._http_handler(scope, receive, send)
else:
raise ValueError(f'Unsupported ASGI scope type: {scope["type"]}')

async def _lifecycle_handler(self, scope, receive, send):
event = await receive()
if event["type"] == "lifespan.startup":
await send({"type": "lifespan.startup.complete"})
elif event["type"] == "lifespan.shutdown":
await send({"type": "lifespan.shutdown.complete"})

async def _websocket_handler(self, scope, receive, send):
while True:
event = await receive()
context = websocket_context(scope, receive, send)
if event["type"] == "websocket.receive":
if "text" not in event:
# OCPP-J message is never binary.
raise ValueError
message = event["text"]
await self.on_receive(message, context=context)
elif event["type"] == "websocket.connect":
await self.on_connect(context)
await send({"type": "websocket.accept"})
elif event["type"] == "websocket.disconnect":
await self.on_disconnect(context)
await send({"type": "websocket.close"})

async def _http_handler(self, scope, receive, send):
# ASGI http.request event type handling can be extended in subclasses
# as interface between "Service" and "ASGI server" depends about
# the used service (e.g. AWS).
pass

async def on_connect(self, context):
if not self._stateless:
# Actual implementation for stateful router missing
raise NotImplementedError

async def on_disconnect(self, context):
if not self._stateless:
# Actual implementation for stateful router missing
raise NotImplementedError

async def on_receive(self, message, context):
if self._stateless:
await self.route_message(message, context=context)
else:
# Actual implementation for stateful router missing
raise NotImplementedError
Loading