Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions lewis/adapters/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import inspect
import re
import socket
from collections import deque

from scanf import scanf_compile

Expand All @@ -47,7 +48,22 @@ def __init__(self, sock, target, stream_server):
self._set_logging_context(target)
self.log.info("Client connected from %s:%s", *sock.getpeername())

self._send_event_message_queue = deque([])
initial_message = self._target.initial_message()
if initial_message:
self.unsolicited_reply(initial_message)

def send_event_message(self, message):
event_message = self._target.event_message(message)
if event_message:
self.unsolicited_reply(event_message)

def process(self, msec):

# if there are any event messages to send, send them
while len(self._send_event_message_queue) > 0:
self.send_event_message(self._send_event_message_queue.popleft())

if not self._buffer:
return

Expand Down Expand Up @@ -151,6 +167,7 @@ def __init__(self, host, port, target, device_lock):
self.log.info("Listening on %s:%s", host, port)

self._accepted_connections = []
self._device_event_message = None

def handle_accept(self):
pair = self.accept()
Expand Down Expand Up @@ -178,7 +195,12 @@ def close(self):

def process(self, msec):
for handler in self._accepted_connections:
if self._device_event_message:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I am missing something but where does self._device_event_message get set?

handler.send_event_message(self._device_event_message)
handler.process(msec)
# after processing all handlers, clear the event message to not repeat sending
if self._device_event_message:
self._device_event_message = None


class PatternMatcher:
Expand Down Expand Up @@ -856,3 +878,16 @@ def handle_error(self, request, error):
:param request: The request that resulted in the error.
:param error: The exception that was raised.
"""

def initial_message(self):
"""
Override this method to send an initial message when a new client connects.
"""
return None

def event_message(self, message):
"""
Override this method to handle and custom process at the interface level an event message.
returning None inhibts the message going out
"""
return message
2 changes: 2 additions & 0 deletions lewis/core/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def __init__(self, device, adapters=(), device_builder=None, control_server=None

self._device = device
self._adapters = AdapterCollection(*adapters)
# WIP to send event messages THIS IS VERY CRUDE to let device know of adapters
self._device._adapters = self._adapters

self._speed = 1.0 # Multiplier for delta t
self._cycle_delay = 0.1 # Target time between cycles
Expand Down
26 changes: 26 additions & 0 deletions lewis/devices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,32 @@ def _initialize_data(self):
"""
pass

def stream_event_message(self, message):
"""Call this method in the device to send periodic or other event messages that were not solicited

Args:
message ([str]): a message to send to active clients

# TODO: make it handle binary and str
"""

# determine if can send message and then send it
if self._adapters._adapters["stream"].is_running and getattr(
self._adapters._adapters["stream"].interface, "handler", None
):
try:
stream_server = self._adapters._adapters["stream"]._server
# send message to all connected stream handlers
for handler in stream_server._accepted_connections:
if handler.connected:
handler._send_event_message_queue.append(message)
# else:
# self.log.debug(
# f"Device _stream_event_message, Handler:{handler} is not connected"
# )
except Exception as e:
self.log.error(f"Device _stream_event_message: Exception: {e}")

def _get_final_state_handlers(self, overrides):
states = self._get_state_handlers()

Expand Down
182 changes: 182 additions & 0 deletions lewis/examples/simple_eventing_state_device/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-
# *********************************************************************
# lewis - a library for creating hardware device simulators
# Copyright (C) 2016-2020 European Spallation Source ERIC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# *********************************************************************

from collections import OrderedDict

from lewis.adapters.stream import Cmd, StreamInterface, Var
from lewis.core.statemachine import State
from lewis.devices import StateMachineDevice
from datetime import datetime


class VerySimpleStateDeviceWithEvents(StateMachineDevice):
param = 10

def _initialize_data(self):
self.connected = False

self.send_update_after = 10 # seconds
self.send_update_time_remainig = self.send_update_after
self.send_updates_enabled = False

def do_send_updates_enable(self):
self.send_updates_enabled = True
return f"ALIVE messages will start within {self.send_update_after}"

def do_send_updates_disable(self):
self.send_updates_enabled = False
return f"ALIVE messages will stop ------------------------------"

def do_send_updates_enable_value(self, state):
self.send_updates_enabled = state
return None

def _get_state_handlers(self):
return {"disconnected": State(), "connected": State()}

def _get_initial_state(self):
return "disconnected"

def _get_transition_handlers(self):
return OrderedDict(
[
(("disconnected", "connected"), lambda: self.connected),
(("connected", "disconnected"), lambda: not self.connected),
]
)

def event_handler(self, message):
"""Publishes a message to all listening stream clients

Args:
message ([str]): an event
"""
self.stream_event_message(message)

def process(self, dt):

# Simple scheduled event , determine when to send a periodic message
self.send_update_time_remainig -= dt
if self.send_update_time_remainig <= 0:
if self.send_updates_enabled:
self.event_handler(f"ALIVE:{datetime.now().isoformat()}")
# reset the periodic timer
self.send_update_time_remainig = self.send_update_after
return super().process(dt=dt)


class VerySimpleStateDeviceWithEventsInterface1(StreamInterface):
"""
A very simple device with TCP-stream interface that events and publishes a welcome message on connect

The device has only one parameter, which can be set to an arbitrary
value through this interface, The interface consists of the following commands which can be invoked via telnet.


to use this simulated device with lewis-control to manual send unsolicited responses/simulated events

`lewis -r localhost:10000 -k lewis.examples simple_state_eventing_device\r\n"

in a seperate terminal

`lewis-control -r localhost:10000 device event_handler test`


To connect:

$ telnet host port

After that, typing either of the commands and pressing enter sends them to the server.

The commands are:

- ``V``: Returns the parameter as part of a verbose message.
- ``V=something``: Sets the parameter to ``something``.
- ``P``: Returns the device parameter unmodified.
- ``P=something``: Exactly the same as ``V=something``.
- ``R`` or ``r``: Returns the number 4.
- ``START`` start regularly sending ALIVE messages regularly default 10s
- ``STOP`` stop sending ALIVE messages

- ``HELP`` Displays valid commands commands

"""

commands = {
Cmd("get_param", pattern="^V$", return_mapping="The value is {}".format),
Cmd("set_param", pattern="^V=(.+)$", argument_mappings=(int,)),
Cmd(
"show_help",
pattern="^HELP$",
),
Cmd(
"do_send_updates_enable",
pattern="^START$",
),
Cmd(
"do_send_updates_disable",
pattern="^STOP$",
),
Var(
"param",
read_pattern="^P$",
write_pattern="^P=(.+)$",
doc="One of the only parameters.",
),
Cmd(lambda: 4, pattern="^R$(?i)", doc='"Random" number (4).'),
}

in_terminator = "\r\n"
out_terminator = "\r\n"

readtimeout = 60000 # ms https://lewis.readthedocs.io/en/latest/api/adapters/stream.html?highlight=readtimeout#lewis.adapters.stream.StreamInterface

# TODO: Timeout handler

def initial_message(self):
return (
"r\n\r\nWelcome to the Simple Eventing Device\r\n\r\n"
" use HELP to display the commands available\r\n"
)

def show_help(self):
"""Returns the valid commands and other notes"""
return self.__doc__.replace("\n", "\r\n")

def get_param(self):
"""Returns the device parameter."""
return self.device.param

def set_param(self, new_param):
"""Set the device parameter, does not return anything."""
self.device.param = new_param

def handle_error(self, request, error):
return "An error occurred: " + repr(error)


setups = dict(
disconnected=dict(
device_type=VerySimpleStateDeviceWithEvents,
parameters=dict(
# override_initial_state="disconnected",
override_initial_data=dict(param=20),
),
)
)