Skip to content

Add singleton protection #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion sense_hat/sense_hat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pwd
import array
import fcntl
import threading
from PIL import Image # pillow
from copy import deepcopy

Expand All @@ -20,7 +21,10 @@
from .exceptions import ColourSensorInitialisationError

class SenseHat(object):

# singleton registration variables
_instance = None
_lock = threading.Lock()

SENSE_HAT_FB_NAME = 'RPi-Sense FB'
SENSE_HAT_FB_FBIOGET_GAMMA = 61696
SENSE_HAT_FB_FBIOSET_GAMMA = 61697
Expand All @@ -30,6 +34,14 @@ class SenseHat(object):
SENSE_HAT_FB_GAMMA_USER = 2
SETTINGS_HOME_PATH = '.config/sense_hat'

# Ensure that there is only a single SenseHat object running to prevent sensor read issues
def __new__(cls):
if cls._instance is None:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(
self,
imu_settings_file='RTIMULib',
Expand Down
33 changes: 33 additions & 0 deletions tests/test_singleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from sense_hat import SenseHat
import unittest
import time
import concurrent.futures


def get_temp(dummy):
sense = SenseHat()
time.sleep(5)
print(dummy)
return round(sense.get_temperature())

class TestSingleton(unittest.TestCase):
def test_singleton(self):
sense1 = SenseHat()
sense2 = SenseHat()
self.assertEqual(sense1.get_temperature(), sense2.get_temperature())
self.assertEqual(sense1, sense2)

@unittest.expectedFailure
def test_multiprocess(self):
executions = ["run1", "run2", "run3", "run4", "run5"]
prev_temp = None
with concurrent.futures.ProcessPoolExecutor() as executor:
for temp in executor.map(get_temp,executions):
if prev_temp:
print(prev_temp, temp)
self.assertEqual(temp, prev_temp)
prev_temp = temp


if __name__ == "__main__":
unittest.main()