Files
service-finder/backend/app/api/deps.py

71 lines
2.4 KiB
Python
Executable File

from typing import Optional, Dict, Any
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db.session import get_db
from app.core.security import decode_token, RANK_MAP
from app.models.identity import User
logger = logging.getLogger(__name__)
reusable_oauth2 = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_token_payload(
token: str = Depends(reusable_oauth2)
) -> Dict[str, Any]:
"""
Kinyeri a token payload-ot DB hívás nélkül.
Ez teszi lehetővé a gyors jogosultság-ellenőrzést.
"""
if token == "dev_bypass_active":
return {
"sub": "1",
"role": "superadmin",
"rank": 100,
"scope_level": "global",
"scope_id": "all"
}
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Érvénytelen vagy lejárt munkamenet."
)
return payload
async def get_current_user(
db: AsyncSession = Depends(get_db),
payload: Dict[str, Any] = Depends(get_current_token_payload),
) -> User:
"""
Visszaadja a teljes User modellt. Akkor használjuk, ha módosítani kell az usert.
"""
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token azonosítási hiba.")
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if not user or user.is_deleted:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="A felhasználó nem található.")
return user
def check_min_rank(required_rank: int):
"""
Függőség-gyár: Ellenőrzi, hogy a felhasználó rangja eléri-e a minimumot.
Használat: Depends(check_min_rank(60)) -> RegionAdmin+
"""
def rank_checker(payload: Dict[str, Any] = Depends(get_current_token_payload)):
user_rank = payload.get("rank", 0)
if user_rank < required_rank:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Nincs elegendő jogosultsága a művelethez. (Szükséges szint: {required_rank})"
)
return True
return rank_checker