Skip to content

Commit

Permalink
Fix duration of UDP connection while searching for controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
MC325 committed Jul 24, 2024
1 parent 206d7e5 commit dcd7e90
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 45 deletions.
79 changes: 35 additions & 44 deletions instec/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,11 @@
from instec.constants import mode, connection


class udp_singleton:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(udp_singleton, cls).__new__(cls)
if not hasattr(cls, 'receiver'):
cls.receiver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cls.receiver.bind(
((udp_singleton._get_eth_addr()
), 50291))
cls.receiver.settimeout(connection.TIMEOUT)
if not hasattr(cls, 'sender'):
cls.sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cls.sender.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
return cls.instance
class controller:
"""All basic communication functions to interface with the MK2000/MK2000B.
"""
ethernet = []
usb = []

def _get_eth_addr():
if isinstance(connection.IP_ADDRESS, str):
Expand All @@ -48,42 +38,43 @@ def _get_eth_addr():
else:
raise RuntimeError('Unsupported OS')


class controller:
"""All basic communication functions to interface with the MK2000/MK2000B.
"""
ethernet = []
usb = []

def get_ethernet_controllers():
"""Get all controllers connected via Ethernet.
Returns:
List: List of tuples in the form (serial_num, ip)
"""
with (
socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as receiver,
socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender):

receiver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
receiver.bind((controller._get_eth_addr(), 50291))
receiver.settimeout(connection.TIMEOUT)

sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sender.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sender.sendto(
bytes.fromhex('73C4000001'),
('255.255.255.255', 50290))

controller.ethernet = []
while True:
try:
buffer, addr = receiver.recvfrom(1024)
buffer = buffer.decode()
data = buffer.strip().split(':')
model = data[0]
serial_num = data[1]
if model.startswith('IoT_MK#MK2000'):
controller.ethernet.append((serial_num, addr[0]))
except socket.error:
break
except Exception as error:
raise RuntimeError(
'Did not receive UDP response') from error

controller.udp = udp_singleton()

controller.udp.sender.sendto(
bytes.fromhex('73C4000001'),
('255.255.255.255', 50290))

controller.ethernet = []
while True:
try:
buffer, addr = controller.udp.receiver.recvfrom(1024)
buffer = buffer.decode()
data = buffer.strip().split(':')
model = data[0]
serial_num = data[1]
if model.startswith('IoT_MK#MK2000'):
controller.ethernet.append((serial_num, addr[0]))
except socket.error:
break
except Exception as error:
raise RuntimeError('Did not receive UDP response') from error

return controller.ethernet
return controller.ethernet

def get_usb_controllers():
"""Get all controllers connected via USB.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "instec"
version = "1.1.64"
version = "1.1.65"
authors = [
{ name="Matthew Chen", email="matthew.chen@instec.com" },
]
Expand Down

0 comments on commit dcd7e90

Please sign in to comment.