""" Financial Orchestrator - Unit of Work mintával a pénzügyi tranzakciók atomi kezeléséhez. Ez a szolgáltatás koordinálja a fizetési folyamatokat, a számlázást és a pénztárca műveleteket egyetlen atomi tranzakcióban (Unit of Work minta). Kulcsfontosságú funkciók: 1. Vetésforgó (select_issuer) - kiválasztja a megfelelő számlakiállítót 2. Unit of Work - minden adatbázis művelet egy tranzakcióban 3. Hibatűrés - rollback hiba esetén """ import logging from decimal import Decimal from typing import Optional, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, and_ from app.models import FinancialLedger, WalletType, LedgerStatus, LedgerEntryType from app.models.identity import Wallet from app.models.marketplace.finance import Issuer, IssuerType from app.services.financial_interfaces import ( BasePaymentGateway, BaseInvoicingService, PaymentGatewayError, InvoicingError, InsufficientFundsError ) logger = logging.getLogger(__name__) class FinancialOrchestrator: """ Pénzügyi tranzakciók koordinálója Unit of Work mintával. Ez az osztály felelős a következőkért: - Számlakiállító kiválasztása (vetésforgó logika) - FinancialLedger bejegyzés létrehozása - Pénztárca egyenleg frissítése - Tranzakció atomi végrehajtása (commit/rollback) """ def __init__( self, payment_gateway: Optional[BasePaymentGateway] = None, invoicing_service: Optional[BaseInvoicingService] = None ): """ Inicializálás opcionális külső szolgáltatásokkal. Args: payment_gateway: Fizetési átjáró implementáció (pl. Stripe) invoicing_service: Számlázási szolgáltatás implementáció """ self.payment_gateway = payment_gateway self.invoicing_service = invoicing_service async def select_issuer( self, db: AsyncSession, amount: Decimal, is_company: bool = False ) -> Issuer: """ Vetésforgó logika: kiválasztja a megfelelő számlakiállítót. Logika: 1. Keressen egy aktív 'EV' típusú Issuert 2. Ha az `current_revenue + amount < revenue_limit` ÉS a vevő nem cég (`is_company == False`), térjen vissza az EV-vel 3. Minden más esetben térjen vissza az aktív 'KFT' típusú Issuerrel Args: db: Adatbázis munkamenet amount: A tranzakció összege is_company: A vevő cég-e (True esetén nem választható EV) Returns: A kiválasztott Issuer objektum Raises: ValueError: Ha nincs aktív számlakiállító """ # 1. EV típusú aktív számlakiállító keresése ev_query = select(Issuer).where( and_( Issuer.type == IssuerType.EV, Issuer.is_active == True ) ).order_by(Issuer.id) ev_result = await db.execute(ev_query) ev_issuer_obj = ev_result.scalars().first() logger.debug(f"EV számlakiállító keresés: talált={ev_issuer_obj is not None}, is_company={is_company}") # 2. Ellenőrizzük, hogy az EV használható-e if ev_issuer_obj and not is_company: # Számoljuk ki az új bevételt new_revenue = ev_issuer_obj.current_revenue + amount logger.debug(f"EV ellenőrzés: current_revenue={ev_issuer_obj.current_revenue}, amount={amount}, new_revenue={new_revenue}, limit={ev_issuer_obj.revenue_limit}") if new_revenue < ev_issuer_obj.revenue_limit: logger.info(f"EV számlakiállító kiválasztva: {ev_issuer_obj.id} " f"(új bevétel: {new_revenue}, limit: {ev_issuer_obj.revenue_limit})") return ev_issuer_obj else: logger.debug(f"EV limit túllépve: {new_revenue} >= {ev_issuer_obj.revenue_limit}") # 3. KFT típusú aktív számlakiállító keresése kft_query = select(Issuer).where( and_( Issuer.type == IssuerType.KFT, Issuer.is_active == True ) ).order_by(Issuer.id) kft_result = await db.execute(kft_query) kft_issuer_obj = kft_result.scalars().first() logger.debug(f"KFT számlakiállító keresés: talált={kft_issuer_obj is not None}") if kft_issuer_obj: logger.info(f"KFT számlakiállító kiválasztva: {kft_issuer_obj.id}") return kft_issuer_obj # 4. Ha egyik sem található, hiba raise ValueError("Nincs aktív számlakiállító (sem EV, sem KFT)") async def process_payment( self, db: AsyncSession, user_id: int, amount: Decimal, wallet_type: WalletType, description: str = "", metadata: Optional[Dict[str, Any]] = None, is_company: bool = False ) -> Dict[str, Any]: """ Fő fizetési folyamat Unit of Work mintával. A folyamat egyetlen nagy try...except...finally blokkban fut: 1. Kiválasztja a számlakiállítót (vetésforgó) 2. Létrehoz egy bejegyzést a FinancialLedger-ben (PENDING státusszal) 3. Frissíti a megfelelő Wallet egyenlegét 4. Csak a legvégén hív egyetlen db.commit()-ot 5. Hiba esetén KÖTELEZŐ a db.rollback() Args: db: Adatbázis munkamenet user_id: A felhasználó azonosítója amount: A fizetendő összeg (pozitív) wallet_type: A cél pénztárca típusa description: Tranzakció leírása metadata: Egyéni metaadatok is_company: A felhasználó cég-e Returns: Szótár a tranzakció részleteivel Raises: InsufficientFundsError: Ha nincs elég egyenleg PaymentGatewayError: Ha a fizetési átjáró hibát jelez ValueError: Ha érvénytelen paraméterek """ if amount <= 0: raise ValueError("Az összegnek pozitívnak kell lennie") # Unit of Work: egyetlen tranzakció try: logger.info(f"Payment process indítása: user={user_id}, amount={amount}, " f"wallet_type={wallet_type}, is_company={is_company}") # 1. Számlakiállító kiválasztása issuer = await self.select_issuer(db, amount, is_company) logger.info(f"Személyi számlakiállító kiválasztva: {issuer.id} ({issuer.type})") # 2. FinancialLedger bejegyzés létrehozása (PENDING státusszal) ledger_entry = FinancialLedger( user_id=user_id, amount=float(amount), # Convert Decimal to float for Numeric field wallet_type=wallet_type, status=LedgerStatus.PENDING, issuer_id=issuer.id, entry_type=LedgerEntryType.DEBIT, # Payment is a DEBIT currency="HUF", # Default currency transaction_type=description or "Payment via FinancialOrchestrator", details=metadata or {} # Store metadata in details JSON field ) db.add(ledger_entry) await db.flush() # Megkapjuk az ID-t, de még nincs commit logger.info(f"FinancialLedger bejegyzés létrehozva: {ledger_entry.id}") # 3. Pénztárca egyenleg frissítése # Először lekérjük a pénztárcát zárolással (minden usernek csak egy walletje van) wallet_query = select(Wallet).where( Wallet.user_id == user_id ).with_for_update() # Sorzárolás a konkurrens hozzáférés megelőzésére wallet_result = await db.execute(wallet_query) wallet = wallet_result.scalar_one_or_none() if not wallet: raise ValueError(f"Nincs pénztárca a user {user_id} számára") # Ellenőrizzük az egyenleget (ha kivételről van szó) # Megjegyzés: A valós implementációban itt ellenőriznénk, hogy van-e elég egyenleg # de a specifikáció szerint csak frissítjük az egyenleget # A Wallet modellben nincs 'balance' mező, hanem külön mezők vannak a különböző credit típusokhoz # Frissítjük a megfelelő credit mezőt a wallet_type alapján # MEGJEGYZÉS: Payment (DEBIT) csökkenti a pénztárca egyenlegét! update_values = {} current_balance = Decimal('0') if wallet_type == WalletType.EARNED: current_balance = Decimal(str(wallet.earned_credits)) new_balance = current_balance - amount # DEBIT csökkenti az egyenleget update_values['earned_credits'] = float(new_balance) elif wallet_type == WalletType.PURCHASED: current_balance = Decimal(str(wallet.purchased_credits)) new_balance = current_balance - amount # DEBIT csökkenti az egyenleget update_values['purchased_credits'] = float(new_balance) elif wallet_type == WalletType.SERVICE_COINS: current_balance = Decimal(str(wallet.service_coins)) new_balance = current_balance - amount # DEBIT csökkenti az egyenleget update_values['service_coins'] = float(new_balance) elif wallet_type == WalletType.VOUCHER: # VOUCHER típusnál nincs dedikált mező a Wallet modellben # Kezeljük mint SERVICE_COINS vagy dobjunk hibát current_balance = Decimal(str(wallet.service_coins)) new_balance = current_balance - amount # DEBIT csökkenti az egyenleget update_values['service_coins'] = float(new_balance) logger.warning(f"VOUCHER wallet_type használva, SERVICE_COINS frissítve") else: raise ValueError(f"Ismeretlen wallet_type: {wallet_type}") # Frissítjük a pénztárcát await db.execute( update(Wallet) .where(Wallet.id == wallet.id) .values(**update_values) ) logger.info(f"Pénztárca frissítve: {wallet.id}, wallet_type={wallet_type}, új egyenleg: {new_balance} (korábbi: {current_balance})") # 4. FinancialLedger státusz frissítése SUCCESS-re ledger_entry.status = LedgerStatus.SUCCESS # 5. Számlakiállító bevételének frissítése issuer.current_revenue += amount db.add(issuer) # 6. Külső szolgáltatások meghívása (ha vannak) external_results = {} if self.payment_gateway: try: payment_result = await self.payment_gateway.create_intent( amount=amount, currency="HUF", metadata={ "ledger_id": ledger_entry.id, "user_id": user_id, "issuer_id": issuer.id, **(metadata or {}) } ) external_results["payment"] = payment_result logger.info(f"Fizetési szándék létrehozva: {payment_result.get('id')}") except PaymentGatewayError as e: logger.error(f"Fizetési átjáró hiba: {e}") # Döntés: tovább dobjuk a hibát, ami rollback-et okoz raise if self.invoicing_service: try: # Ügyfél adatok gyűjtése (egyszerűsített) customer_data = { "user_id": user_id, "amount": float(amount), "description": description } invoice_result = await self.invoicing_service.issue_invoice( issuer_id=issuer.id, customer_data=customer_data, items=[{ "description": description or "Szolgáltatás díja", "quantity": 1, "unit_price": float(amount), "vat_rate": 27.0 # ÁFA kulcs }] ) external_results["invoice"] = invoice_result logger.info(f"Szála kiállítva: {invoice_result.get('invoice_number')}") except InvoicingError as e: logger.error(f"Számlázási hiba: {e}") # Döntés: tovább dobjuk a hibát, ami rollback-et okoz raise # 7. COMMIT - minden művelet sikeres, atomi mentés await db.commit() logger.info(f"Tranzakció sikeresen commitálva: ledger_id={ledger_entry.id}") # Visszatérési érték return { "success": True, "ledger_id": ledger_entry.id, "issuer_id": issuer.id, "issuer_type": issuer.type, "wallet_id": wallet.id, "new_balance": new_balance, "external_results": external_results, "message": "Payment processed successfully" } except Exception as e: # 8. ROLLBACK - bármilyen hiba esetén logger.error(f"Hiba a tranzakcióban: {e}", exc_info=True) await db.rollback() # Speciális hibák újradobása if isinstance(e, (InsufficientFundsError, PaymentGatewayError, InvoicingError)): raise # Általános hiba raise FinancialOrchestratorError(f"Payment processing failed: {e}") from e finally: # 9. További takarítás (ha szükséges) # Jelenleg nincs extra takarítási logika pass async def refund_payment( self, db: AsyncSession, ledger_id: int, reason: str = "" ) -> Dict[str, Any]: """ Visszatérítés folyamata Unit of Work mintával. Ez a metódus visszafordítja egy korábbi tranzakciót: 1. Megkeresi az eredeti FinancialLedger bejegyzést 2. Létrehoz egy negatív összegű bejegyzést (REFUND státusszal) 3. Visszaállítja a pénztárca egyenlegét 4. Visszaállítja a számlakiállító bevételét Args: db: Adatbázis munkamenet ledger_id: Az eredeti FinancialLedger bejegyzés azonosítója reason: Visszatérítés oka Returns: Szótár a visszatérítés részleteivel """ try: logger.info(f"Visszatérítés indítása: ledger_id={ledger_id}") # 1. Eredeti bejegyzés lekérdezése original_query = select(FinancialLedger).where( FinancialLedger.id == ledger_id ).with_for_update() original_result = await db.execute(original_query) original_entry = original_result.scalar_one_or_none() if not original_entry: raise ValueError(f"Nincs FinancialLedger bejegyzés a következő ID-val: {ledger_id}") if original_entry.status != LedgerStatus.SUCCESS: raise ValueError(f"Csak SUCCESS státuszú bejegyzések téríthetők vissza. " f"Jelenlegi státusz: {original_entry.status}") # 2. Visszatérítési bejegyzés létrehozása refund_entry = FinancialLedger( user_id=original_entry.user_id, amount=-original_entry.amount, # Negatív összeg wallet_type=original_entry.wallet_type, status=LedgerStatus.REFUND, issuer_id=original_entry.issuer_id, description=f"Visszatérítés: {reason}" if reason else "Visszatérítés", metadata={ "original_ledger_id": ledger_id, "reason": reason, "refund_type": "full" } ) db.add(refund_entry) await db.flush() # 3. Pénztárca egyenleg visszaállítása wallet_query = select(Wallet).where( and_( Wallet.user_id == original_entry.user_id, Wallet.wallet_type == original_entry.wallet_type ) ).with_for_update() wallet_result = await db.execute(wallet_query) wallet = wallet_result.scalar_one_or_none() if wallet: new_balance = wallet.balance - original_entry.amount await db.execute( update(Wallet) .where(Wallet.id == wallet.id) .values(balance=new_balance) ) # 4. Számlakiállító bevételének csökkentése issuer_query = select(Issuer).where(Issuer.id == original_entry.issuer_id) issuer_result = await db.execute(issuer_query) issuer = issuer_result.scalar_one() issuer.current_revenue -= original_entry.amount db.add(issuer) # 5. Eredeti bejegyzés státuszának frissítése original_entry.status = LedgerStatus.REFUNDED original_entry.metadata = { **(original_entry.metadata or {}), "refund_ledger_id": refund_entry.id, "refund_reason": reason } # 6. COMMIT await db.commit() logger.info(f"Visszatérítés sikeres: refund_ledger_id={refund_entry.id}") return { "success": True, "refund_ledger_id": refund_entry.id, "original_ledger_id": ledger_id, "amount_refunded": original_entry.amount, "message": "Refund processed successfully" } except Exception as e: logger.error(f"Hiba a visszatérítésben: {e}", exc_info=True) await db.rollback() raise FinancialOrchestratorError(f"Refund processing failed: {e}") from e class FinancialOrchestratorError(Exception): """Kivétel a FinancialOrchestrator hibáinak kezelés"""