-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.py
91 lines (70 loc) · 3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Just for Testing purpose for now.
After completion of Hyperlite server,
It will contain the code which will interact with the driver.
"""
import os
import threading
from hyperlite import event_loop
from hyperlite.request_parser import Parser
from hyperlite.event import Event
from server import Socket
from hyperlite.collection import Collection
from hyperlite.process.process import renderProcess, renderRIDUProcess, DataPipelineProcess, renderProviderProcess
from hyperlite import config
from hyperlite.logger import Log
from storage_engine.provider import Provider, loadCollection
TAG = "Main_Process" # Constant used just for logging
def listenForConnection():
Socket(config.DEFAULT.get('host'), config.DEFAULT.get('port')).listen()
def initMe():
Log.welcome()
Log.c(TAG, "Starting Hyperlite Database")
Log.w(TAG, f"We are running on {config.PLATFORM} Operating System")
Log.i(TAG, f"Database files can be found on {config.DATABASE_PATH} ")
if os.path.exists(config.COLLECTION_PATH):
meta_col = loadCollection(config.COLLECTION_PATH)
Provider.meta_collection = meta_col
Log.i(TAG, "Meta collection found on disk")
else:
meta_col = Collection("hyperlite.col", "MetaData")
Provider.meta_collection = meta_col
Log.w(TAG, "Meta collection file not found so creating new meta collection")
if __name__ == "__main__":
initMe()
# server_process = threading.Thread(target=listenForConnection)
loop_runner = event_loop.LoopRunner()
loop_runner.run()
def manage_loop_status():
if not loop_runner.isRunning:
Log.i(TAG, "EventLoop is stopped, Rerunning EventLoop...")
Event.emmit('loop-rerun')
def onRequest(data):
Log.d(TAG, f"New request - {data}")
loop_runner.loop.query_processes.put(renderRIDUProcess(parsed_data=Parser.parse(data)))
manage_loop_status()
def onSubscription(data):
Log.d(TAG, f"New subscription request - {data}")
# TODO: Adding subscription plan
loop_runner.loop.subscriptions.put()
def onPipeline(data):
Log.d(TAG, f"New data pipeline request - {data}")
loop_runner.loop.query_processes.put(DataPipelineProcess(data))
manage_loop_status()
def onProviderRequest(data):
Log.d(TAG, f"New provider request - {data}")
loop_runner.loop.query_processes.put(renderProviderProcess(data))
manage_loop_status()
def onCollectionChange(collection: Collection):
Log.i(TAG, "Event -> Collection Changed")
for proc in renderProcess(collection):
loop_runner.loop.system_process.put(proc)
for proc in renderProcess(Provider.meta_collection):
loop_runner.loop.system_process.put(proc)
manage_loop_status()
Event.on('request', onRequest)
Event.on('col-change', onCollectionChange)
Event.on('req_sub', onSubscription)
Event.on('req_pipe', onPipeline)
Event.on('req_provider', onProviderRequest)
listenForConnection()