Files
ShooterHub/blueprints/auth.py
2026-03-19 16:42:37 +01:00

386 lines
13 KiB
Python

import secrets
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
from flask import (
Blueprint,
abort,
current_app,
flash,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
from flask_babel import _
from flask_login import current_user, login_required, login_user, logout_user
from sqlalchemy.exc import IntegrityError
from extensions import db, oauth
from models import User
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _safe_next() -> str:
target = request.args.get("next") or ""
if target and urlparse(target).netloc == "":
return target
return url_for("dashboard.index")
def _upsert_oauth_user(*, provider: str, provider_id: str, email: str,
display_name: str | None, avatar_url: str | None) -> User | None:
"""Find-or-create a user for an OAuth login. Returns None on email conflict."""
user = db.session.scalar(
db.select(User).filter_by(provider=provider, provider_id=provider_id)
)
now = datetime.now(timezone.utc)
if user is None:
user = User(
email=email,
provider=provider,
provider_id=provider_id,
display_name=display_name,
avatar_url=avatar_url,
email_confirmed=True,
created_at=now,
)
db.session.add(user)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
return None # email already taken by a different provider/local account
else:
user.display_name = display_name or user.display_name
user.avatar_url = avatar_url or user.avatar_url
user.last_login_at = now
db.session.commit()
return user
def _dispatch_confirmation(user: User) -> None:
"""
Log the confirmation URL to container logs.
Replace the body of this function with a real mail call when ready.
"""
confirm_url = url_for("auth.confirm_email", token=user.email_confirm_token, _external=True)
current_app.logger.warning(
"EMAIL CONFIRMATION — %s — open this URL to confirm: %s",
user.email,
confirm_url,
)
# ---------------------------------------------------------------------------
# Local login / register
# ---------------------------------------------------------------------------
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("dashboard.index"))
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
user = db.session.scalar(
db.select(User).filter_by(email=email, provider="local")
)
if user is None or not user.check_password(password):
flash(_("Invalid email or password."), "error")
return render_template("auth/login.html", prefill_email=email)
if current_app.config["EMAIL_CONFIRMATION_REQUIRED"] and not user.email_confirmed:
flash(_("Please confirm your email address before logging in."), "error")
return render_template("auth/login.html", prefill_email=email,
show_resend=True, resend_email=email)
user.last_login_at = datetime.now(timezone.utc)
db.session.commit()
login_user(user)
return redirect(_safe_next())
return render_template("auth/login.html")
@auth_bp.route("/register", methods=["GET", "POST"])
def register():
if current_user.is_authenticated:
return redirect(url_for("dashboard.index"))
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
confirm = request.form.get("confirm_password", "")
# Validate
error = None
if not email or "@" not in email or "." not in email.split("@")[-1]:
error = _("Please enter a valid email address.")
elif len(password) < 8:
error = _("Password must be at least 8 characters.")
elif password != confirm:
error = _("Passwords do not match.")
else:
existing = db.session.scalar(db.select(User).filter_by(email=email))
if existing:
if existing.provider == "local":
error = _("An account with this email already exists.")
else:
error = _(
"This email is linked to a %(provider)s account. "
"Please log in with %(provider2)s.",
provider=existing.provider.title(),
provider2=existing.provider.title(),
)
if error:
flash(error, "error")
return render_template("auth/register.html", prefill_email=email)
needs_confirmation = current_app.config["EMAIL_CONFIRMATION_REQUIRED"]
now = datetime.now(timezone.utc)
user = User(
email=email,
provider="local",
provider_id=email,
display_name=email.split("@")[0],
email_confirmed=not needs_confirmation,
email_confirm_token=secrets.token_urlsafe(32) if needs_confirmation else None,
created_at=now,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
if needs_confirmation:
_dispatch_confirmation(user)
return render_template("auth/confirm_pending.html", email=email)
login_user(user)
flash(_("Account created! Welcome."), "success")
return redirect(url_for("dashboard.index"))
return render_template("auth/register.html")
@auth_bp.route("/confirm/<token>")
def confirm_email(token: str):
user = db.session.scalar(
db.select(User).filter_by(email_confirm_token=token)
)
if user is None:
flash(_("Invalid or expired confirmation link."), "error")
return redirect(url_for("auth.login"))
user.email_confirmed = True
user.email_confirm_token = None
db.session.commit()
login_user(user)
flash(_("Email confirmed! Welcome."), "success")
return redirect(url_for("dashboard.index"))
@auth_bp.route("/resend-confirmation", methods=["POST"])
def resend_confirmation():
email = request.form.get("email", "").strip().lower()
user = db.session.scalar(
db.select(User).filter_by(email=email, provider="local")
)
if user and not user.email_confirmed:
if not user.email_confirm_token:
user.email_confirm_token = secrets.token_urlsafe(32)
db.session.commit()
_dispatch_confirmation(user)
# Vague message to prevent email enumeration
flash(_("If that account exists and is unconfirmed, a new link has been sent."), "message")
return redirect(url_for("auth.login"))
# ---------------------------------------------------------------------------
# OAuth — Google
# ---------------------------------------------------------------------------
@auth_bp.route("/login/google")
def login_google():
return oauth.google.authorize_redirect(
url_for("auth.callback_google", _external=True)
)
@auth_bp.route("/callback/google")
def callback_google():
try:
token = oauth.google.authorize_access_token()
except Exception:
flash(_("Google login failed. Please try again."), "error")
return redirect(url_for("auth.login"))
info = token.get("userinfo") or {}
email = info.get("email")
if not email:
flash(_("Could not retrieve your email from Google."), "error")
return redirect(url_for("auth.login"))
user = _upsert_oauth_user(
provider="google",
provider_id=info["sub"],
email=email,
display_name=info.get("name"),
avatar_url=info.get("picture"),
)
if user is None:
flash(_("This email is already registered with a different login method."), "error")
return redirect(url_for("auth.login"))
login_user(user)
return redirect(_safe_next())
# ---------------------------------------------------------------------------
# OAuth — GitHub
# ---------------------------------------------------------------------------
@auth_bp.route("/login/github")
def login_github():
return oauth.github.authorize_redirect(
url_for("auth.callback_github", _external=True)
)
@auth_bp.route("/callback/github")
def callback_github():
try:
token = oauth.github.authorize_access_token()
except Exception:
flash(_("GitHub login failed. Please try again."), "error")
return redirect(url_for("auth.login"))
resp = oauth.github.get("user", token=token)
info = resp.json()
email = info.get("email")
if not email:
emails_resp = oauth.github.get("user/emails", token=token)
emails = emails_resp.json() if emails_resp.status_code == 200 else []
email = next(
(e["email"] for e in emails if e.get("primary") and e.get("verified")),
None,
)
if not email:
flash(_("Could not retrieve a verified email from GitHub."), "error")
return redirect(url_for("auth.login"))
user = _upsert_oauth_user(
provider="github",
provider_id=str(info["id"]),
email=email,
display_name=info.get("name") or info.get("login"),
avatar_url=info.get("avatar_url"),
)
if user is None:
flash(_("This email is already registered with a different login method."), "error")
return redirect(url_for("auth.login"))
login_user(user)
return redirect(_safe_next())
# ---------------------------------------------------------------------------
# Profile
# ---------------------------------------------------------------------------
@auth_bp.route("/profile", methods=["GET", "POST"])
@login_required
def profile():
if request.method == "POST":
action = request.form.get("action")
if action == "update_profile":
display_name = request.form.get("display_name", "").strip()
if not display_name:
flash(_("Display name cannot be empty."), "error")
else:
current_user.display_name = display_name
current_user.bio = request.form.get("bio", "").strip() or None
current_user.show_equipment_public = bool(request.form.get("show_equipment_public"))
avatar_file = request.files.get("avatar")
if avatar_file and avatar_file.filename:
from storage import save_avatar
try:
old_path = current_user.avatar_path
current_user.avatar_path = save_avatar(current_user.id, avatar_file)
if old_path:
_remove_avatar_file(old_path)
except ValueError as e:
flash(str(e), "error")
db.session.rollback()
return render_template("auth/profile.html")
db.session.commit()
flash(_("Profile updated."), "success")
elif action == "change_password":
if current_user.provider != "local":
flash(_("Password change is only available for local accounts."), "error")
else:
current_pw = request.form.get("current_password", "")
new_pw = request.form.get("new_password", "")
confirm_pw = request.form.get("confirm_password", "")
if not current_user.check_password(current_pw):
flash(_("Current password is incorrect."), "error")
elif len(new_pw) < 8:
flash(_("New password must be at least 8 characters."), "error")
elif new_pw != confirm_pw:
flash(_("Passwords do not match."), "error")
else:
current_user.set_password(new_pw)
db.session.commit()
flash(_("Password changed."), "success")
return redirect(url_for("auth.profile"))
return render_template("auth/profile.html")
@auth_bp.route("/avatar/<int:user_id>")
def avatar(user_id: int):
user = db.session.get(User, user_id)
if not user or not user.avatar_path:
abort(404)
storage_root = current_app.config["STORAGE_ROOT"]
rel = user.avatar_path.removeprefix("avatars/")
return send_from_directory(Path(storage_root) / "avatars", rel)
def _remove_avatar_file(avatar_path: str) -> None:
try:
storage_root = current_app.config["STORAGE_ROOT"]
(Path(storage_root) / avatar_path).unlink(missing_ok=True)
except Exception:
pass
# ---------------------------------------------------------------------------
# Public profile
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Logout
# ---------------------------------------------------------------------------
@auth_bp.route("/logout", methods=["POST"])
def logout():
logout_user()
return redirect(url_for("index"))