-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecure_app.py
More file actions
69 lines (56 loc) · 2.04 KB
/
secure_app.py
File metadata and controls
69 lines (56 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from flask import Flask, request, render_template_string, session
import psycopg2
import psycopg2.extras
# This app is the secure version
app = Flask(__name__)
app.secret_key = "something-secret" # Needed for session cookies
# DB connection with SSL + secure role
def get_db_connection():
# Using a limited privilege role
return psycopg2.connect(
dbname="sql_lab",
user="app_user",
password="app_user_pass",
host="localhost",
options="-c app.current_tenant={}".format(session.get("tenant_id", 0))
)
@app.route("/", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
# ✔ FIXED: Parameterized query — prevents SQL injection entirely
query = """
SELECT id, username, tenant_id
FROM users
WHERE username = %s AND password = %s
"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute(query, (username, password))
result = cur.fetchone()
if result:
# Save tenant ID in session
session["tenant_id"] = result["tenant_id"]
return f"Logged in (securely) as {result['username']} (Tenant {result['tenant_id']})"
return "Invalid credentials"
return render_template_string("""
<form method="POST">
<input name="username" placeholder="username"><br>
<input name="password" placeholder="password"><br>
<button>Login</button>
</form>
""")
# Protected example route
@app.route("/profile")
def profile():
if "tenant_id" not in session:
return "Not logged in"
conn = get_db_connection()
cur = conn.cursor()
# RLS enforces tenant restrictions automatically
cur.execute("SELECT id, username FROM users")
rows = cur.fetchall()
return f"Rows visible to your tenant: {rows}"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)