573 lines
16 KiB
Python
573 lines
16 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
from flask import (
|
|
Flask,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_from_directory,
|
|
session,
|
|
url_for,
|
|
abort,
|
|
)
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from config import Config
|
|
from db import get_connection, fetchone_dict, fetchall_dict
|
|
from auth import login_required
|
|
from permissions import is_video_allowed_for_level
|
|
from security import (
|
|
admin_required,
|
|
get_current_user,
|
|
get_current_user_mandant_level,
|
|
)
|
|
from logging_config import setup_logging
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(Config)
|
|
app.secret_key = app.config["SECRET_KEY"]
|
|
|
|
|
|
LOG_DIR = app.config["LOG_DIR"]
|
|
if not app.config.get("TESTING"):
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
# file_handler = RotatingFileHandler(
|
|
# os.path.join(LOG_DIR, "flask-app.log"),
|
|
# maxBytes=5 * 1024 * 1024,
|
|
# backupCount=5
|
|
# )
|
|
# file_handler.setLevel(logging.INFO)
|
|
# file_handler.setFormatter(
|
|
# logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
|
# )
|
|
|
|
# app.logger.setLevel(logging.INFO)
|
|
# app.logger.addHandler(file_handler)
|
|
|
|
|
|
|
|
|
|
def ensure_base_tables():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS visits (
|
|
id SERIAL PRIMARY KEY,
|
|
route_name VARCHAR(100),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS mandant (
|
|
id SERIAL PRIMARY KEY,
|
|
kuerzel VARCHAR(50) NOT NULL,
|
|
name VARCHAR(255) NOT NULL,
|
|
kontakt_email VARCHAR(255),
|
|
level INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS app_user (
|
|
id SERIAL PRIMARY KEY,
|
|
email VARCHAR(255) NOT NULL UNIQUE,
|
|
name VARCHAR(255) NOT NULL,
|
|
mandant_id INTEGER NOT NULL,
|
|
password_hash VARCHAR(255) NOT NULL,
|
|
last_login TIMESTAMP NULL,
|
|
status INTEGER NOT NULL DEFAULT 0,
|
|
CONSTRAINT fk_app_user_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE RESTRICT
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS app_group (
|
|
id SERIAL PRIMARY KEY,
|
|
mandant_id INTEGER NOT NULL,
|
|
group_name VARCHAR(255) NOT NULL,
|
|
CONSTRAINT fk_app_group_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS user_group (
|
|
user_id INTEGER NOT NULL,
|
|
group_id INTEGER NOT NULL,
|
|
mandant_id INTEGER NOT NULL,
|
|
PRIMARY KEY (user_id, group_id),
|
|
CONSTRAINT fk_user_group_user
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES app_user(id)
|
|
ON DELETE CASCADE,
|
|
CONSTRAINT fk_user_group_group
|
|
FOREIGN KEY (group_id)
|
|
REFERENCES app_group(id)
|
|
ON DELETE CASCADE,
|
|
CONSTRAINT fk_user_group_mandant
|
|
FOREIGN KEY (mandant_id)
|
|
REFERENCES mandant(id)
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def ensure_default_admin():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT id FROM mandant WHERE kuerzel = %s", ("KOLB",))
|
|
row = cur.fetchone()
|
|
|
|
if row:
|
|
mandant_id = row[0]
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", ("KOLB", "Kolb Compliance", "info@kolb.cc", 0))
|
|
mandant_id = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
SELECT id FROM app_group
|
|
WHERE mandant_id = %s AND group_name = %s
|
|
""", (1, "Administratoren"))
|
|
group_row = cur.fetchone()
|
|
|
|
if group_row:
|
|
admin_group_id = group_row[0]
|
|
else:
|
|
cur.execute("""
|
|
INSERT INTO app_group (mandant_id, group_name)
|
|
VALUES (%s, %s)
|
|
RETURNING id
|
|
""", (1, "Administratoren"))
|
|
admin_group_id = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT id FROM app_user WHERE email = %s", ("admin@kolb.cc",))
|
|
user_row = cur.fetchone()
|
|
|
|
if user_row:
|
|
admin_user_id = user_row[0]
|
|
else:
|
|
password_hash = generate_password_hash("topsecret")
|
|
cur.execute("""
|
|
INSERT INTO app_user (email, name, mandant_id, password_hash, status)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", ("admin@kolb.cc", "Admin", 1, password_hash, 1))
|
|
admin_user_id = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
SELECT 1 FROM user_group
|
|
WHERE user_id = %s AND group_id = %s AND mandant_id = %s
|
|
""", (admin_user_id, admin_group_id, 1))
|
|
|
|
if cur.fetchone() is None:
|
|
cur.execute("""
|
|
INSERT INTO user_group (user_id, group_id, mandant_id)
|
|
VALUES (%s, %s, %s)
|
|
""", (admin_user_id, admin_group_id, 1))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def register_visit(route_name: str) -> int:
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"INSERT INTO visits (route_name) VALUES (%s)",
|
|
(route_name,)
|
|
)
|
|
conn.commit()
|
|
|
|
cur.execute("SELECT COUNT(*) FROM visits")
|
|
count = cur.fetchone()[0]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return count
|
|
|
|
|
|
|
|
|
|
|
|
def render_page(active_page: str, title: str):
|
|
count = register_visit(active_page)
|
|
return render_template(
|
|
"index.html",
|
|
active_page=active_page,
|
|
page_title=title,
|
|
visit_count=count,
|
|
**get_current_user()
|
|
)
|
|
|
|
|
|
@app.before_request
|
|
def startup_checks():
|
|
ensure_base_tables()
|
|
ensure_default_admin()
|
|
|
|
|
|
@app.route("/")
|
|
@app.route("/home")
|
|
def home():
|
|
return render_page("home", "Home")
|
|
|
|
|
|
@app.route("/preise")
|
|
@login_required
|
|
def preise():
|
|
return render_page("preise", "Preise")
|
|
|
|
|
|
@app.route("/allgemein")
|
|
@login_required
|
|
def allgemein():
|
|
return render_page("allgemein", "Allgemein")
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
error_message = ""
|
|
next_url = request.args.get("next") or request.form.get("next") or url_for("preise")
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT id, email, name, password_hash, status
|
|
FROM app_user
|
|
WHERE lower(email) = %s
|
|
""", (email,))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
error_message = "Benutzer nicht gefunden."
|
|
else:
|
|
user_id, user_email, user_name, password_hash, status = row
|
|
|
|
if status == 0:
|
|
error_message = "Benutzer ist noch nicht aktiviert."
|
|
elif status == 2:
|
|
error_message = "Benutzer ist gesperrt."
|
|
elif status == 3:
|
|
error_message = "Benutzer ist deaktiviert."
|
|
elif not check_password_hash(password_hash, password):
|
|
error_message = "Passwort ist falsch."
|
|
else:
|
|
session["user_id"] = user_id
|
|
session["user_email"] = user_email
|
|
session["user_name"] = user_name
|
|
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET last_login = %s
|
|
WHERE id = %s
|
|
""", (datetime.utcnow(), user_id))
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
app.logger.info("Login erfolgreich: %s", user_email)
|
|
return redirect(next_url)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"login.html",
|
|
page_title="Login",
|
|
active_page="login",
|
|
error_message=error_message,
|
|
next_url=next_url,
|
|
**get_current_user()
|
|
)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
user_email = session.get("user_email", "unknown")
|
|
session.clear()
|
|
app.logger.info("Logout: %s", user_email)
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
try:
|
|
ensure_base_tables()
|
|
return "OK\n", 200
|
|
except Exception as exc:
|
|
app.logger.exception("Healthcheck fehlgeschlagen: %s", exc)
|
|
return f"DB Fehler: {exc}\n", 500
|
|
|
|
|
|
@app.route("/videos/<path:filename>")
|
|
@login_required
|
|
def protected_videos(filename):
|
|
mandant_level = get_current_user_mandant_level()
|
|
|
|
if not is_video_allowed_for_level(filename, mandant_level):
|
|
abort(403)
|
|
|
|
return send_from_directory("/app/images/videos", filename)
|
|
|
|
@app.route("/images/<path:filename>")
|
|
def serve_image(filename):
|
|
if filename.startswith("videos/"):
|
|
abort(403)
|
|
|
|
return send_from_directory("/app/images", filename)
|
|
|
|
@app.route("/styles/<path:filename>")
|
|
def serve_style(filename):
|
|
return send_from_directory("/app/styles", filename)
|
|
|
|
|
|
@app.route("/files/<path:filename>")
|
|
def serve_file(filename):
|
|
return send_from_directory("/app/files", filename)
|
|
|
|
#temporär
|
|
@app.route("/pwd/<password>/<key>")
|
|
def generate_pwd_hash(password, key):
|
|
if key != "geheim":
|
|
return "Forbidden", 403
|
|
|
|
return f"<pre>{generate_password_hash(password)}</pre>"
|
|
|
|
@app.route("/profil")
|
|
@login_required
|
|
def profil():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
u.id,
|
|
u.email,
|
|
u.name,
|
|
u.mandant_id,
|
|
u.last_login,
|
|
u.status,
|
|
|
|
m.name AS mandant_name,
|
|
m.kuerzel AS mandant_kuerzel,
|
|
m.kontakt_email AS mandant_email,
|
|
m.level AS mandant_level
|
|
|
|
FROM app_user u
|
|
JOIN mandant m ON m.id = u.mandant_id
|
|
WHERE u.id = %s
|
|
""", (session["user_id"],))
|
|
|
|
user_data = fetchone_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"profil.html",
|
|
page_title="Profil",
|
|
active_page="profil",
|
|
profile=user_data,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.route("/admin/mandanten", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_mandanten():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "create":
|
|
kuerzel = request.form.get("kuerzel", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
kontakt_email = request.form.get("kontakt_email", "").strip()
|
|
level = request.form.get("level", "0").strip()
|
|
|
|
email_pattern = r"^[^@\s]+@[^@\s]+\.[^@\s]+$"
|
|
|
|
error_message = None
|
|
|
|
if not kuerzel:
|
|
error_message = "Kürzel ist ein Pflichtfeld."
|
|
elif not name:
|
|
error_message = "Name ist ein Pflichtfeld."
|
|
elif not kontakt_email:
|
|
error_message = "Kontakt E-Mail ist ein Pflichtfeld."
|
|
elif not re.match(email_pattern, kontakt_email):
|
|
error_message = "Bitte eine gültige Kontakt-E-Mail eingeben."
|
|
|
|
if error_message:
|
|
cur.execute("""
|
|
SELECT id, kuerzel, name, kontakt_email, level
|
|
FROM mandant
|
|
ORDER BY id
|
|
""")
|
|
mandanten = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_mandanten.html",
|
|
page_title="Admin - Mandanten",
|
|
active_page="admin",
|
|
mandanten=mandanten,
|
|
form_error=error_message,
|
|
form_values={
|
|
"kuerzel": kuerzel,
|
|
"name": name,
|
|
"kontakt_email": kontakt_email,
|
|
"level": level,
|
|
},
|
|
**get_current_user()
|
|
)
|
|
|
|
cur.execute("""
|
|
INSERT INTO mandant (kuerzel, name, kontakt_email, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (kuerzel, name, kontakt_email, int(level or 0)))
|
|
conn.commit()
|
|
|
|
elif action == "update":
|
|
mandant_id = request.form.get("id")
|
|
kuerzel = request.form.get("kuerzel", "").strip()
|
|
name = request.form.get("name", "").strip()
|
|
kontakt_email = request.form.get("kontakt_email", "").strip()
|
|
level = request.form.get("level", "0").strip()
|
|
|
|
cur.execute("""
|
|
UPDATE mandant
|
|
SET kuerzel = %s,
|
|
name = %s,
|
|
kontakt_email = %s,
|
|
level = %s
|
|
WHERE id = %s
|
|
""", (kuerzel, name, kontakt_email or None, int(level or 0), int(mandant_id)))
|
|
conn.commit()
|
|
|
|
elif action == "delete":
|
|
mandant_id = request.form.get("id")
|
|
cur.execute("DELETE FROM mandant WHERE id = %s", (int(mandant_id),))
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
return redirect(url_for("admin_mandanten"))
|
|
|
|
cur.execute("""
|
|
SELECT id, kuerzel, name, kontakt_email, level
|
|
FROM mandant
|
|
ORDER BY id
|
|
""")
|
|
mandanten = fetchall_dict(cur)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"admin_mandanten.html",
|
|
page_title="Admin - Mandanten",
|
|
active_page="admin",
|
|
mandanten=mandanten,
|
|
**get_current_user()
|
|
)
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden(_error):
|
|
return render_template(
|
|
"403.html",
|
|
page_title="Kein Zugriff",
|
|
active_page="",
|
|
**get_current_user()
|
|
), 403
|
|
|
|
@app.route("/pwdchange", methods=["GET", "POST"])
|
|
@login_required
|
|
def pwdchange():
|
|
error_message = ""
|
|
success_message = ""
|
|
|
|
if request.method == "POST":
|
|
current_password = request.form.get("current_password", "")
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
if not current_password or not new_password or not confirm_password:
|
|
error_message = "Bitte alle Felder ausfüllen."
|
|
elif new_password != confirm_password:
|
|
error_message = "Die neuen Passwörter stimmen nicht überein."
|
|
elif len(new_password) < 8:
|
|
error_message = "Das neue Passwort muss mindestens 8 Zeichen lang sein."
|
|
else:
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT password_hash
|
|
FROM app_user
|
|
WHERE id = %s
|
|
""", (session["user_id"],))
|
|
row = cur.fetchone()
|
|
|
|
if row is None:
|
|
error_message = "Benutzer nicht gefunden."
|
|
else:
|
|
stored_hash = row[0]
|
|
|
|
if not check_password_hash(stored_hash, current_password):
|
|
error_message = "Das aktuelle Passwort ist falsch."
|
|
else:
|
|
new_hash = generate_password_hash(new_password)
|
|
|
|
cur.execute("""
|
|
UPDATE app_user
|
|
SET password_hash = %s
|
|
WHERE id = %s
|
|
""", (new_hash, session["user_id"]))
|
|
conn.commit()
|
|
|
|
success_message = "Passwort wurde erfolgreich geändert."
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template(
|
|
"pwdchange.html",
|
|
page_title="Passwort ändern",
|
|
active_page="profil",
|
|
error_message=error_message,
|
|
success_message=success_message,
|
|
**get_current_user()
|
|
) |