Skip to content

Commit b3366ff

Browse files
moltobdhoomakethu
authored andcommitted
Asyncio client support for python 3 branch (#72)
* Remerge of changes in clean feature branch. * Excluded async tests due to twisted dependencies in imports. Adapted synchronous tests to new standard socket library by adding more required mocks. * Removed twisted dependency.
1 parent 5124afc commit b3366ff

File tree

11 files changed

+776
-357
lines changed

11 files changed

+776
-357
lines changed

pymodbus/client/async.py

Lines changed: 8 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -33,108 +33,27 @@ def process():
3333
reactor.run()
3434
"""
3535
from twisted.internet import defer, protocol
36+
from pymodbus.client import async_twisted
3637
from pymodbus.factory import ClientDecoder
37-
from pymodbus.exceptions import ConnectionException
3838
from pymodbus.transaction import ModbusSocketFramer
3939
from pymodbus.transaction import FifoTransactionManager
4040
from pymodbus.transaction import DictTransactionManager
4141
from pymodbus.client.common import ModbusClientMixin
42-
from twisted.python.failure import Failure
4342

4443
#---------------------------------------------------------------------------#
4544
# Logging
4645
#---------------------------------------------------------------------------#
4746
import logging
47+
4848
_logger = logging.getLogger(__name__)
4949

5050

5151
#---------------------------------------------------------------------------#
5252
# Connected Client Protocols
5353
#---------------------------------------------------------------------------#
54-
class ModbusClientProtocol(protocol.Protocol, ModbusClientMixin):
55-
'''
56-
This represents the base modbus client protocol. All the application
57-
layer code is deferred to a higher level wrapper.
58-
'''
59-
60-
def __init__(self, framer=None):
61-
''' Initializes the framer module
62-
63-
:param framer: The framer to use for the protocol
64-
'''
65-
self._connected = False
66-
self.framer = framer or ModbusSocketFramer(ClientDecoder())
67-
if isinstance(self.framer, ModbusSocketFramer):
68-
self.transaction = DictTransactionManager(self)
69-
else: self.transaction = FifoTransactionManager(self)
70-
71-
def connectionMade(self):
72-
''' Called upon a successful client connection.
73-
'''
74-
_logger.debug("Client connected to modbus server")
75-
self._connected = True
76-
77-
def connectionLost(self, reason):
78-
''' Called upon a client disconnect
79-
80-
:param reason: The reason for the disconnect
81-
'''
82-
_logger.debug("Client disconnected from modbus server: %s" % reason)
83-
self._connected = False
84-
for tid in list(self.transaction):
85-
self.transaction.getTransaction(tid).errback(Failure(
86-
ConnectionException('Connection lost during request')))
87-
88-
def dataReceived(self, data):
89-
''' Get response, check for valid message, decode result
90-
91-
:param data: The data returned from the server
92-
'''
93-
self.framer.processIncomingPacket(data, self._handleResponse)
94-
95-
def execute(self, request):
96-
''' Starts the producer to send the next request to
97-
consumer.write(Frame(request))
98-
'''
99-
request.transaction_id = self.transaction.getNextTID()
100-
packet = self.framer.buildPacket(request)
101-
self.transport.write(packet)
102-
return self._buildResponse(request.transaction_id)
103-
104-
def _handleResponse(self, reply):
105-
''' Handle the processed response and link to correct deferred
106-
107-
:param reply: The reply to process
108-
'''
109-
if reply is not None:
110-
tid = reply.transaction_id
111-
handler = self.transaction.getTransaction(tid)
112-
if handler:
113-
handler.callback(reply)
114-
else: _logger.debug("Unrequested message: " + str(reply))
115-
116-
def _buildResponse(self, tid):
117-
''' Helper method to return a deferred response
118-
for the current request.
119-
120-
:param tid: The transaction identifier for this response
121-
:returns: A defer linked to the latest request
122-
'''
123-
if not self._connected:
124-
return defer.fail(Failure(
125-
ConnectionException('Client is not connected')))
126-
127-
d = defer.Deferred()
128-
self.transaction.addTransaction(d, tid)
129-
return d
13054

131-
#----------------------------------------------------------------------#
132-
# Extra Functions
133-
#----------------------------------------------------------------------#
134-
#if send_failed:
135-
# if self.retry > 0:
136-
# deferLater(clock, self.delay, send, message)
137-
# self.retry -= 1
55+
# Backwards compatibility.
56+
ModbusClientProtocol = async_twisted.ModbusClientProtocol
13857

13958

14059
#---------------------------------------------------------------------------#
@@ -154,7 +73,8 @@ def __init__(self, framer=None):
15473
self.framer = framer or ModbusSocketFramer(ClientDecoder())
15574
if isinstance(self.framer, ModbusSocketFramer):
15675
self.transaction = DictTransactionManager(self)
157-
else: self.transaction = FifoTransactionManager(self)
76+
else:
77+
self.transaction = FifoTransactionManager(self)
15878

15979
def datagramReceived(self, data, params):
16080
''' Get response, check for valid message, decode result
@@ -184,7 +104,8 @@ def _handleResponse(self, reply):
184104
handler = self.transaction.getTransaction(tid)
185105
if handler:
186106
handler.callback(reply)
187-
else: _logger.debug("Unrequested message: " + str(reply))
107+
else:
108+
_logger.debug("Unrequested message: " + str(reply))
188109

189110
def _buildResponse(self, tid):
190111
''' Helper method to return a deferred response

pymodbus/client/async_asyncio.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""Asynchronous framework adapter for asyncio."""
2+
import asyncio
3+
from pymodbus.client.async_common import AsyncModbusClientMixin
4+
import logging
5+
6+
_logger = logging.getLogger(__name__)
7+
8+
9+
class ModbusClientProtocol(asyncio.Protocol, AsyncModbusClientMixin):
10+
"""Asyncio specific implementation of asynchronous modubus client protocol."""
11+
12+
#: Factory that created this instance.
13+
factory = None
14+
15+
def connection_made(self, transport):
16+
self.transport = transport
17+
self._connectionMade()
18+
19+
if self.factory:
20+
self.factory.protocol_made_connection(self)
21+
22+
def connection_lost(self, reason):
23+
self.transport = None
24+
self._connectionLost(reason)
25+
26+
if self.factory:
27+
self.factory.protocol_lost_connection(self)
28+
29+
def data_received(self, data):
30+
self._dataReceived(data)
31+
32+
def create_future(self):
33+
return asyncio.Future()
34+
35+
def resolve_future(self, f, result):
36+
f.set_result(result)
37+
38+
def raise_future(self, f, exc):
39+
f.set_exception(exc)
40+
41+
42+
class ReconnectingAsyncioModbusTcpClient(object):
43+
"""Client to connect to modbus device repeatedly over TCP/IP."""
44+
45+
#: Reconnect delay in milli seconds.
46+
delay_ms = 0
47+
48+
#: Maximum delay in milli seconds before reconnect is attempted.
49+
DELAY_MAX_MS = 1000 * 60 * 5
50+
51+
def __init__(self, protocol_class=None, loop=None):
52+
#: Protocol used to talk to modbus device.
53+
self.protocol_class = protocol_class or ModbusClientProtocol
54+
#: Current protocol instance.
55+
self.protocol = None
56+
#: Event loop to use.
57+
self.loop = loop or asyncio.get_event_loop()
58+
59+
self.host = None
60+
self.port = 0
61+
62+
self.connected = False
63+
self.reset_delay()
64+
65+
def reset_delay(self):
66+
"""Resets wait before next reconnect to minimal period."""
67+
self.delay_ms = 100
68+
69+
@asyncio.coroutine
70+
def start(self, host, port=502):
71+
# force reconnect if required:
72+
self.stop()
73+
74+
_logger.debug('Connecting to %s:%s.' % (host, port))
75+
self.host = host
76+
self.port = port
77+
78+
yield from self._connect()
79+
80+
def stop(self):
81+
# prevent reconnect:
82+
self.host = None
83+
84+
if self.connected:
85+
if self.protocol:
86+
if self.protocol.transport:
87+
self.protocol.transport.close()
88+
89+
def _create_protocol(self):
90+
"""Factory function to create initialized protocol instance."""
91+
protocol = self.protocol_class()
92+
protocol.factory = self
93+
return protocol
94+
95+
@asyncio.coroutine
96+
def _connect(self):
97+
_logger.debug('Connecting.')
98+
try:
99+
yield from self.loop.create_connection(self._create_protocol, self.host, self.port)
100+
_logger.info('Connected to %s:%s.' % (self.host, self.port))
101+
except Exception as ex:
102+
_logger.warning('Failed to connect: %s' % ex)
103+
asyncio.async(self._reconnect(), loop=self.loop)
104+
105+
def protocol_made_connection(self, protocol):
106+
"""Protocol notification of successful connection."""
107+
_logger.info('Protocol made connection.')
108+
if not self.connected:
109+
self.connected = True
110+
self.protocol = protocol
111+
else:
112+
_logger.error('Factory protocol connect callback called while connected.')
113+
114+
def protocol_lost_connection(self, protocol):
115+
"""Protocol notification of lost connection."""
116+
if self.connected:
117+
_logger.info('Protocol lost connection.')
118+
if protocol is not self.protocol:
119+
_logger.error('Factory protocol callback called from unexpected protocol instance.')
120+
121+
self.connected = False
122+
self.protocol = None
123+
if self.host:
124+
asyncio.async(self._reconnect(), loop=self.loop)
125+
else:
126+
_logger.error('Factory protocol disconnect callback called while not connected.')
127+
128+
@asyncio.coroutine
129+
def _reconnect(self):
130+
_logger.debug('Waiting %d ms before next connection attempt.' % self.delay_ms)
131+
yield from asyncio.sleep(self.delay_ms / 1000)
132+
self.delay_ms = min(2 * self.delay_ms, self.DELAY_MAX_MS)
133+
yield from self._connect()

pymodbus/client/async_common.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Common logic of asynchronous client."""
2+
from pymodbus.factory import ClientDecoder
3+
from pymodbus.exceptions import ConnectionException
4+
from pymodbus.transaction import ModbusSocketFramer
5+
from pymodbus.transaction import FifoTransactionManager
6+
from pymodbus.transaction import DictTransactionManager
7+
from pymodbus.client.common import ModbusClientMixin
8+
9+
10+
#---------------------------------------------------------------------------#
11+
# Logging
12+
#---------------------------------------------------------------------------#
13+
import logging
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
#---------------------------------------------------------------------------#
18+
# Connected Client Protocols
19+
#---------------------------------------------------------------------------#
20+
class AsyncModbusClientMixin(ModbusClientMixin):
21+
"""Abstract asynchronous protocol running high level modbus logic on top
22+
of asynchronous loop.
23+
24+
Behavior specific to an asynchronous framework like Twisted or asyncio is
25+
implemented in a derived class.
26+
"""
27+
28+
transport = None
29+
30+
def __init__(self, framer=None):
31+
''' Initializes the framer module
32+
33+
:param framer: The framer to use for the protocol.
34+
'''
35+
self._connected = False
36+
self.framer = framer or ModbusSocketFramer(ClientDecoder())
37+
38+
if isinstance(self.framer, ModbusSocketFramer):
39+
self.transaction = DictTransactionManager(self)
40+
else:
41+
self.transaction = FifoTransactionManager(self)
42+
43+
def _connectionMade(self):
44+
''' Called upon a successful client connection.
45+
'''
46+
_logger.debug("Client connected to modbus server")
47+
self._connected = True
48+
49+
def _connectionLost(self, reason):
50+
''' Called upon a client disconnect
51+
52+
:param reason: The reason for the disconnect
53+
'''
54+
_logger.debug("Client disconnected from modbus server: %s" % reason)
55+
self._connected = False
56+
for tid in list(self.transaction):
57+
self.raise_future(self.transaction.getTransaction(tid), ConnectionException('Connection lost during request'))
58+
59+
def _dataReceived(self, data):
60+
''' Get response, check for valid message, decode result
61+
62+
:param data: The data returned from the server
63+
'''
64+
self.framer.processIncomingPacket(data, self._handleResponse)
65+
66+
def execute(self, request):
67+
''' Starts the producer to send the next request to
68+
consumer.write(Frame(request))
69+
'''
70+
request.transaction_id = self.transaction.getNextTID()
71+
packet = self.framer.buildPacket(request)
72+
self.transport.write(packet)
73+
return self._buildResponse(request.transaction_id)
74+
75+
def _handleResponse(self, reply):
76+
''' Handle the processed response and link to correct deferred
77+
78+
:param reply: The reply to process
79+
'''
80+
if reply is not None:
81+
tid = reply.transaction_id
82+
handler = self.transaction.getTransaction(tid)
83+
if handler:
84+
self.resolve_future(handler, reply)
85+
else:
86+
_logger.debug("Unrequested message: " + str(reply))
87+
88+
def _buildResponse(self, tid):
89+
''' Helper method to return a deferred response
90+
for the current request.
91+
92+
:param tid: The transaction identifier for this response
93+
:returns: A defer linked to the latest request
94+
'''
95+
f = self.create_future()
96+
if not self._connected:
97+
self.raise_future(f, ConnectionException('Client is not connected'))
98+
else:
99+
self.transaction.addTransaction(f, tid)
100+
return f
101+
102+
def create_future(self):
103+
raise NotImplementedError()
104+
105+
def resolve_future(self, f, result):
106+
raise NotImplementedError()
107+
108+
def raise_future(self, f, exc):
109+
raise NotImplementedError()
110+
111+
112+
#---------------------------------------------------------------------------#
113+
# Exported symbols
114+
#---------------------------------------------------------------------------#
115+
__all__ = [
116+
"AsyncModbusClientMixin",
117+
]
118+
#----------------------------------------------------------------------#

0 commit comments

Comments
 (0)