Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 8 additions & 87 deletions pymodbus/client/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,108 +33,27 @@ def process():
reactor.run()
"""
from twisted.internet import defer, protocol
from pymodbus.client import async_twisted
from pymodbus.factory import ClientDecoder
from pymodbus.exceptions import ConnectionException
from pymodbus.transaction import ModbusSocketFramer
from pymodbus.transaction import FifoTransactionManager
from pymodbus.transaction import DictTransactionManager
from pymodbus.client.common import ModbusClientMixin
from twisted.python.failure import Failure

#---------------------------------------------------------------------------#
# Logging
#---------------------------------------------------------------------------#
import logging

_logger = logging.getLogger(__name__)


#---------------------------------------------------------------------------#
# Connected Client Protocols
#---------------------------------------------------------------------------#
class ModbusClientProtocol(protocol.Protocol, ModbusClientMixin):
'''
This represents the base modbus client protocol. All the application
layer code is deferred to a higher level wrapper.
'''

def __init__(self, framer=None):
''' Initializes the framer module

:param framer: The framer to use for the protocol
'''
self._connected = False
self.framer = framer or ModbusSocketFramer(ClientDecoder())
if isinstance(self.framer, ModbusSocketFramer):
self.transaction = DictTransactionManager(self)
else: self.transaction = FifoTransactionManager(self)

def connectionMade(self):
''' Called upon a successful client connection.
'''
_logger.debug("Client connected to modbus server")
self._connected = True

def connectionLost(self, reason):
''' Called upon a client disconnect

:param reason: The reason for the disconnect
'''
_logger.debug("Client disconnected from modbus server: %s" % reason)
self._connected = False
for tid in list(self.transaction):
self.transaction.getTransaction(tid).errback(Failure(
ConnectionException('Connection lost during request')))

def dataReceived(self, data):
''' Get response, check for valid message, decode result

:param data: The data returned from the server
'''
self.framer.processIncomingPacket(data, self._handleResponse)

def execute(self, request):
''' Starts the producer to send the next request to
consumer.write(Frame(request))
'''
request.transaction_id = self.transaction.getNextTID()
packet = self.framer.buildPacket(request)
self.transport.write(packet)
return self._buildResponse(request.transaction_id)

def _handleResponse(self, reply):
''' Handle the processed response and link to correct deferred

:param reply: The reply to process
'''
if reply is not None:
tid = reply.transaction_id
handler = self.transaction.getTransaction(tid)
if handler:
handler.callback(reply)
else: _logger.debug("Unrequested message: " + str(reply))

def _buildResponse(self, tid):
''' Helper method to return a deferred response
for the current request.

:param tid: The transaction identifier for this response
:returns: A defer linked to the latest request
'''
if not self._connected:
return defer.fail(Failure(
ConnectionException('Client is not connected')))

d = defer.Deferred()
self.transaction.addTransaction(d, tid)
return d

#----------------------------------------------------------------------#
# Extra Functions
#----------------------------------------------------------------------#
#if send_failed:
# if self.retry > 0:
# deferLater(clock, self.delay, send, message)
# self.retry -= 1
# Backwards compatibility.
ModbusClientProtocol = async_twisted.ModbusClientProtocol


#---------------------------------------------------------------------------#
Expand All @@ -154,7 +73,8 @@ def __init__(self, framer=None):
self.framer = framer or ModbusSocketFramer(ClientDecoder())
if isinstance(self.framer, ModbusSocketFramer):
self.transaction = DictTransactionManager(self)
else: self.transaction = FifoTransactionManager(self)
else:
self.transaction = FifoTransactionManager(self)

def datagramReceived(self, data, params):
''' Get response, check for valid message, decode result
Expand Down Expand Up @@ -184,7 +104,8 @@ def _handleResponse(self, reply):
handler = self.transaction.getTransaction(tid)
if handler:
handler.callback(reply)
else: _logger.debug("Unrequested message: " + str(reply))
else:
_logger.debug("Unrequested message: " + str(reply))

def _buildResponse(self, tid):
''' Helper method to return a deferred response
Expand Down
133 changes: 133 additions & 0 deletions pymodbus/client/async_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Asynchronous framework adapter for asyncio."""
import asyncio
from pymodbus.client.async_common import AsyncModbusClientMixin
import logging

_logger = logging.getLogger(__name__)


class ModbusClientProtocol(asyncio.Protocol, AsyncModbusClientMixin):
"""Asyncio specific implementation of asynchronous modubus client protocol."""

#: Factory that created this instance.
factory = None

def connection_made(self, transport):
self.transport = transport
self._connectionMade()

if self.factory:
self.factory.protocol_made_connection(self)

def connection_lost(self, reason):
self.transport = None
self._connectionLost(reason)

if self.factory:
self.factory.protocol_lost_connection(self)

def data_received(self, data):
self._dataReceived(data)

def create_future(self):
return asyncio.Future()

def resolve_future(self, f, result):
f.set_result(result)

def raise_future(self, f, exc):
f.set_exception(exc)


class ReconnectingAsyncioModbusTcpClient(object):
"""Client to connect to modbus device repeatedly over TCP/IP."""

#: Reconnect delay in milli seconds.
delay_ms = 0

#: Maximum delay in milli seconds before reconnect is attempted.
DELAY_MAX_MS = 1000 * 60 * 5

def __init__(self, protocol_class=None, loop=None):
#: Protocol used to talk to modbus device.
self.protocol_class = protocol_class or ModbusClientProtocol
#: Current protocol instance.
self.protocol = None
#: Event loop to use.
self.loop = loop or asyncio.get_event_loop()

self.host = None
self.port = 0

self.connected = False
self.reset_delay()

def reset_delay(self):
"""Resets wait before next reconnect to minimal period."""
self.delay_ms = 100

@asyncio.coroutine
def start(self, host, port=502):
# force reconnect if required:
self.stop()

_logger.debug('Connecting to %s:%s.' % (host, port))
self.host = host
self.port = port

yield from self._connect()

def stop(self):
# prevent reconnect:
self.host = None

if self.connected:
if self.protocol:
if self.protocol.transport:
self.protocol.transport.close()

def _create_protocol(self):
"""Factory function to create initialized protocol instance."""
protocol = self.protocol_class()
protocol.factory = self
return protocol

@asyncio.coroutine
def _connect(self):
_logger.debug('Connecting.')
try:
yield from self.loop.create_connection(self._create_protocol, self.host, self.port)
_logger.info('Connected to %s:%s.' % (self.host, self.port))
except Exception as ex:
_logger.warning('Failed to connect: %s' % ex)
asyncio.async(self._reconnect(), loop=self.loop)

def protocol_made_connection(self, protocol):
"""Protocol notification of successful connection."""
_logger.info('Protocol made connection.')
if not self.connected:
self.connected = True
self.protocol = protocol
else:
_logger.error('Factory protocol connect callback called while connected.')

def protocol_lost_connection(self, protocol):
"""Protocol notification of lost connection."""
if self.connected:
_logger.info('Protocol lost connection.')
if protocol is not self.protocol:
_logger.error('Factory protocol callback called from unexpected protocol instance.')

self.connected = False
self.protocol = None
if self.host:
asyncio.async(self._reconnect(), loop=self.loop)
else:
_logger.error('Factory protocol disconnect callback called while not connected.')

@asyncio.coroutine
def _reconnect(self):
_logger.debug('Waiting %d ms before next connection attempt.' % self.delay_ms)
yield from asyncio.sleep(self.delay_ms / 1000)
self.delay_ms = min(2 * self.delay_ms, self.DELAY_MAX_MS)
yield from self._connect()
118 changes: 118 additions & 0 deletions pymodbus/client/async_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Common logic of asynchronous client."""
from pymodbus.factory import ClientDecoder
from pymodbus.exceptions import ConnectionException
from pymodbus.transaction import ModbusSocketFramer
from pymodbus.transaction import FifoTransactionManager
from pymodbus.transaction import DictTransactionManager
from pymodbus.client.common import ModbusClientMixin


#---------------------------------------------------------------------------#
# Logging
#---------------------------------------------------------------------------#
import logging

_logger = logging.getLogger(__name__)

#---------------------------------------------------------------------------#
# Connected Client Protocols
#---------------------------------------------------------------------------#
class AsyncModbusClientMixin(ModbusClientMixin):
"""Abstract asynchronous protocol running high level modbus logic on top
of asynchronous loop.

Behavior specific to an asynchronous framework like Twisted or asyncio is
implemented in a derived class.
"""

transport = None

def __init__(self, framer=None):
''' Initializes the framer module

:param framer: The framer to use for the protocol.
'''
self._connected = False
self.framer = framer or ModbusSocketFramer(ClientDecoder())

if isinstance(self.framer, ModbusSocketFramer):
self.transaction = DictTransactionManager(self)
else:
self.transaction = FifoTransactionManager(self)

def _connectionMade(self):
''' Called upon a successful client connection.
'''
_logger.debug("Client connected to modbus server")
self._connected = True

def _connectionLost(self, reason):
''' Called upon a client disconnect

:param reason: The reason for the disconnect
'''
_logger.debug("Client disconnected from modbus server: %s" % reason)
self._connected = False
for tid in list(self.transaction):
self.raise_future(self.transaction.getTransaction(tid), ConnectionException('Connection lost during request'))

def _dataReceived(self, data):
''' Get response, check for valid message, decode result

:param data: The data returned from the server
'''
self.framer.processIncomingPacket(data, self._handleResponse)

def execute(self, request):
''' Starts the producer to send the next request to
consumer.write(Frame(request))
'''
request.transaction_id = self.transaction.getNextTID()
packet = self.framer.buildPacket(request)
self.transport.write(packet)
return self._buildResponse(request.transaction_id)

def _handleResponse(self, reply):
''' Handle the processed response and link to correct deferred

:param reply: The reply to process
'''
if reply is not None:
tid = reply.transaction_id
handler = self.transaction.getTransaction(tid)
if handler:
self.resolve_future(handler, reply)
else:
_logger.debug("Unrequested message: " + str(reply))

def _buildResponse(self, tid):
''' Helper method to return a deferred response
for the current request.

:param tid: The transaction identifier for this response
:returns: A defer linked to the latest request
'''
f = self.create_future()
if not self._connected:
self.raise_future(f, ConnectionException('Client is not connected'))
else:
self.transaction.addTransaction(f, tid)
return f

def create_future(self):
raise NotImplementedError()

def resolve_future(self, f, result):
raise NotImplementedError()

def raise_future(self, f, exc):
raise NotImplementedError()


#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
"AsyncModbusClientMixin",
]
#----------------------------------------------------------------------#
Loading