-
-
Notifications
You must be signed in to change notification settings - Fork 33
/
_C001_DomoHTTP.py
81 lines (76 loc) · 2.79 KB
/
_C001_DomoHTTP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python3
#############################################################################
################# Domoticz HTTP Controller for RPIEasy ######################
#############################################################################
#
# Only one way data sending supported for obvious reasons.
#
# Copyright (C) 2018-2019 by Alexander Nagy - https://bitekmindenhol.blog.hu/
#
import controller
import rpieGlobals
from helper_domoticz import *
import misc
import urllib.request
from multiprocessing import Process
import base64
class Controller(controller.ControllerProto):
CONTROLLER_ID = 1
CONTROLLER_NAME = "Domoticz HTTP"
def __init__(self,controllerindex):
controller.ControllerProto.__init__(self,controllerindex)
self.usesID = True
self.usesAccount = True
self.usesPassword = True
def senddata(self,idx,sensortype,value,userssi=-1,usebattery=-1,tasknum=-1,changedvalue=-1):
if self.enabled:
if int(idx) != 0:
if int(sensortype)==rpieGlobals.SENSOR_TYPE_SWITCH:
url = "/json.htm?type=command¶m=switchlight&idx="
url += str(idx)
url += "&switchcmd="
if int(value[0]) == 0:
url += "Off"
else:
url += "On"
elif int(sensortype)==rpieGlobals.SENSOR_TYPE_DIMMER:
url = "/json.htm?type=command¶m=switchlight&idx="
url += str(idx)
url += "&switchcmd="
if int(value[0]) == 0:
url += "Off"
else:
url += "Set%20Level&level="
url += str(value[0])
else:
url = "/json.htm?type=command¶m=udevice&idx="
url += str(idx)
url += "&nvalue=0&svalue="
url += formatDomoticzSensorType(sensortype,value)
url += "&rssi="
url += mapRSSItoDomoticz(userssi)
if usebattery != -1 and usebattery != 255: # battery input 0..100%, 255 means not supported
url += "&battery="
url += str(usebattery)
else:
bval = misc.get_battery_value()
url += "&battery="
url += str(bval)
urlstr = "http://"+self.controllerip+":"+self.controllerport+url+self.getaccountstr()
misc.addLog(rpieGlobals.LOG_LEVEL_DEBUG,urlstr) # sendviahttp
httpproc = Process(target=self.urlget, args=(urlstr,)) # use multiprocess to avoid blocking
httpproc.start()
else:
misc.addLog(rpieGlobals.LOG_LEVEL_ERROR,"MQTT : IDX cannot be zero!")
def urlget(self,url):
try:
content = urllib.request.urlopen(url,None,2)
except:
misc.addLog(rpieGlobals.LOG_LEVEL_ERROR,"Controller: "+self.controllerip+" connection failed")
def getaccountstr(self):
retstr = ""
if self.controlleruser!="" or self.controllerpassword!="":
acc = base64.b64encode(bytes(self.controlleruser,"utf-8")).decode("utf-8")
pw = base64.b64encode(bytes(self.controllerpassword,"utf-8")).decode("utf-8")
retstr = "&username="+ str(acc) +"&password="+ str(pw)
return retstr