370 lines
16 KiB
Python
370 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IGAZSÁGSZÉRUM TESZT - Pénzügyi Motor (Epic 3) logikai és matematikai hibátlanságának ellenőrzése.
|
|
CTO szintű bizonyíték a rendszer integritásáról.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
from decimal import Decimal
|
|
from datetime import datetime, timedelta
|
|
from uuid import uuid4
|
|
|
|
# Add backend directory to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
|
|
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy import select, func
|
|
|
|
from app.database import Base
|
|
from app.models.identity import User, Wallet, ActiveVoucher, Person
|
|
from app.models.payment import PaymentIntent, PaymentIntentStatus
|
|
from app.models.audit import FinancialLedger, LedgerEntryType, WalletType
|
|
from app.services.payment_router import PaymentRouter
|
|
from app.services.billing_engine import SmartDeduction
|
|
from app.core.config import settings
|
|
|
|
# Database connection
|
|
DATABASE_URL = settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
class FinancialTruthTest:
|
|
"""A teljes pénzügyi igazság tesztje."""
|
|
|
|
def __init__(self):
|
|
self.session = None
|
|
self.test_payer = None
|
|
self.test_beneficiary = None
|
|
self.payer_wallet = None
|
|
self.beneficiary_wallet = None
|
|
self.test_results = []
|
|
|
|
async def setup(self):
|
|
"""Teszt környezet létrehozása."""
|
|
print("=== IGAZSÁGSZÉRUM TESZT - Pénzügyi Motor Audit ===")
|
|
print("1. TESZT KÖRNYEZET: Teszt felhasználók létrehozása...")
|
|
|
|
self.session = AsyncSessionLocal()
|
|
|
|
# Create test users with unique emails
|
|
email_payer = f"test_payer_{uuid4().hex[:8]}@test.local"
|
|
email_beneficiary = f"test_beneficiary_{uuid4().hex[:8]}@test.local"
|
|
|
|
# Create persons first
|
|
person_payer = Person(
|
|
last_name="TestPayer",
|
|
first_name="Test",
|
|
is_active=True
|
|
)
|
|
person_beneficiary = Person(
|
|
last_name="TestBeneficiary",
|
|
first_name="Test",
|
|
is_active=True
|
|
)
|
|
self.session.add_all([person_payer, person_beneficiary])
|
|
await self.session.flush()
|
|
|
|
# Create users
|
|
self.test_payer = User(
|
|
email=email_payer,
|
|
role="user",
|
|
person_id=person_payer.id,
|
|
is_active=True
|
|
)
|
|
self.test_beneficiary = User(
|
|
email=email_beneficiary,
|
|
role="user",
|
|
person_id=person_beneficiary.id,
|
|
is_active=True
|
|
)
|
|
self.session.add_all([self.test_payer, self.test_beneficiary])
|
|
await self.session.flush()
|
|
|
|
# Create wallets
|
|
self.payer_wallet = Wallet(
|
|
user_id=self.test_payer.id,
|
|
earned_credits=0,
|
|
purchased_credits=0,
|
|
service_coins=0,
|
|
currency="EUR"
|
|
)
|
|
self.beneficiary_wallet = Wallet(
|
|
user_id=self.test_beneficiary.id,
|
|
earned_credits=0,
|
|
purchased_credits=0,
|
|
service_coins=0,
|
|
currency="EUR"
|
|
)
|
|
self.session.add_all([self.payer_wallet, self.beneficiary_wallet])
|
|
await self.session.commit()
|
|
|
|
print(f" TestPayer létrehozva: ID={self.test_payer.id}, Wallet ID={self.payer_wallet.id}")
|
|
print(f" TestBeneficiary létrehozva: ID={self.test_beneficiary.id}, Wallet ID={self.beneficiary_wallet.id}")
|
|
|
|
async def test_stripe_simulation(self):
|
|
"""2. A STRIPE SZIMULÁCIÓ: PaymentIntent létrehozása és feldolgozása."""
|
|
print("\n2. STRIPE SZIMULÁCIÓ: PaymentIntent (net: 10000, fee: 250, gross: 10250)...")
|
|
|
|
# Create PaymentIntent for PURCHASED wallet
|
|
payment_intent = await PaymentRouter.create_payment_intent(
|
|
db=self.session,
|
|
payer_id=self.test_payer.id,
|
|
net_amount=10000.0,
|
|
handling_fee=250.0,
|
|
target_wallet_type=WalletType.PURCHASED,
|
|
beneficiary_id=None, # Self top-up
|
|
currency="EUR"
|
|
)
|
|
|
|
print(f" PaymentIntent létrehozva: ID={payment_intent.id}, token={payment_intent.intent_token}")
|
|
print(f" Net: {payment_intent.net_amount}, Fee: {payment_intent.handling_fee}, Gross: {payment_intent.gross_amount}")
|
|
|
|
# Simulate Stripe webhook - manually credit the wallet
|
|
# In real scenario, AtomicTransactionManager would be called via webhook
|
|
# For test, we directly update wallet and create ledger entries
|
|
self.payer_wallet.purchased_credits += Decimal('10000.0')
|
|
|
|
# Create FinancialLedger entries for the transaction
|
|
transaction_id = uuid4()
|
|
debit_entry = FinancialLedger(
|
|
user_id=self.test_payer.id,
|
|
amount=Decimal('10000.0'),
|
|
entry_type=LedgerEntryType.DEBIT,
|
|
wallet_type=WalletType.PURCHASED,
|
|
description="Stripe payment simulation - DEBIT",
|
|
transaction_id=transaction_id,
|
|
reference_type="stripe_payment",
|
|
reference_id=payment_intent.id,
|
|
balance_after=float(self.payer_wallet.purchased_credits)
|
|
)
|
|
credit_entry = FinancialLedger(
|
|
user_id=self.test_payer.id,
|
|
amount=Decimal('10000.0'),
|
|
entry_type=LedgerEntryType.CREDIT,
|
|
wallet_type=WalletType.PURCHASED,
|
|
description="Stripe payment simulation - CREDIT (system revenue)",
|
|
transaction_id=transaction_id,
|
|
reference_type="system_revenue",
|
|
reference_id=None,
|
|
balance_after=0
|
|
)
|
|
self.session.add_all([debit_entry, credit_entry])
|
|
|
|
# Mark payment intent as completed
|
|
payment_intent.status = PaymentIntentStatus.COMPLETED
|
|
payment_intent.completed_at = datetime.utcnow()
|
|
payment_intent.transaction_id = transaction_id
|
|
|
|
await self.session.commit()
|
|
|
|
# ASSERT: TestPayer Purchased wallet should be exactly 10000
|
|
await self.session.refresh(self.payer_wallet)
|
|
assert float(self.payer_wallet.purchased_credits) == 10000.0, f"Purchased credits mismatch: {self.payer_wallet.purchased_credits}"
|
|
|
|
# Check ledger entry exists
|
|
stmt = select(FinancialLedger).where(FinancialLedger.transaction_id == transaction_id)
|
|
result = await self.session.execute(stmt)
|
|
ledger_entries = result.scalars().all()
|
|
assert len(ledger_entries) == 2, f"Expected 2 ledger entries, got {len(ledger_entries)}"
|
|
|
|
print(f" ✅ ASSERT PASS: TestPayer Purchased zsebe = {self.payer_wallet.purchased_credits}")
|
|
print(f" ✅ ASSERT PASS: Ledger bejegyzések létrejöttek: {len(ledger_entries)} entries")
|
|
|
|
self.test_results.append(("Stripe Simulation", "PASS", f"Purchased credits: {self.payer_wallet.purchased_credits}"))
|
|
|
|
async def test_internal_gifting(self):
|
|
"""3. A BELSŐ AJÁNDÉKOZÁS SZIMULÁCIÓJA: 5000 VOUCHER küldése."""
|
|
print("\n3. BELSŐ AJÁNDÉKOZÁS: TestPayer → TestBeneficiary (5000 VOUCHER)...")
|
|
|
|
# Create PaymentIntent for internal gifting (VOUCHER)
|
|
payment_intent = await PaymentRouter.create_payment_intent(
|
|
db=self.session,
|
|
payer_id=self.test_payer.id,
|
|
net_amount=5000.0,
|
|
handling_fee=0.0,
|
|
target_wallet_type=WalletType.VOUCHER,
|
|
beneficiary_id=self.test_beneficiary.id,
|
|
currency="EUR"
|
|
)
|
|
|
|
print(f" Internal PaymentIntent létrehozva: ID={payment_intent.id}")
|
|
|
|
# Process internal payment
|
|
result = await PaymentRouter.process_internal_payment(
|
|
db=self.session,
|
|
payment_intent_id=payment_intent.id
|
|
)
|
|
|
|
print(f" Belső fizetés eredménye: {result}")
|
|
|
|
# Refresh wallets
|
|
await self.session.refresh(self.payer_wallet)
|
|
await self.session.refresh(self.beneficiary_wallet)
|
|
|
|
# ASSERT: TestPayer Purchased wallet decreased by 5000
|
|
assert float(self.payer_wallet.purchased_credits) == 5000.0, f"Payer purchased credits mismatch: {self.payer_wallet.purchased_credits}"
|
|
|
|
# ASSERT: TestBeneficiary has ActiveVoucher with 5000
|
|
stmt = select(ActiveVoucher).where(ActiveVoucher.wallet_id == self.beneficiary_wallet.id)
|
|
result = await self.session.execute(stmt)
|
|
vouchers = result.scalars().all()
|
|
|
|
assert len(vouchers) == 1, f"Expected 1 voucher, got {len(vouchers)}"
|
|
voucher = vouchers[0]
|
|
assert float(voucher.amount) == 5000.0, f"Voucher amount mismatch: {voucher.amount}"
|
|
|
|
print(f" ✅ ASSERT PASS: TestPayer Purchased zsebe = {self.payer_wallet.purchased_credits} (5000 csökkent)")
|
|
print(f" ✅ ASSERT PASS: TestBeneficiary ActiveVoucher = {voucher.amount} (5000)")
|
|
|
|
self.test_results.append(("Internal Gifting", "PASS", f"Payer: {self.payer_wallet.purchased_credits}, Beneficiary voucher: {voucher.amount}"))
|
|
|
|
# Store voucher for expiration test
|
|
self.test_voucher = voucher
|
|
|
|
async def test_voucher_expiration(self):
|
|
"""4. A CRON-JOB SZIMULÁCIÓJA: Voucher lejárat és díjlevonás."""
|
|
print("\n4. VOUCHER LEJÁRAT SZIMULÁCIÓ: Tegnapra állított expires_at...")
|
|
|
|
# Modify voucher expiry to yesterday
|
|
self.test_voucher.expires_at = datetime.utcnow() - timedelta(days=1)
|
|
await self.session.commit()
|
|
|
|
# Process voucher expiration
|
|
stats = await SmartDeduction.process_voucher_expiration(self.session)
|
|
|
|
print(f" Voucher expiration stats: {stats}")
|
|
|
|
# ASSERT: Fee of 10% (500) should be deducted
|
|
expected_fee = 500.0 # 10% of 5000
|
|
expected_rolled_over = 4500.0
|
|
|
|
assert abs(stats['fee_collected'] - expected_fee) < 0.01, f"Fee mismatch: {stats['fee_collected']} vs {expected_fee}"
|
|
assert abs(stats['rolled_over'] - expected_rolled_over) < 0.01, f"Rolled over mismatch: {stats['rolled_over']} vs {expected_rolled_over}"
|
|
|
|
# Check that new voucher was created with 4500
|
|
stmt = select(ActiveVoucher).where(ActiveVoucher.wallet_id == self.beneficiary_wallet.id)
|
|
result = await self.session.execute(stmt)
|
|
new_vouchers = result.scalars().all()
|
|
|
|
assert len(new_vouchers) == 1, f"Expected 1 new voucher, got {len(new_vouchers)}"
|
|
new_voucher = new_vouchers[0]
|
|
assert abs(float(new_voucher.amount) - expected_rolled_over) < 0.01, f"New voucher amount mismatch: {new_voucher.amount}"
|
|
|
|
# Check ledger entry for fee
|
|
stmt = select(FinancialLedger).where(
|
|
FinancialLedger.user_id == self.test_beneficiary.id,
|
|
FinancialLedger.reference_type == "VOUCHER_EXPIRY_FEE"
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
fee_entries = result.scalars().all()
|
|
|
|
assert len(fee_entries) >= 1, "No ledger entry for voucher expiry fee"
|
|
|
|
print(f" ✅ ASSERT PASS: Levont fee = {stats['fee_collected']} (várt: 500)")
|
|
print(f" ✅ ASSERT PASS: Új voucher = {new_voucher.amount} (várt: 4500)")
|
|
print(f" ✅ ASSERT PASS: Főkönyvi bejegyzés létrejött a {stats['fee_collected']} DEBIT fee-ről")
|
|
|
|
self.test_results.append(("Voucher Expiration", "PASS", f"Fee: {stats['fee_collected']}, Rolled over: {stats['rolled_over']}"))
|
|
|
|
async def test_double_entry_audit(self):
|
|
"""5. A KETTŐS KÖNYVVITEL (DOUBLE-ENTRY) AUDIT: Teljes egyenleg ellenőrzés."""
|
|
print("\n5. KETTŐS KÖNYVVITEL AUDIT: Wallet egyenlegek vs FinancialLedger...")
|
|
|
|
# Calculate total wallet balances for both users
|
|
total_wallet_balance = Decimal('0')
|
|
|
|
for user in [self.test_payer, self.test_beneficiary]:
|
|
stmt = select(Wallet).where(Wallet.user_id == user.id)
|
|
result = await self.session.execute(stmt)
|
|
wallet = result.scalar_one()
|
|
|
|
# Sum of earned, purchased, service_coins
|
|
wallet_sum = (
|
|
wallet.earned_credits +
|
|
wallet.purchased_credits +
|
|
wallet.service_coins
|
|
)
|
|
|
|
# Add voucher balance
|
|
voucher_stmt = select(func.sum(ActiveVoucher.amount)).where(
|
|
ActiveVoucher.wallet_id == wallet.id,
|
|
ActiveVoucher.expires_at > datetime.utcnow()
|
|
)
|
|
voucher_result = await self.session.execute(voucher_stmt)
|
|
voucher_balance = voucher_result.scalar() or Decimal('0')
|
|
|
|
total_user = wallet_sum + Decimal(str(voucher_balance))
|
|
total_wallet_balance += total_user
|
|
|
|
print(f" User {user.id} wallet sum: {wallet_sum} + vouchers {voucher_balance} = {total_user}")
|
|
|
|
print(f" Összes wallet egyenleg (mindkét user): {total_wallet_balance}")
|
|
|
|
# Calculate total from FinancialLedger
|
|
# Sum of all CREDIT entries minus DEBIT entries for these users
|
|
stmt = select(
|
|
FinancialLedger.user_id,
|
|
FinancialLedger.entry_type,
|
|
func.sum(FinancialLedger.amount).label('total')
|
|
).where(
|
|
FinancialLedger.user_id.in_([self.test_payer.id, self.test_beneficiary.id])
|
|
).group_by(FinancialLedger.user_id, FinancialLedger.entry_type)
|
|
|
|
result = await self.session.execute(stmt)
|
|
ledger_totals = result.all()
|
|
|
|
total_ledger_balance = Decimal('0')
|
|
for user_id, entry_type, amount in ledger_totals:
|
|
if entry_type == LedgerEntryType.CREDIT:
|
|
total_ledger_balance += Decimal(str(amount))
|
|
else: # DEBIT
|
|
total_ledger_balance -= Decimal(str(amount))
|
|
|
|
print(f" Összes ledger net egyenleg: {total_ledger_balance}")
|
|
|
|
# The system should be balanced: wallet balances should equal ledger net balance
|
|
# PLUS any fees collected (which go to system revenue, not user wallets)
|
|
# Fees are DEBIT entries with no corresponding CREDIT in user wallets
|
|
# Actually, fees are DEBIT from user and CREDIT to system revenue (different user_id)
|
|
# For simplicity, we check that the difference is within tolerance
|
|
|
|
# Get total fees collected (DEBIT entries with reference_type VOUCHER_EXPIRY_FEE)
|
|
fee_stmt = select(func.sum(FinancialLedger.amount)).where(
|
|
FinancialLedger.reference_type == "VOUCHER_EXPIRY_FEE",
|
|
FinancialLedger.entry_type == LedgerEntryType.DEBIT
|
|
)
|
|
fee_result = await self.session.execute(fee_stmt)
|
|
total_fees = fee_result.scalar() or Decimal('0')
|
|
|
|
print(f" Összes levont fee: {total_fees}")
|
|
|
|
# Adjusted ledger balance (excluding fees that left the user wallet system)
|
|
adjusted_ledger = total_ledger_balance + total_fees # Fees were DEBIT, so add back
|
|
|
|
# Compare wallet balance with adjusted ledger
|
|
difference = abs(total_wallet_balance - adjusted_ledger)
|
|
tolerance = Decimal('0.01') # 1 cent tolerance
|
|
|
|
if difference > tolerance:
|
|
error_msg = (
|
|
f"DOUBLE-ENTRY HIBA! Wallet egyenleg ({total_wallet_balance}) != "
|
|
f"Ledger egyenleg ({adjusted_ledger}), különbség: {difference}"
|
|
)
|
|
raise AssertionError(error_msg)
|
|
|
|
print(f" ✅ ASSERT PASS: Wallet egyenleg
|
|
async def main():
|
|
test = FinancialTruthTest()
|
|
await test.setup()
|
|
await test.test_stripe_simulation()
|
|
await test.test_internal_gifting()
|
|
await test.test_voucher_expiration()
|
|
await test.test_double_entry_audit()
|
|
print("\n🎉 MINDEN TESZT SIKERES! A PÉNZÜGYI MOTOR ATOMBIZTOS! 🎉")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|