AICertification/app/flask-postgres/app/security.py
2026-04-03 09:43:57 +02:00

124 lines
2.9 KiB
Python

from functools import wraps
from flask import session, redirect, url_for, request, abort
from db import get_connection
def get_current_user_mandant_level():
user_id = session.get("user_id")
if not user_id:
return None
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT m.level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return row[0]
def user_is_admin():
user_id = session.get("user_id")
if not user_id:
return False
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT 1
FROM app_user u
JOIN user_group ug ON ug.user_id = u.id
JOIN app_group g ON g.id = ug.group_id
WHERE u.id = %s
AND ug.mandant_id = 1
AND g.mandant_id = 1
AND g.group_name = 'Administratoren'
LIMIT 1
""", (user_id,))
result = cur.fetchone()
cur.close()
conn.close()
return result is not None
def get_current_user():
country = session.get("country", "DE") # Default DE
return {
"user_id": session.get("user_id"),
"user_name": session.get("user_name"),
"user_email": session.get("user_email"),
"mandant_id": session.get("mandant_id"),
"is_logged_in": bool(session.get("user_id")),
"is_admin": user_is_admin() if session.get("user_id") else False,
"is_user_admin": user_is_user_admin() if session.get("user_id") else False,
"country": country,
}
def admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
if not user_is_admin():
abort(403)
return view_func(*args, **kwargs)
return wrapper
def user_is_user_admin():
user_id = session.get("user_id")
current_mandant_id = session.get("mandant_id")
if not user_id or not current_mandant_id:
return False
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT 1
FROM user_group ug
JOIN app_group g ON g.id = ug.group_id
WHERE ug.user_id = %s
AND ug.mandant_id = %s
AND g.mandant_id = %s
AND g.group_name = 'Useradministration'
LIMIT 1
""", (user_id, current_mandant_id, current_mandant_id))
result = cur.fetchone()
cur.close()
conn.close()
return result is not None
def user_admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
if not user_is_user_admin():
abort(403)
return view_func(*args, **kwargs)
return wrapper