from typing import Optional, Dict, Any, Union import logging from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.db.session import get_db from app.core.security import decode_token, DEFAULT_RANK_MAP from app.models.identity import User, UserRole from app.core.config import settings logger = logging.getLogger(__name__) # Az OAuth2 folyamat a bejelentkezési végponton keresztül reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login" ) async def get_current_token_payload( token: str = Depends(reusable_oauth2) ) -> Dict[str, Any]: """ JWT token visszafejtése és a típus (access) ellenőrzése. """ # Dev bypass (ha esetleg fejlesztéshez használtad korábban, itt a helye, # de élesben a token validáció fut le) if settings.DEBUG and token == "dev_bypass_active": return { "sub": "1", "role": "superadmin", "rank": 100, "scope_level": "global", "scope_id": "all", "type": "access" } payload = decode_token(token) if not payload or payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Érvénytelen vagy lejárt munkamenet." ) return payload async def get_current_user( db: AsyncSession = Depends(get_db), payload: Dict = Depends(get_current_token_payload) ) -> User: """ Lekéri a felhasználót a token 'sub' mezője alapján. """ user_id = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token azonosítási hiba." ) result = await db.execute(select(User).where(User.id == int(user_id))) user = result.scalar_one_or_none() if not user or user.is_deleted: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="A felhasználó nem található." ) return user async def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """ Ellenőrzi, hogy a felhasználó aktív-e. Ez elengedhetetlen az Admin felület és a védett végpontok számára. """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="A művelethez aktív profil és KYC azonosítás (Step 2) szükséges." ) return current_user async def check_resource_access( resource_scope_id: Union[str, int], current_user: User = Depends(get_current_user) ): """ Scoped RBAC: Megakadályozza, hogy egy felhasználó más valaki erőforrásaihoz nyúljon. Kezeli az ID-t (int) és a Scope ID-t / Slug-ot (str) is. """ if current_user.role == UserRole.superadmin: return True # Ha a usernek van beállított scope_id-ja (pl. egy flottához tartozik), # akkor ellenőrizzük, hogy a kért erőforrás abba a scope-ba tartozik-e. user_scope = current_user.scope_id requested_scope = str(resource_scope_id) # 1. Saját erőforrás (saját ID) if str(current_user.id) == requested_scope: return True # 2. Scope alapú hozzáférés (pl. flotta tagja) if user_scope and user_scope == requested_scope: return True raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Nincs jogosultsága ehhez az erőforráshoz." ) def check_min_rank(role_key: str): """ Dinamikus Rank ellenőrzés. Az adatbázisból (system_parameters) kéri le az elvárt szintet. """ async def rank_checker( db: AsyncSession = Depends(get_db), payload: Dict = Depends(get_current_token_payload) ): # A settings.get_db_setting-et használjuk a dinamikus lekéréshez ranks = await settings.get_db_setting( db, "rbac_rank_matrix", default=DEFAULT_RANK_MAP ) required_rank = ranks.get(role_key, 0) user_rank = payload.get("rank", 0) if user_rank < required_rank: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Alacsony jogosultsági szint. (Szükséges: {required_rank})" ) return True return rank_checker