Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.tox
.cache
.venv
.history
*.egg-info
/.project
/.pydevproject
Expand Down
74 changes: 42 additions & 32 deletions dsmr_parser/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, telegram_specification, apply_checksum_validation=True):
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
for object in self.telegram_specification['objects']
}
self._telegram_encryption_active = None

def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
"""
Expand All @@ -46,38 +47,11 @@ def parse(self, telegram_data, encryption_key="", authentication_key="", throw_e
:raises ParseError:
:raises InvalidChecksumError:
"""

if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError(
"Looks like a general_global_cipher frame but telegram specification is not matching!")
except Exception:
pass
telegram_data = self.decrypt_telegram_data(
telegram_data=telegram_data,
encryption_key=encryption_key,
authentication_key=authentication_key
)

if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data)
Expand Down Expand Up @@ -112,6 +86,42 @@ def parse(self, telegram_data, encryption_key="", authentication_key="", throw_e

return telegram

def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data):
"""
Check if telegram data is encrypted and decrypt if applicable.
"""
# if self._telegram_encryption_active is False:
# # If encryption is not working, stop trying and logging warnings.
# return telegram_data

if self.telegram_specification.get("general_global_cipher"):
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
self._telegram_encryption_active = True
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
logger.warning("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
self._telegram_encryption_active = False

return telegram_data

@staticmethod
def validate_checksum(telegram):
"""
Expand Down
Empty file added test/clients/__init__.py
Empty file.
21 changes: 21 additions & 0 deletions test/clients/test_filereader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import unittest
import tempfile

from dsmr_parser.clients.filereader import FileReader
from dsmr_parser.telegram_specifications import V5
from test.example_telegrams import TELEGRAM_V5


class FileReaderTest(unittest.TestCase):
def test_read_as_object(self):
with tempfile.NamedTemporaryFile() as file:
with open(file.name, "w") as f:
f.write(TELEGRAM_V5)

telegrams = []
reader = FileReader(file=file.name, telegram_specification=V5)
# Call
for telegram in reader.read_as_object():
telegrams.append(telegram)

self.assertEqual(len(telegrams), 1)
77 changes: 77 additions & 0 deletions test/clients/test_rfxtrx_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from unittest.mock import Mock

import unittest

from dsmr_parser import obis_references as obis
from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1
from dsmr_parser.objects import Telegram

TELEGRAM_V2_2 = (
'/ISk5\2MT382-1004\r\n'
'\r\n'
'0-0:96.1.1(00000000000000)\r\n'
'1-0:1.8.1(00001.001*kWh)\r\n'
'1-0:1.8.2(00001.001*kWh)\r\n'
'1-0:2.8.1(00001.001*kWh)\r\n'
'1-0:2.8.2(00001.001*kWh)\r\n'
'0-0:96.14.0(0001)\r\n'
'1-0:1.7.0(0001.01*kW)\r\n'
'1-0:2.7.0(0000.00*kW)\r\n'
'0-0:17.0.0(0999.00*kW)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'0-1:24.1.0(3)\r\n'
'0-1:96.1.0(000000000000)\r\n'
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)

OTHER_RF_PACKET = b'\x03\x01\x02\x03'


def encode_telegram_as_RF_packets(telegram):
data = b''

for line in telegram.split('\n'):
packet_data = (line + '\n').encode('ascii')
packet_header = bytes(bytearray([
len(packet_data) + 3, # excluding length byte
PACKETTYPE_DSMR,
SUBTYPE_P1,
0 # seq num (ignored)
]))

data += packet_header + packet_data
# other RF packets can pass by on the line
data += OTHER_RF_PACKET

return data


class RFXtrxProtocolTest(unittest.TestCase):

def setUp(self):
new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2',
telegram_callback=Mock(),
keep_alive_interval=1)
self.protocol = new_protocol()

def test_complete_packet(self):
"""Protocol should assemble incoming lines into complete packet."""

data = encode_telegram_as_RF_packets(TELEGRAM_V2_2)
# send data broken up in two parts
self.protocol.data_received(data[0:200])
self.protocol.data_received(data[200:])

telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
assert isinstance(telegram, Telegram)

assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'

assert float(telegram[obis.GAS_METER_READING].value) == 1.001
assert telegram[obis.GAS_METER_READING].unit == 'm3'
2 changes: 1 addition & 1 deletion test/test_parse_v5.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_parse(self):
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
except Exception as ex:
assert False, f"parse trigged an exception {ex}"
print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__)

# P1_MESSAGE_HEADER (1-3:0.2.8)
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
assert telegram.P1_MESSAGE_HEADER.unit is None
Expand Down
Loading