347 lines
9.5 KiB
Python
347 lines
9.5 KiB
Python
import os
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
|
|
from flask import (
|
|
Flask,
|
|
flash,
|
|
g,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_from_directory,
|
|
session,
|
|
url_for,
|
|
)
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
base_dir = Path(__file__).resolve().parent
|
|
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "change-me-in-production")
|
|
app.config["DB_PATH"] = os.environ.get("DB_PATH", str(base_dir / "app.sqlite3"))
|
|
app.config["UPLOAD_FOLDER"] = os.environ.get("UPLOAD_FOLDER", str(base_dir / "uploads"))
|
|
app.config["EVENT_PASSWORD"] = os.environ.get("EVENT_PASSWORD", "wedding2026")
|
|
app.config["MAX_CONTENT_LENGTH"] = int(os.environ.get("MAX_UPLOAD_BYTES", str(8 * 1024 * 1024)))
|
|
app.config["LOCATION_NAME"] = os.environ.get("LOCATION_NAME", "Schlossgarten Venue")
|
|
app.config["LOCATION_ADDRESS"] = os.environ.get(
|
|
"LOCATION_ADDRESS", "Musterstraße 12, 12345 Musterstadt"
|
|
)
|
|
app.config["LOCATION_WEBSITE_URL"] = os.environ.get(
|
|
"LOCATION_WEBSITE_URL", "https://example.com/location"
|
|
)
|
|
app.config["GOOGLE_MAPS_EMBED_URL"] = os.environ.get(
|
|
"GOOGLE_MAPS_EMBED_URL",
|
|
"https://www.google.com/maps/embed?pb=!1m18!1m12!1m3!1d0",
|
|
)
|
|
|
|
ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png"}
|
|
|
|
TEXTS = {
|
|
"de": {
|
|
"brand": "Svenja & Dominic",
|
|
"subtitle": "Willkommen zu unserer Hochzeits-App",
|
|
"login": "Login",
|
|
"name": "Dein Name",
|
|
"event_password": "Event-Passwort",
|
|
"login_submit": "Weiter zum Dashboard",
|
|
"dashboard": "Dashboard",
|
|
"logout": "Abmelden",
|
|
"rsvp": "RSVP",
|
|
"upload": "Upload",
|
|
"gallery": "Galerie",
|
|
"info": "Infos",
|
|
"save": "Speichern",
|
|
"attending": "Ich komme",
|
|
"not_attending": "Ich komme nicht",
|
|
"plus_one": "Ich bringe eine Begleitperson mit",
|
|
"file": "Bild auswählen",
|
|
"upload_submit": "Foto hochladen",
|
|
"schedule": "Ablauf",
|
|
"hotels": "Hotels",
|
|
"taxi": "Taxi",
|
|
"location": "Location",
|
|
"visit_location": "Zur Location-Webseite",
|
|
},
|
|
"en": {
|
|
"brand": "Svenja & Dominic",
|
|
"subtitle": "Welcome to our wedding app",
|
|
"login": "Login",
|
|
"name": "Your name",
|
|
"event_password": "Event password",
|
|
"login_submit": "Open dashboard",
|
|
"dashboard": "Dashboard",
|
|
"logout": "Logout",
|
|
"rsvp": "RSVP",
|
|
"upload": "Upload",
|
|
"gallery": "Gallery",
|
|
"info": "Info",
|
|
"save": "Save",
|
|
"attending": "I will attend",
|
|
"not_attending": "I cannot attend",
|
|
"plus_one": "I will bring a plus-one",
|
|
"file": "Select image",
|
|
"upload_submit": "Upload photo",
|
|
"schedule": "Schedule",
|
|
"hotels": "Hotels",
|
|
"taxi": "Taxi",
|
|
"location": "Location",
|
|
"visit_location": "Visit location website",
|
|
},
|
|
}
|
|
|
|
|
|
def get_lang() -> str:
|
|
lang = session.get("lang", "de")
|
|
return lang if lang in TEXTS else "de"
|
|
|
|
|
|
def t(key: str) -> str:
|
|
return TEXTS[get_lang()].get(key, key)
|
|
|
|
|
|
@app.context_processor
|
|
def inject_common() -> dict:
|
|
return {
|
|
"t": t,
|
|
"lang": get_lang(),
|
|
"guest_name": session.get("guest_name"),
|
|
"location_name": app.config["LOCATION_NAME"],
|
|
"location_address": app.config["LOCATION_ADDRESS"],
|
|
"location_website_url": app.config["LOCATION_WEBSITE_URL"],
|
|
"google_maps_embed_url": app.config["GOOGLE_MAPS_EMBED_URL"],
|
|
}
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
db_path = app.config["DB_PATH"]
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
if "db" not in g:
|
|
g.db = sqlite3.connect(db_path)
|
|
g.db.row_factory = sqlite3.Row
|
|
return g.db
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_db(_error) -> None:
|
|
db = g.pop("db", None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
def init_db() -> None:
|
|
db_path = app.config["DB_PATH"]
|
|
db_dir = os.path.dirname(db_path)
|
|
if db_dir:
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS guests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
attending INTEGER,
|
|
plus_one INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS uploads (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL,
|
|
uploaded_by INTEGER NOT NULL,
|
|
uploaded_at TEXT NOT NULL,
|
|
FOREIGN KEY(uploaded_by) REFERENCES guests(id)
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def login_required(view):
|
|
@wraps(view)
|
|
def wrapped(*args, **kwargs):
|
|
if "guest_id" not in session:
|
|
return redirect(url_for("landing"))
|
|
return view(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def is_allowed_file(filename: str) -> bool:
|
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def upsert_guest(name: str) -> int:
|
|
now = datetime.utcnow().isoformat()
|
|
db = get_db()
|
|
row = db.execute("SELECT id FROM guests WHERE name = ?", (name,)).fetchone()
|
|
if row:
|
|
return int(row["id"])
|
|
|
|
cursor = db.execute(
|
|
"INSERT INTO guests (name, created_at) VALUES (?, ?)",
|
|
(name, now),
|
|
)
|
|
db.commit()
|
|
return int(cursor.lastrowid)
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/")
|
|
def landing():
|
|
if "guest_id" in session:
|
|
return redirect(url_for("dashboard"))
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.post("/login")
|
|
def login():
|
|
name = (request.form.get("name") or "").strip()
|
|
event_password = request.form.get("event_password") or ""
|
|
|
|
if not name:
|
|
flash("Bitte Namen eingeben.")
|
|
return redirect(url_for("landing"))
|
|
|
|
if event_password != app.config["EVENT_PASSWORD"]:
|
|
flash("Ungültiges Event-Passwort.")
|
|
return redirect(url_for("landing"))
|
|
|
|
guest_id = upsert_guest(name)
|
|
session["guest_id"] = guest_id
|
|
session["guest_name"] = name
|
|
return redirect(url_for("dashboard"))
|
|
|
|
|
|
@app.post("/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for("landing"))
|
|
|
|
|
|
@app.post("/lang/<code>")
|
|
def set_lang(code: str):
|
|
if code in TEXTS:
|
|
session["lang"] = code
|
|
return redirect(request.referrer or url_for("landing"))
|
|
|
|
|
|
@app.get("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
return render_template("dashboard.html")
|
|
|
|
|
|
@app.route("/rsvp", methods=["GET", "POST"])
|
|
@login_required
|
|
def rsvp():
|
|
db = get_db()
|
|
|
|
if request.method == "POST":
|
|
attending_raw = request.form.get("attending")
|
|
plus_one = 1 if request.form.get("plus_one") == "on" else 0
|
|
|
|
if attending_raw not in {"yes", "no"}:
|
|
flash("Bitte eine RSVP-Auswahl treffen.")
|
|
return redirect(url_for("rsvp"))
|
|
|
|
attending = 1 if attending_raw == "yes" else 0
|
|
if not attending:
|
|
plus_one = 0
|
|
|
|
db.execute(
|
|
"UPDATE guests SET attending = ?, plus_one = ? WHERE id = ?",
|
|
(attending, plus_one, session["guest_id"]),
|
|
)
|
|
db.commit()
|
|
flash("RSVP gespeichert.")
|
|
return redirect(url_for("rsvp"))
|
|
|
|
guest = db.execute(
|
|
"SELECT attending, plus_one FROM guests WHERE id = ?",
|
|
(session["guest_id"],),
|
|
).fetchone()
|
|
|
|
return render_template("rsvp.html", guest=guest)
|
|
|
|
|
|
@app.route("/upload", methods=["GET", "POST"])
|
|
@login_required
|
|
def upload():
|
|
if request.method == "POST":
|
|
file = request.files.get("photo")
|
|
if file is None or file.filename == "":
|
|
flash("Bitte eine Bilddatei auswählen.")
|
|
return redirect(url_for("upload"))
|
|
|
|
if not is_allowed_file(file.filename):
|
|
flash("Nur JPG/JPEG/PNG sind erlaubt.")
|
|
return redirect(url_for("upload"))
|
|
|
|
safe_name = secure_filename(file.filename)
|
|
ext = safe_name.rsplit(".", 1)[1].lower()
|
|
stored_name = f"{uuid.uuid4().hex}.{ext}"
|
|
|
|
upload_dir = app.config["UPLOAD_FOLDER"]
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
file.save(os.path.join(upload_dir, stored_name))
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"INSERT INTO uploads (filename, uploaded_by, uploaded_at) VALUES (?, ?, ?)",
|
|
(stored_name, session["guest_id"], datetime.utcnow().isoformat()),
|
|
)
|
|
db.commit()
|
|
|
|
flash("Upload erfolgreich.")
|
|
return redirect(url_for("gallery"))
|
|
|
|
return render_template("upload.html")
|
|
|
|
|
|
@app.get("/gallery")
|
|
@login_required
|
|
def gallery():
|
|
db = get_db()
|
|
images = db.execute(
|
|
"""
|
|
SELECT uploads.filename, uploads.uploaded_at, guests.name AS uploaded_by
|
|
FROM uploads
|
|
JOIN guests ON guests.id = uploads.uploaded_by
|
|
ORDER BY uploads.id DESC
|
|
"""
|
|
).fetchall()
|
|
return render_template("gallery.html", images=images)
|
|
|
|
|
|
@app.get("/uploads/<path:filename>")
|
|
@login_required
|
|
def serve_upload(filename: str):
|
|
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
|
|
|
|
|
|
@app.get("/info/<page>")
|
|
@login_required
|
|
def info(page: str):
|
|
allowed = {"schedule", "hotels", "taxi", "location"}
|
|
if page not in allowed:
|
|
return redirect(url_for("dashboard"))
|
|
return render_template("info.html", page=page)
|
|
|
|
|
|
init_db()
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8000)
|