Files
service-finder/backend/app/api/v1/endpoints/security.py

173 lines
6.4 KiB
Python

# /opt/docker/dev/service_finder/backend/app/api/v1/endpoints/security.py
"""
Dual Control (Négy szem elv) API végpontok.
Kiemelt műveletek jóváhagyási folyamata.
"""
import logging
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db, get_current_user
from app.models.identity import User, UserRole
from app.services.security_service import security_service
from app.schemas.security import (
PendingActionCreate, PendingActionResponse, PendingActionApprove, PendingActionReject
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/request", response_model=PendingActionResponse, status_code=status.HTTP_201_CREATED)
async def request_action(
request: PendingActionCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Dual Control: Jóváhagyási kérelem indítása kiemelt művelethez.
Engedélyezett művelettípusok:
- CHANGE_ROLE: Felhasználó szerepkörének módosítása
- SET_VIP: VIP státusz beállítása
- WALLET_ADJUST: Pénztár egyenleg módosítása (nagy összeg)
- SOFT_DELETE_USER: Felhasználó soft delete
- ORGANIZATION_TRANSFER: Szervezet tulajdonjog átadása
"""
# Csak admin és superadmin kezdeményezhet kiemelt műveleteket
if current_user.role not in [UserRole.admin, UserRole.superadmin]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Csak adminisztrátorok kezdeményezhetnek Dual Control műveleteket."
)
try:
action = await security_service.request_action(
db, requester_id=current_user.id,
action_type=request.action_type,
payload=request.payload,
reason=request.reason
)
return PendingActionResponse.from_orm(action)
except Exception as e:
logger.error(f"Dual Control request error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Hiba a kérelem létrehozásakor: {str(e)}"
)
@router.get("/pending", response_model=List[PendingActionResponse])
async def list_pending_actions(
action_type: Optional[str] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Függőben lévő Dual Control műveletek listázása.
Admin és superadmin látja az összes függőben lévő műveletet.
Egyéb felhasználók csak a sajátjaikat láthatják.
"""
if current_user.role in [UserRole.admin, UserRole.superadmin]:
user_id = None
else:
user_id = current_user.id
actions = await security_service.get_pending_actions(db, user_id=user_id, action_type=action_type)
return [PendingActionResponse.from_orm(action) for action in actions]
@router.post("/approve/{action_id}", response_model=PendingActionResponse)
async def approve_action(
action_id: int,
approve_data: PendingActionApprove,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Dual Control: Művelet jóváhagyása.
Csak admin/superadmin hagyhat jóvá, és nem lehet a saját kérése.
"""
if current_user.role not in [UserRole.admin, UserRole.superadmin]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Csak adminisztrátorok hagyhatnak jóvá műveleteket."
)
try:
await security_service.approve_action(db, approver_id=current_user.id, action_id=action_id)
# Frissített művelet lekérdezése
from sqlalchemy import select
from app.models.security import PendingAction
stmt = select(PendingAction).where(PendingAction.id == action_id)
action = (await db.execute(stmt)).scalar_one()
return PendingActionResponse.from_orm(action)
except Exception as e:
logger.error(f"Dual Control approve error: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/reject/{action_id}", response_model=PendingActionResponse)
async def reject_action(
action_id: int,
reject_data: PendingActionReject,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Dual Control: Művelet elutasítása.
Csak admin/superadmin utasíthat el, és nem lehet a saját kérése.
"""
if current_user.role not in [UserRole.admin, UserRole.superadmin]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Csak adminisztrátorok utasíthatnak el műveleteket."
)
try:
await security_service.reject_action(
db, approver_id=current_user.id,
action_id=action_id, reason=reject_data.reason
)
# Frissített művelet lekérdezése
from sqlalchemy import select
from app.models.security import PendingAction
stmt = select(PendingAction).where(PendingAction.id == action_id)
action = (await db.execute(stmt)).scalar_one()
return PendingActionResponse.from_orm(action)
except Exception as e:
logger.error(f"Dual Control reject error: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.get("/{action_id}", response_model=PendingActionResponse)
async def get_action(
action_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Egy konkrét Dual Control művelet lekérdezése.
Csak a művelet létrehozója vagy admin/superadmin érheti el.
"""
from sqlalchemy import select
from app.models.security import PendingAction
stmt = select(PendingAction).where(PendingAction.id == action_id)
action = (await db.execute(stmt)).scalar_one_or_none()
if not action:
raise HTTPException(status_code=404, detail="Művelet nem található.")
if current_user.role not in [UserRole.admin, UserRole.superadmin] and action.requester_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nincs jogosultságod ehhez a művelethez."
)
return PendingActionResponse.from_orm(action)