1605 lines
47 KiB
Python
1605 lines
47 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
from datetime import datetime
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
from flask import (
|
|
Flask,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_from_directory,
|
|
session,
|
|
url_for,
|
|
abort,
|
|
)
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from config import Config, COUNTRY_VAT_LABELS
|
|
from db import get_connection, fetchone_dict, fetchall_dict
|
|
from auth import login_required
|
|
from permissions import is_video_allowed_for_level, is_course_allowed_for_level, get_allowed_checklist_levels_for_mandant_level
|
|
from security import (
|
|
admin_required,
|
|
get_current_user,
|
|
get_current_user_mandant_level,
|
|
user_admin_required,
|
|
contentmanager_required
|
|
)
|
|
from logging_config import setup_logging
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(Config)
|
|
app.secret_key = app.config["SECRET_KEY"]
|
|
|
|
|
|
LOG_DIR = app.config["LOG_DIR"]
|
|
if not app.config.get("TESTING"):
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
# file_handler = RotatingFileHandler(
|
|
# os.path.join(LOG_DIR, "flask-app.log"),
|
|
# maxBytes=5 * 1024 * 1024,
|
|
# backupCount=5
|
|
# )
|
|
# file_handler.setLevel(logging.INFO)
|
|
# file_handler.setFormatter(
|
|
# logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
|
# )
|
|
|
|
# app.logger.setLevel(logging.INFO)
|
|
# app.logger.addHandler(file_handler)
|
|
|
|
|
|
def format_level(level):
|
|
mapping = {
|
|
0: "0 - Admin",
|
|
1: "1 - Gold",
|
|
2: "2 - Silber",
|
|
3: "3 - Bronze",
|
|
}
|
|
return mapping.get(level, f"{level} - Unbekannt")
|
|
|
|
def ensure_base_tables():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS visits (
|
|
id SERIAL PRIMARY KEY,
|
|
route_name VARCHAR(100),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS mandant (
|
|
id SERIAL PRIMARY KEY,
|
|
kuerzel VARCHAR(50) NOT NULL,
|
|
name VARCHAR(255) NOT NULL,
|
|
kontakt_email VARCHAR(255),
|
|
level INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS app_user (
|
|
id SERIAL PRIMARY KEY,
|
|
email VARCHAR(255) NOT NULL UNIQUE,
|
|
name VARCHAR(255) NOT NULL,
|
|
mandant_id INTEGER NOT NULL,
|
|
password_hash VARCHAR(255) NOT NULL,
|
|
last_login TIMESTAMP NULL,
|
|
status INTEGER NOT NULL DEFAULT 0,
|
|
CONSTRAINT fk_app_user_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE RESTRICT
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS app_group (
|
|
id SERIAL PRIMARY KEY,
|
|
mandant_id INTEGER NOT NULL,
|
|
group_name VARCHAR(255) NOT NULL,
|
|
CONSTRAINT fk_app_group_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS user_group (
|
|
user_id INTEGER NOT NULL,
|
|
group_id INTEGER NOT NULL,
|
|
mandant_id INTEGER NOT NULL,
|
|
PRIMARY KEY (user_id, group_id),
|
|
CONSTRAINT fk_user_group_user
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES app_user(id)
|
|
ON DELETE CASCADE,
|
|
CONSTRAINT fk_user_group_group
|
|
FOREIGN KEY (group_id)
|
|
REFERENCES app_group(id)
|
|
ON DELETE CASCADE,
|
|
CONSTRAINT fk_user_group_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def ensure_default_admin():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT id FROM mandant WHERE kuerzel = %s", ("KOLB",))
|
|
row = cur.fetchone()
|
|
|
|
if row:
|
|
mandant_id = row[0]
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", ("KOLB", "Kolb Compliance", "info@kolb.cc", 0))
|
|
mandant_id = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
SELECT id FROM app_group
|
|
WHERE mandant_id = %s AND group_name = %s
|
|
""", (1, "Administratoren"))
|
|
group_row = cur.fetchone()
|
|
|
|
if group_row:
|
|
admin_group_id = group_row[0]
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO app_group (mandant_id, group_name)
|
|
VALUES (%s, %s)
|
|
RETURNING id
|
|
""", (1, "Administratoren"))
|
|
admin_group_id = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT id FROM app_user WHERE email = %s", ("admin@kolb.cc",))
|
|
user_row = cur.fetchone()
|
|
|
|
if user_row:
|
|
admin_user_id = user_row[0]
|
|
else:
|
|
password_hash = generate_password_hash("topsecret")
|
|
cur.execute("""
|
|
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", ("admin@kolb.cc", "Admin", 1, password_hash, 1))
|
|
admin_user_id = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
SELECT 1 FROM user_group
|
|
WHERE user_id = %s AND group_id = %s AND mandant_id = %s
|
|
""", (admin_user_id, admin_group_id, 1))
|
|
|
|
if cur.fetchone() is None:
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (admin_user_id, admin_group_id, 1))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def register_visit(route_name: str) -> int:
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"INSERT INTO visits (route_name) VALUES (%s)",
|
|
(route_name,)
|
|
)
|
|
conn.commit()
|
|
|
|
cur.execute("SELECT COUNT(*) FROM visits")
|
|
count = cur.fetchone()[0]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return count
|
|
|
|
def get_available_courses_for_user():
|
|
mandant_level = session.get("mandant_level")
|
|
|
|
if mandant_level is None:
|
|
current_mandant_id = session.get("mandant_id")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT level
|
|
FROM mandant
|
|
WHERE id = %s
|
|
""", (current_mandant_id,))
|
|
row = cur.fetchone()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
mandant_level = row[0] if row else 0
|
|
session["mandant_level"] = mandant_level
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, code, title, description, video_file, sort_order
|
|
FROM course
|
|
WHERE is_active = TRUE
|
|
ORDER BY sort_order, code
|
|
""")
|
|
|
|
all_courses = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return [
|
|
course for course in all_courses
|
|
if is_course_allowed_for_level(course["code"], mandant_level)
|
|
]
|
|
|
|
|
|
|
|
def render_page(active_page: str, title: str):
|
|
count = register_visit(active_page)
|
|
return render_template(
|
|
"index.html",
|
|
active_page=active_page,
|
|
page_title=title,
|
|
visit_count=count,
|
|
**get_current_user()
|
|
)
|
|
|
|
|
|
@app.before_request
|
|
def startup_checks():
|
|
ensure_base_tables()
|
|
ensure_default_admin()
|
|
|
|
|
|
@app.route("/")
|
|
@app.route("/home")
|
|
def home():
|
|
return render_page("home", "Home")
|
|
|
|
|
|
@app.route("/preise")
|
|
@login_required
|
|
def preise():
|
|
return render_template(
|
|
"preise2.html",
|
|
page_title="Preise",
|
|
active_page="preise",
|
|
vat_label=COUNTRY_VAT_LABELS.get(session.get("country", "DE")),
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/allgemein")
|
|
def allgemein():
|
|
return render_template(
|
|
"allgemein.html",
|
|
page_title="Info",
|
|
active_page="allgemein",
|
|
**get_current_user()
|
|
)
|
|
|
|
# @app.route("/allgemein")
|
|
# @login_required
|
|
# def allgemein():
|
|
# return render_page("allgemein", "Allgemein")
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
error_message = ""
|
|
next_url = request.args.get("next") or request.form.get("next") or url_for("preise")
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, email, name, mandant_id, password_hash, status
|
|
FROM app_user
|
|
WHERE lower(email) = %s
|
|
""", (email,))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
error_message = "Benutzer nicht gefunden."
|
|
else:
|
|
user_id, user_email, user_name, mandant_id, password_hash, status = row
|
|
|
|
if status == 0:
|
|
error_message = "Benutzer ist noch nicht aktiviert."
|
|
elif status == 2:
|
|
error_message = "Benutzer ist gesperrt."
|
|
elif status == 3:
|
|
error_message = "Benutzer ist deaktiviert."
|
|
elif not check_password_hash(password_hash, password):
|
|
error_message = "Passwort ist falsch."
|
|
else:
|
|
session["user_id"] = user_id
|
|
session["user_email"] = user_email
|
|
session["user_name"] = user_name
|
|
session["mandant_id"] = mandant_id
|
|
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET last_login = %s
|
|
WHERE id = %s
|
|
""", (datetime.utcnow(), user_id))
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
app.logger.info("Login erfolgreich: %s", user_email)
|
|
return redirect(next_url)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"login.html",
|
|
page_title="Login",
|
|
active_page="login",
|
|
error_message=error_message,
|
|
next_url=next_url,
|
|
**get_current_user()
|
|
)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
user_email = session.get("user_email", "unknown")
|
|
session.clear()
|
|
app.logger.info("Logout: %s", user_email)
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
try:
|
|
ensure_base_tables()
|
|
return "OK\n", 200
|
|
except Exception as exc:
|
|
app.logger.exception("Healthcheck fehlgeschlagen: %s", exc)
|
|
return f"DB Fehler: {exc}\n", 500
|
|
|
|
|
|
@app.route("/videos/<path:filename>")
|
|
@login_required
|
|
def protected_videos(filename):
|
|
mandant_level = get_current_user_mandant_level()
|
|
|
|
if not is_video_allowed_for_level(filename, mandant_level):
|
|
abort(403)
|
|
|
|
return send_from_directory("/app/images/videos", filename)
|
|
|
|
@app.route("/images/<path:filename>")
|
|
def serve_image(filename):
|
|
if filename.startswith("videos/"):
|
|
abort(403)
|
|
|
|
return send_from_directory("/app/images", filename)
|
|
|
|
@app.route("/styles/<path:filename>")
|
|
def serve_style(filename):
|
|
return send_from_directory("/app/styles", filename)
|
|
|
|
|
|
@app.route("/files/<path:filename>")
|
|
def serve_file(filename):
|
|
return send_from_directory("/app/files", filename)
|
|
|
|
#temporär
|
|
@app.route("/pwd/<password>/<key>")
|
|
def generate_pwd_hash(password, key):
|
|
if key != "geheim":
|
|
return "Forbidden", 403
|
|
|
|
return f"<pre>{generate_password_hash(password)}</pre>"
|
|
|
|
@app.route("/profil")
|
|
@login_required
|
|
def profil():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
u.id,
|
|
u.email,
|
|
u.name,
|
|
u.mandant_id,
|
|
u.last_login,
|
|
u.status,
|
|
m.name AS mandant_name,
|
|
m.kuerzel AS mandant_kuerzel,
|
|
m.kontakt_email AS mandant_email,
|
|
m.level AS mandant_level
|
|
FROM app_user u
|
|
JOIN mandant m ON m.id = u.mandant_id
|
|
WHERE u.id = %s
|
|
""", (session["user_id"],))
|
|
|
|
profile = fetchone_dict(cur)
|
|
|
|
cur.execute("""
|
|
SELECT g.group_name
|
|
FROM user_group ug
|
|
JOIN app_group g ON g.id = ug.group_id
|
|
WHERE ug.user_id = %s
|
|
AND ug.mandant_id = %s
|
|
ORDER BY g.group_name
|
|
""", (session["user_id"], session["mandant_id"]))
|
|
|
|
gruppen_rows = fetchall_dict(cur)
|
|
gruppen = [row["group_name"] for row in gruppen_rows]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
profile["mandant_level_label"] = format_level(profile["mandant_level"])
|
|
|
|
return render_template(
|
|
"profil.html",
|
|
page_title="Profil",
|
|
active_page="profil",
|
|
profile=profile,
|
|
gruppen=gruppen,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/admin/mandanten", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_mandanten():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "create":
|
|
kuerzel = request.form.get("kuerzel", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
kontakt_email = request.form.get("kontakt_email", "").strip()
|
|
level = request.form.get("level", "0").strip()
|
|
|
|
admin_name = request.form.get("admin_name", "").strip()
|
|
admin_email = request.form.get("admin_email", "").strip().lower()
|
|
admin_password = request.form.get("admin_password", "")
|
|
admin_password2 = request.form.get("admin_password2", "")
|
|
|
|
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
|
|
|
|
error_message = None
|
|
|
|
if not kuerzel:
|
|
error_message = "Kürzel ist ein Pflichtfeld."
|
|
elif not name:
|
|
error_message = "Name ist ein Pflichtfeld."
|
|
elif not kontakt_email:
|
|
error_message = "Kontakt E-Mail ist ein Pflichtfeld."
|
|
elif not re.match(email_pattern, kontakt_email):
|
|
error_message = "Bitte eine gültige Kontakt-E-Mail eingeben."
|
|
|
|
elif not admin_name:
|
|
error_message = "Admin User Name ist ein Pflichtfeld."
|
|
elif not admin_email:
|
|
error_message = "Admin User E-Mail ist ein Pflichtfeld."
|
|
elif not re.match(email_pattern, admin_email):
|
|
error_message = "Bitte eine gültige Admin E-Mail-Adresse eingeben."
|
|
elif not admin_password:
|
|
error_message = "Admin Passwort ist ein Pflichtfeld."
|
|
elif not admin_password2:
|
|
error_message = "Bitte Admin Passwort bestätigen."
|
|
elif admin_password != admin_password2:
|
|
error_message = "Die beiden Admin-Passwörter stimmen nicht überein."
|
|
elif len(admin_password) < 8:
|
|
error_message = "Das Admin Passwort muss mindestens 8 Zeichen lang sein."
|
|
|
|
if error_message:
|
|
cur.execute("""
|
|
SELECT id, kuerzel, name, kontakt_email, level
|
|
FROM mandant
|
|
ORDER BY id
|
|
""")
|
|
mandanten = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
for mandant in mandanten:
|
|
mandant["level_label"] = format_level(mandant["level"])
|
|
|
|
|
|
|
|
return render_template(
|
|
"admin_mandanten.html",
|
|
page_title="Admin - Mandanten",
|
|
active_page="admin",
|
|
mandanten=mandanten,
|
|
form_error=error_message,
|
|
form_values = {
|
|
"kuerzel": kuerzel,
|
|
"name": name,
|
|
"kontakt_email": kontakt_email,
|
|
"level": level,
|
|
"admin_name": admin_name,
|
|
"admin_email": admin_email,
|
|
}
|
|
|
|
**get_current_user()
|
|
)
|
|
|
|
cur.execute("""
|
|
SELECT id
|
|
FROM app_user
|
|
WHERE lower(email) = %s
|
|
""", (admin_email,))
|
|
existing_admin_user = cur.fetchone()
|
|
|
|
if existing_admin_user:
|
|
error_message = "Ein Benutzer mit der Admin E-Mail existiert bereits."
|
|
|
|
cur.execute("""
|
|
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (kuerzel, name, kontakt_email, int(level or 0)))
|
|
new_mandant_id = cur.fetchone()[0]
|
|
|
|
# Standardgruppen für den neuen Mandanten
|
|
cur.execute("""
|
|
INSERT INTO app_group (mandant_id, group_name)
|
|
VALUES (%s, %s)
|
|
RETURNING id
|
|
""", (new_mandant_id, "Useradministration"))
|
|
useradmin_group_id = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
INSERT INTO app_group (mandant_id, group_name)
|
|
VALUES (%s, %s)
|
|
RETURNING id
|
|
""", (new_mandant_id, "Contentmanager"))
|
|
contentmanager_group_id = cur.fetchone()[0]
|
|
|
|
# erster Admin-User
|
|
admin_password_hash = generate_password_hash(admin_password)
|
|
|
|
cur.execute("""
|
|
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (admin_email, admin_name, new_mandant_id, admin_password_hash, 1))
|
|
new_admin_user_id = cur.fetchone()[0]
|
|
|
|
# User beiden Gruppen zuordnen
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (new_admin_user_id, useradmin_group_id, new_mandant_id))
|
|
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (new_admin_user_id, contentmanager_group_id, new_mandant_id))
|
|
|
|
conn.commit()
|
|
|
|
# Verzeichnis anlegen
|
|
mandant_dir = os.path.join(Config.FILES_DIR, str(new_mandant_id))
|
|
os.makedirs(mandant_dir, exist_ok=True)
|
|
|
|
elif action == "update":
|
|
mandant_id = request.form.get("id")
|
|
kuerzel = request.form.get("kuerzel", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
kontakt_email = request.form.get("kontakt_email", "").strip()
|
|
level = request.form.get("level", "0").strip()
|
|
|
|
cur.execute("""
|
|
UPDATE mandant
|
|
SET kuerzel = %s,
|
|
name = %s,
|
|
kontakt_email = %s,
|
|
level = %s
|
|
WHERE id = %s
|
|
""", (kuerzel, name, kontakt_email or None, int(level or 0), int(mandant_id)))
|
|
conn.commit()
|
|
|
|
elif action == "delete":
|
|
mandant_id = request.form.get("id")
|
|
mandant_id_int = int(mandant_id)
|
|
|
|
# Verzeichnis vor/nach dem Delete entfernen
|
|
mandant_dir = os.path.join(Config.FILES_DIR, str(mandant_id_int))
|
|
|
|
cur.execute("DELETE FROM mandant WHERE id = %s", (mandant_id_int,))
|
|
conn.commit()
|
|
|
|
if os.path.isdir(mandant_dir):
|
|
shutil.rmtree(mandant_dir, ignore_errors=True)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return redirect(url_for("admin_mandanten"))
|
|
|
|
cur.execute("""
|
|
SELECT id, kuerzel, name, kontakt_email, level
|
|
FROM mandant
|
|
ORDER BY id
|
|
""")
|
|
mandanten = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_mandanten.html",
|
|
page_title="Admin - Mandanten",
|
|
active_page="admin",
|
|
mandanten=mandanten,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/useradmin/mandant")
|
|
@user_admin_required
|
|
def useradmin_mandant():
|
|
current_mandant_id = session.get("mandant_id")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
u.id,
|
|
u.email,
|
|
u.name,
|
|
u.mandant_id,
|
|
u.last_login,
|
|
u.status,
|
|
m.name AS mandant_name,
|
|
m.kontakt_email AS mandant_email,
|
|
m.level AS mandant_level
|
|
FROM app_user u
|
|
JOIN mandant m ON m.id = u.mandant_id
|
|
WHERE u.mandant_id = %s
|
|
ORDER BY u.name, u.email
|
|
""", (current_mandant_id,))
|
|
|
|
users = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
for user in users:
|
|
user["mandant_level_label"] = format_level(user["mandant_level"])
|
|
|
|
return render_template(
|
|
"useradmin_mandant.html",
|
|
page_title="Useradministration",
|
|
active_page="useradmin",
|
|
users=users,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden(_error):
|
|
return render_template(
|
|
"403.html",
|
|
page_title="Kein Zugriff",
|
|
active_page="",
|
|
**get_current_user()
|
|
), 403
|
|
|
|
@app.route("/pwdchange", methods=["GET", "POST"])
|
|
@login_required
|
|
def pwdchange():
|
|
error_message = ""
|
|
success_message = ""
|
|
|
|
if request.method == "POST":
|
|
current_password = request.form.get("current_password", "")
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
if not current_password or not new_password or not confirm_password:
|
|
error_message = "Bitte alle Felder ausfüllen."
|
|
elif new_password != confirm_password:
|
|
error_message = "Die neuen Passwörter stimmen nicht überein."
|
|
elif len(new_password) < 8:
|
|
error_message = "Das neue Passwort muss mindestens 8 Zeichen lang sein."
|
|
else:
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT password_hash
|
|
FROM app_user
|
|
WHERE id = %s
|
|
""", (session["user_id"],))
|
|
row = cur.fetchone()
|
|
|
|
if row is None:
|
|
error_message = "Benutzer nicht gefunden."
|
|
else:
|
|
stored_hash = row[0]
|
|
|
|
if not check_password_hash(stored_hash, current_password):
|
|
error_message = "Das aktuelle Passwort ist falsch."
|
|
else:
|
|
new_hash = generate_password_hash(new_password)
|
|
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET password_hash = %s
|
|
WHERE id = %s
|
|
""", (new_hash, session["user_id"]))
|
|
conn.commit()
|
|
|
|
success_message = "Passwort wurde erfolgreich geändert."
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"pwdchange.html",
|
|
page_title="Passwort ändern",
|
|
active_page="profil",
|
|
error_message=error_message,
|
|
success_message=success_message,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/useradmin/mandant/new", methods=["GET", "POST"])
|
|
@user_admin_required
|
|
def useradmin_user_new():
|
|
current_mandant_id = session.get("mandant_id")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, group_name
|
|
FROM app_group
|
|
WHERE mandant_id = %s
|
|
ORDER BY group_name
|
|
""", (current_mandant_id,))
|
|
gruppen = fetchall_dict(cur)
|
|
|
|
form_error = None
|
|
form_values = {
|
|
"email": "",
|
|
"name": "",
|
|
"status": "1",
|
|
"selected_groups": []
|
|
}
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
name = request.form.get("name", "").strip()
|
|
password = request.form.get("password", "")
|
|
password2 = request.form.get("password2", "")
|
|
status = request.form.get("status", "1").strip()
|
|
selected_groups = request.form.getlist("group_ids")
|
|
|
|
form_values = {
|
|
"email": email,
|
|
"name": name,
|
|
"status": status,
|
|
"selected_groups": selected_groups
|
|
}
|
|
|
|
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
|
|
|
|
if not email:
|
|
form_error = "E-Mail ist ein Pflichtfeld."
|
|
elif not re.match(email_pattern, email):
|
|
form_error = "Bitte eine gültige E-Mail-Adresse eingeben."
|
|
elif not name:
|
|
form_error = "Name ist ein Pflichtfeld."
|
|
elif not password:
|
|
form_error = "Passwort ist ein Pflichtfeld."
|
|
elif not password2:
|
|
form_error = "Bitte Passwort bestätigen."
|
|
elif password != password2:
|
|
form_error = "Die beiden Passwörter stimmen nicht überein."
|
|
elif len(password) < 8:
|
|
form_error = "Das Passwort muss mindestens 8 Zeichen lang sein."
|
|
else:
|
|
cur.execute("""
|
|
SELECT id
|
|
FROM app_user
|
|
WHERE lower(email) = %s
|
|
""", (email,))
|
|
existing_user = cur.fetchone()
|
|
|
|
if existing_user:
|
|
form_error = "Ein Benutzer mit dieser E-Mail existiert bereits."
|
|
else:
|
|
password_hash = generate_password_hash(password)
|
|
|
|
cur.execute("""
|
|
INSERT INTO app_user (
|
|
email,
|
|
name,
|
|
mandant_id,
|
|
password_hash,
|
|
status
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
email,
|
|
name,
|
|
current_mandant_id,
|
|
password_hash,
|
|
int(status or 1)
|
|
))
|
|
new_user_id = cur.fetchone()[0]
|
|
|
|
if selected_groups:
|
|
selected_group_ids = [int(gid) for gid in selected_groups]
|
|
|
|
cur.execute("""
|
|
SELECT id
|
|
FROM app_group
|
|
WHERE mandant_id = %s
|
|
AND id = ANY(%s)
|
|
""", (current_mandant_id, selected_group_ids))
|
|
valid_groups = cur.fetchall()
|
|
|
|
for row in valid_groups:
|
|
group_id = row[0]
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (new_user_id, group_id, current_mandant_id))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect(url_for("useradmin_mandant"))
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"useradmin_user_new.html",
|
|
page_title="Neuer User",
|
|
active_page="useradmin",
|
|
gruppen=gruppen,
|
|
form_error=form_error,
|
|
form_values=form_values,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/set-country")
|
|
def set_country():
|
|
country = request.args.get("country", "DE").upper()
|
|
allowed = {"DE", "AT", "CH"}
|
|
|
|
if country in allowed:
|
|
session["country"] = country
|
|
|
|
next_url = request.args.get("next") or request.referrer or url_for("home")
|
|
return redirect(next_url)
|
|
|
|
@app.route("/useradmin/mandant/user/<int:user_id>", methods=["GET", "POST"])
|
|
@user_admin_required
|
|
def useradmin_user_edit(user_id):
|
|
import re
|
|
|
|
current_mandant_id = session.get("mandant_id")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, email, name, mandant_id, status
|
|
FROM app_user
|
|
WHERE id = %s
|
|
AND mandant_id = %s
|
|
""", (user_id, current_mandant_id))
|
|
user = fetchone_dict(cur)
|
|
|
|
if not user:
|
|
cur.close()
|
|
conn.close()
|
|
abort(404)
|
|
|
|
cur.execute("""
|
|
SELECT id, group_name
|
|
FROM app_group
|
|
WHERE mandant_id = %s
|
|
ORDER BY group_name
|
|
""", (current_mandant_id,))
|
|
gruppen = fetchall_dict(cur)
|
|
|
|
cur.execute("""
|
|
SELECT group_id
|
|
FROM user_group
|
|
WHERE user_id = %s
|
|
AND mandant_id = %s
|
|
""", (user_id, current_mandant_id))
|
|
assigned_rows = cur.fetchall()
|
|
assigned_group_ids = [str(row[0]) for row in assigned_rows]
|
|
|
|
form_error = None
|
|
success_message = None
|
|
|
|
form_values = {
|
|
"email": user["email"],
|
|
"name": user["name"],
|
|
"status": str(user["status"]),
|
|
"selected_groups": assigned_group_ids,
|
|
}
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
name = request.form.get("name", "").strip()
|
|
status = request.form.get("status", "1").strip()
|
|
password = request.form.get("password", "")
|
|
password2 = request.form.get("password2", "")
|
|
selected_groups = request.form.getlist("group_ids")
|
|
|
|
form_values = {
|
|
"email": email,
|
|
"name": name,
|
|
"status": status,
|
|
"selected_groups": selected_groups,
|
|
}
|
|
|
|
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
|
|
|
|
if not email:
|
|
form_error = "E-Mail ist ein Pflichtfeld."
|
|
elif not re.match(email_pattern, email):
|
|
form_error = "Bitte eine gültige E-Mail-Adresse eingeben."
|
|
elif not name:
|
|
form_error = "Name ist ein Pflichtfeld."
|
|
else:
|
|
cur.execute("""
|
|
SELECT id
|
|
FROM app_user
|
|
WHERE lower(email) = %s
|
|
AND id <> %s
|
|
""", (email, user_id))
|
|
existing_user = cur.fetchone()
|
|
|
|
if existing_user:
|
|
form_error = "Ein anderer Benutzer mit dieser E-Mail existiert bereits."
|
|
|
|
if not form_error and (password or password2):
|
|
if not password:
|
|
form_error = "Bitte neues Passwort eingeben."
|
|
elif not password2:
|
|
form_error = "Bitte neues Passwort bestätigen."
|
|
elif password != password2:
|
|
form_error = "Die beiden Passwörter stimmen nicht überein."
|
|
elif len(password) < 8:
|
|
form_error = "Das Passwort muss mindestens 8 Zeichen lang sein."
|
|
|
|
if not form_error:
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET email = %s,
|
|
name = %s,
|
|
status = %s
|
|
WHERE id = %s
|
|
AND mandant_id = %s
|
|
""", (email, name, int(status or 1), user_id, current_mandant_id))
|
|
|
|
if password:
|
|
password_hash = generate_password_hash(password)
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET password_hash = %s
|
|
WHERE id = %s
|
|
AND mandant_id = %s
|
|
""", (password_hash, user_id, current_mandant_id))
|
|
|
|
cur.execute("""
|
|
DELETE FROM user_group
|
|
WHERE user_id = %s
|
|
AND mandant_id = %s
|
|
""", (user_id, current_mandant_id))
|
|
|
|
selected_group_ids = []
|
|
for gid in selected_groups:
|
|
try:
|
|
selected_group_ids.append(int(gid))
|
|
except ValueError:
|
|
pass
|
|
|
|
if selected_group_ids:
|
|
cur.execute("""
|
|
SELECT id
|
|
FROM app_group
|
|
WHERE mandant_id = %s
|
|
AND id = ANY(%s)
|
|
""", (current_mandant_id, selected_group_ids))
|
|
valid_groups = cur.fetchall()
|
|
|
|
for row in valid_groups:
|
|
group_id = row[0]
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (user_id, group_id, current_mandant_id))
|
|
|
|
conn.commit()
|
|
success_message = "Benutzer wurde erfolgreich aktualisiert."
|
|
|
|
cur.execute("""
|
|
SELECT group_id
|
|
FROM user_group
|
|
WHERE user_id = %s
|
|
AND mandant_id = %s
|
|
""", (user_id, current_mandant_id))
|
|
assigned_rows = cur.fetchall()
|
|
form_values["selected_groups"] = [str(row[0]) for row in assigned_rows]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"useradmin_user_edit.html",
|
|
page_title="User bearbeiten",
|
|
active_page="useradmin",
|
|
edit_user=user,
|
|
gruppen=gruppen,
|
|
form_values=form_values,
|
|
form_error=form_error,
|
|
success_message=success_message,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/courses")
|
|
@login_required
|
|
def course_list():
|
|
courses = get_available_courses_for_user()
|
|
|
|
return render_template(
|
|
"course_list.html",
|
|
page_title="Kurse",
|
|
active_page="courses",
|
|
courses=courses,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/course/<int:course_id>")
|
|
@login_required
|
|
def course_start(course_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT *
|
|
FROM course
|
|
WHERE id = %s
|
|
""", (course_id,))
|
|
course = fetchone_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"course_video.html",
|
|
course=course,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/course/<int:course_id>/page/<int:page_number>")
|
|
@login_required
|
|
def course_page(course_id, page_number):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT *
|
|
FROM course_page
|
|
WHERE course_id = %s
|
|
AND page_number = %s
|
|
""", (course_id, page_number))
|
|
page = fetchone_dict(cur)
|
|
|
|
if not page:
|
|
abort(404)
|
|
|
|
# Fortschritt speichern
|
|
cur.execute("""
|
|
INSERT INTO user_course_progress (user_id, course_id, last_page)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (user_id, course_id)
|
|
DO UPDATE SET last_page = EXCLUDED.last_page
|
|
""", (session["user_id"], course_id, page_number))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
page["template_name"],
|
|
course_id=course_id,
|
|
page_number=page_number,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/dokumente")
|
|
@contentmanager_required
|
|
def dokumente():
|
|
mandant_id = session.get("mandant_id")
|
|
mandant_level = session.get("mandant_level")
|
|
|
|
allowed_levels = get_allowed_checklist_levels_for_mandant_level(mandant_level)
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
c.id,
|
|
c.level,
|
|
c.title,
|
|
c.short_description,
|
|
c.default_filename,
|
|
u.uploaded_at,
|
|
u.uploaded_by_user_id,
|
|
u.filesize,
|
|
u.stored_filename,
|
|
u.original_filename,
|
|
au.name AS uploaded_by_name
|
|
FROM certification_checklist c
|
|
LEFT JOIN mandant_checklist_upload u
|
|
ON u.checklist_item_id = c.id
|
|
AND u.mandant_id = %s
|
|
LEFT JOIN app_user au
|
|
ON au.id = u.uploaded_by_user_id
|
|
WHERE c.level = ANY(%s)
|
|
ORDER BY c.level, c.id
|
|
""", (mandant_id, allowed_levels))
|
|
|
|
items = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
total_items = len(items)
|
|
completed_items = sum(1 for i in items if i.get("stored_filename"))
|
|
|
|
progress_percent = int((completed_items / total_items) * 100) if total_items > 0 else 0
|
|
|
|
return render_template(
|
|
"dokumente.html",
|
|
page_title="Dokumente",
|
|
active_page="dokumente",
|
|
items=items,
|
|
progress_percent=progress_percent,
|
|
completed_items=completed_items,
|
|
total_items=total_items,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/dokumente/upload/<int:item_id>", methods=["POST"])
|
|
@contentmanager_required
|
|
def dokument_upload(item_id):
|
|
mandant_id = session.get("mandant_id")
|
|
user_id = session.get("user_id")
|
|
|
|
uploaded_file = request.files.get("file")
|
|
if not uploaded_file or uploaded_file.filename == "":
|
|
return redirect(url_for("dokumente"))
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, default_filename
|
|
FROM certification_checklist
|
|
WHERE id = %s
|
|
""", (item_id,))
|
|
item = fetchone_dict(cur)
|
|
|
|
if not item:
|
|
cur.close()
|
|
conn.close()
|
|
abort(404)
|
|
|
|
original_filename = secure_filename(uploaded_file.filename)
|
|
final_name_part = item["default_filename"] or original_filename
|
|
stored_filename = f"{item_id}-{secure_filename(final_name_part)}"
|
|
|
|
mandant_dir = os.path.join("/app/files", str(mandant_id))
|
|
os.makedirs(mandant_dir, exist_ok=True)
|
|
|
|
full_path = os.path.join(mandant_dir, stored_filename)
|
|
uploaded_file.save(full_path)
|
|
|
|
filesize = os.path.getsize(full_path)
|
|
|
|
cur.execute("""
|
|
SELECT stored_filename
|
|
FROM mandant_checklist_upload
|
|
WHERE mandant_id = %s
|
|
AND checklist_item_id = %s
|
|
""", (mandant_id, item_id))
|
|
existing = cur.fetchone()
|
|
|
|
if existing:
|
|
old_filename = existing[0]
|
|
old_path = os.path.join(mandant_dir, old_filename)
|
|
if os.path.exists(old_path) and old_filename != stored_filename:
|
|
os.remove(old_path)
|
|
|
|
cur.execute("""
|
|
UPDATE mandant_checklist_upload
|
|
SET uploaded_at = CURRENT_TIMESTAMP,
|
|
uploaded_by_user_id = %s,
|
|
filesize = %s,
|
|
stored_filename = %s,
|
|
original_filename = %s
|
|
WHERE mandant_id = %s
|
|
AND checklist_item_id = %s
|
|
""", (user_id, filesize, stored_filename, original_filename, mandant_id, item_id))
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO mandant_checklist_upload (
|
|
mandant_id,
|
|
checklist_item_id,
|
|
uploaded_by_user_id,
|
|
filesize,
|
|
stored_filename,
|
|
original_filename
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
""", (mandant_id, item_id, user_id, filesize, stored_filename, original_filename))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect(url_for("dokumente"))
|
|
|
|
@app.route("/dokumente/delete/<int:item_id>", methods=["POST"])
|
|
@contentmanager_required
|
|
def dokument_delete(item_id):
|
|
mandant_id = session.get("mandant_id")
|
|
mandant_dir = os.path.join("/app/files", str(mandant_id))
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT stored_filename
|
|
FROM mandant_checklist_upload
|
|
WHERE mandant_id = %s
|
|
AND checklist_item_id = %s
|
|
""", (mandant_id, item_id))
|
|
row = cur.fetchone()
|
|
|
|
if row:
|
|
stored_filename = row[0]
|
|
full_path = os.path.join(mandant_dir, stored_filename)
|
|
|
|
if os.path.exists(full_path):
|
|
os.remove(full_path)
|
|
|
|
cur.execute("""
|
|
DELETE FROM mandant_checklist_upload
|
|
WHERE mandant_id = %s
|
|
AND checklist_item_id = %s
|
|
""", (mandant_id, item_id))
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect(url_for("dokumente"))
|
|
|
|
@app.route("/dokumente/file/<int:item_id>")
|
|
@contentmanager_required
|
|
def dokument_file(item_id):
|
|
mandant_id = session.get("mandant_id")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT stored_filename
|
|
FROM mandant_checklist_upload
|
|
WHERE mandant_id = %s
|
|
AND checklist_item_id = %s
|
|
""", (mandant_id, item_id))
|
|
row = cur.fetchone()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not row:
|
|
abort(404)
|
|
|
|
stored_filename = row[0]
|
|
mandant_dir = os.path.join("/app/files", str(mandant_id))
|
|
return send_from_directory(mandant_dir, stored_filename)
|
|
|
|
@app.template_filter("datetime")
|
|
def format_datetime(value):
|
|
if not value:
|
|
return "-"
|
|
return value.strftime("%d.%m.%Y %H:%M")
|
|
|
|
@app.route("/admin/checklist", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_checklist():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
form_error = None
|
|
form_values = None
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "create":
|
|
level = request.form.get("level", "").strip()
|
|
title = request.form.get("title", "").strip()
|
|
short_description = request.form.get("short_description", "").strip()
|
|
default_filename = request.form.get("default_filename", "").strip()
|
|
|
|
form_values = {
|
|
"level": level,
|
|
"title": title,
|
|
"short_description": short_description,
|
|
"default_filename": default_filename,
|
|
}
|
|
|
|
if not level:
|
|
form_error = "Level ist ein Pflichtfeld."
|
|
elif level not in ("1", "2", "3"):
|
|
form_error = "Level muss 1, 2 oder 3 sein."
|
|
elif not title:
|
|
form_error = "Titel ist ein Pflichtfeld."
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO certification_checklist (
|
|
level,
|
|
title,
|
|
short_description,
|
|
default_filename
|
|
)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (
|
|
int(level),
|
|
title,
|
|
short_description or None,
|
|
default_filename or None
|
|
))
|
|
conn.commit()
|
|
|
|
elif action == "update":
|
|
item_id = request.form.get("id")
|
|
level = request.form.get("level", "").strip()
|
|
title = request.form.get("title", "").strip()
|
|
short_description = request.form.get("short_description", "").strip()
|
|
default_filename = request.form.get("default_filename", "").strip()
|
|
|
|
if not level:
|
|
form_error = "Level ist ein Pflichtfeld."
|
|
elif level not in ("1", "2", "3"):
|
|
form_error = "Level muss 1, 2 oder 3 sein."
|
|
elif not title:
|
|
form_error = "Titel ist ein Pflichtfeld."
|
|
else:
|
|
cur.execute("""
|
|
UPDATE certification_checklist
|
|
SET level = %s,
|
|
title = %s,
|
|
short_description = %s,
|
|
default_filename = %s
|
|
WHERE id = %s
|
|
""", (
|
|
int(level),
|
|
title,
|
|
short_description or None,
|
|
default_filename or None,
|
|
int(item_id)
|
|
))
|
|
conn.commit()
|
|
|
|
elif action == "delete":
|
|
item_id = request.form.get("id")
|
|
cur.execute("""
|
|
DELETE FROM certification_checklist
|
|
WHERE id = %s
|
|
""", (int(item_id),))
|
|
conn.commit()
|
|
|
|
if form_error:
|
|
cur.execute("""
|
|
SELECT id, level, title, short_description, default_filename
|
|
FROM certification_checklist
|
|
ORDER BY level, id
|
|
""")
|
|
items = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_checklist.html",
|
|
page_title="Checklist-Verwaltung",
|
|
active_page="admin_checklist",
|
|
items=items,
|
|
form_error=form_error,
|
|
form_values=form_values,
|
|
**get_current_user()
|
|
)
|
|
|
|
cur.execute("""
|
|
SELECT id, level, title, short_description, default_filename
|
|
FROM certification_checklist
|
|
ORDER BY level, id
|
|
""")
|
|
items = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_checklist.html",
|
|
page_title="Checklist-Verwaltung",
|
|
active_page="admin_checklist",
|
|
items=items,
|
|
form_error=form_error,
|
|
form_values=form_values,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/admin/courses", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_courses():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
form_error = None
|
|
form_values = None
|
|
|
|
video_dir = os.path.join("/app/images/videos")
|
|
video_files = []
|
|
if os.path.isdir(video_dir):
|
|
video_files = sorted([
|
|
f for f in os.listdir(video_dir)
|
|
if f.lower().endswith(".mp4")
|
|
])
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
code = request.form.get("code", "").strip().upper()
|
|
title = request.form.get("title", "").strip()
|
|
description = request.form.get("description", "").strip()
|
|
min_level = request.form.get("min_level", "").strip()
|
|
video_file = request.form.get("video_file", "").strip()
|
|
sort_order = request.form.get("sort_order", "0").strip()
|
|
|
|
form_values = {
|
|
"code": code,
|
|
"title": title,
|
|
"description": description,
|
|
"min_level": min_level,
|
|
"video_file": video_file,
|
|
"sort_order": sort_order
|
|
}
|
|
|
|
if action == "create":
|
|
if not code:
|
|
form_error = "Code ist Pflicht."
|
|
elif not title:
|
|
form_error = "Titel ist Pflicht."
|
|
elif not min_level:
|
|
form_error = "Level ist Pflicht."
|
|
elif not video_file:
|
|
form_error = "Video-Datei ist Pflicht."
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO course (code, title, description, min_level, video_file, sort_order)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
code,
|
|
title,
|
|
description or None,
|
|
int(min_level),
|
|
video_file,
|
|
int(sort_order or 0)
|
|
))
|
|
conn.commit()
|
|
|
|
elif action == "update":
|
|
course_id = request.form.get("id")
|
|
|
|
if not code:
|
|
form_error = "Code ist Pflicht."
|
|
elif not title:
|
|
form_error = "Titel ist Pflicht."
|
|
elif not min_level:
|
|
form_error = "Level ist Pflicht."
|
|
elif not video_file:
|
|
form_error = "Video-Datei ist Pflicht."
|
|
else:
|
|
cur.execute("""
|
|
UPDATE course
|
|
SET code=%s,
|
|
title=%s,
|
|
description=%s,
|
|
min_level=%s,
|
|
video_file=%s,
|
|
sort_order=%s
|
|
WHERE id=%s
|
|
""", (
|
|
code,
|
|
title,
|
|
description or None,
|
|
int(min_level),
|
|
video_file,
|
|
int(sort_order or 0),
|
|
int(course_id)
|
|
))
|
|
conn.commit()
|
|
|
|
elif action == "delete":
|
|
course_id = request.form.get("id")
|
|
cur.execute("DELETE FROM course WHERE id=%s", (int(course_id),))
|
|
conn.commit()
|
|
|
|
cur.execute("""
|
|
SELECT id, code, title, description, min_level, video_file, sort_order
|
|
FROM course
|
|
ORDER BY min_level, sort_order, id
|
|
""")
|
|
courses = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_courses.html",
|
|
courses=courses,
|
|
video_files=video_files,
|
|
form_error=form_error,
|
|
form_values=form_values,
|
|
**get_current_user()
|
|
) |