AICertification/app/flask-postgres/app/app.py

1355 lines
39 KiB
Python

import logging
import os
import re
import shutil
from datetime import datetime
from logging.handlers import RotatingFileHandler
from flask import (
Flask,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
abort,
)
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.utils import secure_filename
from config import Config, COUNTRY_VAT_LABELS
from db import get_connection, fetchone_dict, fetchall_dict
from auth import login_required
from permissions import is_video_allowed_for_level, is_course_allowed_for_level, get_allowed_checklist_levels_for_mandant_level
from security import (
admin_required,
get_current_user,
get_current_user_mandant_level,
user_admin_required,
contentmanager_required
)
from logging_config import setup_logging
app = Flask(__name__)
app.config.from_object(Config)
app.secret_key = app.config["SECRET_KEY"]
LOG_DIR = app.config["LOG_DIR"]
if not app.config.get("TESTING"):
os.makedirs(LOG_DIR, exist_ok=True)
# file_handler = RotatingFileHandler(
# os.path.join(LOG_DIR, "flask-app.log"),
# maxBytes=5 * 1024 * 1024,
# backupCount=5
# )
# file_handler.setLevel(logging.INFO)
# file_handler.setFormatter(
# logging.Formatter("%(asctime)s %(levelname)s %(message)s")
# )
# app.logger.setLevel(logging.INFO)
# app.logger.addHandler(file_handler)
def format_level(level):
mapping = {
0: "0 - Admin",
1: "1 - Gold",
2: "2 - Silber",
3: "3 - Bronze",
}
return mapping.get(level, f"{level} - Unbekannt")
def ensure_base_tables():
conn = get_connection()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS visits (
id SERIAL PRIMARY KEY,
route_name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS mandant (
id SERIAL PRIMARY KEY,
kuerzel VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
kontakt_email VARCHAR(255),
level INTEGER NOT NULL DEFAULT 0
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS app_user (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
mandant_id INTEGER NOT NULL,
password_hash VARCHAR(255) NOT NULL,
last_login TIMESTAMP NULL,
status INTEGER NOT NULL DEFAULT 0,
CONSTRAINT fk_app_user_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE RESTRICT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS app_group (
id SERIAL PRIMARY KEY,
mandant_id INTEGER NOT NULL,
group_name VARCHAR(255) NOT NULL,
CONSTRAINT fk_app_group_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE CASCADE
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS user_group (
user_id INTEGER NOT NULL,
group_id INTEGER NOT NULL,
mandant_id INTEGER NOT NULL,
PRIMARY KEY (user_id, group_id),
CONSTRAINT fk_user_group_user
FOREIGN KEY (user_id)
REFERENCES app_user(id)
ON DELETE CASCADE,
CONSTRAINT fk_user_group_group
FOREIGN KEY (group_id)
REFERENCES app_group(id)
ON DELETE CASCADE,
CONSTRAINT fk_user_group_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE CASCADE
)
""")
conn.commit()
cur.close()
conn.close()
def ensure_default_admin():
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT id FROM mandant WHERE kuerzel = %s", ("KOLB",))
row = cur.fetchone()
if row:
mandant_id = row[0]
else:
cur.execute("""
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
VALUES (%s, %s, %s, %s)
RETURNING id
""", ("KOLB", "Kolb Compliance", "info@kolb.cc", 0))
mandant_id = cur.fetchone()[0]
cur.execute("""
SELECT id FROM app_group
WHERE mandant_id = %s AND group_name = %s
""", (1, "Administratoren"))
group_row = cur.fetchone()
if group_row:
admin_group_id = group_row[0]
else:
cur.execute("""
INSERT INTO app_group (mandant_id, group_name)
VALUES (%s, %s)
RETURNING id
""", (1, "Administratoren"))
admin_group_id = cur.fetchone()[0]
cur.execute("SELECT id FROM app_user WHERE email = %s", ("admin@kolb.cc",))
user_row = cur.fetchone()
if user_row:
admin_user_id = user_row[0]
else:
password_hash = generate_password_hash("topsecret")
cur.execute("""
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", ("admin@kolb.cc", "Admin", 1, password_hash, 1))
admin_user_id = cur.fetchone()[0]
cur.execute("""
SELECT 1 FROM user_group
WHERE user_id = %s AND group_id = %s AND mandant_id = %s
""", (admin_user_id, admin_group_id, 1))
if cur.fetchone() is None:
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (admin_user_id, admin_group_id, 1))
conn.commit()
cur.close()
conn.close()
def register_visit(route_name: str) -> int:
conn = get_connection()
cur = conn.cursor()
cur.execute(
"INSERT INTO visits (route_name) VALUES (%s)",
(route_name,)
)
conn.commit()
cur.execute("SELECT COUNT(*) FROM visits")
count = cur.fetchone()[0]
cur.close()
conn.close()
return count
def get_available_courses_for_user():
mandant_level = session.get("mandant_level")
if mandant_level is None:
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT level
FROM mandant
WHERE id = %s
""", (current_mandant_id,))
row = cur.fetchone()
cur.close()
conn.close()
mandant_level = row[0] if row else 0
session["mandant_level"] = mandant_level
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, code, title, description, video_file, sort_order
FROM course
WHERE is_active = TRUE
ORDER BY sort_order, code
""")
all_courses = fetchall_dict(cur)
cur.close()
conn.close()
return [
course for course in all_courses
if is_course_allowed_for_level(course["code"], mandant_level)
]
def render_page(active_page: str, title: str):
count = register_visit(active_page)
return render_template(
"index.html",
active_page=active_page,
page_title=title,
visit_count=count,
**get_current_user()
)
@app.before_request
def startup_checks():
ensure_base_tables()
ensure_default_admin()
@app.route("/")
@app.route("/home")
def home():
return render_page("home", "Home")
@app.route("/preise")
@login_required
def preise():
return render_template(
"preise2.html",
page_title="Preise",
active_page="preise",
vat_label=COUNTRY_VAT_LABELS.get(session.get("country", "DE")),
**get_current_user()
)
@app.route("/allgemein")
def allgemein():
return render_template(
"allgemein.html",
page_title="Info",
active_page="allgemein",
**get_current_user()
)
# @app.route("/allgemein")
# @login_required
# def allgemein():
# return render_page("allgemein", "Allgemein")
@app.route("/login", methods=["GET", "POST"])
def login():
error_message = ""
next_url = request.args.get("next") or request.form.get("next") or url_for("preise")
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, email, name, mandant_id, password_hash, status
FROM app_user
WHERE lower(email) = %s
""", (email,))
row = cur.fetchone()
if not row:
error_message = "Benutzer nicht gefunden."
else:
user_id, user_email, user_name, mandant_id, password_hash, status = row
if status == 0:
error_message = "Benutzer ist noch nicht aktiviert."
elif status == 2:
error_message = "Benutzer ist gesperrt."
elif status == 3:
error_message = "Benutzer ist deaktiviert."
elif not check_password_hash(password_hash, password):
error_message = "Passwort ist falsch."
else:
session["user_id"] = user_id
session["user_email"] = user_email
session["user_name"] = user_name
session["mandant_id"] = mandant_id
cur.execute("""
UPDATE app_user
SET last_login = %s
WHERE id = %s
""", (datetime.utcnow(), user_id))
conn.commit()
cur.close()
conn.close()
app.logger.info("Login erfolgreich: %s", user_email)
return redirect(next_url)
cur.close()
conn.close()
return render_template(
"login.html",
page_title="Login",
active_page="login",
error_message=error_message,
next_url=next_url,
**get_current_user()
)
@app.route("/logout")
def logout():
user_email = session.get("user_email", "unknown")
session.clear()
app.logger.info("Logout: %s", user_email)
return redirect(url_for("home"))
@app.route("/health")
def health():
try:
ensure_base_tables()
return "OK\n", 200
except Exception as exc:
app.logger.exception("Healthcheck fehlgeschlagen: %s", exc)
return f"DB Fehler: {exc}\n", 500
@app.route("/videos/<path:filename>")
@login_required
def protected_videos(filename):
mandant_level = get_current_user_mandant_level()
if not is_video_allowed_for_level(filename, mandant_level):
abort(403)
return send_from_directory("/app/images/videos", filename)
@app.route("/images/<path:filename>")
def serve_image(filename):
if filename.startswith("videos/"):
abort(403)
return send_from_directory("/app/images", filename)
@app.route("/styles/<path:filename>")
def serve_style(filename):
return send_from_directory("/app/styles", filename)
@app.route("/files/<path:filename>")
def serve_file(filename):
return send_from_directory("/app/files", filename)
#temporär
@app.route("/pwd/<password>/<key>")
def generate_pwd_hash(password, key):
if key != "geheim":
return "Forbidden", 403
return f"<pre>{generate_password_hash(password)}</pre>"
@app.route("/profil")
@login_required
def profil():
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
u.id,
u.email,
u.name,
u.mandant_id,
u.last_login,
u.status,
m.name AS mandant_name,
m.kuerzel AS mandant_kuerzel,
m.kontakt_email AS mandant_email,
m.level AS mandant_level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (session["user_id"],))
profile = fetchone_dict(cur)
cur.execute("""
SELECT g.group_name
FROM user_group ug
JOIN app_group g ON g.id = ug.group_id
WHERE ug.user_id = %s
AND ug.mandant_id = %s
ORDER BY g.group_name
""", (session["user_id"], session["mandant_id"]))
gruppen_rows = fetchall_dict(cur)
gruppen = [row["group_name"] for row in gruppen_rows]
cur.close()
conn.close()
profile["mandant_level_label"] = format_level(profile["mandant_level"])
return render_template(
"profil.html",
page_title="Profil",
active_page="profil",
profile=profile,
gruppen=gruppen,
**get_current_user()
)
@app.route("/admin/mandanten", methods=["GET", "POST"])
@admin_required
def admin_mandanten():
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
action = request.form.get("action")
if action == "create":
kuerzel = request.form.get("kuerzel", "").strip()
name = request.form.get("name", "").strip()
kontakt_email = request.form.get("kontakt_email", "").strip()
level = request.form.get("level", "0").strip()
admin_name = request.form.get("admin_name", "").strip()
admin_email = request.form.get("admin_email", "").strip().lower()
admin_password = request.form.get("admin_password", "")
admin_password2 = request.form.get("admin_password2", "")
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
error_message = None
if not kuerzel:
error_message = "Kürzel ist ein Pflichtfeld."
elif not name:
error_message = "Name ist ein Pflichtfeld."
elif not kontakt_email:
error_message = "Kontakt E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, kontakt_email):
error_message = "Bitte eine gültige Kontakt-E-Mail eingeben."
elif not admin_name:
error_message = "Admin User Name ist ein Pflichtfeld."
elif not admin_email:
error_message = "Admin User E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, admin_email):
error_message = "Bitte eine gültige Admin E-Mail-Adresse eingeben."
elif not admin_password:
error_message = "Admin Passwort ist ein Pflichtfeld."
elif not admin_password2:
error_message = "Bitte Admin Passwort bestätigen."
elif admin_password != admin_password2:
error_message = "Die beiden Admin-Passwörter stimmen nicht überein."
elif len(admin_password) < 8:
error_message = "Das Admin Passwort muss mindestens 8 Zeichen lang sein."
if error_message:
cur.execute("""
SELECT id, kuerzel, name, kontakt_email, level
FROM mandant
ORDER BY id
""")
mandanten = fetchall_dict(cur)
cur.close()
conn.close()
for mandant in mandanten:
mandant["level_label"] = format_level(mandant["level"])
return render_template(
"admin_mandanten.html",
page_title="Admin - Mandanten",
active_page="admin",
mandanten=mandanten,
form_error=error_message,
form_values = {
"kuerzel": kuerzel,
"name": name,
"kontakt_email": kontakt_email,
"level": level,
"admin_name": admin_name,
"admin_email": admin_email,
}
**get_current_user()
)
cur.execute("""
SELECT id
FROM app_user
WHERE lower(email) = %s
""", (admin_email,))
existing_admin_user = cur.fetchone()
if existing_admin_user:
error_message = "Ein Benutzer mit der Admin E-Mail existiert bereits."
cur.execute("""
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (kuerzel, name, kontakt_email, int(level or 0)))
new_mandant_id = cur.fetchone()[0]
# Standardgruppen für den neuen Mandanten
cur.execute("""
INSERT INTO app_group (mandant_id, group_name)
VALUES (%s, %s)
RETURNING id
""", (new_mandant_id, "Useradministration"))
useradmin_group_id = cur.fetchone()[0]
cur.execute("""
INSERT INTO app_group (mandant_id, group_name)
VALUES (%s, %s)
RETURNING id
""", (new_mandant_id, "Contentmanager"))
contentmanager_group_id = cur.fetchone()[0]
# erster Admin-User
admin_password_hash = generate_password_hash(admin_password)
cur.execute("""
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (admin_email, admin_name, new_mandant_id, admin_password_hash, 1))
new_admin_user_id = cur.fetchone()[0]
# User beiden Gruppen zuordnen
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (new_admin_user_id, useradmin_group_id, new_mandant_id))
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (new_admin_user_id, contentmanager_group_id, new_mandant_id))
conn.commit()
# Verzeichnis anlegen
mandant_dir = os.path.join(Config.FILES_DIR, str(new_mandant_id))
os.makedirs(mandant_dir, exist_ok=True)
elif action == "update":
mandant_id = request.form.get("id")
kuerzel = request.form.get("kuerzel", "").strip()
name = request.form.get("name", "").strip()
kontakt_email = request.form.get("kontakt_email", "").strip()
level = request.form.get("level", "0").strip()
cur.execute("""
UPDATE mandant
SET kuerzel = %s,
name = %s,
kontakt_email = %s,
level = %s
WHERE id = %s
""", (kuerzel, name, kontakt_email or None, int(level or 0), int(mandant_id)))
conn.commit()
elif action == "delete":
mandant_id = request.form.get("id")
mandant_id_int = int(mandant_id)
# Verzeichnis vor/nach dem Delete entfernen
mandant_dir = os.path.join(Config.FILES_DIR, str(mandant_id_int))
cur.execute("DELETE FROM mandant WHERE id = %s", (mandant_id_int,))
conn.commit()
if os.path.isdir(mandant_dir):
shutil.rmtree(mandant_dir, ignore_errors=True)
cur.close()
conn.close()
return redirect(url_for("admin_mandanten"))
cur.execute("""
SELECT id, kuerzel, name, kontakt_email, level
FROM mandant
ORDER BY id
""")
mandanten = fetchall_dict(cur)
cur.close()
conn.close()
return render_template(
"admin_mandanten.html",
page_title="Admin - Mandanten",
active_page="admin",
mandanten=mandanten,
**get_current_user()
)
@app.route("/useradmin/mandant")
@user_admin_required
def useradmin_mandant():
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
u.id,
u.email,
u.name,
u.mandant_id,
u.last_login,
u.status,
m.name AS mandant_name,
m.kontakt_email AS mandant_email,
m.level AS mandant_level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.mandant_id = %s
ORDER BY u.name, u.email
""", (current_mandant_id,))
users = fetchall_dict(cur)
cur.close()
conn.close()
for user in users:
user["mandant_level_label"] = format_level(user["mandant_level"])
return render_template(
"useradmin_mandant.html",
page_title="Useradministration",
active_page="useradmin",
users=users,
**get_current_user()
)
@app.errorhandler(403)
def forbidden(_error):
return render_template(
"403.html",
page_title="Kein Zugriff",
active_page="",
**get_current_user()
), 403
@app.route("/pwdchange", methods=["GET", "POST"])
@login_required
def pwdchange():
error_message = ""
success_message = ""
if request.method == "POST":
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
if not current_password or not new_password or not confirm_password:
error_message = "Bitte alle Felder ausfüllen."
elif new_password != confirm_password:
error_message = "Die neuen Passwörter stimmen nicht überein."
elif len(new_password) < 8:
error_message = "Das neue Passwort muss mindestens 8 Zeichen lang sein."
else:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT password_hash
FROM app_user
WHERE id = %s
""", (session["user_id"],))
row = cur.fetchone()
if row is None:
error_message = "Benutzer nicht gefunden."
else:
stored_hash = row[0]
if not check_password_hash(stored_hash, current_password):
error_message = "Das aktuelle Passwort ist falsch."
else:
new_hash = generate_password_hash(new_password)
cur.execute("""
UPDATE app_user
SET password_hash = %s
WHERE id = %s
""", (new_hash, session["user_id"]))
conn.commit()
success_message = "Passwort wurde erfolgreich geändert."
cur.close()
conn.close()
return render_template(
"pwdchange.html",
page_title="Passwort ändern",
active_page="profil",
error_message=error_message,
success_message=success_message,
**get_current_user()
)
@app.route("/useradmin/mandant/new", methods=["GET", "POST"])
@user_admin_required
def useradmin_user_new():
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, group_name
FROM app_group
WHERE mandant_id = %s
ORDER BY group_name
""", (current_mandant_id,))
gruppen = fetchall_dict(cur)
form_error = None
form_values = {
"email": "",
"name": "",
"status": "1",
"selected_groups": []
}
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
name = request.form.get("name", "").strip()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
status = request.form.get("status", "1").strip()
selected_groups = request.form.getlist("group_ids")
form_values = {
"email": email,
"name": name,
"status": status,
"selected_groups": selected_groups
}
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
if not email:
form_error = "E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, email):
form_error = "Bitte eine gültige E-Mail-Adresse eingeben."
elif not name:
form_error = "Name ist ein Pflichtfeld."
elif not password:
form_error = "Passwort ist ein Pflichtfeld."
elif not password2:
form_error = "Bitte Passwort bestätigen."
elif password != password2:
form_error = "Die beiden Passwörter stimmen nicht überein."
elif len(password) < 8:
form_error = "Das Passwort muss mindestens 8 Zeichen lang sein."
else:
cur.execute("""
SELECT id
FROM app_user
WHERE lower(email) = %s
""", (email,))
existing_user = cur.fetchone()
if existing_user:
form_error = "Ein Benutzer mit dieser E-Mail existiert bereits."
else:
password_hash = generate_password_hash(password)
cur.execute("""
INSERT INTO app_user (
email,
name,
mandant_id,
password_hash,
status
)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (
email,
name,
current_mandant_id,
password_hash,
int(status or 1)
))
new_user_id = cur.fetchone()[0]
if selected_groups:
selected_group_ids = [int(gid) for gid in selected_groups]
cur.execute("""
SELECT id
FROM app_group
WHERE mandant_id = %s
AND id = ANY(%s)
""", (current_mandant_id, selected_group_ids))
valid_groups = cur.fetchall()
for row in valid_groups:
group_id = row[0]
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (new_user_id, group_id, current_mandant_id))
conn.commit()
cur.close()
conn.close()
return redirect(url_for("useradmin_mandant"))
cur.close()
conn.close()
return render_template(
"useradmin_user_new.html",
page_title="Neuer User",
active_page="useradmin",
gruppen=gruppen,
form_error=form_error,
form_values=form_values,
**get_current_user()
)
@app.route("/set-country")
def set_country():
country = request.args.get("country", "DE").upper()
allowed = {"DE", "AT", "CH"}
if country in allowed:
session["country"] = country
next_url = request.args.get("next") or request.referrer or url_for("home")
return redirect(next_url)
@app.route("/useradmin/mandant/user/<int:user_id>", methods=["GET", "POST"])
@user_admin_required
def useradmin_user_edit(user_id):
import re
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, email, name, mandant_id, status
FROM app_user
WHERE id = %s
AND mandant_id = %s
""", (user_id, current_mandant_id))
user = fetchone_dict(cur)
if not user:
cur.close()
conn.close()
abort(404)
cur.execute("""
SELECT id, group_name
FROM app_group
WHERE mandant_id = %s
ORDER BY group_name
""", (current_mandant_id,))
gruppen = fetchall_dict(cur)
cur.execute("""
SELECT group_id
FROM user_group
WHERE user_id = %s
AND mandant_id = %s
""", (user_id, current_mandant_id))
assigned_rows = cur.fetchall()
assigned_group_ids = [str(row[0]) for row in assigned_rows]
form_error = None
success_message = None
form_values = {
"email": user["email"],
"name": user["name"],
"status": str(user["status"]),
"selected_groups": assigned_group_ids,
}
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
name = request.form.get("name", "").strip()
status = request.form.get("status", "1").strip()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
selected_groups = request.form.getlist("group_ids")
form_values = {
"email": email,
"name": name,
"status": status,
"selected_groups": selected_groups,
}
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
if not email:
form_error = "E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, email):
form_error = "Bitte eine gültige E-Mail-Adresse eingeben."
elif not name:
form_error = "Name ist ein Pflichtfeld."
else:
cur.execute("""
SELECT id
FROM app_user
WHERE lower(email) = %s
AND id <> %s
""", (email, user_id))
existing_user = cur.fetchone()
if existing_user:
form_error = "Ein anderer Benutzer mit dieser E-Mail existiert bereits."
if not form_error and (password or password2):
if not password:
form_error = "Bitte neues Passwort eingeben."
elif not password2:
form_error = "Bitte neues Passwort bestätigen."
elif password != password2:
form_error = "Die beiden Passwörter stimmen nicht überein."
elif len(password) < 8:
form_error = "Das Passwort muss mindestens 8 Zeichen lang sein."
if not form_error:
cur.execute("""
UPDATE app_user
SET email = %s,
name = %s,
status = %s
WHERE id = %s
AND mandant_id = %s
""", (email, name, int(status or 1), user_id, current_mandant_id))
if password:
password_hash = generate_password_hash(password)
cur.execute("""
UPDATE app_user
SET password_hash = %s
WHERE id = %s
AND mandant_id = %s
""", (password_hash, user_id, current_mandant_id))
cur.execute("""
DELETE FROM user_group
WHERE user_id = %s
AND mandant_id = %s
""", (user_id, current_mandant_id))
selected_group_ids = []
for gid in selected_groups:
try:
selected_group_ids.append(int(gid))
except ValueError:
pass
if selected_group_ids:
cur.execute("""
SELECT id
FROM app_group
WHERE mandant_id = %s
AND id = ANY(%s)
""", (current_mandant_id, selected_group_ids))
valid_groups = cur.fetchall()
for row in valid_groups:
group_id = row[0]
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (user_id, group_id, current_mandant_id))
conn.commit()
success_message = "Benutzer wurde erfolgreich aktualisiert."
cur.execute("""
SELECT group_id
FROM user_group
WHERE user_id = %s
AND mandant_id = %s
""", (user_id, current_mandant_id))
assigned_rows = cur.fetchall()
form_values["selected_groups"] = [str(row[0]) for row in assigned_rows]
cur.close()
conn.close()
return render_template(
"useradmin_user_edit.html",
page_title="User bearbeiten",
active_page="useradmin",
edit_user=user,
gruppen=gruppen,
form_values=form_values,
form_error=form_error,
success_message=success_message,
**get_current_user()
)
@app.route("/courses")
@login_required
def course_list():
courses = get_available_courses_for_user()
return render_template(
"course_list.html",
page_title="Kurse",
active_page="courses",
courses=courses,
**get_current_user()
)
@app.route("/course/<int:course_id>")
@login_required
def course_start(course_id):
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT *
FROM course
WHERE id = %s
""", (course_id,))
course = fetchone_dict(cur)
cur.close()
conn.close()
return render_template(
"course_video.html",
course=course,
**get_current_user()
)
@app.route("/course/<int:course_id>/page/<int:page_number>")
@login_required
def course_page(course_id, page_number):
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT *
FROM course_page
WHERE course_id = %s
AND page_number = %s
""", (course_id, page_number))
page = fetchone_dict(cur)
if not page:
abort(404)
# Fortschritt speichern
cur.execute("""
INSERT INTO user_course_progress (user_id, course_id, last_page)
VALUES (%s, %s, %s)
ON CONFLICT (user_id, course_id)
DO UPDATE SET last_page = EXCLUDED.last_page
""", (session["user_id"], course_id, page_number))
conn.commit()
cur.close()
conn.close()
return render_template(
page["template_name"],
course_id=course_id,
page_number=page_number,
**get_current_user()
)
@app.route("/dokumente")
@contentmanager_required
def dokumente():
mandant_id = session.get("mandant_id")
mandant_level = session.get("mandant_level")
allowed_levels = get_allowed_checklist_levels_for_mandant_level(mandant_level)
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
c.id,
c.level,
c.title,
c.short_description,
c.default_filename,
u.uploaded_at,
u.uploaded_by_user_id,
u.filesize,
u.stored_filename,
u.original_filename,
au.name AS uploaded_by_name
FROM certification_checklist c
LEFT JOIN mandant_checklist_upload u
ON u.checklist_item_id = c.id
AND u.mandant_id = %s
LEFT JOIN app_user au
ON au.id = u.uploaded_by_user_id
WHERE c.level = ANY(%s)
ORDER BY c.level, c.id
""", (mandant_id, allowed_levels))
items = fetchall_dict(cur)
cur.close()
conn.close()
return render_template(
"dokumente.html",
page_title="Dokumente",
active_page="dokumente",
items=items,
**get_current_user()
)
@app.route("/dokumente/upload/<int:item_id>", methods=["POST"])
@contentmanager_required
def dokument_upload(item_id):
mandant_id = session.get("mandant_id")
user_id = session.get("user_id")
uploaded_file = request.files.get("file")
if not uploaded_file or uploaded_file.filename == "":
return redirect(url_for("dokumente"))
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, default_filename
FROM certification_checklist
WHERE id = %s
""", (item_id,))
item = fetchone_dict(cur)
if not item:
cur.close()
conn.close()
abort(404)
original_filename = secure_filename(uploaded_file.filename)
final_name_part = item["default_filename"] or original_filename
stored_filename = f"{item_id}-{secure_filename(final_name_part)}"
mandant_dir = os.path.join("/app/files", str(mandant_id))
os.makedirs(mandant_dir, exist_ok=True)
full_path = os.path.join(mandant_dir, stored_filename)
uploaded_file.save(full_path)
filesize = os.path.getsize(full_path)
cur.execute("""
SELECT stored_filename
FROM mandant_checklist_upload
WHERE mandant_id = %s
AND checklist_item_id = %s
""", (mandant_id, item_id))
existing = cur.fetchone()
if existing:
old_filename = existing[0]
old_path = os.path.join(mandant_dir, old_filename)
if os.path.exists(old_path) and old_filename != stored_filename:
os.remove(old_path)
cur.execute("""
UPDATE mandant_checklist_upload
SET uploaded_at = CURRENT_TIMESTAMP,
uploaded_by_user_id = %s,
filesize = %s,
stored_filename = %s,
original_filename = %s
WHERE mandant_id = %s
AND checklist_item_id = %s
""", (user_id, filesize, stored_filename, original_filename, mandant_id, item_id))
else:
cur.execute("""
INSERT INTO mandant_checklist_upload (
mandant_id,
checklist_item_id,
uploaded_by_user_id,
filesize,
stored_filename,
original_filename
)
VALUES (%s, %s, %s, %s, %s, %s)
""", (mandant_id, item_id, user_id, filesize, stored_filename, original_filename))
conn.commit()
cur.close()
conn.close()
return redirect(url_for("dokumente"))
@app.route("/dokumente/delete/<int:item_id>", methods=["POST"])
@contentmanager_required
def dokument_delete(item_id):
mandant_id = session.get("mandant_id")
mandant_dir = os.path.join("/app/files", str(mandant_id))
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT stored_filename
FROM mandant_checklist_upload
WHERE mandant_id = %s
AND checklist_item_id = %s
""", (mandant_id, item_id))
row = cur.fetchone()
if row:
stored_filename = row[0]
full_path = os.path.join(mandant_dir, stored_filename)
if os.path.exists(full_path):
os.remove(full_path)
cur.execute("""
DELETE FROM mandant_checklist_upload
WHERE mandant_id = %s
AND checklist_item_id = %s
""", (mandant_id, item_id))
conn.commit()
cur.close()
conn.close()
return redirect(url_for("dokumente"))
@app.route("/dokumente/file/<int:item_id>")
@contentmanager_required
def dokument_file(item_id):
mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT stored_filename
FROM mandant_checklist_upload
WHERE mandant_id = %s
AND checklist_item_id = %s
""", (mandant_id, item_id))
row = cur.fetchone()
cur.close()
conn.close()
if not row:
abort(404)
stored_filename = row[0]
mandant_dir = os.path.join("/app/files", str(mandant_id))
return send_from_directory(mandant_dir, stored_filename)
@app.template_filter("datetime")
def format_datetime(value):
if not value:
return "-"
return value.strftime("%d.%m.%Y %H:%M")