441 lines
17 KiB
Python
441 lines
17 KiB
Python
# /opt/docker/dev/service_finder/backend/app/services/analytics_service.py
|
|
"""
|
|
TCO (Total Cost of Ownership) Analytics Service.
|
|
Számítások a vehicle.costs tábla alapján, árfolyam-átváltással a system_service segítségével.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
from sqlalchemy import select, func, and_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.models.vehicle import VehicleCost, CostCategory
|
|
from app.models import VehicleModelDefinition
|
|
from app.models.marketplace.organization import Organization
|
|
from app.services.system_service import SystemService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TCOAnalytics:
|
|
"""
|
|
TCO Analytics osztály 3 fő metódussal:
|
|
1. get_user_tco: Egy adott organization_id költségeinek összesítése
|
|
2. get_vehicle_lifetime_tco: Egy jármű összes tulajdonos költségének összesítése (anonimizálva)
|
|
3. get_global_benchmark: Egy modell (vehicle_model_id) átlagos költségeinek számítása
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.system_service = SystemService()
|
|
|
|
async def get_user_tco(
|
|
self,
|
|
db: AsyncSession,
|
|
organization_id: int,
|
|
currency_target: str = "HUF",
|
|
include_categories: Optional[List[str]] = None,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Egy adott szervezet (organization_id) összes költségének összesítése.
|
|
Átváltja a különböző valutákban lévő költségeket a célvalutára (currency_target).
|
|
|
|
:param db: Adatbázis munkamenet
|
|
:param organization_id: A szervezet azonosítója
|
|
:param currency_target: Célvaluta (pl. "HUF", "EUR")
|
|
:param include_categories: Szűrés költségkategóriákra (opcionális)
|
|
:param start_date: Kezdő dátum (ISO formátum, opcionális)
|
|
:param end_date: Végdátum (ISO formátum, opcionális)
|
|
:return: Szótár a következőkkel:
|
|
- total_amount: Összesített összeg a célvalutában
|
|
- total_transactions: Tranzakciók száma
|
|
- by_category: Kategóriánkénti bontás
|
|
- currency: A célvaluta
|
|
"""
|
|
# Alap lekérdezés: organization_id szűrés
|
|
stmt = select(
|
|
VehicleCost.amount,
|
|
VehicleCost.currency,
|
|
VehicleCost.category_id,
|
|
CostCategory.code,
|
|
CostCategory.name
|
|
).join(
|
|
CostCategory, VehicleCost.category_id == CostCategory.id
|
|
).where(
|
|
VehicleCost.organization_id == organization_id
|
|
)
|
|
|
|
# Dátum szűrés
|
|
if start_date:
|
|
stmt = stmt.where(VehicleCost.date >= start_date)
|
|
if end_date:
|
|
stmt = stmt.where(VehicleCost.date <= end_date)
|
|
|
|
# Kategória szűrés
|
|
if include_categories:
|
|
stmt = stmt.where(CostCategory.code.in_(include_categories))
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
# Árfolyamok lekérése a system_service-ből
|
|
exchange_rates = await self._get_exchange_rates(db, currency_target)
|
|
|
|
total_amount = 0.0
|
|
category_totals = {}
|
|
|
|
for row in rows:
|
|
amount = float(row.amount)
|
|
source_currency = row.currency
|
|
|
|
# Átváltás célvalutára
|
|
converted_amount = await self._convert_currency(
|
|
db, amount, source_currency, currency_target, exchange_rates
|
|
)
|
|
|
|
total_amount += converted_amount
|
|
|
|
# Kategória összesítés
|
|
category_code = row.code
|
|
if category_code not in category_totals:
|
|
category_totals[category_code] = {
|
|
"name": row.name,
|
|
"total": 0.0,
|
|
"count": 0
|
|
}
|
|
category_totals[category_code]["total"] += converted_amount
|
|
category_totals[category_code]["count"] += 1
|
|
|
|
return {
|
|
"organization_id": organization_id,
|
|
"total_amount": round(total_amount, 2),
|
|
"total_transactions": len(rows),
|
|
"currency": currency_target,
|
|
"by_category": category_totals,
|
|
"date_range": {
|
|
"start": start_date,
|
|
"end": end_date
|
|
}
|
|
}
|
|
|
|
async def get_vehicle_lifetime_tco(
|
|
self,
|
|
db: AsyncSession,
|
|
vehicle_model_id: int,
|
|
currency_target: str = "HUF",
|
|
anonymize: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Egy jármű (vehicle_model_id) összes tulajdonos általi költségének összesítése.
|
|
Alapértelmezetten anonimizálva (organization_id-k elrejtve).
|
|
|
|
:param db: Adatbázis munkamenet
|
|
:param vehicle_model_id: A járműmodell azonosítója
|
|
:param currency_target: Célvaluta (pl. "HUF", "EUR")
|
|
:param anonymize: Ha True, nem tartalmazza az organization_id-kat
|
|
:return: Szótár a következőkkel:
|
|
- vehicle_model_id: A járműmodell azonosítója
|
|
- total_lifetime_cost: Teljes élettartam költség a célvalutában
|
|
- total_owners: Különböző tulajdonosok száma
|
|
- average_cost_per_owner: Tulajdonosonkénti átlag
|
|
- by_owner: Tulajdonosonkénti bontás (ha anonymize=False)
|
|
- currency: A célvaluta
|
|
"""
|
|
# Összes költség lekérdezése a járműhöz
|
|
stmt = select(
|
|
VehicleCost.amount,
|
|
VehicleCost.currency,
|
|
VehicleCost.organization_id,
|
|
Organization.name.label("org_name")
|
|
).outerjoin(
|
|
Organization, VehicleCost.organization_id == Organization.id
|
|
).where(
|
|
VehicleCost.vehicle_id == vehicle_model_id
|
|
)
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
# Árfolyamok lekérése
|
|
exchange_rates = await self._get_exchange_rates(db, currency_target)
|
|
|
|
total_lifetime_cost = 0.0
|
|
owners = set()
|
|
owner_totals = {}
|
|
|
|
for row in rows:
|
|
amount = float(row.amount)
|
|
source_currency = row.currency
|
|
|
|
# Átváltás célvalutára
|
|
converted_amount = await self._convert_currency(
|
|
db, amount, source_currency, currency_target, exchange_rates
|
|
)
|
|
|
|
total_lifetime_cost += converted_amount
|
|
|
|
# Tulajdonos adatok
|
|
org_id = row.organization_id
|
|
if org_id:
|
|
owners.add(org_id)
|
|
|
|
if not anonymize:
|
|
if org_id not in owner_totals:
|
|
owner_totals[org_id] = {
|
|
"name": row.org_name,
|
|
"total": 0.0,
|
|
"count": 0
|
|
}
|
|
owner_totals[org_id]["total"] += converted_amount
|
|
owner_totals[org_id]["count"] += 1
|
|
|
|
total_owners = len(owners)
|
|
average_cost_per_owner = round(total_lifetime_cost / max(total_owners, 1), 2)
|
|
|
|
result_data = {
|
|
"vehicle_model_id": vehicle_model_id,
|
|
"total_lifetime_cost": round(total_lifetime_cost, 2),
|
|
"total_owners": total_owners,
|
|
"average_cost_per_owner": average_cost_per_owner,
|
|
"currency": currency_target,
|
|
"anonymized": anonymize,
|
|
}
|
|
|
|
if not anonymize:
|
|
result_data["by_owner"] = owner_totals
|
|
|
|
return result_data
|
|
|
|
async def get_global_benchmark(
|
|
self,
|
|
db: AsyncSession,
|
|
vehicle_model_id: Optional[int] = None,
|
|
make: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
fuel_type: Optional[str] = None,
|
|
currency_target: str = "HUF",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Egy modell (vehicle_model_id) vagy modellcsoport átlagos költségeinek számítása.
|
|
Ha vehicle_model_id nincs megadva, akkor make/model/fuel_type alapján csoportosít.
|
|
|
|
:param db: Adatbázis munkamenet
|
|
:param vehicle_model_id: Konkrét járműmodell azonosítója (opcionális)
|
|
:param make: Gyártó (opcionális)
|
|
:param model: Modell (opcionális)
|
|
:param fuel_type: Üzemanyag típus (opcionális)
|
|
:param currency_target: Célvaluta (pl. "HUF", "EUR")
|
|
:return: Szótár a következőkkel:
|
|
- benchmark_type: "specific_model" vagy "grouped"
|
|
- vehicle_count: Járművek száma a mintában
|
|
- total_cost_sum: Összes költség a célvalutában
|
|
- average_cost_per_vehicle: Járművenkénti átlag
|
|
- average_cost_per_km: Kilométerenkénti átlag (ha elérhető odometer adat)
|
|
- by_category: Kategóriánkénti átlagok
|
|
- currency: A célvaluta
|
|
"""
|
|
# Alap lekérdezés: vehicle és cost összekapcsolása
|
|
stmt = select(
|
|
VehicleCost.amount,
|
|
VehicleCost.currency,
|
|
VehicleCost.vehicle_id,
|
|
VehicleCost.odometer,
|
|
CostCategory.code,
|
|
VehicleModelDefinition.make,
|
|
VehicleModelDefinition.model,
|
|
VehicleModelDefinition.fuel_type
|
|
).join(
|
|
VehicleModelDefinition, VehicleCost.vehicle_id == VehicleModelDefinition.id
|
|
).join(
|
|
CostCategory, VehicleCost.category_id == CostCategory.id
|
|
)
|
|
|
|
# Szűrés
|
|
if vehicle_model_id:
|
|
stmt = stmt.where(VehicleCost.vehicle_id == vehicle_model_id)
|
|
benchmark_type = "specific_model"
|
|
else:
|
|
conditions = []
|
|
if make:
|
|
conditions.append(VehicleModelDefinition.make == make)
|
|
if model:
|
|
conditions.append(VehicleModelDefinition.model == model)
|
|
if fuel_type:
|
|
conditions.append(VehicleModelDefinition.fuel_type == fuel_type)
|
|
|
|
if conditions:
|
|
stmt = stmt.where(and_(*conditions))
|
|
|
|
benchmark_type = "grouped"
|
|
|
|
result = await db.execute(stmt)
|
|
rows = result.all()
|
|
|
|
if not rows:
|
|
return {
|
|
"benchmark_type": benchmark_type,
|
|
"vehicle_count": 0,
|
|
"total_cost_sum": 0.0,
|
|
"average_cost_per_vehicle": 0.0,
|
|
"average_cost_per_km": None,
|
|
"by_category": {},
|
|
"currency": currency_target,
|
|
"message": "No data found for the specified criteria"
|
|
}
|
|
|
|
# Árfolyamok
|
|
exchange_rates = await self._get_exchange_rates(db, currency_target)
|
|
|
|
total_cost_sum = 0.0
|
|
total_odometer_sum = 0
|
|
vehicle_ids = set()
|
|
category_totals = {}
|
|
category_counts = {}
|
|
|
|
for row in rows:
|
|
amount = float(row.amount)
|
|
source_currency = row.currency
|
|
|
|
# Átváltás
|
|
converted_amount = await self._convert_currency(
|
|
db, amount, source_currency, currency_target, exchange_rates
|
|
)
|
|
|
|
total_cost_sum += converted_amount
|
|
vehicle_ids.add(row.vehicle_id)
|
|
|
|
# Odometer összegzés (ha van)
|
|
if row.odometer:
|
|
total_odometer_sum += row.odometer
|
|
|
|
# Kategória összesítés
|
|
category_code = row.code
|
|
if category_code not in category_totals:
|
|
category_totals[category_code] = 0.0
|
|
category_counts[category_code] = 0
|
|
|
|
category_totals[category_code] += converted_amount
|
|
category_counts[category_code] += 1
|
|
|
|
vehicle_count = len(vehicle_ids)
|
|
average_cost_per_vehicle = round(total_cost_sum / vehicle_count, 2)
|
|
|
|
# Kilométerenkénti átlag számítása
|
|
average_cost_per_km = None
|
|
if total_odometer_sum > 0:
|
|
average_cost_per_km = round(total_cost_sum / total_odometer_sum, 4)
|
|
|
|
# Kategóriánkénti átlagok
|
|
category_averages = {}
|
|
for code, total in category_totals.items():
|
|
count = category_counts[code]
|
|
category_averages[code] = {
|
|
"total": round(total, 2),
|
|
"count": count,
|
|
"average": round(total / count, 2)
|
|
}
|
|
|
|
return {
|
|
"benchmark_type": benchmark_type,
|
|
"vehicle_count": vehicle_count,
|
|
"total_cost_sum": round(total_cost_sum, 2),
|
|
"average_cost_per_vehicle": average_cost_per_vehicle,
|
|
"average_cost_per_km": average_cost_per_km,
|
|
"by_category": category_averages,
|
|
"currency": currency_target,
|
|
"criteria": {
|
|
"vehicle_model_id": vehicle_model_id,
|
|
"make": make,
|
|
"model": model,
|
|
"fuel_type": fuel_type
|
|
}
|
|
}
|
|
|
|
async def _get_exchange_rates(
|
|
self,
|
|
db: AsyncSession,
|
|
target_currency: str
|
|
) -> Dict[str, float]:
|
|
"""
|
|
Árfolyamok lekérése a system_service-ből.
|
|
A rendszerparaméterekben az "exchange_rates" kulcs alatt tároljuk.
|
|
|
|
:param db: Adatbázis munkamenet
|
|
:param target_currency: Célvaluta
|
|
:return: Szótár forrásvaluta -> célvaluta árfolyammal
|
|
"""
|
|
exchange_rates = await self.system_service.get_scoped_parameter(
|
|
db,
|
|
key="exchange_rates",
|
|
default={}
|
|
)
|
|
|
|
# Ha nincs adat, alapértelmezett árfolyamok
|
|
if not exchange_rates:
|
|
logger.warning("No exchange rates found in system parameters, using defaults")
|
|
# Alapértelmezett árfolyamok (1 EUR = 400 HUF, 1 USD = 350 HUF stb.)
|
|
exchange_rates = {
|
|
"EUR": {"HUF": 400.0, "EUR": 1.0, "USD": 1.1},
|
|
"USD": {"HUF": 350.0, "EUR": 0.9, "USD": 1.0},
|
|
"HUF": {"HUF": 1.0, "EUR": 0.0025, "USD": 0.0029},
|
|
"GBP": {"HUF": 460.0, "EUR": 1.15, "USD": 1.26},
|
|
}
|
|
|
|
# Ellenőrizzük, hogy a célvaluta szerepel-e az árfolyamokban
|
|
if target_currency not in exchange_rates.get("EUR", {}):
|
|
logger.warning(f"Target currency {target_currency} not found in exchange rates, using 1:1 conversion")
|
|
|
|
return exchange_rates
|
|
|
|
async def _convert_currency(
|
|
self,
|
|
db: AsyncSession,
|
|
amount: float,
|
|
source_currency: str,
|
|
target_currency: str,
|
|
exchange_rates: Dict[str, Any]
|
|
) -> float:
|
|
"""
|
|
Pénznem átváltása a megadott árfolyamok alapján.
|
|
|
|
:param amount: Összeg a forrásvalutában
|
|
:param source_currency: Forrásvaluta (pl. "EUR")
|
|
:param target_currency: Célvaluta (pl. "HUF")
|
|
:param exchange_rates: Árfolyam szótár
|
|
:return: Átváltott összeg a célvalutában
|
|
"""
|
|
if source_currency == target_currency:
|
|
return amount
|
|
|
|
# Keresés az árfolyamokban
|
|
try:
|
|
# Próbáljuk meg a forrásvaluta -> célvaluta árfolyamot
|
|
if source_currency in exchange_rates:
|
|
rates = exchange_rates[source_currency]
|
|
if target_currency in rates:
|
|
rate = rates[target_currency]
|
|
return amount * rate
|
|
|
|
# Ha nem találjuk, próbáljuk meg fordítva (inverz)
|
|
if target_currency in exchange_rates:
|
|
rates = exchange_rates[target_currency]
|
|
if source_currency in rates:
|
|
rate = 1.0 / rates[source_currency]
|
|
return amount * rate
|
|
|
|
# Ha még mindig nem találjuk, használjunk EUR-t közvetítőként
|
|
if "EUR" in exchange_rates:
|
|
eur_rates = exchange_rates["EUR"]
|
|
if source_currency in eur_rates and target_currency in eur_rates:
|
|
# Forrás -> EUR -> Cél
|
|
to_eur = amount / eur_rates[source_currency]
|
|
return to_eur * eur_rates[target_currency]
|
|
|
|
except (KeyError, ZeroDivisionError, TypeError) as e:
|
|
logger.error(f"Currency conversion error: {e}, using 1:1 conversion")
|
|
|
|
# Visszaesés: 1:1 árfolyam
|
|
logger.warning(f"Could not convert {source_currency} to {target_currency}, using 1:1 conversion")
|
|
return amount |