Files
service-finder/backend/app/api/deps.py

136 lines
4.4 KiB
Python
Executable File

from typing import Optional, Dict, Any, Union
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db.session import get_db
from app.core.security import decode_token, DEFAULT_RANK_MAP
from app.models.identity import User, UserRole
from app.core.config import settings
logger = logging.getLogger(__name__)
# Az OAuth2 folyamat a bejelentkezési végponton keresztül
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
async def get_current_token_payload(
token: str = Depends(reusable_oauth2)
) -> Dict[str, Any]:
"""
JWT token visszafejtése és a típus (access) ellenőrzése.
"""
# Dev bypass (ha esetleg fejlesztéshez használtad korábban, itt a helye,
# de élesben a token validáció fut le)
if settings.DEBUG and token == "dev_bypass_active":
return {
"sub": "1",
"role": "superadmin",
"rank": 100,
"scope_level": "global",
"scope_id": "all",
"type": "access"
}
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Érvénytelen vagy lejárt munkamenet."
)
return payload
async def get_current_user(
db: AsyncSession = Depends(get_db),
payload: Dict = Depends(get_current_token_payload)
) -> User:
"""
Lekéri a felhasználót a token 'sub' mezője alapján.
"""
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token azonosítási hiba."
)
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if not user or user.is_deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A felhasználó nem található."
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Ellenőrzi, hogy a felhasználó aktív-e.
Ez elengedhetetlen az Admin felület és a védett végpontok számára.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="A művelethez aktív profil és KYC azonosítás (Step 2) szükséges."
)
return current_user
async def check_resource_access(
resource_scope_id: Union[str, int],
current_user: User = Depends(get_current_user)
):
"""
Scoped RBAC: Megakadályozza, hogy egy felhasználó más valaki erőforrásaihoz nyúljon.
Kezeli az ID-t (int) és a Scope ID-t / Slug-ot (str) is.
"""
if current_user.role == UserRole.superadmin:
return True
# Ha a usernek van beállított scope_id-ja (pl. egy flottához tartozik),
# akkor ellenőrizzük, hogy a kért erőforrás abba a scope-ba tartozik-e.
user_scope = current_user.scope_id
requested_scope = str(resource_scope_id)
# 1. Saját erőforrás (saját ID)
if str(current_user.id) == requested_scope:
return True
# 2. Scope alapú hozzáférés (pl. flotta tagja)
if user_scope and user_scope == requested_scope:
return True
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nincs jogosultsága ehhez az erőforráshoz."
)
def check_min_rank(role_key: str):
"""
Dinamikus Rank ellenőrzés.
Az adatbázisból (system_parameters) kéri le az elvárt szintet.
"""
async def rank_checker(
db: AsyncSession = Depends(get_db),
payload: Dict = Depends(get_current_token_payload)
):
# A settings.get_db_setting-et használjuk a dinamikus lekéréshez
ranks = await settings.get_db_setting(
db, "rbac_rank_matrix", default=DEFAULT_RANK_MAP
)
required_rank = ranks.get(role_key, 0)
user_rank = payload.get("rank", 0)
if user_rank < required_rank:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Alacsony jogosultsági szint. (Szükséges: {required_rank})"
)
return True
return rank_checker