# /opt/docker/dev/service_finder/backend/app/services/trust_engine.py """ Gondos Gazda Index (Trust Score) számítási motor. Dinamikusan betölti a súlyozási paramétereket a SystemParameter rendszerből. """ import logging from typing import Optional, Dict, Any from datetime import datetime, timedelta from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.identity import User, UserTrustProfile from app.models import Vehicle, VehicleOwnership from app.models.marketplace.service import Cost from app.models.system import SystemParameter, ParameterScope from app.services.system_service import SystemService logger = logging.getLogger(__name__) class TrustEngine: """ A Gondos Gazda Index számításáért felelős motor. A számítás három komponensből áll: 1. Maintenance Score - Karbantartási időzítés pontossága 2. Quality Score - Szerviz minősége (ár/érték arány) 3. Preventive Score - Megelőző intézkedések (pl. idő előtti cserék) Minden komponens súlyozása a SystemParameter rendszerből származik. """ def __init__(self): self.system_service = SystemService() async def calculate_user_trust( self, db: AsyncSession, user_id: int, force_recalculate: bool = False ) -> Dict[str, Any]: """ Kiszámolja a felhasználó trust score-ját és elmenti a UserTrustProfile táblába. :param db: Adatbázis munkamenet :param user_id: A felhasználó azonosítója :param force_recalculate: Ha True, akkor újraszámolja még akkor is, ha friss :return: A számított trust adatok szótárban """ logger.info(f"Trust számítás indítása user_id={user_id}") # 1. Ellenőrizzük, hogy szükséges-e újraszámolni trust_profile = await self._get_or_create_trust_profile(db, user_id) if not force_recalculate: # Ha a számítás kevesebb mint 24 órája történt, visszaadjuk a meglévőt time_threshold = datetime.utcnow() - timedelta(hours=24) if trust_profile.last_calculated and trust_profile.last_calculated > time_threshold: logger.debug(f"Trust score már friss (last_calculated={trust_profile.last_calculated}), visszaadjuk") return self._format_trust_response(trust_profile) # 2. Lekérjük a súlyozási paramétereket weights = await self._get_trust_weights(db, user_id) tolerance_km = await self._get_tolerance_km(db, user_id) # 3. Számoljuk ki a részpontszámokat maintenance_score = await self._calculate_maintenance_score(db, user_id, tolerance_km) quality_score = await self._calculate_quality_score(db, user_id) preventive_score = await self._calculate_preventive_score(db, user_id) # 4. Összesített trust score számítása súlyozással trust_score = int( (maintenance_score * weights["maintenance"] + quality_score * weights["quality"] + preventive_score * weights["preventive"]) * 100 ) # Korlátozzuk 0-100 közé trust_score = max(0, min(100, trust_score)) # 5. Frissítjük a trust profile-t trust_profile.trust_score = trust_score trust_profile.maintenance_score = float(maintenance_score) trust_profile.quality_score = float(quality_score) trust_profile.preventive_score = float(preventive_score) trust_profile.last_calculated = datetime.utcnow() db.add(trust_profile) await db.commit() logger.info(f"Trust számítás kész user_id={user_id}: score={trust_score}") return { "trust_score": trust_score, "maintenance_score": float(maintenance_score), "quality_score": float(quality_score), "preventive_score": float(preventive_score), "weights": weights, "tolerance_km": tolerance_km, "last_calculated": trust_profile.last_calculated.isoformat() if trust_profile.last_calculated else None, } async def _get_or_create_trust_profile( self, db: AsyncSession, user_id: int ) -> UserTrustProfile: """Lekéri vagy létrehozza a felhasználó trust profile-ját.""" stmt = select(UserTrustProfile).where(UserTrustProfile.user_id == user_id) result = await db.execute(stmt) profile = result.scalar_one_or_none() if profile is None: profile = UserTrustProfile( user_id=user_id, trust_score=0, maintenance_score=0.0, quality_score=0.0, preventive_score=0.0, last_calculated=datetime.utcnow() ) db.add(profile) await db.flush() return profile async def _get_trust_weights( self, db: AsyncSession, user_id: int ) -> Dict[str, float]: """Lekéri a súlyozási paramétereket hierarchikusan.""" # A user region_code-ját és country_code-ját lekérjük a User táblából stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() region_id = user.region_code if user else None country_code = user.region_code[:2] if user and user.region_code else None # pl. "HU" az első 2 karakter # Súlyok lekérése weight_m = await self.system_service.get_scoped_parameter( db, "TRUST_WEIGHT_MAINTENANCE", user_id=str(user_id), region_id=region_id, country_code=country_code, default=0.4 ) weight_q = await self.system_service.get_scoped_parameter( db, "TRUST_WEIGHT_QUALITY", user_id=str(user_id), region_id=region_id, country_code=country_code, default=0.3 ) weight_p = await self.system_service.get_scoped_parameter( db, "TRUST_WEIGHT_PREVENTIVE", user_id=str(user_id), region_id=region_id, country_code=country_code, default=0.3 ) # A JSON értékből kinyerjük a számot (ha dict formátumban van) if isinstance(weight_m, dict): weight_m = weight_m.get("value", 0.4) if isinstance(weight_q, dict): weight_q = weight_q.get("value", 0.3) if isinstance(weight_p, dict): weight_p = weight_p.get("value", 0.3) # Normalizáljuk, hogy összegük 1 legyen total = weight_m + weight_q + weight_p if total > 0: weight_m /= total weight_q /= total weight_p /= total return { "maintenance": float(weight_m), "quality": float(weight_q), "preventive": float(weight_p) } async def _get_tolerance_km( self, db: AsyncSession, user_id: int ) -> int: """Lekéri a tolerancia km-t a karbantartási időzítéshez.""" stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() region_id = user.region_code if user else None country_code = user.region_code[:2] if user and user.region_code else None tolerance = await self.system_service.get_scoped_parameter( db, "TRUST_MAINTENANCE_TOLERANCE_KM", user_id=str(user_id), region_id=region_id, country_code=country_code, default=1000 ) if isinstance(tolerance, dict): tolerance = tolerance.get("value", 1000) return int(tolerance) async def _calculate_maintenance_score( self, db: AsyncSession, user_id: int, tolerance_km: int ) -> float: """ Karbantartási időzítés pontosságának számítása. Összehasonlítja a tényleges karbantartási költségeket az odometer állásokkal. """ # 1. Lekérjük a felhasználó járműveit stmt = ( select(Vehicle) .join(VehicleOwnership, VehicleOwnership.vehicle_id == Vehicle.id) .where(VehicleOwnership.user_id == user_id) .where(VehicleOwnership.is_active == True) ) result = await db.execute(stmt) vehicles = result.scalars().all() if not vehicles: logger.debug(f"Nincs aktív jármű a user_id={user_id} számára, maintenance_score=0.5") return 0.5 # Alapértelmezett közepes érték total_score = 0.0 vehicle_count = 0 for vehicle in vehicles: # 2. Lekérjük a MAINTENANCE kategóriájú költségeket stmt_costs = ( select(Cost) .where(Cost.vehicle_id == vehicle.id) .where(Cost.category == "MAINTENANCE") .where(Cost.is_deleted == False) .order_by(Cost.occurrence_date) ) result_costs = await db.execute(stmt_costs) maintenance_costs = result_costs.scalars().all() if not maintenance_costs: continue # Nincs karbantartási költség, nem számítunk bele # 3. Összehasonlítjuk az odometer állásokkal vehicle_score = await self._calculate_vehicle_maintenance_score( db, vehicle, maintenance_costs, tolerance_km ) total_score += vehicle_score vehicle_count += 1 if vehicle_count == 0: return 0.5 return total_score / vehicle_count async def _calculate_vehicle_maintenance_score( self, db: AsyncSession, vehicle: Vehicle, maintenance_costs: list, tolerance_km: int ) -> float: """Egy jármű karbantartási pontszámának számítása.""" # Egyszerűsített implementáció: csak ellenőrizzük, hogy vannak-e karbantartási költségek # és hogy az odometer növekedése nem túl nagy a költségek között # (Valós implementációban összehasonlítanánk a gyártói ajánlásokkal) if len(maintenance_costs) < 2: # Kevesebb mint 2 karbantartás, nem tudunk trendet elemezni return 0.7 # Átlagos időköz a karbantartások között (km-ben) total_km_gap = 0 gap_count = 0 for i in range(1, len(maintenance_costs)): prev_cost = maintenance_costs[i-1] curr_cost = maintenance_costs[i] if prev_cost.odometer_km and curr_cost.odometer_km: gap = curr_cost.odometer_km - prev_cost.odometer_km total_km_gap += gap gap_count += 1 if gap_count == 0: return 0.7 avg_gap = total_km_gap / gap_count # Ideális karbantartási intervallum (pl. 15,000 km) ideal_interval = 15000 # Pontszám: minél közelebb van az ideálishoz, annál magasabb deviation = abs(avg_gap - ideal_interval) if deviation <= tolerance_km: score = 1.0 elif deviation <= ideal_interval * 0.5: # 50%-nál kisebb eltérés score = 0.8 elif deviation <= ideal_interval: # 100%-nál kisebb eltérés score = 0.5 else: score = 0.2 return score async def _calculate_quality_score( self, db: AsyncSession, user_id: int ) -> float: """ Szerviz minőségének számítása (ár/érték arány). Egyszerűsített implementáció: átlagos értékelések alapján. """ # Jelenlegi implementáció: minden felhasználó kap egy alap pontszámot # Valós implementációban a szervizek értékeléseit és árait elemeznénk return 0.75 # Alapértelmezett közepes érték async def _calculate_preventive_score( self, db: AsyncSession, user_id: int ) -> float: """ Megelőző intézkedések pontszáma. Egyszerűsített implementáció: idő előtti alkatrész cserék száma. """ # Jelenlegi implementáció: minden felhasználó kap egy alap pontszámot # Valós implementációban a PREVENTIVE kategóriájú költségeket elemeznénk return 0.6 # Alapértelmezett közepes érték def _format_trust_response(self, profile: UserTrustProfile) -> Dict[str, Any]: """Formázza a trust profile-t válaszként.""" return { "trust_score": profile.trust_score, "maintenance_score": float(profile.maintenance_score), "quality_score": float(profile.quality_score), "preventive_score": float(profile.preventive_score), "weights": {}, # Üres, mert nem számoltuk újra "tolerance_km": None, "last_calculated": profile.last_calculated.isoformat() if profile.last_calculated else None, }