import secrets from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse from flask import ( Blueprint, abort, current_app, flash, redirect, render_template, request, send_from_directory, session as flask_session, url_for, ) from flask_babel import _ from flask_login import current_user, login_required, login_user, logout_user from sqlalchemy.exc import IntegrityError from extensions import db, oauth from models import User auth_bp = Blueprint("auth", __name__, url_prefix="/auth") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _login(user: "User") -> None: """Log in user and restore their language preference into the session.""" login_user(user) if user.language: flask_session["lang"] = user.language def _safe_next() -> str: target = request.args.get("next") or "" if target and urlparse(target).netloc == "": return target return url_for("dashboard.index") def _upsert_oauth_user(*, provider: str, provider_id: str, email: str, display_name: str | None, avatar_url: str | None) -> User | None: """Find-or-create a user for an OAuth login. Returns None on email conflict.""" user = db.session.scalar( db.select(User).filter_by(provider=provider, provider_id=provider_id) ) now = datetime.now(timezone.utc) if user is None: user = User( email=email, provider=provider, provider_id=provider_id, display_name=display_name, avatar_url=avatar_url, email_confirmed=True, created_at=now, ) db.session.add(user) try: db.session.commit() except IntegrityError: db.session.rollback() return None # email already taken by a different provider/local account else: user.display_name = display_name or user.display_name user.avatar_url = avatar_url or user.avatar_url user.last_login_at = now db.session.commit() return user def _dispatch_confirmation(user: User) -> None: """ Log the confirmation URL to container logs. Replace the body of this function with a real mail call when ready. """ confirm_url = url_for("auth.confirm_email", token=user.email_confirm_token, _external=True) current_app.logger.warning( "EMAIL CONFIRMATION — %s — open this URL to confirm: %s", user.email, confirm_url, ) # --------------------------------------------------------------------------- # Local login / register # --------------------------------------------------------------------------- @auth_bp.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("dashboard.index")) if request.method == "POST": email = request.form.get("email", "").strip().lower() password = request.form.get("password", "") user = db.session.scalar( db.select(User).filter_by(email=email, provider="local") ) if user is None or not user.check_password(password): flash(_("Invalid email or password."), "error") return render_template("auth/login.html", prefill_email=email) if current_app.config["EMAIL_CONFIRMATION_REQUIRED"] and not user.email_confirmed: flash(_("Please confirm your email address before logging in."), "error") return render_template("auth/login.html", prefill_email=email, show_resend=True, resend_email=email) user.last_login_at = datetime.now(timezone.utc) db.session.commit() _login(user) return redirect(_safe_next()) return render_template("auth/login.html") @auth_bp.route("/register", methods=["GET", "POST"]) def register(): if current_user.is_authenticated: return redirect(url_for("dashboard.index")) if request.method == "POST": email = request.form.get("email", "").strip().lower() password = request.form.get("password", "") confirm = request.form.get("confirm_password", "") # Validate error = None if not email or "@" not in email or "." not in email.split("@")[-1]: error = _("Please enter a valid email address.") elif len(password) < 8: error = _("Password must be at least 8 characters.") elif password != confirm: error = _("Passwords do not match.") else: existing = db.session.scalar(db.select(User).filter_by(email=email)) if existing: if existing.provider == "local": error = _("An account with this email already exists.") else: error = _( "This email is linked to a %(provider)s account. " "Please log in with %(provider2)s.", provider=existing.provider.title(), provider2=existing.provider.title(), ) if error: flash(error, "error") return render_template("auth/register.html", prefill_email=email) needs_confirmation = current_app.config["EMAIL_CONFIRMATION_REQUIRED"] now = datetime.now(timezone.utc) user = User( email=email, provider="local", provider_id=email, display_name=email.split("@")[0], email_confirmed=not needs_confirmation, email_confirm_token=secrets.token_urlsafe(32) if needs_confirmation else None, created_at=now, ) user.set_password(password) db.session.add(user) db.session.commit() if needs_confirmation: _dispatch_confirmation(user) return render_template("auth/confirm_pending.html", email=email) _login(user) flash(_("Account created! Welcome."), "success") return redirect(url_for("dashboard.index")) return render_template("auth/register.html") @auth_bp.route("/confirm/") def confirm_email(token: str): user = db.session.scalar( db.select(User).filter_by(email_confirm_token=token) ) if user is None: flash(_("Invalid or expired confirmation link."), "error") return redirect(url_for("auth.login")) user.email_confirmed = True user.email_confirm_token = None db.session.commit() _login(user) flash(_("Email confirmed! Welcome."), "success") return redirect(url_for("dashboard.index")) @auth_bp.route("/resend-confirmation", methods=["POST"]) def resend_confirmation(): email = request.form.get("email", "").strip().lower() user = db.session.scalar( db.select(User).filter_by(email=email, provider="local") ) if user and not user.email_confirmed: if not user.email_confirm_token: user.email_confirm_token = secrets.token_urlsafe(32) db.session.commit() _dispatch_confirmation(user) # Vague message to prevent email enumeration flash(_("If that account exists and is unconfirmed, a new link has been sent."), "message") return redirect(url_for("auth.login")) # --------------------------------------------------------------------------- # OAuth — Google # --------------------------------------------------------------------------- @auth_bp.route("/login/google") def login_google(): return oauth.google.authorize_redirect( url_for("auth.callback_google", _external=True) ) @auth_bp.route("/callback/google") def callback_google(): try: token = oauth.google.authorize_access_token() except Exception: flash(_("Google login failed. Please try again."), "error") return redirect(url_for("auth.login")) info = token.get("userinfo") or {} email = info.get("email") if not email: flash(_("Could not retrieve your email from Google."), "error") return redirect(url_for("auth.login")) user = _upsert_oauth_user( provider="google", provider_id=info["sub"], email=email, display_name=info.get("name"), avatar_url=info.get("picture"), ) if user is None: flash(_("This email is already registered with a different login method."), "error") return redirect(url_for("auth.login")) _login(user) return redirect(_safe_next()) # --------------------------------------------------------------------------- # OAuth — GitHub # --------------------------------------------------------------------------- @auth_bp.route("/login/github") def login_github(): return oauth.github.authorize_redirect( url_for("auth.callback_github", _external=True) ) @auth_bp.route("/callback/github") def callback_github(): try: token = oauth.github.authorize_access_token() except Exception: flash(_("GitHub login failed. Please try again."), "error") return redirect(url_for("auth.login")) resp = oauth.github.get("user", token=token) info = resp.json() email = info.get("email") if not email: emails_resp = oauth.github.get("user/emails", token=token) emails = emails_resp.json() if emails_resp.status_code == 200 else [] email = next( (e["email"] for e in emails if e.get("primary") and e.get("verified")), None, ) if not email: flash(_("Could not retrieve a verified email from GitHub."), "error") return redirect(url_for("auth.login")) user = _upsert_oauth_user( provider="github", provider_id=str(info["id"]), email=email, display_name=info.get("name") or info.get("login"), avatar_url=info.get("avatar_url"), ) if user is None: flash(_("This email is already registered with a different login method."), "error") return redirect(url_for("auth.login")) _login(user) return redirect(_safe_next()) # --------------------------------------------------------------------------- # Profile # --------------------------------------------------------------------------- @auth_bp.route("/profile", methods=["GET", "POST"]) @login_required def profile(): if request.method == "POST": action = request.form.get("action") if action == "update_profile": display_name = request.form.get("display_name", "").strip() if not display_name: flash(_("Display name cannot be empty."), "error") else: current_user.display_name = display_name current_user.bio = request.form.get("bio", "").strip() or None current_user.show_equipment_public = bool(request.form.get("show_equipment_public")) avatar_file = request.files.get("avatar") if avatar_file and avatar_file.filename: from storage import save_avatar try: old_path = current_user.avatar_path current_user.avatar_path = save_avatar(current_user.id, avatar_file) if old_path: _remove_avatar_file(old_path) except ValueError as e: flash(str(e), "error") db.session.rollback() return render_template("auth/profile.html") db.session.commit() flash(_("Profile updated."), "success") elif action == "change_password": if current_user.provider != "local": flash(_("Password change is only available for local accounts."), "error") else: current_pw = request.form.get("current_password", "") new_pw = request.form.get("new_password", "") confirm_pw = request.form.get("confirm_password", "") if not current_user.check_password(current_pw): flash(_("Current password is incorrect."), "error") elif len(new_pw) < 8: flash(_("New password must be at least 8 characters."), "error") elif new_pw != confirm_pw: flash(_("Passwords do not match."), "error") else: current_user.set_password(new_pw) db.session.commit() flash(_("Password changed."), "success") return redirect(url_for("auth.profile")) return render_template("auth/profile.html") @auth_bp.route("/avatar/") def avatar(user_id: int): user = db.session.get(User, user_id) if not user or not user.avatar_path: abort(404) storage_root = current_app.config["STORAGE_ROOT"] rel = user.avatar_path.removeprefix("avatars/") return send_from_directory(Path(storage_root) / "avatars", rel) def _remove_avatar_file(avatar_path: str) -> None: try: storage_root = current_app.config["STORAGE_ROOT"] (Path(storage_root) / avatar_path).unlink(missing_ok=True) except Exception: pass # --------------------------------------------------------------------------- # Public profile # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Logout # --------------------------------------------------------------------------- @auth_bp.route("/logout", methods=["POST"]) def logout(): logout_user() return redirect(url_for("index"))