219 lines
8.3 KiB
Python
219 lines
8.3 KiB
Python
"""
|
|
Aszinkron ütemező (APScheduler) a napi karbantartási feladatokhoz.
|
|
|
|
Integrálva a FastAPI lifespan eseményébe, így az alkalmazás indításakor elindul,
|
|
és leálláskor megáll.
|
|
|
|
Biztonsági Jitter: A napi futás 00:15-kor indul, de jitter=900 (15 perc) paraméterrel
|
|
véletlenszerűen 0:15 és 0:30 között fog lefutni.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, time, timedelta
|
|
from typing import Optional
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.jobstores.memory import MemoryJobStore
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.services.billing_engine import SmartDeduction
|
|
from app.models.payment import WithdrawalRequest, WithdrawalRequestStatus
|
|
from app.models.identity import User
|
|
from app.models.audit import ProcessLog, WalletType, FinancialLedger
|
|
from sqlalchemy import select, update, and_
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Globális scheduler példány
|
|
_scheduler: Optional[AsyncIOScheduler] = None
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler:
|
|
"""Visszaadja a globális scheduler példányt (lazy initialization)."""
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
jobstores = {
|
|
'default': MemoryJobStore()
|
|
}
|
|
_scheduler = AsyncIOScheduler(
|
|
jobstores=jobstores,
|
|
timezone="UTC",
|
|
job_defaults={
|
|
'coalesce': True,
|
|
'max_instances': 1,
|
|
'misfire_grace_time': 3600 # 1 óra
|
|
}
|
|
)
|
|
return _scheduler
|
|
|
|
|
|
async def daily_financial_maintenance() -> None:
|
|
"""
|
|
Napi pénzügyi karbantartási feladatok.
|
|
|
|
A. Voucher lejárat kezelése
|
|
B. Withdrawal Request lejárat (14 nap) és automatikus elutasítás
|
|
C. Soft Downgrade (lejárt előfizetések)
|
|
D. Naplózás ProcessLog-ba
|
|
"""
|
|
logger.info("Daily financial maintenance started")
|
|
stats = {
|
|
"vouchers_expired": 0,
|
|
"withdrawals_rejected": 0,
|
|
"users_downgraded": 0,
|
|
"errors": []
|
|
}
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
try:
|
|
# A. Voucher lejárat kezelése
|
|
try:
|
|
voucher_count = await SmartDeduction.process_voucher_expiration(db)
|
|
stats["vouchers_expired"] = voucher_count
|
|
logger.info(f"Expired {voucher_count} vouchers")
|
|
except Exception as e:
|
|
stats["errors"].append(f"Voucher expiration error: {str(e)}")
|
|
logger.error(f"Voucher expiration error: {e}", exc_info=True)
|
|
|
|
# B. Withdrawal Request lejárat (14 nap)
|
|
try:
|
|
# Keresd meg a PENDING státuszú, 14 napnál régebbi kéréseket
|
|
fourteen_days_ago = datetime.utcnow() - timedelta(days=14)
|
|
stmt = select(WithdrawalRequest).where(
|
|
and_(
|
|
WithdrawalRequest.status == WithdrawalRequestStatus.PENDING,
|
|
WithdrawalRequest.created_at < fourteen_days_ago,
|
|
WithdrawalRequest.is_deleted == False
|
|
)
|
|
).options(selectinload(WithdrawalRequest.user))
|
|
|
|
result = await db.execute(stmt)
|
|
expired_requests = result.scalars().all()
|
|
|
|
for req in expired_requests:
|
|
# Állítsd REJECTED-re
|
|
req.status = WithdrawalRequestStatus.REJECTED
|
|
req.reason = "Automatikus elutasítás: 14 napig hiányzó bizonylat"
|
|
|
|
# Refund: pénz vissza a user Earned zsebébe
|
|
# Ehhez létrehozunk egy FinancialLedger bejegyzést (refund)
|
|
refund_transaction = FinancialLedger(
|
|
transaction_id=uuid.uuid4(),
|
|
user_id=req.user_id,
|
|
wallet_type=WalletType.EARNED,
|
|
amount=req.amount,
|
|
currency=req.currency,
|
|
transaction_type="REFUND",
|
|
description=f"Refund for expired withdrawal request #{req.id}",
|
|
metadata={"withdrawal_request_id": req.id}
|
|
)
|
|
db.add(refund_transaction)
|
|
req.refund_transaction_id = refund_transaction.transaction_id
|
|
|
|
stats["withdrawals_rejected"] += 1
|
|
|
|
await db.commit()
|
|
logger.info(f"Rejected {len(expired_requests)} expired withdrawal requests")
|
|
except Exception as e:
|
|
await db.rollback()
|
|
stats["errors"].append(f"Withdrawal expiration error: {str(e)}")
|
|
logger.error(f"Withdrawal expiration error: {e}", exc_info=True)
|
|
|
|
# C. Soft Downgrade (lejárt előfizetések)
|
|
try:
|
|
# Keresd meg a lejárt subscription_expires_at idejű usereket
|
|
stmt = select(User).where(
|
|
and_(
|
|
User.subscription_expires_at < datetime.utcnow(),
|
|
User.subscription_plan != 'FREE',
|
|
User.is_deleted == False
|
|
)
|
|
)
|
|
result = await db.execute(stmt)
|
|
expired_users = result.scalars().all()
|
|
|
|
for user in expired_users:
|
|
# Állítsd a subscription_plan-t 'FREE'-re, role-t 'user'-re
|
|
user.subscription_plan = 'FREE'
|
|
user.role = 'user'
|
|
# Opcionálisan: állítsd be a felfüggesztett státuszt a kapcsolódó entitásokon
|
|
# (pl. Organization.is_active = False) - ez egy külön logika lehet
|
|
stats["users_downgraded"] += 1
|
|
|
|
await db.commit()
|
|
logger.info(f"Downgraded {len(expired_users)} users to FREE plan")
|
|
except Exception as e:
|
|
await db.rollback()
|
|
stats["errors"].append(f"Soft downgrade error: {str(e)}")
|
|
logger.error(f"Soft downgrade error: {e}", exc_info=True)
|
|
|
|
# D. Naplózás ProcessLog-ba
|
|
process_log = ProcessLog(
|
|
process_name="Daily-Financial-Maintenance",
|
|
status="COMPLETED" if not stats["errors"] else "PARTIAL",
|
|
details=stats,
|
|
executed_at=datetime.utcnow()
|
|
)
|
|
db.add(process_log)
|
|
await db.commit()
|
|
|
|
logger.info(f"Daily financial maintenance completed: {stats}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Daily financial maintenance failed: {e}", exc_info=True)
|
|
# Hiba esetén is naplózzuk
|
|
process_log = ProcessLog(
|
|
process_name="Daily-Financial-Maintenance",
|
|
status="FAILED",
|
|
details={"error": str(e), **stats},
|
|
executed_at=datetime.utcnow()
|
|
)
|
|
db.add(process_log)
|
|
await db.commit()
|
|
|
|
|
|
def setup_scheduler() -> None:
|
|
"""Beállítja a scheduler-t a napi feladatokkal."""
|
|
scheduler = get_scheduler()
|
|
|
|
# Napi futás 00:15-kor, jitter=900 (15 perc véletlenszerű eltolás)
|
|
scheduler.add_job(
|
|
daily_financial_maintenance,
|
|
trigger=CronTrigger(hour=0, minute=15, jitter=900),
|
|
id="daily_financial_maintenance",
|
|
name="Daily Financial Maintenance",
|
|
replace_existing=True
|
|
)
|
|
|
|
logger.info("Scheduler jobs registered")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def scheduler_lifespan(app):
|
|
"""
|
|
FastAPI lifespan manager, amely elindítja és leállítja a schedulert.
|
|
"""
|
|
# Importáljuk a szükséges modulokat
|
|
import uuid
|
|
from datetime import timedelta
|
|
|
|
global _scheduler
|
|
scheduler = get_scheduler()
|
|
setup_scheduler()
|
|
|
|
logger.info("Starting scheduler...")
|
|
scheduler.start()
|
|
|
|
# Azonnali tesztfutás (opcionális, csak fejlesztéshez)
|
|
# scheduler.add_job(daily_financial_maintenance, 'date', run_date=datetime.utcnow())
|
|
|
|
yield
|
|
|
|
logger.info("Shutting down scheduler...")
|
|
scheduler.shutdown(wait=False)
|
|
_scheduler = None |