forked from HIP-Labs/HIP-Subnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
frontend.py
136 lines (113 loc) · 4.01 KB
/
frontend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse, JSONResponse
from sqlalchemy.exc import NoResultFound
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from hip.miner.db import Option, Task, get_db
app = FastAPI()
class Answer(BaseModel):
answer: str = Field(..., title="The answer to the question")
id: str = Field(..., title="The id of the question")
class TaskDTO(BaseModel):
id: str
label: str
type: str
options: List[str]
value: str
image: str
expiry: int
answer: str
class Config:
orm_mode = True
allow_population_by_field_name = True
@app.get("/api/tasks", response_model=List[TaskDTO])
async def get_tasks(db: Session = Depends(get_db)):
current_time = int(time.time())
# Remove expired tasks with grace period of 5 seconds
expired_tasks = db.query(Task).filter(Task.expiry < current_time - 5).all()
for task in expired_tasks:
db.delete(task)
db.commit()
# Get non-expired tasks with empty answer
tasks = db.query(Task).filter(Task.expiry >= current_time, Task.answer == "").all()
# Convert tasks to DTOs
task_dtos = []
for task in tasks:
options = db.query(Option).filter(Option.task_id == task.id).all()
option_list = [str(option.option) for option in options]
task_dto = TaskDTO(
id=str(task.id),
label=str(task.label),
type=str(task.type),
options=option_list,
value=str(task.value),
image=str(task.image),
expiry=int(task.expiry), # type: ignore
answer=str(task.answer),
)
task_dtos.append(task_dto)
return task_dtos
@app.get("/api/task/{task_id}", response_model=TaskDTO)
async def get_task(task_id: int, db: Session = Depends(get_db)):
try:
task = db.query(Task).filter(Task.id == task_id).one()
except NoResultFound:
raise HTTPException(status_code=404, detail="Task not found")
options = db.query(Option).filter(Option.task_id == task.id).all()
option_list = [option.option for option in options]
task_dto = TaskDTO(
id=task.id, # type: ignore
label=task.label, # type: ignore
type=task.type, # type: ignore
options=option_list, # type: ignore
value=task.value, # type: ignore
image=task.image, # type: ignore
expiry=task.expiry, # type: ignore
answer=task.answer, # type: ignore
)
return task_dto
@app.post("/api/answer")
async def post_answer(answer: Answer, db: Session = Depends(get_db)):
# remove all expired tasks
db.query(Task).filter(Task.expiry < int(time.time())).delete()
db.commit()
task = db.query(Task).filter(Task.id == answer.id).first()
if task:
task.answer = answer.answer # type: ignore
db.commit() # Explicitly commit the session
else:
print(f"Task with id {answer.id} not found")
return JSONResponse(
status_code=200, content={"status": "success", "message": "Answer updated"}
)
@app.post("/api/task")
async def post_task(task: TaskDTO, db: Session = Depends(get_db)):
dbTask = Task(
id=task.id,
label=task.label,
type=task.type,
value=task.value,
image=task.image,
expiry=task.expiry,
answer="",
)
db.add(dbTask)
db.commit()
db.refresh(dbTask)
# Create the associated options
for option_text in task.options:
option = Option(task_id=task.id, option=option_text)
db.add(option)
db.commit()
return JSONResponse(
status_code=200, content={"status": "success", "message": "Task added"}
)
@app.get("/")
async def root():
# redirect to the index.html file
return RedirectResponse("/index.html")
# mount the public directory to the root path after the apis have been defined
app.mount("/", StaticFiles(directory="public"), name="public")