from pathlib import Path from flask import Flask, flash, redirect, render_template, request, session, url_for, send_from_directory from werkzeug.security import check_password_hash, generate_password_hash from config import Config from db import ( get_user_by_id, get_user_by_email, get_all_users, create_user, activate_user_by_email, update_user_password, update_user_last_login, log_access, get_user_groups, create_assessment, get_next_thema_id, get_assessment_result_rows, get_thema_questions, get_thema_ansprechpartner, save_assessment_answer, get_all_themen, get_thema_by_id, get_all_ansprechpartner, get_ansprechpartner_by_id, get_ansprechpartner_ids_for_thema, create_thema, update_thema, delete_thema, create_ansprechpartner, update_ansprechpartner, delete_ansprechpartner, get_all_questions_with_thema, get_all_contacts, get_question_by_id, create_question, update_question, delete_question, activate_user, delete_user, ) from permissions import admin_required, login_required from tools import create_assessment_chart, generate_activation_token, send_mail, verify_activation_token app = Flask(__name__) app.config.from_object(Config) chart_dir = Path("generated_charts") chart_dir.mkdir(exist_ok=True) @app.context_processor def inject_user(): user = None user_is_admin = False if session.get("user_id"): user = get_user_by_id(session["user_id"]) groups = get_user_groups(session["user_id"]) session["groups"] = groups user_is_admin = "Admins" in groups return { "current_user": user, "is_admin": user_is_admin, } @app.route("/") def index(): if session.get("user_id"): return redirect(url_for("dashboard")) return render_template("index.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": name = request.form["name"].strip() email = request.form["email"].strip().lower() password = request.form["password"] existing = get_user_by_email(email) if existing: flash("E-Mail ist bereits registriert.", "danger") return render_template("register.html") password_hash = generate_password_hash(password) create_user(name, email, password_hash, is_active=False) token = generate_activation_token(email) activation_link = f"{Config.APP_BASE_URL}{url_for('activate_account', token=token)}" send_mail( email, "Account aktivieren", f"Hallo {name},\n\nbitte aktiviere deinen Account:\n{activation_link}\n", ) flash("Registrierung gespeichert. Bitte E-Mail zur Aktivierung prüfen.", "success") return redirect(url_for("login")) return render_template("register.html") @app.route("/activate/") def activate_account(token): try: email = verify_activation_token(token) except Exception: flash("Aktivierungslink ist ungültig oder abgelaufen.", "danger") return redirect(url_for("login")) activate_user_by_email(email) flash("Account wurde aktiviert. Bitte anmelden.", "success") return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": email = request.form["email"].strip().lower() password = request.form["password"] user = get_user_by_email(email) if not user: flash("Ungültige Zugangsdaten.", "danger") return render_template("login.html") if not user["is_active"]: flash("Account ist noch nicht aktiviert.", "warning") return render_template("login.html") stored_password = user["passwort_hash"] password_ok = False if stored_password == "topsecret" and password == "topsecret": new_hash = generate_password_hash(password) update_user_password(user["id"], new_hash) password_ok = True else: password_ok = check_password_hash(stored_password, password) if not password_ok: flash("Ungültige Zugangsdaten.", "danger") return render_template("login.html") session["user_id"] = user["id"] session["groups"] = get_user_groups(user["id"]) update_user_last_login(user["id"]) log_access(user["id"]) return redirect(url_for("dashboard")) return render_template("login.html") @app.route("/logout") def logout(): session.clear() flash("Erfolgreich abgemeldet.", "success") return redirect(url_for("login")) @app.route("/dashboard") @login_required def dashboard(): themen = get_all_themen() return render_template("dashboard.html", themen=themen) @app.route("/profil", methods=["GET", "POST"]) @login_required def profile(): user = get_user_by_id(session["user_id"]) if request.method == "POST": new_password = request.form["password"].strip() if not new_password: flash("Bitte ein Passwort eingeben.", "warning") return render_template("profile.html", user=user) update_user_password(session["user_id"], generate_password_hash(new_password)) flash("Passwort wurde geändert.", "success") return redirect(url_for("profile")) return render_template("profile.html", user=user) @app.route("/thema/", methods=["GET", "POST"]) @login_required def topic(thema_id): thema = get_thema_by_id(thema_id) if not thema: flash("Thema nicht gefunden.", "danger") return redirect(url_for("dashboard")) fragen = get_thema_questions(thema_id) ansprechpartner = get_thema_ansprechpartner(thema_id) if request.method == "POST": assessment_id = request.form.get("assessment_id") if not assessment_id: assessment_id = create_assessment(session["user_id"]) for frage in fragen: value = request.form.get(f'frage_{frage["id"]}') if value not in ("ja", "nein"): flash("Bitte alle Fragen beantworten.", "warning") return render_template( "topic.html", thema=thema, fragen=fragen, ansprechpartner=ansprechpartner, assessment_id=assessment_id, ) save_assessment_answer( assessment_id=assessment_id, thema_id=thema_id, frage_id=frage["id"], antwort=(value == "ja"), ) next_thema_id = get_next_thema_id(thema_id) if next_thema_id: return redirect(url_for("topic", thema_id=next_thema_id, assessment_id=assessment_id)) return redirect(url_for("assessment_result", assessment_id=assessment_id)) assessment_id = request.args.get("assessment_id", "") return render_template( "topic.html", thema=thema, fragen=fragen, ansprechpartner=ansprechpartner, assessment_id=assessment_id, ) @app.route("/assessment//result") @login_required def assessment_result(assessment_id): rows = get_assessment_result_rows(assessment_id) labels = [row["kurztitel"] for row in rows] values = [int(row["ja_anzahl"]) for row in rows] filename = f"assessment_{assessment_id}.png" output_path = chart_dir / filename create_assessment_chart(labels, values, output_path) return render_template( "result.html", assessment_id=assessment_id, chart_file=filename, rows=rows, ) @app.route("/generated_charts/") @login_required def generated_chart(filename): return send_from_directory(chart_dir, filename) @app.route("/admin") @admin_required def admin_index(): return render_template("admin/index.html") @app.route("/admin/user") @admin_required def admin_users(): users = get_all_users() return render_template("admin/users.html", users=users) @app.route("/admin/themen") @admin_required def admin_themen(): themen = get_all_themen() return render_template("admin/themen_list.html", themen=themen) @app.route("/admin/themen/new", methods=["GET", "POST"]) @admin_required def admin_thema_new(): ansprechpartner = get_all_ansprechpartner() if request.method == "POST": kurztitel = request.form.get("kurztitel", "").strip() titel = request.form.get("titel", "").strip() infotext = request.form.get("infotext", "").strip() zusatztext = request.form.get("zusatztext", "").strip() ansprechpartner_ids = [int(x) for x in request.form.getlist("ansprechpartner_ids")] if not kurztitel or not titel: flash("Kurztitel und Titel sind Pflichtfelder.", "error") return render_template( "admin/thema_form.html", mode="new", thema=request.form, ansprechpartner=ansprechpartner, selected_ansprechpartner_ids=ansprechpartner_ids, ) create_thema(kurztitel, titel, infotext, zusatztext, ansprechpartner_ids) flash("Thema wurde erstellt.", "success") return redirect(url_for("admin_themen")) return render_template( "admin/thema_form.html", mode="new", thema={}, ansprechpartner=ansprechpartner, selected_ansprechpartner_ids=[], ) @app.route("/admin/themen//edit", methods=["GET", "POST"]) @admin_required def admin_thema_edit(thema_id): thema = get_thema_by_id(thema_id) if not thema: flash("Thema nicht gefunden.", "error") return redirect(url_for("admin_themen")) ansprechpartner = get_all_ansprechpartner() if request.method == "POST": kurztitel = request.form.get("kurztitel", "").strip() titel = request.form.get("titel", "").strip() infotext = request.form.get("infotext", "").strip() zusatztext = request.form.get("zusatztext", "").strip() ansprechpartner_ids = [int(x) for x in request.form.getlist("ansprechpartner_ids")] if not kurztitel or not titel: flash("Kurztitel und Titel sind Pflichtfelder.", "error") thema_form = { "id": thema_id, "kurztitel": kurztitel, "titel": titel, "infotext": infotext, "zusatztext": zusatztext, } return render_template( "admin/thema_form.html", mode="edit", thema=thema_form, ansprechpartner=ansprechpartner, selected_ansprechpartner_ids=ansprechpartner_ids, ) update_thema(thema_id, kurztitel, titel, infotext, zusatztext, ansprechpartner_ids) flash("Thema wurde gespeichert.", "success") return redirect(url_for("admin_themen")) selected_ansprechpartner_ids = get_ansprechpartner_ids_for_thema(thema_id) return render_template( "admin/thema_form.html", mode="edit", thema=thema, ansprechpartner=ansprechpartner, selected_ansprechpartner_ids=selected_ansprechpartner_ids, ) @app.route("/admin/themen//delete", methods=["POST"]) @admin_required def admin_thema_delete(thema_id): delete_thema(thema_id) flash("Thema wurde gelöscht.", "success") return redirect(url_for("admin_themen")) @app.route("/admin/ansprechpartner") @admin_required def admin_contacts(): contacts = get_all_contacts() return render_template("admin/contacts.html", contacts=contacts) @app.route("/admin/ansprechpartner/new", methods=["GET", "POST"]) @admin_required def admin_contact_new(): if request.method == "POST": name = request.form.get("name", "").strip() email = request.form.get("email", "").strip() infotext = request.form.get("infotext", "").strip() if not name or not email: flash("Name und E-Mail sind Pflichtfelder.", "error") return render_template( "admin/contact_form.html", mode="new", contact=request.form, ) create_ansprechpartner(name, email, infotext) flash("Ansprechpartner wurde erstellt.", "success") return redirect(url_for("admin_contacts")) return render_template( "admin/contact_form.html", mode="new", contact={}, ) @app.route("/admin/ansprechpartner//edit", methods=["GET", "POST"]) @admin_required def admin_contact_edit(ansprechpartner_id): contact = get_ansprechpartner_by_id(ansprechpartner_id) if not contact: flash("Ansprechpartner nicht gefunden.", "error") return redirect(url_for("admin_contacts")) if request.method == "POST": name = request.form.get("name", "").strip() email = request.form.get("email", "").strip() infotext = request.form.get("infotext", "").strip() if not name or not email: flash("Name und E-Mail sind Pflichtfelder.", "error") contact_form = { "id": ansprechpartner_id, "name": name, "email": email, "infotext": infotext, } return render_template( "admin/contact_form.html", mode="edit", contact=contact_form, ) update_ansprechpartner(ansprechpartner_id, name, email, infotext) flash("Ansprechpartner wurde gespeichert.", "success") return redirect(url_for("admin_contacts")) return render_template( "admin/contact_form.html", mode="edit", contact=contact, ) @app.route("/admin/ansprechpartner//delete", methods=["POST"]) @admin_required def admin_contact_delete(ansprechpartner_id): delete_ansprechpartner(ansprechpartner_id) flash("Ansprechpartner wurde gelöscht.", "success") return redirect(url_for("admin_contacts")) @app.route("/admin/fragen") @admin_required def admin_questions(): fragen = get_all_questions_with_thema() themen = get_all_themen() return render_template("admin/questions.html", fragen=fragen, themen=themen) @app.route("/admin/fragen/new", methods=["GET", "POST"]) @admin_required def admin_question_new(): themen = get_all_themen() if request.method == "POST": thema_id = request.form.get("thema_id") text = request.form.get("text", "").strip() if not thema_id or not text: flash("Thema und Text sind Pflichtfelder.", "error") return render_template( "admin/question_form.html", mode="new", frage=request.form, themen=themen, ) create_question(thema_id, text) flash("Frage wurde erstellt.", "success") return redirect(url_for("admin_questions")) return render_template( "admin/question_form.html", mode="new", frage={}, themen=themen, ) @app.route("/admin/fragen//edit", methods=["GET", "POST"]) @admin_required def admin_question_edit(frage_id): frage = get_question_by_id(frage_id) themen = get_all_themen() if not frage: flash("Frage nicht gefunden.", "error") return redirect(url_for("admin_questions")) if request.method == "POST": thema_id = request.form.get("thema_id") text = request.form.get("text", "").strip() if not thema_id or not text: frage_form = { "id": frage_id, "thema_id": thema_id, "text": text, } flash("Thema und Text sind Pflichtfelder.", "error") return render_template( "admin/question_form.html", mode="edit", frage=frage_form, themen=themen, ) update_question(frage_id, thema_id, text) flash("Frage wurde gespeichert.", "success") return redirect(url_for("admin_questions")) return render_template( "admin/question_form.html", mode="edit", frage=frage, themen=themen, ) @app.route("/admin/fragen//delete", methods=["POST"]) @admin_required def admin_question_delete(frage_id): delete_question(frage_id) flash("Frage wurde gelöscht.", "success") return redirect(url_for("admin_questions")) @app.route("/admin/user//activate", methods=["POST"]) @admin_required def admin_user_activate(user_id): if user_id == session.get("user_id"): flash("Der aktuell angemeldete Benutzer ist bereits aktiv.", "warning") return redirect(url_for("admin_users")) activate_user(user_id) flash("Benutzer wurde aktiviert.", "success") return redirect(url_for("admin_users")) @app.route("/admin/user//delete", methods=["POST"]) @admin_required def admin_user_delete(user_id): if user_id == session.get("user_id"): flash("Du kannst deinen eigenen Benutzer nicht löschen.", "danger") return redirect(url_for("admin_users")) delete_user(user_id) flash("Benutzer wurde gelöscht.", "success") return redirect(url_for("admin_users")) @app.route("/impressum") def impressum(): return render_template("impressum.html") if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)