Skip to content

Commit

Permalink
Fixed bugs wrt environment variables. Fixed userfunc
Browse files Browse the repository at this point in the history
  • Loading branch information
rraks committed Oct 13, 2019
1 parent 65cd877 commit 1b7760e
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 35 deletions.
1 change: 1 addition & 0 deletions scripts/setenv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ echo "Setting environment variables"
# Set common environment variable
export LB_IP=10.156.14.138
export LB_PORT=5000
export MQTT_PORT=1883
export ORIGIN_IP=10.156.14.138
export ORIGIN_ID=TestOrigin
export DIST_IP=10.156.14.138
Expand Down
29 changes: 19 additions & 10 deletions src/HTTPserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
''' Environment Variables '''
LB_IP = os.environ["LB_IP"]
LB_PORT = os.environ["LB_PORT"]
MQTT_PORT = os.environ["MQTT_PORT"]
if LB_IP is None or LB_PORT is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
Expand Down Expand Up @@ -65,14 +66,16 @@ def on_message(client, userdata, message):
TODO: make all messages json
TODO: Add time out
'''
global msg, action
global addusers, delusers, verified, allusers
global allorigins, addorigin, delorigin, alldists
global adddists, deldists, addstreams, allstreams
global delstreams, allarchives, addarchives, delarchives
global stream_link
msg = message.payload.decode("utf-8")
print(msg)
topic = message.topic.decode("utf-8")
print(msg)
print(topic)
if topic == "lbsresponse/rtmp":
stream_link = msg
if topic == "lbsresponse/user/add":
Expand Down Expand Up @@ -113,7 +116,7 @@ def on_message(client, userdata, message):
mqParams = {}
mqParams["url"] = LB_IP
''' TODO: read this port from env variables '''
mqParams["port"] = 1883
mqParams["port"] = int(MQTT_PORT)
mqParams["timeout"] = 60
''' Mqtt list of topics the server users '''
mqParams["topic"] = [
Expand All @@ -129,13 +132,9 @@ def on_message(client, userdata, message):
]
mqParams["onMessage"] = on_message
client = MQTTPubSub(mqParams)
client.run()


''' HTTP App '''
app = Flask(__name__)
app.run(host=LB_IP, port=LB_PORT, debug=True)
print("Starting server on - ", LB_IP + ":" + LB_PORT)



Expand Down Expand Up @@ -235,7 +234,7 @@ def userfunc():
retval = allusers
allusers = ""
if retval:
return Response(json.dumps({"username": json.loads(retval)}),
return Response(retval,
status=200, mimetype='application/json')
else:
return Response(json.dumps({}), status=408,
Expand Down Expand Up @@ -650,10 +649,20 @@ def archivestream():
retval = allarchives
allarchives = ""
if retval:
return Response(json.dumps({"success": " Archive Streams Present: "+str(retval)}), status=200, mimetype='application/json')
return Response(json.dumps({"success": " Archive Streams Present: " +
str(retval)}), status=200,
mimetype='application/json')
else:
return Response(json.dumps({"error": " Operation Failed"}), status=408, mimetype='application/json')
return Response(json.dumps({"error": " Operation Failed"}),
status=408, mimetype='application/json')
else:
return Response(json.dumps({"error": "Request not supported"}), status=405, mimetype='application/json')
return Response(json.dumps({"error": "Request not supported"}),
status=405, mimetype='application/json')
else:
return Response(json.dumps({"error": "Invalid Credentials"}), status=403, mimetype='application/json')


if __name__ == "__main__":
print("Starting server on - ", LB_IP + ":" + LB_PORT)
client.run()
app.run(host=LB_IP, port=int(LB_PORT), debug=True)
17 changes: 9 additions & 8 deletions src/celeryLBmain.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, mqtt_ip, mqtt_port):
# MQTT Params
self.mqParams = {}
self.mqParams["url"] = mqtt_ip
self.mqParams["port"] = mqtt_port
self.mqParams["port"] = int(mqtt_port)
self.mqParams["timeout"] = 60
self.mqParams["topic"] = [("origin/get", 0), ("dist/get", 0),
("archive/get", 0), ("stream/get", 0),
Expand All @@ -44,8 +44,8 @@ def __init__(self, mqtt_ip, mqtt_port):

def on_message(self, client, userdata, message):
''' MQTT Callback function '''
self.msg = str(message.payload.decode("utf-8"))
self.action = str(message.topic.decode("utf-8"))
self.msg = message.payload.decode("UTF-8")
self.action = message.topic

def monitorTaskResult(self, res):
''' Celery task monitor '''
Expand Down Expand Up @@ -179,7 +179,7 @@ def router(self):
args=(res,)).start()

if self.action == "user/get":
res = lbc.GetUsers.delay(self.msg)
res = lbc.GetUsers.delay()
threading.Thread(target=self.monitorTaskResult,
args=(res,)).start()

Expand All @@ -197,17 +197,18 @@ def router(self):
res = lbc.VerifyUser.delay(self.msg)
threading.Thread(target=self.monitorTaskResult,
args=(res,)).start()

else:
if self.action == "idle":
self.action = "idle"
self.msg = ""
continue
time.sleep(0.001)
self.action = "idle"
self.msg = ""
time.sleep(0.01)


def main():
mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["LB_PORT"]
mqtt_port = os.environ["MQTT_PORT"]
if mqtt_ip is None or mqtt_port is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
Expand Down
12 changes: 8 additions & 4 deletions src/loadbalancercelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def update(self, key, doc):

def findOne(self, doc, args=None):
res = self.collection.find_one(doc, {"_id": 0})
if(res is None):
return {}
return res

def findAll(self, doc=None, args=None):
Expand All @@ -59,10 +61,10 @@ def findAll(self, doc=None, args=None):
if args is not None:
arg.update(args)
if (doc is not None):
res = self.collection.find(doc, arg)
res = list(self.collection.find(doc, arg))
return res
else:
return self.collection.find({}, arg)
return list(self.collection.find({}, arg))

def delete(self, doc):
self.collection.delete_one(doc)
Expand All @@ -75,7 +77,8 @@ def count(self):


''' mongo initializations '''
mongoclient = pymongo.MongoClient('mongodb://localhost:27017/')
''' TODO: Parameterize '''
mongoclient = pymongo.MongoClient('mongodb://localhost:27017/', connect=False)
mongoDB = mongoclient["vid-iot"]

'''
Expand Down Expand Up @@ -480,7 +483,7 @@ def RequestStream(msg):
bestDist = dist
dist = bestDist
resp = {"origin_id": stream["origin_id"], "origin_ip": stream["origin_ip"],
"dist_id": dist["dist_id"], "dist_ip": dist["dist_ip"]<
"dist_id": dist["dist_id"], "dist_ip": dist["dist_ip"],
"stream_id": stream["stream_id"], "stream_ip": stream["stream_ip"]}
userresp = {"stream_id": stream["stream_id"], "rtmp": ffproc["cmd"],
"hls": "http://" + ffproc["to_ip"] +
Expand Down Expand Up @@ -592,6 +595,7 @@ def GetUsers():
Response: HTTPServer.py
'''
users = usersTable.findAll(args={"password": 0})
logger.info("Showing all users")
return {"topic": "lbsresponse/user/all", "msg": users}


Expand Down
4 changes: 2 additions & 2 deletions src/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ def on_message(client, userdata, message):


mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["LB_PORT"]
mqtt_port = os.environ["MQTT_PORT"]
if mqtt_ip is None or mqtt_port is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
origin_id = os.environ["ORIGIN_ID"]

mqttServerParams = {}
mqttServerParams["url"] = mqtt_ip
mqttServerParams["port"] = mqtt_port
mqttServerParams["port"] = int(mqtt_port)
mqttServerParams["timeout"] = 60
mqttServerParams["topic"] = "+"
mqttServerParams["onMessage"] = on_message
Expand Down
6 changes: 4 additions & 2 deletions src/originffmpegarchiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, origin_id, mqtt_ip, mqtt_port):
self.msg = ""
self.mqParams = {}
self.mqParams["url"] = mqtt_ip
self.mqParams["port"] = mqtt_port
self.mqParams["port"] = int(mqtt_port)
self.mqParams["timeout"] = 60
self.mqParams["topic"] = [("origin/ffmpeg/archive/add", 0),
("origin/ffmpeg/archive/delete", 0)]
Expand Down Expand Up @@ -142,6 +142,8 @@ def router(self):
self.action = "idle"
self.msg = ""
continue
self.action = "idle"
self.msg = ""
time.sleep(0.001)

def on_message(self, client, userdata, message):
Expand Down Expand Up @@ -171,7 +173,7 @@ def monitorTaskResult(self, res):

def main():
mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["LB_PORT"]
mqtt_port = os.environ["MQTT_PORT"]
if mqtt_ip is None or mqtt_port is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
Expand Down
6 changes: 4 additions & 2 deletions src/originffmpegkiller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, origin_id, mqtt_ip, mqtt_port):
self.msg = ""
self.mqParams = {}
self.mqParams["url"] = mqtt_ip
self.mqParams["port"] = mqtt_port
self.mqParams["port"] = int(mqtt_port)
self.mqParams["timeout"] = 60
self.mqParams["topic"] = [("origin/ffmpeg/kill", 0),
("origin/ffmpeg/killall", 0)]
Expand All @@ -38,6 +38,8 @@ def router(self):
self.action = "idle"
self.msg = ""
continue
self.action = "idle"
self.msg = ""
time.sleep(0.001)

def on_message(self, client, userdata, message):
Expand Down Expand Up @@ -67,7 +69,7 @@ def monitorTaskResult(self, res):

def main():
mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["LB_PORT"]
mqtt_port = os.environ["MQTT_PORT"]
if mqtt_ip is None or mqtt_port is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
Expand Down
6 changes: 4 additions & 2 deletions src/originffmpegspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, origin_id, mqtt_ip, mqtt_port):
self.msg = ""
self.mqParams = {}
self.mqParams["url"] = mqtt_ip
self.mqParams["port"] = mqtt_port
self.mqParams["port"] = int(mqtt_port)
self.mqParams["timeout"] = 60
self.mqParams["topic"] = [("origin/ffmpeg/dist/respawn", 0),
("origin/ffmpeg/stream/spawn", 0),
Expand Down Expand Up @@ -45,6 +45,8 @@ def router(self):
self.action = "idle"
self.msg = ""
continue
self.action = "idle"
self.msg = ""
time.sleep(0.001)

def on_message(self, client, userdata, message):
Expand Down Expand Up @@ -86,7 +88,7 @@ def run(self):

def main():
mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["LB_PORT"]
mqtt_port = os.environ["MQTT_PORT"]
if mqtt_ip is None or mqtt_port is None:
print("Error! LB_IP and LB_PORT not set")
sys.exit(0)
Expand Down
11 changes: 6 additions & 5 deletions src/originstatchecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class Statter():
"""Statter Class to check status of NGINX based FFMPEG streams"""

def __init__(self, tsDBParams, statPageURL, mqttServer, mqttTopics):
def __init__(self, tsDBParams, statPageURL, mqtt_ip, mqtt_port, mqttTopics):
""" Internal Defs"""
self.statPageURL = statPageURL
''' Registered Streams '''
Expand All @@ -39,8 +39,8 @@ def __init__(self, tsDBParams, statPageURL, mqttServer, mqttTopics):
s.close()
''' MQTT Backend '''
self.mqttServerParams = {}
self.mqttServerParams["url"] = mqttServer
self.mqttServerParams["port"] = 1883
self.mqttServerParams["url"] = mqtt_ip
self.mqttServerParams["port"] = mqtt_port
self.mqttServerParams["timeout"] = 60
self.mqttServerParams["topic"] = mqttTopics
self.mqttServerParams["onMessage"] = self.on_message
Expand Down Expand Up @@ -241,14 +241,15 @@ def start(self):

if __name__ == "__main__":
statPageURL = "http://localhost:8080/stat"
mqttServer = "10.156.14.138"
mqtt_ip = os.environ["LB_IP"]
mqtt_port = os.environ["MQTT_PORT"]
mqttTopics = [("origin/ffmpeg/stream/stat/spawn", 0),
("origin/ffmpeg/kill", 0),
("lb/request/origin/streams", 0),
("origin/ffmpeg/killall", 0)]
tsDBParams = {"url": "127.0.0.1", "port": 8086,
"uname": "root", "pwd": "root", "app": "statter"}
statter = Statter(tsDBParams, statPageURL, mqttServer, mqttTopics)
statter = Statter(tsDBParams, statPageURL, mqtt_ip, mqtt_port, mqttTopics)

statter.start()

0 comments on commit 1b7760e

Please sign in to comment.