from pathlib import Path from flask import Flask, flash, redirect, render_template, request, session, url_for, send_from_directory, abort from werkzeug.security import check_password_hash, generate_password_hash from config import Config from db import ( get_user_by_id, get_user_by_email, get_all_users, create_user, activate_user_by_email, update_user_password, update_user_last_login, log_access, get_user_groups, create_assessment, get_next_thema_id, get_assessment_result_rows, get_thema_questions, get_thema_ansprechpartner, save_assessment_answer, get_all_themen, get_thema_by_id, get_all_ansprechpartner, get_ansprechpartner_by_id, get_ansprechpartner_ids_for_thema, create_thema, update_thema, delete_thema, create_ansprechpartner, update_ansprechpartner, delete_ansprechpartner, get_all_questions_with_thema, get_all_contacts, get_question_by_id, create_question, update_question, delete_question, activate_user, delete_user, get_all_branchen, get_branche_by_id, get_thema_ids_for_branche, create_branche, update_branche, delete_branche, get_all_branchen, get_branche_ids_for_thema, get_all_branchen, get_themen_for_branche, get_next_thema_id_for_branche, get_thema_for_branche, get_question_count_for_thema, get_all_themen_with_question_count, get_pdf_recommendation_topics, get_random_ansprechpartner_for_thema, create_empfehlung, ) from permissions import admin_required, login_required from tools import create_assessment_chart, generate_activation_token, send_mail, verify_activation_token, generate_pdf_from_html from datetime import datetime from pathlib import Path from copy import deepcopy app = Flask(__name__) app.config.from_object(Config) chart_dir = Path("generated_charts") chart_dir.mkdir(exist_ok=True) pdf_dir = Path("generated_pdfs") pdf_dir.mkdir(exist_ok=True) @app.context_processor def inject_user(): user = None user_is_admin = False if session.get("user_id"): user = get_user_by_id(session["user_id"]) groups = get_user_groups(session["user_id"]) session["groups"] = groups user_is_admin = "Admins" in groups return { "current_user": user, "is_admin": user_is_admin, } @app.route("/") def index(): if session.get("user_id"): return redirect(url_for("dashboard")) return render_template("index.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": name = request.form["name"].strip() email = request.form["email"].strip().lower() password = request.form["password"] existing = get_user_by_email(email) if existing: flash("E-Mail ist bereits registriert.", "danger") return render_template("register.html") password_hash = generate_password_hash(password) create_user(name, email, password_hash, is_active=False) token = generate_activation_token(email) activation_link = f"{Config.APP_BASE_URL}{url_for('activate_account', token=token)}" send_mail( email, "Account aktivieren", f"Hallo {name},\n\nbitte aktiviere deinen Account:\n{activation_link}\n", ) flash("Registrierung gespeichert. Bitte E-Mail zur Aktivierung prüfen.", "success") return redirect(url_for("login")) return render_template("register.html") @app.route("/activate/") def activate_account(token): try: email = verify_activation_token(token) except Exception: flash("Aktivierungslink ist ungültig oder abgelaufen.", "danger") return redirect(url_for("login")) activate_user_by_email(email) flash("Account wurde aktiviert. Bitte anmelden.", "success") return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": email = request.form["email"].strip().lower() password = request.form["password"] user = get_user_by_email(email) if not user: flash("Ungültige Zugangsdaten.", "danger") return render_template("login.html") if not user["is_active"]: flash("Account ist noch nicht aktiviert.", "warning") return render_template("login.html") stored_password = user["passwort_hash"] password_ok = False if stored_password == "topsecret" and password == "topsecret": new_hash = generate_password_hash(password) update_user_password(user["id"], new_hash) password_ok = True else: password_ok = check_password_hash(stored_password, password) if not password_ok: flash("Ungültige Zugangsdaten.", "danger") return render_template("login.html") session["user_id"] = user["id"] session["groups"] = get_user_groups(user["id"]) update_user_last_login(user["id"]) log_access(user["id"]) return redirect(url_for("dashboard")) return render_template("login.html") @app.route("/logout") def logout(): session.clear() flash("Erfolgreich abgemeldet.", "success") return redirect(url_for("login")) @app.route("/dashboard", methods=["GET", "POST"]) @login_required def dashboard(): branchen = get_all_branchen() if request.method == "POST": branche_id = request.form.get("branche_id") if not branche_id: flash("Bitte wählen Sie eine Branche aus.", "warning") return render_template("dashboard.html", branchen=branchen) themen = get_themen_for_branche(branche_id) if not themen: flash("Für diese Branche sind aktuell keine Themen hinterlegt.", "warning") return render_template("dashboard.html", branchen=branchen) return redirect(url_for("topic", thema_id=themen[0]["id"], branche_id=branche_id)) return render_template("dashboard.html", branchen=branchen) @app.route("/profil", methods=["GET", "POST"]) @login_required def profile(): user = get_user_by_id(session["user_id"]) if request.method == "POST": new_password = request.form["password"].strip() if not new_password: flash("Bitte ein Passwort eingeben.", "warning") return render_template("profile.html", user=user) update_user_password(session["user_id"], generate_password_hash(new_password)) flash("Passwort wurde geändert.", "success") return redirect(url_for("profile")) return render_template("profile.html", user=user) @app.route("/thema/", methods=["GET", "POST"]) @login_required def topic(thema_id): branche_id = request.values.get("branche_id") if not branche_id: flash("Bitte starten Sie das Assessment über die Startseite.", "warning") return redirect(url_for("dashboard")) try: branche_id = int(branche_id) except ValueError: flash("Ungültige Branche.", "danger") return redirect(url_for("dashboard")) thema = get_thema_for_branche(thema_id, branche_id) if not thema: flash("Thema nicht gefunden oder nicht für diese Branche freigegeben.", "danger") return redirect(url_for("dashboard")) fragen = get_thema_questions(thema_id) ansprechpartner = get_thema_ansprechpartner(thema_id) if request.method == "POST": assessment_id = request.form.get("assessment_id") if not assessment_id: assessment_id = create_assessment(session["user_id"]) for frage in fragen: value = request.form.get(f'frage_{frage["id"]}') if value not in ("ja", "nein"): flash("Bitte alle Fragen beantworten.", "warning") return render_template( "topic.html", thema=thema, fragen=fragen, ansprechpartner=ansprechpartner, assessment_id=assessment_id, branche_id=branche_id, ) save_assessment_answer( assessment_id=assessment_id, thema_id=thema_id, frage_id=frage["id"], antwort=(value == "ja"), ) next_thema_id = get_next_thema_id_for_branche(thema_id, branche_id) if next_thema_id: return redirect( url_for( "topic", thema_id=next_thema_id, assessment_id=assessment_id, branche_id=branche_id, ) ) return redirect( url_for( "assessment_result", assessment_id=assessment_id, branche_id=branche_id, ) ) assessment_id = request.args.get("assessment_id", "") return render_template( "topic.html", thema=thema, fragen=fragen, ansprechpartner=ansprechpartner, assessment_id=assessment_id, branche_id=branche_id, ) @app.route("/assessment//result") @login_required def assessment_result(assessment_id): branche_id = request.args.get("branche_id") if not branche_id: flash("Branche fehlt für die Auswertung.", "warning") return redirect(url_for("dashboard")) try: branche_id = int(branche_id) except ValueError: flash("Ungültige Branche.", "danger") return redirect(url_for("dashboard")) rows = get_assessment_result_rows(assessment_id, branche_id) if not rows: flash("Für diese Branche konnten keine Auswertungsdaten geladen werden.", "warning") return redirect(url_for("dashboard")) branche = get_branche_by_id(branche_id) user = get_user_by_id(session["user_id"]) labels = [row["kurztitel"] for row in rows] values = [int(row["ja_anzahl"]) for row in rows] chart_filename = f"assessment_{assessment_id}_branche_{branche_id}.png" chart_path = chart_dir / chart_filename create_assessment_chart(labels, values, chart_path) today_str = datetime.now().strftime("%d.%m.%Y") logo_up_path = (Path(app.root_path) / "static" / "pdf" / "logo_up.png").resolve().as_uri() unternehmer_path = (Path(app.root_path) / "static" / "pdf" / "unternehmer.png").resolve().as_uri() chart_pdf_path = chart_path.resolve().as_uri() today_str = datetime.now().strftime("%d.%m.%Y") pdf_filename = f"assessment_{assessment_id}_branche_{branche_id}.pdf" pdf_path = pdf_dir / pdf_filename recommendation_topics = get_pdf_recommendation_topics(assessment_id, branche_id) pdf_recommendations = [] for topic in recommendation_topics: topic_copy = deepcopy(topic) ansprechpartner = get_random_ansprechpartner_for_thema(topic["id"]) topic_copy["ansprechpartner"] = ansprechpartner pdf_recommendations.append(topic_copy) if ansprechpartner: create_empfehlung( user_id=session["user_id"], thema_id=topic["id"], ansprechpartner_id=ansprechpartner["id"], ) pdf_html = render_template( "pdf/assessment_report.html", assessment_id=assessment_id, branche=branche, user=user, today_str=today_str, rows=rows, pdf_recommendations=pdf_recommendations, chart_pdf_path=chart_pdf_path, logo_up_path=logo_up_path, unternehmer_path=unternehmer_path, ) generate_pdf_from_html( html_string=pdf_html, output_path=pdf_path, base_url=request.url_root.rstrip("/") + "/", ) return render_template( "result.html", assessment_id=assessment_id, branche_id=branche_id, chart_file=chart_filename, pdf_file=pdf_filename, rows=rows, ) @app.route("/generated_pdfs/") @login_required def generated_pdf(filename): return send_from_directory(pdf_dir, filename, as_attachment=True) @app.route("/generated_charts/") @login_required def generated_chart(filename): return send_from_directory(chart_dir, filename) @app.route("/admin") @admin_required def admin_index(): return render_template("admin/index.html") @app.route("/admin/user") @admin_required def admin_users(): users = get_all_users() return render_template("admin/users.html", users=users) @app.route("/admin/themen") @admin_required def admin_themen(): themen = get_all_themen_with_question_count() return render_template("admin/themen_list.html", themen=themen) @app.route("/admin/themen/new", methods=["GET", "POST"]) @admin_required def admin_thema_new(): ansprechpartner = get_all_ansprechpartner() branchen = get_all_branchen() if request.method == "POST": kurztitel = request.form.get("kurztitel", "").strip() titel = request.form.get("titel", "").strip() infotext = request.form.get("infotext", "").strip() zusatztext = request.form.get("zusatztext", "").strip() ansprechpartner_ids = [int(x) for x in request.form.getlist("ansprechpartner_ids")] branche_ids = [int(x) for x in request.form.getlist("branche_ids")] if not kurztitel or not titel: flash("Kurztitel und Titel sind Pflichtfelder.", "error") return render_template( "admin/thema_form.html", mode="new", thema=request.form, ansprechpartner=ansprechpartner, branchen=branchen, selected_ansprechpartner_ids=ansprechpartner_ids, selected_branche_ids=branche_ids, ) create_thema( kurztitel, titel, infotext, zusatztext, ansprechpartner_ids, branche_ids, ) flash("Thema wurde erstellt.", "success") return redirect(url_for("admin_themen")) return render_template( "admin/thema_form.html", mode="new", thema={}, ansprechpartner=ansprechpartner, branchen=branchen, selected_ansprechpartner_ids=[], selected_branche_ids=[], ) @app.route("/admin/themen//edit", methods=["GET", "POST"]) @admin_required def admin_thema_edit(thema_id): thema = get_thema_by_id(thema_id) if not thema: flash("Thema nicht gefunden.", "error") return redirect(url_for("admin_themen")) ansprechpartner = get_all_ansprechpartner() branchen = get_all_branchen() if request.method == "POST": kurztitel = request.form.get("kurztitel", "").strip() titel = request.form.get("titel", "").strip() infotext = request.form.get("infotext", "").strip() zusatztext = request.form.get("zusatztext", "").strip() ansprechpartner_ids = [int(x) for x in request.form.getlist("ansprechpartner_ids")] branche_ids = [int(x) for x in request.form.getlist("branche_ids")] if not kurztitel or not titel: flash("Kurztitel und Titel sind Pflichtfelder.", "error") thema_form = { "id": thema_id, "kurztitel": kurztitel, "titel": titel, "infotext": infotext, "zusatztext": zusatztext, } return render_template( "admin/thema_form.html", mode="edit", thema=thema_form, ansprechpartner=ansprechpartner, branchen=branchen, selected_ansprechpartner_ids=ansprechpartner_ids, selected_branche_ids=branche_ids, ) update_thema( thema_id, kurztitel, titel, infotext, zusatztext, ansprechpartner_ids, branche_ids, ) flash("Thema wurde gespeichert.", "success") return redirect(url_for("admin_themen")) selected_ansprechpartner_ids = get_ansprechpartner_ids_for_thema(thema_id) selected_branche_ids = get_branche_ids_for_thema(thema_id) return render_template( "admin/thema_form.html", mode="edit", thema=thema, ansprechpartner=ansprechpartner, branchen=branchen, selected_ansprechpartner_ids=selected_ansprechpartner_ids, selected_branche_ids=selected_branche_ids, ) @app.route("/admin/themen//delete", methods=["POST"]) @admin_required def admin_thema_delete(thema_id): delete_thema(thema_id) flash("Thema wurde gelöscht.", "success") return redirect(url_for("admin_themen")) @app.route("/admin/ansprechpartner") @admin_required def admin_contacts(): contacts = get_all_contacts() return render_template("admin/contacts.html", contacts=contacts) @app.route("/admin/ansprechpartner/new", methods=["GET", "POST"]) @admin_required def admin_contact_new(): if request.method == "POST": name = request.form.get("name", "").strip() email = request.form.get("email", "").strip() infotext = request.form.get("infotext", "").strip() if not name or not email: flash("Name und E-Mail sind Pflichtfelder.", "error") return render_template( "admin/contact_form.html", mode="new", contact=request.form, ) create_ansprechpartner(name, email, infotext) flash("Ansprechpartner wurde erstellt.", "success") return redirect(url_for("admin_contacts")) return render_template( "admin/contact_form.html", mode="new", contact={}, ) @app.route("/admin/ansprechpartner//edit", methods=["GET", "POST"]) @admin_required def admin_contact_edit(ansprechpartner_id): contact = get_ansprechpartner_by_id(ansprechpartner_id) if not contact: flash("Ansprechpartner nicht gefunden.", "error") return redirect(url_for("admin_contacts")) if request.method == "POST": name = request.form.get("name", "").strip() email = request.form.get("email", "").strip() infotext = request.form.get("infotext", "").strip() if not name or not email: flash("Name und E-Mail sind Pflichtfelder.", "error") contact_form = { "id": ansprechpartner_id, "name": name, "email": email, "infotext": infotext, } return render_template( "admin/contact_form.html", mode="edit", contact=contact_form, ) update_ansprechpartner(ansprechpartner_id, name, email, infotext) flash("Ansprechpartner wurde gespeichert.", "success") return redirect(url_for("admin_contacts")) return render_template( "admin/contact_form.html", mode="edit", contact=contact, ) @app.route("/admin/ansprechpartner//delete", methods=["POST"]) @admin_required def admin_contact_delete(ansprechpartner_id): delete_ansprechpartner(ansprechpartner_id) flash("Ansprechpartner wurde gelöscht.", "success") return redirect(url_for("admin_contacts")) @app.route("/admin/fragen") @admin_required def admin_questions(): fragen = get_all_questions_with_thema() themen = get_all_themen() branchen = get_all_branchen() return render_template("admin/questions.html", fragen=fragen, themen=themen, branchen=branchen) @app.route("/admin/fragen/new", methods=["GET", "POST"]) @admin_required def admin_question_new(): themen = get_all_themen() if request.method == "POST": thema_id = request.form.get("thema_id") text = request.form.get("text", "").strip() if not thema_id or not text: flash("Thema und Text sind Pflichtfelder.", "error") return render_template( "admin/question_form.html", mode="new", frage=request.form, themen=themen, ) create_question(thema_id, text) flash("Frage wurde erstellt.", "success") return redirect(url_for("admin_questions")) return render_template( "admin/question_form.html", mode="new", frage={}, themen=themen, ) @app.route("/admin/fragen//edit", methods=["GET", "POST"]) @admin_required def admin_question_edit(frage_id): frage = get_question_by_id(frage_id) themen = get_all_themen() if not frage: flash("Frage nicht gefunden.", "error") return redirect(url_for("admin_questions")) if request.method == "POST": thema_id = request.form.get("thema_id") text = request.form.get("text", "").strip() if not thema_id or not text: frage_form = { "id": frage_id, "thema_id": thema_id, "text": text, } flash("Thema und Text sind Pflichtfelder.", "error") return render_template( "admin/question_form.html", mode="edit", frage=frage_form, themen=themen, ) update_question(frage_id, thema_id, text) flash("Frage wurde gespeichert.", "success") return redirect(url_for("admin_questions")) return render_template( "admin/question_form.html", mode="edit", frage=frage, themen=themen, ) @app.route("/admin/fragen//delete", methods=["POST"]) @admin_required def admin_question_delete(frage_id): delete_question(frage_id) flash("Frage wurde gelöscht.", "success") return redirect(url_for("admin_questions")) @app.route("/admin/user//activate", methods=["POST"]) @admin_required def admin_user_activate(user_id): if user_id == session.get("user_id"): flash("Der aktuell angemeldete Benutzer ist bereits aktiv.", "warning") return redirect(url_for("admin_users")) activate_user(user_id) flash("Benutzer wurde aktiviert.", "success") return redirect(url_for("admin_users")) @app.route("/admin/user//delete", methods=["POST"]) @admin_required def admin_user_delete(user_id): if user_id == session.get("user_id"): flash("Du kannst deinen eigenen Benutzer nicht löschen.", "danger") return redirect(url_for("admin_users")) delete_user(user_id) flash("Benutzer wurde gelöscht.", "success") return redirect(url_for("admin_users")) @app.route("/impressum") def impressum(): return render_template("impressum.html") @app.route("/admin/branchen") @admin_required def admin_branchen(): branchen = get_all_branchen() return render_template("admin/branchen_list.html", branchen=branchen) @app.route("/admin/branchen/new", methods=["GET", "POST"]) @admin_required def admin_branche_new(): themen = get_all_themen() if request.method == "POST": branchenname = request.form.get("branchenname", "").strip() thema_ids = [int(x) for x in request.form.getlist("thema_ids")] if not branchenname: flash("Branchenname ist ein Pflichtfeld.", "error") return render_template( "admin/branche_form.html", mode="new", branche=request.form, themen=themen, selected_thema_ids=thema_ids, ) create_branche(branchenname, thema_ids) flash("Branche wurde erstellt.", "success") return redirect(url_for("admin_branchen")) return render_template( "admin/branche_form.html", mode="new", branche={}, themen=themen, selected_thema_ids=[], ) @app.route("/admin/branchen//edit", methods=["GET", "POST"]) @admin_required def admin_branche_edit(branche_id): branche = get_branche_by_id(branche_id) if not branche: flash("Branche nicht gefunden.", "error") return redirect(url_for("admin_branchen")) themen = get_all_themen() if request.method == "POST": branchenname = request.form.get("branchenname", "").strip() thema_ids = [int(x) for x in request.form.getlist("thema_ids")] if not branchenname: flash("Branchenname ist ein Pflichtfeld.", "error") branche_form = { "id": branche_id, "branchenname": branchenname, } return render_template( "admin/branche_form.html", mode="edit", branche=branche_form, themen=themen, selected_thema_ids=thema_ids, ) update_branche(branche_id, branchenname, thema_ids) flash("Branche wurde gespeichert.", "success") return redirect(url_for("admin_branchen")) selected_thema_ids = get_thema_ids_for_branche(branche_id) return render_template( "admin/branche_form.html", mode="edit", branche=branche, themen=themen, selected_thema_ids=selected_thema_ids, ) @app.route("/admin/branchen//delete", methods=["POST"]) @admin_required def admin_branche_delete(branche_id): delete_branche(branche_id) flash("Branche wurde gelöscht.", "success") return redirect(url_for("admin_branchen")) @app.route("/admin/themen//fragen/new", methods=["GET", "POST"]) @admin_required def admin_question_new_for_thema(thema_id): thema = get_thema_by_id(thema_id) if not thema: flash("Thema nicht gefunden.", "error") return redirect(url_for("admin_themen")) question_count = get_question_count_for_thema(thema_id) if request.method == "POST": text = request.form.get("text", "").strip() if not text: flash("Der Fragetext ist ein Pflichtfeld.", "error") return render_template( "admin/question_form_thema.html", mode="new", thema=thema, frage=request.form, question_count=question_count, min_questions=8, ) create_question(thema_id, text) flash("Frage wurde erstellt.", "success") return redirect(url_for("admin_question_new_for_thema", thema_id=thema_id)) return render_template( "admin/question_form_thema.html", mode="new", thema=thema, frage={}, question_count=question_count, min_questions=8, ) @app.route("/admin/themen//fragen//edit", methods=["GET", "POST"]) @admin_required def admin_question_edit_for_thema(thema_id, frage_id): thema = get_thema_by_id(thema_id) frage = get_question_by_id(frage_id) if not thema or not frage or int(frage["thema_id"]) != int(thema_id): flash("Frage oder Thema nicht gefunden.", "error") return redirect(url_for("admin_themen")) question_count = get_question_count_for_thema(thema_id) if request.method == "POST": text = request.form.get("text", "").strip() if not text: frage_form = { "id": frage_id, "thema_id": thema_id, "text": text, } flash("Der Fragetext ist ein Pflichtfeld.", "error") return render_template( "admin/question_form_thema.html", mode="edit", thema=thema, frage=frage_form, question_count=question_count, min_questions=8, ) update_question(frage_id, thema_id, text) flash("Frage wurde gespeichert.", "success") return redirect(url_for("admin_question_new_for_thema", thema_id=thema_id)) return render_template( "admin/question_form_thema.html", mode="edit", thema=thema, frage=frage, question_count=question_count, min_questions=8, ) @app.errorhandler(401) def unauthorized_error(error): return render_template("401.html"), 401 @app.errorhandler(404) def not_found_error(error): return render_template("404.html"), 404 @app.errorhandler(500) def internal_error(error): return render_template("500.html"), 500 if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)