Files
service-finder/backend/app/services/security_auditor.py
2026-03-22 11:02:05 +00:00

97 lines
3.4 KiB
Python

"""
Security Auditor Service - Anti-Cheat rendszer része.
Felelős a gyanús tevékenységek (pl. Rapid Fire validációk) észleléséért.
"""
import logging
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import select, and_, func
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import HTTPException
from app.services.config_service import ConfigService
from app.models.gamification.gamification import UserContribution
from app.models.system.system import ParameterScope
logger = logging.getLogger(__name__)
class SecurityAuditorService:
"""
Biztonsági audit szolgáltatás a Rapid Fire (gyorstüzelés) anomáliák
detektálására.
"""
@staticmethod
async def check_rapid_fire_validation(db: AsyncSession, user_id: int) -> None:
"""
Ellenőrzi, hogy a felhasználó túl sok validációt végzett-e rövid idő alatt.
Args:
db: Adatbázis munkamenet
user_id: Ellenőrizendő felhasználó azonosítója
Raises:
HTTPException(429): Ha a felhasználó túllépte a megengedett limitet.
"""
# 1. Dinamikus limit lekérése a konfigurációból
max_validations = await ConfigService.get_int(
db,
"ANTI_CHEAT_MAX_VALIDATIONS_PER_HOUR",
default=10,
scope_level=ParameterScope.GLOBAL,
scope_id=None
)
# 2. Az elmúlt 1 órában végzett validációk számának lekérdezése
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
stmt = select(func.count(UserContribution.id)).where(
and_(
UserContribution.user_id == user_id,
UserContribution.contribution_type == 'service_validation',
UserContribution.created_at >= one_hour_ago,
UserContribution.status == 'approved' # csak jóváhagyott validációk
)
)
result = await db.execute(stmt)
recent_count = result.scalar() or 0
logger.debug(
f"Rapid fire check for user {user_id}: {recent_count} validations "
f"in last hour (limit: {max_validations})"
)
# 3. Limit ellenőrzése
if recent_count >= max_validations:
# Opcionális: büntetőpont hozzáadása a GamificationService-en keresztül
# await GamificationService.add_penalty(db, user_id, reason="Rapid fire validation")
raise HTTPException(
status_code=429,
detail=(
f"Anti-Cheat: Túl sok művelet rövid idő alatt. "
f"Maximum {max_validations} validáció engedélyezett óránként. "
f"Kérjük, lassíts!"
)
)
@staticmethod
async def log_suspicious_activity(
db: AsyncSession,
user_id: int,
activity_type: str,
details: Optional[dict] = None
) -> None:
"""
Gyanús tevékenység naplózása a későbbi elemzéshez.
Jelenleg csak logol, de később beilleszthető egy audit táblába.
"""
logger.warning(
f"Suspicious activity detected: user={user_id}, "
f"type={activity_type}, details={details}"
)
# TODO: Beszúrás egy security_audit_log táblába, ha lesz ilyen