""" Aszinkron ütemező (APScheduler) a napi karbantartási feladatokhoz. Integrálva a FastAPI lifespan eseményébe, így az alkalmazás indításakor elindul, és leálláskor megáll. Biztonsági Jitter: A napi futás 00:15-kor indul, de jitter=900 (15 perc) paraméterrel véletlenszerűen 0:15 és 0:30 között fog lefutni. """ import asyncio import logging import uuid from contextlib import asynccontextmanager from datetime import datetime, time, timedelta from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.jobstores.memory import MemoryJobStore from app.database import AsyncSessionLocal from app.services.billing_engine import SmartDeduction from app.models.marketplace.payment import WithdrawalRequest, WithdrawalRequestStatus from app.models.identity import User from app.models import ProcessLog, WalletType, FinancialLedger from sqlalchemy import select, update, and_ from sqlalchemy.orm import selectinload logger = logging.getLogger(__name__) # Globális scheduler példány _scheduler: Optional[AsyncIOScheduler] = None def get_scheduler() -> AsyncIOScheduler: """Visszaadja a globális scheduler példányt (lazy initialization).""" global _scheduler if _scheduler is None: jobstores = { 'default': MemoryJobStore() } _scheduler = AsyncIOScheduler( jobstores=jobstores, timezone="UTC", job_defaults={ 'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 3600 # 1 óra } ) return _scheduler async def daily_financial_maintenance() -> None: """ Napi pénzügyi karbantartási feladatok. A. Voucher lejárat kezelése B. Withdrawal Request lejárat (14 nap) és automatikus elutasítás C. Soft Downgrade (lejárt előfizetések) D. Naplózás ProcessLog-ba """ logger.info("Daily financial maintenance started") stats = { "vouchers_expired": 0, "withdrawals_rejected": 0, "users_downgraded": 0, "errors": [] } async with AsyncSessionLocal() as db: try: # A. Voucher lejárat kezelése try: voucher_count = await SmartDeduction.process_voucher_expiration(db) stats["vouchers_expired"] = voucher_count logger.info(f"Expired {voucher_count} vouchers") except Exception as e: stats["errors"].append(f"Voucher expiration error: {str(e)}") logger.error(f"Voucher expiration error: {e}", exc_info=True) # B. Withdrawal Request lejárat (14 nap) try: # Keresd meg a PENDING státuszú, 14 napnál régebbi kéréseket fourteen_days_ago = datetime.utcnow() - timedelta(days=14) stmt = select(WithdrawalRequest).where( and_( WithdrawalRequest.status == WithdrawalRequestStatus.PENDING, WithdrawalRequest.created_at < fourteen_days_ago, WithdrawalRequest.is_deleted == False ) ).options(selectinload(WithdrawalRequest.user)) result = await db.execute(stmt) expired_requests = result.scalars().all() for req in expired_requests: # Állítsd REJECTED-re req.status = WithdrawalRequestStatus.REJECTED req.reason = "Automatikus elutasítás: 14 napig hiányzó bizonylat" # Refund: pénz vissza a user Earned zsebébe # Ehhez létrehozunk egy FinancialLedger bejegyzést (refund) refund_transaction = FinancialLedger( transaction_id=uuid.uuid4(), user_id=req.user_id, wallet_type=WalletType.EARNED, amount=req.amount, currency=req.currency, transaction_type="REFUND", description=f"Refund for expired withdrawal request #{req.id}", metadata={"withdrawal_request_id": req.id} ) db.add(refund_transaction) req.refund_transaction_id = refund_transaction.transaction_id stats["withdrawals_rejected"] += 1 await db.commit() logger.info(f"Rejected {len(expired_requests)} expired withdrawal requests") except Exception as e: await db.rollback() stats["errors"].append(f"Withdrawal expiration error: {str(e)}") logger.error(f"Withdrawal expiration error: {e}", exc_info=True) # C. Soft Downgrade (lejárt előfizetések) try: # Keresd meg a lejárt subscription_expires_at idejű usereket stmt = select(User).where( and_( User.subscription_expires_at < datetime.utcnow(), User.subscription_plan != 'FREE', User.is_deleted == False ) ) result = await db.execute(stmt) expired_users = result.scalars().all() for user in expired_users: # Állítsd a subscription_plan-t 'FREE'-re, role-t 'user'-re user.subscription_plan = 'FREE' user.role = 'user' # Opcionálisan: állítsd be a felfüggesztett státuszt a kapcsolódó entitásokon # (pl. Organization.is_active = False) - ez egy külön logika lehet stats["users_downgraded"] += 1 await db.commit() logger.info(f"Downgraded {len(expired_users)} users to FREE plan") except Exception as e: await db.rollback() stats["errors"].append(f"Soft downgrade error: {str(e)}") logger.error(f"Soft downgrade error: {e}", exc_info=True) # D. Naplózás ProcessLog-ba (JAVÍTOTT RÉSZ) process_log = ProcessLog( process_name="Daily-Financial-Maintenance", items_processed=stats["vouchers_expired"] + stats["withdrawals_rejected"] + stats["users_downgraded"], items_failed=len(stats["errors"]), end_time=datetime.utcnow(), details={ "status": "COMPLETED" if not stats["errors"] else "PARTIAL", **stats } ) db.add(process_log) await db.commit() logger.info(f"Daily financial maintenance completed: {stats}") except Exception as e: logger.error(f"Daily financial maintenance failed: {e}", exc_info=True) # Hiba esetén is naplózzuk a modellnek megfelelő mezőkkel process_log = ProcessLog( process_name="Daily-Financial-Maintenance", items_processed=0, items_failed=1, end_time=datetime.utcnow(), details={ "status": "FAILED", "error": str(e), **stats } ) db.add(process_log) await db.commit() def setup_scheduler() -> None: """Beállítja a scheduler-t a napi feladatokkal.""" scheduler = get_scheduler() # Napi futás 00:15-kor, jitter=900 (15 perc véletlenszerű eltolás) scheduler.add_job( daily_financial_maintenance, trigger=CronTrigger(hour=0, minute=15, jitter=900), id="daily_financial_maintenance", name="Daily Financial Maintenance", replace_existing=True ) logger.info("Scheduler jobs registered") @asynccontextmanager async def scheduler_lifespan(app): """ FastAPI lifespan manager, amely elindítja és leállítja a schedulert. """ # Importáljuk a szükséges modulokat import uuid from datetime import timedelta global _scheduler scheduler = get_scheduler() setup_scheduler() logger.info("Starting scheduler...") scheduler.start() # Azonnali tesztfutás (opcionális, csak fejlesztéshez) # scheduler.add_job(daily_financial_maintenance, 'date', run_date=datetime.utcnow()) yield logger.info("Shutting down scheduler...") scheduler.shutdown(wait=False) _scheduler = None