101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
from flask import Blueprint, abort, flash, redirect, render_template, request, url_for
|
|
from flask_babel import _
|
|
from flask_login import current_user, login_required
|
|
|
|
from extensions import db
|
|
from models import User
|
|
|
|
admin_bp = Blueprint("admin", __name__, url_prefix="/admin")
|
|
|
|
ROLES = ["user", "admin"]
|
|
|
|
|
|
def _require_admin():
|
|
if not current_user.is_authenticated or current_user.role != "admin":
|
|
abort(403)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@admin_bp.route("/")
|
|
@login_required
|
|
def index():
|
|
_require_admin()
|
|
users = db.session.scalars(
|
|
db.select(User).order_by(User.created_at.desc())
|
|
).all()
|
|
return render_template("admin/users.html", users=users, roles=ROLES)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Change role
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@admin_bp.route("/users/<int:user_id>/role", methods=["POST"])
|
|
@login_required
|
|
def change_role(user_id: int):
|
|
_require_admin()
|
|
user = db.session.get(User, user_id)
|
|
if user is None:
|
|
abort(404)
|
|
new_role = request.form.get("role", "user")
|
|
if new_role not in ROLES:
|
|
flash(_("Invalid role."), "error")
|
|
return redirect(url_for("admin.index"))
|
|
# Prevent removing the last admin
|
|
if user.role == "admin" and new_role != "admin":
|
|
admin_count = db.session.scalar(
|
|
db.select(db.func.count()).select_from(User).where(User.role == "admin")
|
|
)
|
|
if admin_count <= 1:
|
|
flash(_("Cannot remove the last admin."), "error")
|
|
return redirect(url_for("admin.index"))
|
|
user.role = new_role
|
|
db.session.commit()
|
|
flash(_("Role updated for %(email)s.", email=user.email), "success")
|
|
return redirect(url_for("admin.index"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reset password
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@admin_bp.route("/users/<int:user_id>/password", methods=["POST"])
|
|
@login_required
|
|
def reset_password(user_id: int):
|
|
_require_admin()
|
|
user = db.session.get(User, user_id)
|
|
if user is None:
|
|
abort(404)
|
|
new_pw = request.form.get("new_password", "").strip()
|
|
if len(new_pw) < 8:
|
|
flash(_("Password must be at least 8 characters."), "error")
|
|
return redirect(url_for("admin.index"))
|
|
user.set_password(new_pw)
|
|
db.session.commit()
|
|
flash(_("Password reset for %(email)s.", email=user.email), "success")
|
|
return redirect(url_for("admin.index"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Delete user
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@admin_bp.route("/users/<int:user_id>/delete", methods=["POST"])
|
|
@login_required
|
|
def delete_user(user_id: int):
|
|
_require_admin()
|
|
if user_id == current_user.id:
|
|
flash(_("You cannot delete your own account."), "error")
|
|
return redirect(url_for("admin.index"))
|
|
user = db.session.get(User, user_id)
|
|
if user is None:
|
|
abort(404)
|
|
email = user.email
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(_("User %(email)s deleted.", email=email), "success")
|
|
return redirect(url_for("admin.index"))
|