Skip to content

Commit

Permalink
Minor fix to statchecker pending
Browse files Browse the repository at this point in the history
  • Loading branch information
rraks committed Oct 17, 2019
1 parent f2d41c7 commit 9804713
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 23 deletions.
2 changes: 1 addition & 1 deletion scripts/docker_env.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
MONGO_INITDB_ROOT_USERNAME=vidiot
MONGO_INITDB_ROOT_PASSWORD=vidiot
MONGO_URL=db
HTTP_IP=10.156.14.138
HTTP_IP=0.0.0.0
LB_IP=lb
HTTP_PORT=5000
MQTT_PORT=1883
Expand Down
1 change: 1 addition & 0 deletions scripts/start_all_tmux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sed -i s/#allow_anonymous\ true/allow_anonymous\ true/ /etc/mosquitto/mosquitto.
/opt/nginx/sbin/nginx
redis-server --daemonize yes
mosquitto -d
nohup influxd > /dev/null 2>&1 &
# Project and root directories
PROJ_DIR=/vidiot/src

Expand Down
1 change: 1 addition & 0 deletions src/celeryLBmain.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, mqtt_ip, mqtt_port, mqtt_uname, mqtt_passwd):
("db/dist/ffmpeg/respawn", 1),
("archive/delete", 1), ("archive/add", 1),
("request/allstreams", 1),
("request/origin/streams", 1),
("db/origin/ffmpeg/respawn", 1),
("origin/stat", 1), ("dist/stat", 1),
("origin/add", 1), ("origin/delete", 1),
Expand Down
46 changes: 24 additions & 22 deletions src/originstatchecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,29 @@ def __init__(self, tsDBParams, statPageURL, mqtt_ip, mqtt_port, mqttTopics,
""" Origin Server IP Address, currently LAN IP """
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
self.origin_IP = str(s.getsockname()[0])
self.origin_ID = os.environ["ORIGIN_ID"]
if(self.origin_ID is None):
self.ORIGIN_IP = str(s.getsockname()[0])
self.ORIGIN_ID = os.environ["ORIGIN_ID"]
if(self.ORIGIN_ID is None):
sys.exit(0)

s.close()
''' MQTT Backend '''
self.mqttServerParams = {}
self.mqttServerParams["url"] = mqtt_ip
self.mqttServerParams["port"] = mqtt_port
self.mqttServerParams["port"] = int(mqtt_port)
self.mqttServerParams["timeout"] = 60
self.mqttServerParams["topic"] = mqttTopics
self.mqttServerParams["username"] = mqtt_uname
self.mqttServerParams["password"] = mqtt_passwd
self.mqttServerParams["onMessage"] = self.on_message
self.mqttc = MQTTPubSub(self.mqttServerParams)
self.mqttc.run()
self.numClients = 0
''' InfluxDB '''
self.influxClient = InfluxDBClient(
tsDBParams["url"], tsDBParams["port"],
tsDBParams["uname"], tsDBParams["pwd"], tsDBParams["appName"])
print("Initalization done")

def addNewStream(self, stream_id, stream_ip):
with self.dictLock:
Expand All @@ -69,18 +71,18 @@ def deleteStream(self, streamId):
del self.rS[streamId]

def on_message(self, client, userdata, message):
msg = str(message.payload.decode("utf-8"))
topic = str(message.topic.decode("utf-8"))
msg = message.payload
topic = message.topic
msgDict = json.loads(msg)
print(msgDict)
try:
if isinstance(msgDict, list):
for stream in msgDict:
if stream["origin_ip"] == self.origin_IP:
if stream["origin_id"] == self.ORIGIN_ID:
if topic == "origin/ffmpeg/kill":
self.deleteStream(stream["stream_id"])
else:
if msgDict["origin_ip"] == self.origin_IP:
if msgDict["origin_id"] == self.ORIGIN_ID:
if topic == "origin/ffmpeg/stream/stat/spawn":
self.addNewStream(msgDict["stream_id"],
msgDict["stream_ip"])
Expand All @@ -96,7 +98,7 @@ def on_message(self, client, userdata, message):
stream["stream_ip"])
self.startFlag = True
except Exception as e:
print(e)
print("Couldn't decode response", e)

def stat(self):
while(True):
Expand All @@ -109,7 +111,7 @@ def stat(self):
stats = (stats["rtmp"]["server"]["application"][1]
["live"]["stream"])
except Exception as e:
print(e)
print("Couldn't decode response", e)

if isinstance(stats, collections.OrderedDict):
statList.append(stats)
Expand All @@ -127,15 +129,15 @@ def stat(self):
self.rS[stat["name"]]["InBW"] = int(stat["bw_in"])
self.rS[stat["name"]]["status"] = 1
except Exception as e:
print(e)
print("Couldn't read stat ", e)
time.sleep(0.5)

def resetrevived(self, streamId):
with self.dictLock:
try:
self.rS[streamId]["revived"] = 0
except Exception as e:
print(e)
print("Couldn't revive ", e)

def checkStat(self):
while(True):
Expand All @@ -160,7 +162,7 @@ def pub(self):
streamId = stream
streamIp = self.rS[stream]["stream_ip"]
streamDict = {"stream_ip": streamIp,
"stream_id": streamId, "origin_id": self.origin_ID}
"stream_id": streamId, "origin_id": self.ORIGIN_ID}
print("Publishing")
print(streamDict)
self.mqttc.publish("db/origin/ffmpeg/respawn",
Expand All @@ -172,7 +174,7 @@ def logger(self):
while(True):
''' TODO: Send status on a per stream basis here '''
self.mqttc.publish("origin/stat",
json.dumps({"origin_id": self.origin_ID,
json.dumps({"origin_id": self.ORIGIN_ID,
"num_clients": str(self.numClients)}))
epochTime = int(time.time()) * 1000000000
self.logDataFlag = False
Expand Down Expand Up @@ -208,9 +210,9 @@ def logger(self):
time.sleep(30)

def start(self):
self.mqttc.run()
time.sleep(0.5)
self.mqttc.publish("request/origin/streams", json.dumps({"origin_ip": self.origin_IP}))
time.sleep(1)
self.mqttc.publish("request/origin/streams", json.dumps({"origin_id": self.ORIGIN_ID}))
print("Requesting all streams belonging to ", self.ORIGIN_ID)
while(self.startFlag is False):
time.sleep(0.5)
''' Start the stat thread '''
Expand Down Expand Up @@ -238,7 +240,7 @@ def start(self):
''' For printing '''
obj = self.rS[stream]
except Exception as e:
print(e)
print("Error in loop ", e)
time.sleep(2)


Expand All @@ -248,10 +250,10 @@ def start(self):
mqtt_port = os.environ["MQTT_PORT"]
mqtt_uname = os.environ["MQTT_UNAME"]
mqtt_passwd = os.environ["MQTT_PASSWD"]
mqttTopics = [("origin/ffmpeg/stream/stat/spawn", 0),
("origin/ffmpeg/kill", 0),
("lb/request/origin/streams", 0),
("origin/ffmpeg/killall", 0)]
mqttTopics = [("origin/ffmpeg/stream/stat/spawn", 1),
("origin/ffmpeg/kill", 1),
("lb/request/origin/streams", 1),
("origin/ffmpeg/killall", 1)]
''' TODO: Parameterize tsdb params '''
tsDBParams = {"url": "127.0.0.1", "port": 8086,
"uname": "root", "pwd": "root", "appName": "statter"}
Expand Down

0 comments on commit 9804713

Please sign in to comment.