AICertification/app/flask-postgres/app/app.py
2026-04-03 09:51:58 +02:00

772 lines
22 KiB
Python

import logging
import os
import re
from datetime import datetime
from logging.handlers import RotatingFileHandler
from flask import (
Flask,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
abort,
)
from werkzeug.security import check_password_hash, generate_password_hash
from config import Config
from db import get_connection, fetchone_dict, fetchall_dict
from auth import login_required
from permissions import is_video_allowed_for_level
from security import (
admin_required,
get_current_user,
get_current_user_mandant_level,
user_admin_required,
)
from logging_config import setup_logging
app = Flask(__name__)
app.config.from_object(Config)
app.secret_key = app.config["SECRET_KEY"]
LOG_DIR = app.config["LOG_DIR"]
if not app.config.get("TESTING"):
os.makedirs(LOG_DIR, exist_ok=True)
# file_handler = RotatingFileHandler(
# os.path.join(LOG_DIR, "flask-app.log"),
# maxBytes=5 * 1024 * 1024,
# backupCount=5
# )
# file_handler.setLevel(logging.INFO)
# file_handler.setFormatter(
# logging.Formatter("%(asctime)s %(levelname)s %(message)s")
# )
# app.logger.setLevel(logging.INFO)
# app.logger.addHandler(file_handler)
def ensure_base_tables():
conn = get_connection()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS visits (
id SERIAL PRIMARY KEY,
route_name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS mandant (
id SERIAL PRIMARY KEY,
kuerzel VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
kontakt_email VARCHAR(255),
level INTEGER NOT NULL DEFAULT 0
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS app_user (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
mandant_id INTEGER NOT NULL,
password_hash VARCHAR(255) NOT NULL,
last_login TIMESTAMP NULL,
status INTEGER NOT NULL DEFAULT 0,
CONSTRAINT fk_app_user_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE RESTRICT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS app_group (
id SERIAL PRIMARY KEY,
mandant_id INTEGER NOT NULL,
group_name VARCHAR(255) NOT NULL,
CONSTRAINT fk_app_group_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE CASCADE
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS user_group (
user_id INTEGER NOT NULL,
group_id INTEGER NOT NULL,
mandant_id INTEGER NOT NULL,
PRIMARY KEY (user_id, group_id),
CONSTRAINT fk_user_group_user
FOREIGN KEY (user_id)
REFERENCES app_user(id)
ON DELETE CASCADE,
CONSTRAINT fk_user_group_group
FOREIGN KEY (group_id)
REFERENCES app_group(id)
ON DELETE CASCADE,
CONSTRAINT fk_user_group_mandant
FOREIGN KEY (mandant_id)
REFERENCES mandant(id)
ON DELETE CASCADE
)
""")
conn.commit()
cur.close()
conn.close()
def ensure_default_admin():
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT id FROM mandant WHERE kuerzel = %s", ("KOLB",))
row = cur.fetchone()
if row:
mandant_id = row[0]
else:
cur.execute("""
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
VALUES (%s, %s, %s, %s)
RETURNING id
""", ("KOLB", "Kolb Compliance", "info@kolb.cc", 0))
mandant_id = cur.fetchone()[0]
cur.execute("""
SELECT id FROM app_group
WHERE mandant_id = %s AND group_name = %s
""", (1, "Administratoren"))
group_row = cur.fetchone()
if group_row:
admin_group_id = group_row[0]
else:
cur.execute("""
INSERT INTO app_group (mandant_id, group_name)
VALUES (%s, %s)
RETURNING id
""", (1, "Administratoren"))
admin_group_id = cur.fetchone()[0]
cur.execute("SELECT id FROM app_user WHERE email = %s", ("admin@kolb.cc",))
user_row = cur.fetchone()
if user_row:
admin_user_id = user_row[0]
else:
password_hash = generate_password_hash("topsecret")
cur.execute("""
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", ("admin@kolb.cc", "Admin", 1, password_hash, 1))
admin_user_id = cur.fetchone()[0]
cur.execute("""
SELECT 1 FROM user_group
WHERE user_id = %s AND group_id = %s AND mandant_id = %s
""", (admin_user_id, admin_group_id, 1))
if cur.fetchone() is None:
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (admin_user_id, admin_group_id, 1))
conn.commit()
cur.close()
conn.close()
def register_visit(route_name: str) -> int:
conn = get_connection()
cur = conn.cursor()
cur.execute(
"INSERT INTO visits (route_name) VALUES (%s)",
(route_name,)
)
conn.commit()
cur.execute("SELECT COUNT(*) FROM visits")
count = cur.fetchone()[0]
cur.close()
conn.close()
return count
def render_page(active_page: str, title: str):
count = register_visit(active_page)
return render_template(
"index.html",
active_page=active_page,
page_title=title,
visit_count=count,
**get_current_user()
)
@app.before_request
def startup_checks():
ensure_base_tables()
ensure_default_admin()
@app.route("/")
@app.route("/home")
def home():
return render_page("home", "Home")
@app.route("/preise")
@login_required
def preise():
return render_template(
"preise2.html",
page_title="Preise",
active_page="preise",
**get_current_user()
)
@app.route("/allgemein")
def allgemein():
return render_template(
"allgemein.html",
page_title="Info",
active_page="allgemein",
**get_current_user()
)
# @app.route("/allgemein")
# @login_required
# def allgemein():
# return render_page("allgemein", "Allgemein")
@app.route("/login", methods=["GET", "POST"])
def login():
error_message = ""
next_url = request.args.get("next") or request.form.get("next") or url_for("preise")
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, email, name, mandant_id, password_hash, status
FROM app_user
WHERE lower(email) = %s
""", (email,))
row = cur.fetchone()
if not row:
error_message = "Benutzer nicht gefunden."
else:
user_id, user_email, user_name, mandant_id, password_hash, status = row
if status == 0:
error_message = "Benutzer ist noch nicht aktiviert."
elif status == 2:
error_message = "Benutzer ist gesperrt."
elif status == 3:
error_message = "Benutzer ist deaktiviert."
elif not check_password_hash(password_hash, password):
error_message = "Passwort ist falsch."
else:
session["user_id"] = user_id
session["user_email"] = user_email
session["user_name"] = user_name
session["mandant_id"] = mandant_id
cur.execute("""
UPDATE app_user
SET last_login = %s
WHERE id = %s
""", (datetime.utcnow(), user_id))
conn.commit()
cur.close()
conn.close()
app.logger.info("Login erfolgreich: %s", user_email)
return redirect(next_url)
cur.close()
conn.close()
return render_template(
"login.html",
page_title="Login",
active_page="login",
error_message=error_message,
next_url=next_url,
**get_current_user()
)
@app.route("/logout")
def logout():
user_email = session.get("user_email", "unknown")
session.clear()
app.logger.info("Logout: %s", user_email)
return redirect(url_for("home"))
@app.route("/health")
def health():
try:
ensure_base_tables()
return "OK\n", 200
except Exception as exc:
app.logger.exception("Healthcheck fehlgeschlagen: %s", exc)
return f"DB Fehler: {exc}\n", 500
@app.route("/videos/<path:filename>")
@login_required
def protected_videos(filename):
mandant_level = get_current_user_mandant_level()
if not is_video_allowed_for_level(filename, mandant_level):
abort(403)
return send_from_directory("/app/images/videos", filename)
@app.route("/images/<path:filename>")
def serve_image(filename):
if filename.startswith("videos/"):
abort(403)
return send_from_directory("/app/images", filename)
@app.route("/styles/<path:filename>")
def serve_style(filename):
return send_from_directory("/app/styles", filename)
@app.route("/files/<path:filename>")
def serve_file(filename):
return send_from_directory("/app/files", filename)
#temporär
@app.route("/pwd/<password>/<key>")
def generate_pwd_hash(password, key):
if key != "geheim":
return "Forbidden", 403
return f"<pre>{generate_password_hash(password)}</pre>"
@app.route("/profil")
@login_required
def profil():
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
u.id,
u.email,
u.name,
u.mandant_id,
u.last_login,
u.status,
m.name AS mandant_name,
m.kuerzel AS mandant_kuerzel,
m.kontakt_email AS mandant_email,
m.level AS mandant_level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.id = %s
""", (session["user_id"],))
profile = fetchone_dict(cur)
cur.execute("""
SELECT g.group_name
FROM user_group ug
JOIN app_group g ON g.id = ug.group_id
WHERE ug.user_id = %s
AND ug.mandant_id = %s
ORDER BY g.group_name
""", (session["user_id"], session["mandant_id"]))
gruppen_rows = fetchall_dict(cur)
gruppen = [row["group_name"] for row in gruppen_rows]
cur.close()
conn.close()
return render_template(
"profil.html",
page_title="Profil",
active_page="profil",
profile=profile,
gruppen=gruppen,
**get_current_user()
)
@app.route("/admin/mandanten", methods=["GET", "POST"])
@admin_required
def admin_mandanten():
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
action = request.form.get("action")
if action == "create":
kuerzel = request.form.get("kuerzel", "").strip()
name = request.form.get("name", "").strip()
kontakt_email = request.form.get("kontakt_email", "").strip()
level = request.form.get("level", "0").strip()
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
error_message = None
if not kuerzel:
error_message = "Kürzel ist ein Pflichtfeld."
elif not name:
error_message = "Name ist ein Pflichtfeld."
elif not kontakt_email:
error_message = "Kontakt E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, kontakt_email):
error_message = "Bitte eine gültige Kontakt-E-Mail eingeben."
if error_message:
cur.execute("""
SELECT id, kuerzel, name, kontakt_email, level
FROM mandant
ORDER BY id
""")
mandanten = fetchall_dict(cur)
cur.close()
conn.close()
return render_template(
"admin_mandanten.html",
page_title="Admin - Mandanten",
active_page="admin",
mandanten=mandanten,
form_error=error_message,
form_values={
"kuerzel": kuerzel,
"name": name,
"kontakt_email": kontakt_email,
"level": level,
},
**get_current_user()
)
cur.execute("""
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
VALUES (%s, %s, %s, %s)
""", (kuerzel, name, kontakt_email, int(level or 0)))
conn.commit()
elif action == "update":
mandant_id = request.form.get("id")
kuerzel = request.form.get("kuerzel", "").strip()
name = request.form.get("name", "").strip()
kontakt_email = request.form.get("kontakt_email", "").strip()
level = request.form.get("level", "0").strip()
cur.execute("""
UPDATE mandant
SET kuerzel = %s,
name = %s,
kontakt_email = %s,
level = %s
WHERE id = %s
""", (kuerzel, name, kontakt_email or None, int(level or 0), int(mandant_id)))
conn.commit()
elif action == "delete":
mandant_id = request.form.get("id")
cur.execute("DELETE FROM mandant WHERE id = %s", (int(mandant_id),))
conn.commit()
cur.close()
conn.close()
return redirect(url_for("admin_mandanten"))
cur.execute("""
SELECT id, kuerzel, name, kontakt_email, level
FROM mandant
ORDER BY id
""")
mandanten = fetchall_dict(cur)
cur.close()
conn.close()
return render_template(
"admin_mandanten.html",
page_title="Admin - Mandanten",
active_page="admin",
mandanten=mandanten,
**get_current_user()
)
@app.route("/useradmin/mandant")
@user_admin_required
def useradmin_mandant():
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
u.id,
u.email,
u.name,
u.mandant_id,
u.last_login,
u.status,
m.name AS mandant_name,
m.kontakt_email AS mandant_email,
m.level AS mandant_level
FROM app_user u
JOIN mandant m ON m.id = u.mandant_id
WHERE u.mandant_id = %s
ORDER BY u.name, u.email
""", (current_mandant_id,))
users = fetchall_dict(cur)
cur.close()
conn.close()
return render_template(
"useradmin_mandant.html",
page_title="Useradministration",
active_page="useradmin",
users=users,
**get_current_user()
)
@app.errorhandler(403)
def forbidden(_error):
return render_template(
"403.html",
page_title="Kein Zugriff",
active_page="",
**get_current_user()
), 403
@app.route("/pwdchange", methods=["GET", "POST"])
@login_required
def pwdchange():
error_message = ""
success_message = ""
if request.method == "POST":
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
if not current_password or not new_password or not confirm_password:
error_message = "Bitte alle Felder ausfüllen."
elif new_password != confirm_password:
error_message = "Die neuen Passwörter stimmen nicht überein."
elif len(new_password) < 8:
error_message = "Das neue Passwort muss mindestens 8 Zeichen lang sein."
else:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT password_hash
FROM app_user
WHERE id = %s
""", (session["user_id"],))
row = cur.fetchone()
if row is None:
error_message = "Benutzer nicht gefunden."
else:
stored_hash = row[0]
if not check_password_hash(stored_hash, current_password):
error_message = "Das aktuelle Passwort ist falsch."
else:
new_hash = generate_password_hash(new_password)
cur.execute("""
UPDATE app_user
SET password_hash = %s
WHERE id = %s
""", (new_hash, session["user_id"]))
conn.commit()
success_message = "Passwort wurde erfolgreich geändert."
cur.close()
conn.close()
return render_template(
"pwdchange.html",
page_title="Passwort ändern",
active_page="profil",
error_message=error_message,
success_message=success_message,
**get_current_user()
)
@app.route("/useradmin/mandant/new", methods=["GET", "POST"])
@user_admin_required
def useradmin_user_new():
current_mandant_id = session.get("mandant_id")
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT id, group_name
FROM app_group
WHERE mandant_id = %s
ORDER BY group_name
""", (current_mandant_id,))
gruppen = fetchall_dict(cur)
form_error = None
form_values = {
"email": "",
"name": "",
"status": "1",
"selected_groups": []
}
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
name = request.form.get("name", "").strip()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
status = request.form.get("status", "1").strip()
selected_groups = request.form.getlist("group_ids")
form_values = {
"email": email,
"name": name,
"status": status,
"selected_groups": selected_groups
}
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
if not email:
form_error = "E-Mail ist ein Pflichtfeld."
elif not re.match(email_pattern, email):
form_error = "Bitte eine gültige E-Mail-Adresse eingeben."
elif not name:
form_error = "Name ist ein Pflichtfeld."
elif not password:
form_error = "Passwort ist ein Pflichtfeld."
elif not password2:
form_error = "Bitte Passwort bestätigen."
elif password != password2:
form_error = "Die beiden Passwörter stimmen nicht überein."
elif len(password) < 8:
form_error = "Das Passwort muss mindestens 8 Zeichen lang sein."
else:
cur.execute("""
SELECT id
FROM app_user
WHERE lower(email) = %s
""", (email,))
existing_user = cur.fetchone()
if existing_user:
form_error = "Ein Benutzer mit dieser E-Mail existiert bereits."
else:
password_hash = generate_password_hash(password)
cur.execute("""
INSERT INTO app_user (
email,
name,
mandant_id,
password_hash,
status
)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (
email,
name,
current_mandant_id,
password_hash,
int(status or 1)
))
new_user_id = cur.fetchone()[0]
if selected_groups:
selected_group_ids = [int(gid) for gid in selected_groups]
cur.execute("""
SELECT id
FROM app_group
WHERE mandant_id = %s
AND id = ANY(%s)
""", (current_mandant_id, selected_group_ids))
valid_groups = cur.fetchall()
for row in valid_groups:
group_id = row[0]
cur.execute("""
INSERT INTO user_group (user_id, group_id, mandant_id)
VALUES (%s, %s, %s)
""", (new_user_id, group_id, current_mandant_id))
conn.commit()
cur.close()
conn.close()
return redirect(url_for("useradmin_mandant"))
cur.close()
conn.close()
return render_template(
"useradmin_user_new.html",
page_title="Neuer User",
active_page="useradmin",
gruppen=gruppen,
form_error=form_error,
form_values=form_values,
**get_current_user()
)
@app.route("/set-country")
def set_country():
country = request.args.get("country", "DE").upper()
allowed = {"DE", "AT", "CH"}
if country in allowed:
session["country"] = country
next_url = request.args.get("next") or request.referrer or url_for("home")
return redirect(next_url)