Skip to content

Commit

Permalink
cli get sleep mode
Browse files Browse the repository at this point in the history
  • Loading branch information
michelepagot committed Jan 20, 2021
1 parent 15ebfc8 commit 9eb53c5
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 22 deletions.
16 changes: 11 additions & 5 deletions src/pysds011/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def main(ctx, port, id):
sensor_id = None
if id:
sensor_id = bytes.fromhex(id)
else:
sensor_id = b'\xff\xff'
ctx.obj = Context(ser=main_ser, id=sensor_id)
log.debug('Process subcommands')

Expand Down Expand Up @@ -108,22 +110,26 @@ def fw_version(ctx):


@main.command()
@click.argument('mode', type=click.Choice(['0', '1']))
@click.argument('mode', type=click.Choice(['0', '1']), required=False)
@click.pass_obj
def sleep(ctx, mode):
"""
Set sleep MODE 1:sleep 0:wakeup
Just 'sleep' without a number result in querying the actual value applied in the sensor
"""
sd = None
exit_val = 0
log.debug('BEGIN mode:%d' % int(mode))
try:
ctx.serial.open()
ctx.serial.flushInput()
sd = driver.SDS011(ctx.serial, log)
if not sd.cmd_set_sleep(mode):
log.error('cmd_set_sleep error')
exit_val = 1
if mode:
log.debug('BEGIN cli mode:%d' % int(mode))
if not sd.cmd_set_sleep(int(mode), id=ctx.id):
log.error('cmd_set_sleep error')
exit_val = 1
else:
click.echo(sd.cmd_get_sleep())
except Exception as e:
log.exception(e)
exit_val = 1
Expand Down
32 changes: 28 additions & 4 deletions src/pysds011/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ def __construct_command(self, cmd, data=[], dest=b'\xff\xff'):
# this method is in charge of 6 bytes
# user needs to provide 13bytes = 1byte for cmd + some (max12) bytes for data
assert len(data) <= 12
self.log.debug("data:" + str(data) + " dest:" + str(dest))
# feel not provided data with zero
data += [0, ]*(12-len(data))
self.log.debug("Resized data:" + str(data))
# calculate the checksum: TODO has the dest to be included?
checksum = (sum(data)+sum(struct.unpack('<BB', dest))+cmd) % 256
# head:AA CommandID:B4 --> are common to all PC->Sensor commands
Expand Down Expand Up @@ -113,7 +115,7 @@ def __read_response(self):
if checksum != d[-2]:
self.log.error('Wrong rep checksum: expected ' + str(checksum) + ' and get ' + str(d[-2]))
return None
#if b'\xab' != d[-1]: # commented as it always result True
# if b'\xab' != d[-1]: # commented as it always result True
# self.log.error('Wrong TAIL ' + str(d[-1]))
# return None
self.__dump(d, '< ')
Expand Down Expand Up @@ -173,11 +175,23 @@ def __process_data(self, d):
def cmd_get_sleep(self):
"""Get active sleep mode
:return: True if it is sleeping
Take care that sensor will not response to it if it is sleeping. So mainly
this API will return 0 or None. None could means:
- I'm sleeping, so I cannot reply
- I'm not sleeping but something went wrong as responding
:return: True if it is sleeping, False if wakeup, None in case of communication error
:rtype: bool
"""
self.ser.write(self.__construct_command(CMD_SLEEP, [0x0, 0x0]))
return True
resp = self.__read_response()
if resp is None:
self.log.error("No sensor response")
return None
if 0 != resp[4] and 1 != resp[4]:
self.log.error("No valid sensor response %d", resp[4])
return None
return 0 == resp[4]

def cmd_set_sleep(self, sleep=1, id=b'\xff\xff'):
"""Set sleep mode
Expand All @@ -189,8 +203,10 @@ def cmd_set_sleep(self, sleep=1, id=b'\xff\xff'):
:return: True is set is ok
:rtype: bool
"""
assert id is not None
self.log.debug('is:%s', str(id))
mode = 0 if sleep else 1
self.log.debug('mode:%d', mode)
self.log.debug('driver mode:%d', mode)
self.ser.write(self.__construct_command(CMD_SLEEP, [0x1, mode], id))
resp = self.__read_response()
return resp is not None
Expand All @@ -203,6 +219,7 @@ def cmd_get_mode(self, id=b'\xff\xff'):
:return: mode if it is ok, None if error
:rtype: int
"""
assert id is not None
self.ser.write(self.__construct_command(CMD_MODE, [0x0, 0x0], id))
resp = self.__read_response()
if resp is None:
Expand All @@ -220,6 +237,7 @@ def cmd_set_mode(self, mode=1, id=b'\xff\xff'):
:return: True is set is ok
:rtype: bool
"""
assert id is not None
self.log.debug('mode:%d', mode)
self.ser.write(self.__construct_command(CMD_MODE, [0x1, mode], id))
resp = self.__read_response()
Expand All @@ -237,6 +255,7 @@ def cmd_firmware_ver(self, id=b'\xff\xff'):
:return: version description dictionary
:rtype: dict
"""
assert id is not None
# 7: CMD_FIRMWARE: not needs any PC->Sensor data
self.ser.write(self.__construct_command(7, dest=id))
d = self.__read_response()
Expand All @@ -251,6 +270,7 @@ def cmd_query_data(self, id=b'\xff\xff'):
:return: dust data as dictionary
:rtype: dict
"""
assert id is not None
self.ser.write(self.__construct_command(CMD_QUERY_DATA, dest=id))
d = self.__read_response()
self.log.debug(d)
Expand All @@ -270,6 +290,8 @@ def cmd_set_id(self, id, new_id):
:return: operation result
:rtype: bool
"""
assert id is not None
assert new_id is not None
self.ser.write(self.__construct_command(CMD_DEVICE_ID, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, new_id[0], new_id[1]], id))
d = self.__read_response()
if d is None:
Expand All @@ -291,6 +313,7 @@ def cmd_set_working_period(self, period=0, id=b'\xff\xff'):
:return: result
:rtype: bool
"""
assert id is not None
if period > 30:
return False
self.ser.write(self.__construct_command(CMD_WORKING_PERIOD, [0x01, period], id))
Expand All @@ -303,6 +326,7 @@ def cmd_get_working_period(self, id=b'\xff\xff'):
:return: working period in minutes: work 30 seconds and sleep n*60-30 seconds
:rtype: int
"""
assert id is not None
self.ser.write(self.__construct_command(CMD_WORKING_PERIOD, [0x00], id))
d = self.__read_response()
if d is None:
Expand Down
90 changes: 77 additions & 13 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ def test_cmd_set_sleep_get_only_head():
##################
log = logging.getLogger("SDS011")
sm = SerialMock()

DATA = b'\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
SENSOR_ID = b'\xff\xff'
EXPECTED_DRIVER_WRITE = compose_write(DATA, SENSOR_ID)

sm.test_expect_read(HEAD)

d = SDS011(sm, log)
Expand All @@ -449,9 +454,7 @@ def test_cmd_set_sleep_get_only_head():
# check expectation about what driver should sent to sensor
production_code_write_to_sensor = sm.test_get_write()
assert 1 == len(production_code_write_to_sensor)
DATA = b'\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'
CHECKSUM = bytes([sum(DATA) % 256])
assert HEAD + CMD_ID + DATA + CHECKSUM + TAIL == production_code_write_to_sensor[0]
assert EXPECTED_DRIVER_WRITE == production_code_write_to_sensor[0]


def test_cmd_set_sleep_wrong_checksum():
Expand All @@ -461,14 +464,15 @@ def test_cmd_set_sleep_wrong_checksum():
log = logging.getLogger("SDS011")
sm = SerialMock()
sm.test_expect_read(HEAD)
DATA_RSP = b'\x06\x01\x00\x00\xab\xcd'
DATA_RSP = b'\x06\x01\x00\x00'
SENSOR_ID_RSP = b'\xab\xcd'
CHECKSUM_RSP = bytes([sum(DATA_RSP) % 256 + 1])
sm.test_expect_read(RSP_ID+DATA_RSP+CHECKSUM_RSP+TAIL)
sm.test_expect_read(RSP_ID+DATA_RSP+SENSOR_ID_RSP+CHECKSUM_RSP+TAIL)
d = SDS011(sm, log)
assert False == d.cmd_set_sleep()


def test_cmd_get_sleep():
def test_cmd_get_sleep_sleepingsensor():
"""
Test correctly processed get sleep command
"""
Expand All @@ -477,19 +481,79 @@ def test_cmd_get_sleep():
##################
log = logging.getLogger("SDS011")
sm = SerialMock()

DATA = b'\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
SENSOR_ID = b'\xff\xff'
EXPECTED_DRIVER_WRITE = compose_write(DATA, SENSOR_ID)
sm.test_expect_read(HEAD)
DATA_RSP = b'\x06\x00\x00\x00\xab\xcd'
CHECKSUM_RSP = bytes([sum(DATA_RSP) % 256])
sm.test_expect_read(RSP_ID+DATA_RSP+CHECKSUM_RSP+TAIL)

DATA_RSP = b'\x06\x00\x00\x00'
SENSOR_ID_RSP = b'\xab\xcd'
sm.test_expect_read(compose_response(DATA_RSP + SENSOR_ID_RSP))
d = SDS011(sm, log)
assert True == d.cmd_get_sleep()

# check expectation about what driver should sent to sensor
production_code_write_to_sensor = sm.test_get_write()
assert 1 == len(production_code_write_to_sensor)
assert EXPECTED_DRIVER_WRITE == production_code_write_to_sensor[0]


def test_cmd_get_sleep_awakesensor():
"""
Test correctly processed get sleep command, sensor is not sleeping
"""
##################
# EXPECTATION
##################
log = logging.getLogger("SDS011")
sm = SerialMock()
DATA = b'\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
SENSOR_ID = b'\xff\xff'
EXPECTED_DRIVER_WRITE = compose_write(DATA, SENSOR_ID)

sm.test_expect_read(HEAD)
DATA_RSP = b'\x06\x00\x01\x00'
SENSOR_ID_RSP = b'\xab\xcd'
sm.test_expect_read(compose_response(DATA_RSP + SENSOR_ID_RSP))
d = SDS011(sm, log)
assert 1 == d.cmd_get_sleep()
assert False == d.cmd_get_sleep()

# check expectation about what driver should sent to sensor
production_code_write_to_sensor = sm.test_get_write()
assert 1 == len(production_code_write_to_sensor)
DATA = b'\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'
CHECKSUM = bytes([sum(DATA) % 256])
assert HEAD + CMD_ID + DATA + CHECKSUM + TAIL == production_code_write_to_sensor[0]
assert EXPECTED_DRIVER_WRITE == production_code_write_to_sensor[0]


def test_cmd_get_sleep_noresponse():
"""
Test correctly processed get sleep command, sensor is not sleeping
"""
##################
# EXPECTATION
##################
log = logging.getLogger("SDS011")
sm = SerialMock()
d = SDS011(sm, log)
assert d.cmd_get_sleep() is None


def test_cmd_get_sleep_invalid():
"""
Test correctly processed get sleep command, sensor is not sleeping
"""
##################
# EXPECTATION
##################
log = logging.getLogger("SDS011")
sm = SerialMock()

sm.test_expect_read(HEAD)
DATA_RSP = b'\x06\x00\x02\x00' # 2 is not valid status
SENSOR_ID_RSP = b'\xab\xcd'
sm.test_expect_read(compose_response(DATA_RSP + SENSOR_ID_RSP))
d = SDS011(sm, log)
assert d.cmd_get_sleep() is None


def test_cmd_query_data():
Expand Down
25 changes: 25 additions & 0 deletions tests/test_pysds011_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ def test_sleep(mocker):
css = mocker.patch('pysds011.driver.SDS011.cmd_set_sleep')
runner = CliRunner()
result = runner.invoke(main, ['sleep', '1'])
css.assert_called_once_with(1, id=None)

assert result.exit_code == 0


def test_sleep_with_id(mocker):
mocker.patch('serial.Serial.open')
mocker.patch('serial.Serial.flushInput')
css = mocker.patch('pysds011.driver.SDS011.cmd_set_sleep')
runner = CliRunner()
result = runner.invoke(main, ['--id', 'ABCD', 'sleep', '1'])
css.assert_called_once_with(1, id=b'\xab\xcd')

assert result.exit_code == 0

Expand Down Expand Up @@ -155,3 +167,16 @@ def test_sleep_driver_error(mocker):
result = runner.invoke(main, ['sleep', '0'])

assert result.exit_code == 1


def test_get_sleep(mocker):
mocker.patch('serial.Serial.open')
mocker.patch('serial.Serial.flushInput')
cgs = mocker.patch('pysds011.driver.SDS011.cmd_get_sleep')
cgs.return_value = 1
runner = CliRunner()
result = runner.invoke(main, ['sleep'])
cgs.assert_called_once_with()

assert '1' in result.output
assert result.exit_code == 0

0 comments on commit 9eb53c5

Please sign in to comment.