refactoring python

This commit is contained in:
Bkolb 2026-04-02 13:09:29 +02:00
parent 341a6e61ee
commit ac780a14b5
5 changed files with 163 additions and 140 deletions

View File

@ -1,11 +1,8 @@
import logging import logging
import os import os
from datetime import datetime from datetime import datetime
from functools import wraps
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import psycopg2
from flask import ( from flask import (
Flask, Flask,
redirect, redirect,
@ -18,16 +15,22 @@ from flask import (
) )
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
from config import Config
from db import get_connection, fetchone_dict, fetchall_dict
from auth import login_required
from permissions import (
admin_required,
get_current_user,
get_current_user_mandant_level,
is_video_allowed_for_level,
)
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "change-this-secret-key") app.config.from_object(Config)
app.secret_key = app.config["SECRET_KEY"]
DB_HOST = os.getenv("DB_HOST", "db")
DB_NAME = os.getenv("DB_NAME", "CertDB")
DB_USER = os.getenv("DB_USER", "CertUser")
DB_PASSWORD = os.getenv("DB_PASSWORD", "CertPWD")
DB_PORT = os.getenv("DB_PORT", "5432")
LOG_DIR = os.getenv("LOG_DIR", "/logs") LOG_DIR = app.config["LOG_DIR"]
os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True)
file_handler = RotatingFileHandler( file_handler = RotatingFileHandler(
@ -44,14 +47,6 @@ app.logger.setLevel(logging.INFO)
app.logger.addHandler(file_handler) app.logger.addHandler(file_handler)
def get_connection():
return psycopg2.connect(
host=DB_HOST,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
port=DB_PORT,
)
def ensure_base_tables(): def ensure_base_tables():
@ -211,108 +206,7 @@ def register_visit(route_name: str) -> int:
return count return count
def get_current_user():
return {
"user_id": session.get("user_id"),
"user_name": session.get("user_name"),
"user_email": session.get("user_email"),
"is_logged_in": bool(session.get("user_id")),
}
def fetchone_dict(cur):
row = cur.fetchone()
if row is None:
return None
columns = [desc[0] for desc in cur.description]
return dict(zip(columns, row))
def fetchall_dict(cur):
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in rows]
def user_is_admin():
user_id = session.get("user_id")
if not user_id:
return False
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT 1
FROM app_user u
JOIN user_group ug ON ug.user_id = u.id
JOIN app_group g ON g.id = ug.group_id
WHERE u.id = %s
AND ug.mandant_id = 1
AND g.mandant_id = 1
AND g.group_name = 'Administratoren'
LIMIT 1
""", (user_id,))
result = cur.fetchone()
cur.close()
conn.close()
return result is not None
def get_current_user():
return {
"user_id": session.get("user_id"),
"user_name": session.get("user_name"),
"user_email": session.get("user_email"),
"is_logged_in": bool(session.get("user_id")),
"is_admin": user_is_admin() if session.get("user_id") else False,
}
def get_current_user_mandant_level():
user_id = session.get("user_id")
if not user_id:
return None
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT m.level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return row[0]
def admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
if not user_is_admin():
abort(403)
return view_func(*args, **kwargs)
return wrapper
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
return view_func(*args, **kwargs)
return wrapper
def render_page(active_page: str, title: str): def render_page(active_page: str, title: str):
@ -435,28 +329,8 @@ def health():
@login_required @login_required
def protected_videos(filename): def protected_videos(filename):
mandant_level = get_current_user_mandant_level() mandant_level = get_current_user_mandant_level()
if mandant_level is None:
abort(403)
basename = os.path.basename(filename) if not is_video_allowed_for_level(filename, mandant_level):
first_char = basename[:1].upper()
# Level 0 und 1: alles erlaubt
if mandant_level in (0, 1):
allowed = True
# Level 2: nur A und B
elif mandant_level == 2:
allowed = first_char in ("A", "B")
# Level 3: nur A
elif mandant_level == 3:
allowed = first_char == "A"
else:
allowed = False
if not allowed:
abort(403) abort(403)
return send_from_directory("/app/images/videos", filename) return send_from_directory("/app/images/videos", filename)

View File

@ -0,0 +1,11 @@
from functools import wraps
from flask import session, redirect, url_for, request
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
return view_func(*args, **kwargs)
return wrapper

View File

@ -0,0 +1,13 @@
import os
class Config:
SECRET_KEY = os.getenv("SECRET_KEY", "change-this-secret-key")
DB_HOST = os.getenv("DB_HOST", "db")
DB_NAME = os.getenv("DB_NAME", "CertDB")
DB_USER = os.getenv("DB_USER", "CertUser")
DB_PASSWORD = os.getenv("DB_PASSWORD", "CertPWD")
DB_PORT = os.getenv("DB_PORT", "5432")
LOG_DIR = os.getenv("LOG_DIR", "/logs")

View File

@ -0,0 +1,27 @@
import psycopg2
from flask import current_app
def get_connection():
return psycopg2.connect(
host=current_app.config["DB_HOST"],
dbname=current_app.config["DB_NAME"],
user=current_app.config["DB_USER"],
password=current_app.config["DB_PASSWORD"],
port=current_app.config["DB_PORT"],
)
def fetchone_dict(cur):
row = cur.fetchone()
if row is None:
return None
columns = [desc[0] for desc in cur.description]
return dict(zip(columns, row))
def fetchall_dict(cur):
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in rows]

View File

@ -0,0 +1,98 @@
import os
from functools import wraps
from flask import session, redirect, url_for, request, abort
from db import get_connection
def get_current_user_mandant_level():
user_id = session.get("user_id")
if not user_id:
return None
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT m.level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (user_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return None
return row[0]
def is_video_allowed_for_level(filename: str, mandant_level: int | None) -> bool:
if mandant_level is None:
return False
basename = os.path.basename(filename)
first_char = basename[:1].upper()
if mandant_level in (0, 1):
return True
if mandant_level == 2:
return first_char in ("A", "B")
if mandant_level == 3:
return first_char == "A"
return False
def user_is_admin():
user_id = session.get("user_id")
if not user_id:
return False
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT 1
FROM app_user u
JOIN user_group ug ON ug.user_id = u.id
JOIN app_group g ON g.id = ug.group_id
WHERE u.id = %s
AND ug.mandant_id = 1
AND g.mandant_id = 1
AND g.group_name = 'Administratoren'
LIMIT 1
""", (user_id,))
result = cur.fetchone()
cur.close()
conn.close()
return result is not None
def get_current_user():
return {
"user_id": session.get("user_id"),
"user_name": session.get("user_name"),
"user_email": session.get("user_email"),
"is_logged_in": bool(session.get("user_id")),
"is_admin": user_is_admin() if session.get("user_id") else False,
}
def admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("login", next=request.path))
if not user_is_admin():
abort(403)
return view_func(*args, **kwargs)
return wrapper