DasUnternehmen/app.py
2026-04-08 21:23:55 +02:00

271 lines
9.1 KiB
Python

from pathlib import Path
from flask import Flask, flash, redirect, render_template, request, session, url_for, send_from_directory
from werkzeug.security import check_password_hash, generate_password_hash
from config import Config
from db import execute, execute_returning, fetch_all, fetch_one
from permissions import admin_required, login_required
from tools import create_assessment_chart, generate_activation_token, send_mail, verify_activation_token
app = Flask(__name__)
app.config.from_object(Config)
chart_dir = Path('generated_charts')
chart_dir.mkdir(exist_ok=True)
@app.context_processor
def inject_user():
user = None
is_admin = False
if session.get('user_id'):
user = fetch_one('SELECT id, name, email FROM benutzer WHERE id = %s', (session['user_id'],))
admin_row = fetch_one(
'''
SELECT 1
FROM benutzer_gruppen bg
JOIN gruppen g ON g.id = bg.gruppen_id
WHERE bg.benutzer_id = %s AND g.gruppenname = 'Admins'
''',
(session['user_id'],),
)
is_admin = bool(admin_row)
return {'current_user': user, 'is_admin': is_admin}
@app.route('/')
def index():
if session.get('user_id'):
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
name = request.form['name'].strip()
email = request.form['email'].strip().lower()
password = request.form['password']
existing = fetch_one('SELECT id FROM benutzer WHERE email = %s', (email,))
if existing:
flash('E-Mail ist bereits registriert.', 'danger')
return render_template('register.html')
password_hash = generate_password_hash(password)
execute(
'''
INSERT INTO benutzer (name, email, passwort_hash, is_active)
VALUES (%s, %s, %s, FALSE)
''',
(name, email, password_hash),
)
token = generate_activation_token(email)
activation_link = f"{Config.APP_BASE_URL}{url_for('activate_account', token=token)}"
send_mail(
email,
'Account aktivieren',
f'Hallo {name},\n\nbitte aktiviere deinen Account:\n{activation_link}\n',
)
flash('Registrierung gespeichert. Bitte E-Mail zur Aktivierung prüfen.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/activate/<token>')
def activate_account(token):
try:
email = verify_activation_token(token)
except Exception:
flash('Aktivierungslink ist ungültig oder abgelaufen.', 'danger')
return redirect(url_for('login'))
execute('UPDATE benutzer SET is_active = TRUE WHERE email = %s', (email,))
flash('Account wurde aktiviert. Bitte anmelden.', 'success')
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
email = request.form['email'].strip().lower()
password = request.form['password']
user = fetch_one('SELECT * FROM benutzer WHERE email = %s', (email,))
if not user:
flash('Ungültige Zugangsdaten.', 'danger')
return render_template('login.html')
if not user['is_active']:
flash('Account ist noch nicht aktiviert.', 'warning')
return render_template('login.html')
stored_password = user['passwort_hash']
password_ok = False
if stored_password == 'topsecret' and password == 'topsecret':
new_hash = generate_password_hash(password)
execute('UPDATE benutzer SET passwort_hash = %s WHERE id = %s', (new_hash, user['id']))
password_ok = True
else:
password_ok = check_password_hash(stored_password, password)
if not password_ok:
flash('Ungültige Zugangsdaten.', 'danger')
return render_template('login.html')
session['user_id'] = user['id']
execute('UPDATE benutzer SET last_login = NOW() WHERE id = %s', (user['id'],))
execute('INSERT INTO accesslog (userid) VALUES (%s)', (user['id'],))
return redirect(url_for('dashboard'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash('Erfolgreich abgemeldet.', 'success')
return redirect(url_for('login'))
@app.route('/dashboard')
@login_required
def dashboard():
themen = fetch_all('SELECT * FROM thema ORDER BY id')
return render_template('dashboard.html', themen=themen)
@app.route('/profil', methods=['GET', 'POST'])
@login_required
def profile():
user = fetch_one('SELECT id, name, email FROM benutzer WHERE id = %s', (session['user_id'],))
if request.method == 'POST':
new_password = request.form['password']
execute(
'UPDATE benutzer SET passwort_hash = %s WHERE id = %s',
(generate_password_hash(new_password), session['user_id'])
)
flash('Passwort wurde geändert.', 'success')
return redirect(url_for('profile'))
return render_template('profile.html', user=user)
@app.route('/thema/<int:thema_id>', methods=['GET', 'POST'])
@login_required
def topic(thema_id):
thema = fetch_one('SELECT * FROM thema WHERE id = %s', (thema_id,))
fragen = fetch_all('SELECT * FROM fragen WHERE themaid = %s ORDER BY id', (thema_id,))
ansprechpartner = fetch_all(
'''
SELECT a.*
FROM ansprechpartner a
JOIN themaansprechpartner ta ON ta.ansprechpartnerid = a.id
WHERE ta.themaid = %s
ORDER BY a.name
''',
(thema_id,),
)
if request.method == 'POST':
assessment_id = request.form.get('assessment_id')
if not assessment_id:
assessment = execute_returning(
'INSERT INTO assessment (userid) VALUES (%s) RETURNING id',
(session['user_id'],),
)
assessment_id = assessment['id']
for frage in fragen:
value = request.form.get(f'frage_{frage["id"]}')
if value not in ('ja', 'nein'):
flash('Bitte alle Fragen beantworten.', 'warning')
return render_template(
'topic.html',
thema=thema,
fragen=fragen,
ansprechpartner=ansprechpartner,
assessment_id=assessment_id,
)
execute(
'''
INSERT INTO assessmentanswer (assessmentid, themaid, frageid, antwort)
VALUES (%s, %s, %s, %s)
ON CONFLICT (assessmentid, frageid)
DO UPDATE SET antwort = EXCLUDED.antwort
''',
(assessment_id, thema_id, frage['id'], value == 'ja'),
)
next_topic = fetch_one('SELECT id FROM thema WHERE id > %s ORDER BY id LIMIT 1', (thema_id,))
if next_topic:
return redirect(url_for('topic', thema_id=next_topic['id'], assessment_id=assessment_id))
return redirect(url_for('assessment_result', assessment_id=assessment_id))
assessment_id = request.args.get('assessment_id', '')
return render_template(
'topic.html',
thema=thema,
fragen=fragen,
ansprechpartner=ansprechpartner,
assessment_id=assessment_id,
)
@app.route('/assessment/<int:assessment_id>/result')
@login_required
def assessment_result(assessment_id):
rows = fetch_all(
'''
SELECT t.kurztitel, COUNT(*) FILTER (WHERE aa.antwort = TRUE) AS ja_anzahl
FROM thema t
LEFT JOIN assessmentanswer aa ON aa.themaid = t.id AND aa.assessmentid = %s
GROUP BY t.id, t.kurztitel
ORDER BY t.id
''',
(assessment_id,),
)
labels = [row['kurztitel'] for row in rows]
values = [int(row['ja_anzahl']) for row in rows]
filename = f'assessment_{assessment_id}.png'
output_path = chart_dir / filename
create_assessment_chart(labels, values, output_path)
return render_template('result.html', assessment_id=assessment_id, chart_file=filename, rows=rows)
@app.route('/generated_charts/<path:filename>')
@login_required
def generated_chart(filename):
return send_from_directory(chart_dir, filename)
@app.route('/admin')
@admin_required
def admin_index():
return render_template('admin/index.html')
@app.route('/admin/themen')
@admin_required
def admin_topics():
themen = fetch_all('SELECT * FROM thema ORDER BY id')
return render_template('admin/topics.html', themen=themen)
@app.route('/admin/fragen')
@admin_required
def admin_questions():
fragen = fetch_all(
'''SELECT f.id, f.text, t.kurztitel FROM fragen f JOIN thema t ON t.id = f.themaid ORDER BY t.id, f.id'''
)
return render_template('admin/questions.html', fragen=fragen)
@app.route('/admin/ansprechpartner')
@admin_required
def admin_contacts():
contacts = fetch_all('SELECT * FROM ansprechpartner ORDER BY name')
return render_template('admin/contacts.html', contacts=contacts)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)