AICertification/app/flask-postgres/app/security.py
2026-04-02 18:20:38 +02:00

79 lines
1.7 KiB
Python

from functools import wraps
from flask import session, redirect, url_for, request, abort
from db import get_connection
def get_current_user_mandant_level():
user_id = session.get("user_id")
if not user_id:
return None
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT m.level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return row[0]
def user_is_admin():
user_id = session.get("user_id")
if not user_id:
return False
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT 1
FROM app_user u
JOIN user_group ug ON ug.user_id = u.id
JOIN app_group g ON g.id = ug.group_id
WHERE u.id = %s
AND ug.mandant_id = 1
AND g.mandant_id = 1
AND g.group_name = 'Administratoren'
LIMIT 1
""", (user_id,))
result = cur.fetchone()
cur.close()
conn.close()
return result is not None
def get_current_user():
return {
"user_id": session.get("user_id"),
"user_name": session.get("user_name"),
"user_email": session.get("user_email"),
"is_logged_in": bool(session.get("user_id")),
"is_admin": user_is_admin() if session.get("user_id") else False,
}
def admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
if not user_is_admin():
abort(403)
return view_func(*args, **kwargs)
return wrapper