diff --git a/app/flask-postgres/app/app.py b/app/flask-postgres/app/app.py index d0026b4..360a394 100644 --- a/app/flask-postgres/app/app.py +++ b/app/flask-postgres/app/app.py @@ -1,11 +1,8 @@ import logging import os from datetime import datetime -from functools import wraps from logging.handlers import RotatingFileHandler -import psycopg2 - from flask import ( Flask, redirect, @@ -18,16 +15,22 @@ from flask import ( ) from werkzeug.security import check_password_hash, generate_password_hash +from config import Config +from db import get_connection, fetchone_dict, fetchall_dict +from auth import login_required +from permissions import ( + admin_required, + get_current_user, + get_current_user_mandant_level, + is_video_allowed_for_level, +) + app = Flask(__name__) -app.secret_key = os.getenv("SECRET_KEY", "change-this-secret-key") +app.config.from_object(Config) +app.secret_key = app.config["SECRET_KEY"] -DB_HOST = os.getenv("DB_HOST", "db") -DB_NAME = os.getenv("DB_NAME", "CertDB") -DB_USER = os.getenv("DB_USER", "CertUser") -DB_PASSWORD = os.getenv("DB_PASSWORD", "CertPWD") -DB_PORT = os.getenv("DB_PORT", "5432") -LOG_DIR = os.getenv("LOG_DIR", "/logs") +LOG_DIR = app.config["LOG_DIR"] os.makedirs(LOG_DIR, exist_ok=True) file_handler = RotatingFileHandler( @@ -44,14 +47,6 @@ app.logger.setLevel(logging.INFO) app.logger.addHandler(file_handler) -def get_connection(): - return psycopg2.connect( - host=DB_HOST, - dbname=DB_NAME, - user=DB_USER, - password=DB_PASSWORD, - port=DB_PORT, - ) def ensure_base_tables(): @@ -211,108 +206,7 @@ def register_visit(route_name: str) -> int: return count -def get_current_user(): - return { - "user_id": session.get("user_id"), - "user_name": session.get("user_name"), - "user_email": session.get("user_email"), - "is_logged_in": bool(session.get("user_id")), - } -def fetchone_dict(cur): - row = cur.fetchone() - if row is None: - return None - columns = [desc[0] for desc in cur.description] - return dict(zip(columns, row)) - - -def fetchall_dict(cur): - rows = cur.fetchall() - columns = [desc[0] for desc in cur.description] - return [dict(zip(columns, row)) for row in rows] - - -def user_is_admin(): - user_id = session.get("user_id") - if not user_id: - return False - - conn = get_connection() - cur = conn.cursor() - - cur.execute(""" - SELECT 1 - FROM app_user u - JOIN user_group ug ON ug.user_id = u.id - JOIN app_group g ON g.id = ug.group_id - WHERE u.id = %s - AND ug.mandant_id = 1 - AND g.mandant_id = 1 - AND g.group_name = 'Administratoren' - LIMIT 1 - """, (user_id,)) - - result = cur.fetchone() - - cur.close() - conn.close() - - return result is not None - - -def get_current_user(): - return { - "user_id": session.get("user_id"), - "user_name": session.get("user_name"), - "user_email": session.get("user_email"), - "is_logged_in": bool(session.get("user_id")), - "is_admin": user_is_admin() if session.get("user_id") else False, - } - -def get_current_user_mandant_level(): - user_id = session.get("user_id") - if not user_id: - return None - - conn = get_connection() - cur = conn.cursor() - - cur.execute(""" - SELECT m.level - FROM app_user u - JOIN mandant m ON m.id = u.mandant_id - WHERE u.id = %s - """, (user_id,)) - - row = cur.fetchone() - - cur.close() - conn.close() - - if row is None: - return None - - return row[0] - -def admin_required(view_func): - @wraps(view_func) - def wrapper(*args, **kwargs): - if not session.get("user_id"): - return redirect(url_for("login", next=request.path)) - if not user_is_admin(): - abort(403) - return view_func(*args, **kwargs) - return wrapper - - -def login_required(view_func): - @wraps(view_func) - def wrapper(*args, **kwargs): - if not session.get("user_id"): - return redirect(url_for("login", next=request.path)) - return view_func(*args, **kwargs) - return wrapper def render_page(active_page: str, title: str): @@ -435,28 +329,8 @@ def health(): @login_required def protected_videos(filename): mandant_level = get_current_user_mandant_level() - if mandant_level is None: - abort(403) - basename = os.path.basename(filename) - first_char = basename[:1].upper() - - # Level 0 und 1: alles erlaubt - if mandant_level in (0, 1): - allowed = True - - # Level 2: nur A und B - elif mandant_level == 2: - allowed = first_char in ("A", "B") - - # Level 3: nur A - elif mandant_level == 3: - allowed = first_char == "A" - - else: - allowed = False - - if not allowed: + if not is_video_allowed_for_level(filename, mandant_level): abort(403) return send_from_directory("/app/images/videos", filename) diff --git a/app/flask-postgres/app/auth.py b/app/flask-postgres/app/auth.py new file mode 100644 index 0000000..de33463 --- /dev/null +++ b/app/flask-postgres/app/auth.py @@ -0,0 +1,11 @@ +from functools import wraps +from flask import session, redirect, url_for, request + + +def login_required(view_func): + @wraps(view_func) + def wrapper(*args, **kwargs): + if not session.get("user_id"): + return redirect(url_for("login", next=request.path)) + return view_func(*args, **kwargs) + return wrapper diff --git a/app/flask-postgres/app/config.py b/app/flask-postgres/app/config.py new file mode 100644 index 0000000..1b1b256 --- /dev/null +++ b/app/flask-postgres/app/config.py @@ -0,0 +1,13 @@ +import os + + +class Config: + SECRET_KEY = os.getenv("SECRET_KEY", "change-this-secret-key") + + DB_HOST = os.getenv("DB_HOST", "db") + DB_NAME = os.getenv("DB_NAME", "CertDB") + DB_USER = os.getenv("DB_USER", "CertUser") + DB_PASSWORD = os.getenv("DB_PASSWORD", "CertPWD") + DB_PORT = os.getenv("DB_PORT", "5432") + + LOG_DIR = os.getenv("LOG_DIR", "/logs") \ No newline at end of file diff --git a/app/flask-postgres/app/db.py b/app/flask-postgres/app/db.py new file mode 100644 index 0000000..5e7bc06 --- /dev/null +++ b/app/flask-postgres/app/db.py @@ -0,0 +1,27 @@ +import psycopg2 +from flask import current_app + + +def get_connection(): + return psycopg2.connect( + host=current_app.config["DB_HOST"], + dbname=current_app.config["DB_NAME"], + user=current_app.config["DB_USER"], + password=current_app.config["DB_PASSWORD"], + port=current_app.config["DB_PORT"], + ) + + +def fetchone_dict(cur): + row = cur.fetchone() + if row is None: + return None + + columns = [desc[0] for desc in cur.description] + return dict(zip(columns, row)) + + +def fetchall_dict(cur): + rows = cur.fetchall() + columns = [desc[0] for desc in cur.description] + return [dict(zip(columns, row)) for row in rows] \ No newline at end of file diff --git a/app/flask-postgres/app/permissions.py b/app/flask-postgres/app/permissions.py new file mode 100644 index 0000000..586d36f --- /dev/null +++ b/app/flask-postgres/app/permissions.py @@ -0,0 +1,98 @@ +import os +from functools import wraps + +from flask import session, redirect, url_for, request, abort + +from db import get_connection + + +def get_current_user_mandant_level(): + user_id = session.get("user_id") + if not user_id: + return None + + conn = get_connection() + cur = conn.cursor() + + cur.execute(""" + SELECT m.level + FROM app_user u + JOIN mandant m ON m.id = u.mandant_id + WHERE u.id = %s + """, (user_id,)) + + row = cur.fetchone() + + cur.close() + conn.close() + + if row is None: + return None + + return row[0] + + +def is_video_allowed_for_level(filename: str, mandant_level: int | None) -> bool: + if mandant_level is None: + return False + + basename = os.path.basename(filename) + first_char = basename[:1].upper() + + if mandant_level in (0, 1): + return True + if mandant_level == 2: + return first_char in ("A", "B") + if mandant_level == 3: + return first_char == "A" + + return False + + +def user_is_admin(): + user_id = session.get("user_id") + if not user_id: + return False + + conn = get_connection() + cur = conn.cursor() + + cur.execute(""" + SELECT 1 + FROM app_user u + JOIN user_group ug ON ug.user_id = u.id + JOIN app_group g ON g.id = ug.group_id + WHERE u.id = %s + AND ug.mandant_id = 1 + AND g.mandant_id = 1 + AND g.group_name = 'Administratoren' + LIMIT 1 + """, (user_id,)) + + result = cur.fetchone() + + cur.close() + conn.close() + + return result is not None + + +def get_current_user(): + return { + "user_id": session.get("user_id"), + "user_name": session.get("user_name"), + "user_email": session.get("user_email"), + "is_logged_in": bool(session.get("user_id")), + "is_admin": user_is_admin() if session.get("user_id") else False, + } + + +def admin_required(view_func): + @wraps(view_func) + def wrapper(*args, **kwargs): + if not session.get("user_id"): + return redirect(url_for("login", next=request.path)) + if not user_is_admin(): + abort(403) + return view_func(*args, **kwargs) + return wrapper \ No newline at end of file