from contextlib import contextmanager import psycopg2 import psycopg2.extras from config import Config def get_connection(): return psycopg2.connect( host=Config.DB_HOST, dbname=Config.DB_NAME, user=Config.DB_USER, password=Config.DB_PASSWORD, port=Config.DB_PORT, cursor_factory=psycopg2.extras.RealDictCursor, ) @contextmanager def get_cursor(commit=False): conn = get_connection() cur = conn.cursor() try: yield cur if commit: conn.commit() except Exception: conn.rollback() raise finally: cur.close() conn.close() def fetch_one(query, params=None): with get_cursor() as cur: cur.execute(query, params or ()) return cur.fetchone() def fetch_all(query, params=None): with get_cursor() as cur: cur.execute(query, params or ()) return cur.fetchall() def execute(query, params=None): with get_cursor(commit=True) as cur: cur.execute(query, params or ()) def execute_returning(query, params=None): with get_cursor(commit=True) as cur: cur.execute(query, params or ()) return cur.fetchone() ############### # Benutzer ############### def get_user_by_id(user_id): return fetch_one( """ SELECT id, name, email, passwort_hash, is_active, last_login FROM benutzer WHERE id = %s """, (user_id,), ) def get_user_by_email(email): return fetch_one( """ SELECT id, name, email, passwort_hash, is_active, last_login FROM benutzer WHERE email = %s """, (email,), ) def get_all_users(): return fetch_all( """ SELECT id, name, email, last_login, is_active FROM benutzer ORDER BY name, email """ ) def create_user(name, email, password_hash, is_active=False): return execute_returning( """ INSERT INTO benutzer (name, email, passwort_hash, is_active) VALUES (%s, %s, %s, %s) RETURNING id """, (name, email, password_hash, is_active), ) def activate_user_by_email(email): execute( """ UPDATE benutzer SET is_active = TRUE WHERE email = %s """, (email,), ) def update_user_password(user_id, password_hash): execute( """ UPDATE benutzer SET passwort_hash = %s WHERE id = %s """, (password_hash, user_id), ) def update_user_last_login(user_id): execute( """ UPDATE benutzer SET last_login = NOW() WHERE id = %s """, (user_id,), ) def log_access(user_id): execute( """ INSERT INTO accesslog (user_id) VALUES (%s) """, (user_id,), ) def get_user_groups(user_id): rows = fetch_all( """ SELECT g.gruppenname FROM benutzer_gruppen bg JOIN gruppen g ON g.id = bg.gruppen_id WHERE bg.benutzer_id = %s ORDER BY g.gruppenname """, (user_id,), ) return [row["gruppenname"] for row in rows] ############### # Themen ############### def get_all_themen(): return fetch_all( """ SELECT id, kurztitel, titel, infotext, zusatztext FROM thema ORDER BY id """ ) def get_thema_by_id(thema_id): return fetch_one( """ SELECT id, kurztitel, titel, infotext, zusatztext FROM thema WHERE id = %s """, (thema_id,), ) def get_next_thema_id(current_thema_id): row = fetch_one( """ SELECT id FROM thema WHERE id > %s ORDER BY id LIMIT 1 """, (current_thema_id,), ) return row["id"] if row else None def get_all_ansprechpartner(): return fetch_all( """ SELECT id, name, email, infotext FROM ansprechpartner ORDER BY name ASC """ ) def get_all_contacts(): return fetch_all( """ SELECT id, name, email, infotext FROM ansprechpartner ORDER BY name ASC """ ) def get_thema_ansprechpartner(thema_id): return fetch_all( """ SELECT a.id, a.name, a.email, a.infotext FROM ansprechpartner a JOIN themaansprechpartner ta ON ta.ansprechpartner_id = a.id WHERE ta.thema_id = %s ORDER BY a.name ASC """, (thema_id,), ) def get_ansprechpartner_ids_for_thema(thema_id): rows = fetch_all( """ SELECT ansprechpartner_id FROM themaansprechpartner WHERE thema_id = %s """, (thema_id,), ) return [row["ansprechpartner_id"] for row in rows] def create_thema(kurztitel, titel, infotext, zusatztext, ansprechpartner_ids): row = execute_returning( """ INSERT INTO thema (kurztitel, titel, infotext, zusatztext) VALUES (%s, %s, %s, %s) RETURNING id """, (kurztitel, titel, infotext, zusatztext), ) thema_id = row["id"] for ap_id in ansprechpartner_ids: execute( """ INSERT INTO themaansprechpartner (thema_id, ansprechpartner_id) VALUES (%s, %s) """, (thema_id, ap_id), ) return thema_id def update_thema(thema_id, kurztitel, titel, infotext, zusatztext, ansprechpartner_ids): execute( """ UPDATE thema SET kurztitel = %s, titel = %s, infotext = %s, zusatztext = %s WHERE id = %s """, (kurztitel, titel, infotext, zusatztext, thema_id), ) execute( """ DELETE FROM themaansprechpartner WHERE thema_id = %s """, (thema_id,), ) for ap_id in ansprechpartner_ids: execute( """ INSERT INTO themaansprechpartner (thema_id, ansprechpartner_id) VALUES (%s, %s) """, (thema_id, ap_id), ) def delete_thema(thema_id): execute("DELETE FROM themaansprechpartner WHERE thema_id = %s", (thema_id,)) execute("DELETE FROM fragen WHERE thema_id = %s", (thema_id,)) execute("DELETE FROM thema WHERE id = %s", (thema_id,)) ############### # Fragen ############### def get_thema_questions(thema_id): return fetch_all( """ SELECT id, thema_id, text FROM fragen WHERE thema_id = %s ORDER BY id """, (thema_id,), ) def get_all_questions_with_thema(): return fetch_all( """ SELECT f.id, f.text, f.thema_id, t.kurztitel, t.titel FROM fragen f JOIN thema t ON t.id = f.thema_id ORDER BY t.id, f.id """ ) ############### # Assessment ############### def create_assessment(user_id): row = execute_returning( """ INSERT INTO assessment (user_id) VALUES (%s) RETURNING id """, (user_id,), ) return row["id"] def save_assessment_answer(assessment_id, thema_id, frage_id, antwort): execute( """ INSERT INTO assessmentanswer (assessmentid, thema_id, frage_id, antwort) VALUES (%s, %s, %s, %s) ON CONFLICT (assessmentid, frage_id) DO UPDATE SET antwort = EXCLUDED.antwort """, (assessment_id, thema_id, frage_id, antwort), ) def get_assessment_result_rows(assessment_id): return fetch_all( """ SELECT t.kurztitel, COUNT(*) FILTER (WHERE aa.antwort = TRUE) AS ja_anzahl FROM thema t LEFT JOIN assessmentanswer aa ON aa.thema_id = t.id AND aa.assessmentid = %s GROUP BY t.id, t.kurztitel ORDER BY t.id """, (assessment_id,), ) ############ # Ansprechpartner ############ def get_ansprechpartner_by_id(ansprechpartner_id): return fetch_one( """ SELECT id, name, email, infotext FROM ansprechpartner WHERE id = %s """, (ansprechpartner_id,), ) def create_ansprechpartner(name, email, infotext): row = execute_returning( """ INSERT INTO ansprechpartner (name, email, infotext) VALUES (%s, %s, %s) RETURNING id """, (name, email, infotext), ) return row["id"] def update_ansprechpartner(ansprechpartner_id, name, email, infotext): execute( """ UPDATE ansprechpartner SET name = %s, email = %s, infotext = %s WHERE id = %s """, (name, email, infotext, ansprechpartner_id), ) def delete_ansprechpartner(ansprechpartner_id): execute( """ DELETE FROM themaansprechpartner WHERE ansprechpartner_id = %s """, (ansprechpartner_id,), ) execute( """ DELETE FROM ansprechpartner WHERE id = %s """, (ansprechpartner_id,), ) ############### # Fragen ############### def get_question_by_id(frage_id): return fetch_one( """ SELECT id, thema_id, text FROM fragen WHERE id = %s """, (frage_id,), ) def create_question(thema_id, text): return execute_returning( """ INSERT INTO fragen (thema_id, text) VALUES (%s, %s) RETURNING id """, (thema_id, text), ) def update_question(frage_id, thema_id, text): execute( """ UPDATE fragen SET thema_id = %s, text = %s WHERE id = %s """, (thema_id, text, frage_id), ) def delete_question(frage_id): execute( "DELETE FROM fragen WHERE id = %s", (frage_id,), ) ########## # usermanagement ########## def activate_user(user_id): execute( """ UPDATE benutzer SET is_active = TRUE WHERE id = %s """, (user_id,), ) def delete_user(user_id, current_user_id=None): if current_user_id is not None and int(user_id) == int(current_user_id): raise ValueError("Der aktuell angemeldete Benutzer kann nicht gelöscht werden.") execute( """ DELETE FROM benutzer WHERE id = %s """, (user_id,), )