Files
service-finder/backend/app/tests/e2e/conftest.py
2026-03-22 11:02:05 +00:00

379 lines
14 KiB
Python

"""
Pytest fixtures for E2E testing of Core modules.
Provides an authenticated client that goes through the full user journey.
"""
import asyncio
import httpx
import pytest
import pytest_asyncio
import uuid
import re
import logging
import time
from typing import AsyncGenerator, Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import DBAPIError
from app.db.session import AsyncSessionLocal
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "http://sf_api:8000"
MAILPIT_URL = "http://sf_mailpit:8025"
TEST_EMAIL_DOMAIN = "example.com"
class MailpitClient:
"""Client for interacting with Mailpit API."""
def __init__(self, base_url: str = MAILPIT_URL):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=30.0)
async def delete_all_messages(self) -> bool:
"""Delete all messages in Mailpit to ensure clean state."""
try:
response = await self.client.delete(f"{self.base_url}/api/v1/messages")
response.raise_for_status()
logger.debug("Mailpit cleaned (all messages deleted).")
return True
except Exception as e:
logger.warning(f"Mailpit clean failed: {e}, continuing anyway.")
return False
async def get_messages(self, limit: int = 50) -> Optional[Dict]:
"""Fetch messages from Mailpit."""
try:
response = await self.client.get(f"{self.base_url}/api/v1/messages?limit={limit}")
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to fetch messages: {e}")
return None
async def get_latest_message(self) -> Optional[dict]:
"""Fetch the latest email message from Mailpit."""
data = await self.get_messages(limit=1)
if data and data.get("messages"):
return data["messages"][0]
return None
async def get_message_content(self, message_id: str) -> Optional[str]:
"""Get the full content (HTML and text) of a specific message."""
try:
response = await self.client.get(f"{self.base_url}/api/v1/message/{message_id}")
response.raise_for_status()
data = response.json()
# Prefer text over HTML
return data.get("Text") or data.get("HTML") or ""
except Exception as e:
logger.error(f"Failed to fetch message content: {e}")
return None
async def poll_for_verification_email(self, email: str, max_attempts: int = 5, wait_seconds: int = 3) -> Optional[str]:
"""
Poll Mailpit for a verification email sent to the given email address.
Returns the verification token if found, otherwise None.
"""
for attempt in range(1, max_attempts + 1):
logger.debug(f"Polling for verification email (attempt {attempt}/{max_attempts})...")
data = await self.get_messages(limit=20)
if not data or not data.get("messages"):
logger.debug(f"No emails in Mailpit, waiting {wait_seconds}s...")
await asyncio.sleep(wait_seconds)
continue
# Search for email sent to the target address
for msg in data.get("messages", []):
to_list = msg.get("To", [])
email_found = False
for recipient in to_list:
if isinstance(recipient, dict) and recipient.get("Address") == email:
email_found = True
break
elif isinstance(recipient, str) and recipient == email:
email_found = True
break
if email_found:
msg_id = msg.get("ID")
if not msg_id:
continue
content = await self.get_message_content(msg_id)
if content:
token = extract_verification_token(content)
if token:
logger.debug(f"Token found on attempt {attempt}: {token}")
return token
else:
logger.debug(f"Email found but no token pattern matched.")
else:
logger.debug(f"Could not fetch email content.")
logger.debug(f"No verification email found yet, waiting {wait_seconds}s...")
await asyncio.sleep(wait_seconds)
logger.error(f"Could not retrieve verification token after {max_attempts} attempts.")
return None
async def cleanup(self):
"""Close the HTTP client."""
await self.client.aclose()
class APIClient:
"""Client for interacting with the Service Finder API."""
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=30.0)
self.token = None
async def register(self, email: str, password: str = "TestPassword123!") -> httpx.Response:
"""Register a new user."""
payload = {
"email": email,
"password": password,
"first_name": "Test",
"last_name": "User",
"region_code": "HU",
"lang": "hu"
}
response = await self.client.post(f"{self.base_url}/api/v1/auth/register", json=payload)
return response
async def login(self, email: str, password: str = "TestPassword123!") -> Optional[str]:
"""Login and return JWT token."""
payload = {
"username": email,
"password": password
}
response = await self.client.post(f"{self.base_url}/api/v1/auth/login", data=payload)
if response.status_code == 200:
data = response.json()
self.token = data.get("access_token")
return self.token
return None
async def verify_email(self, token: str) -> httpx.Response:
"""Verify email with token."""
response = await self.client.post(
f"{self.base_url}/api/v1/auth/verify-email",
json={"token": token}
)
return response
async def complete_kyc(self, token: str) -> httpx.Response:
"""Complete KYC with dummy data (matching Sandbox script)."""
payload = {
"phone_number": "+36123456789",
"birth_place": "Budapest",
"birth_date": "1990-01-01",
"mothers_last_name": "Kovács",
"mothers_first_name": "Éva",
"address_zip": "1051",
"address_city": "Budapest",
"address_street_name": "Váci",
"address_street_type": "utca",
"address_house_number": "1",
"address_stairwell": None,
"address_floor": None,
"address_door": None,
"address_hrsz": None,
"identity_docs": {
"ID_CARD": {
"number": "123456AB",
"expiry_date": "2030-12-31"
}
},
"ice_contact": {
"name": "John Doe",
"phone": "+36198765432",
"relationship": "friend"
},
"preferred_language": "hu",
"preferred_currency": "HUF"
}
headers = {"Authorization": f"Bearer {token}"}
response = await self.client.post(
f"{self.base_url}/api/v1/auth/complete-kyc",
json=payload,
headers=headers
)
return response
async def get_authenticated_client(self, token: str) -> httpx.AsyncClient:
"""Return a new httpx.AsyncClient with Authorization header set."""
headers = {"Authorization": f"Bearer {token}"}
return httpx.AsyncClient(base_url=self.base_url, headers=headers, timeout=30.0)
async def cleanup(self):
"""Close the HTTP client."""
await self.client.aclose()
def extract_verification_token(email_content: str) -> Optional[str]:
"""Extract verification token from email content."""
# Look for token in URL patterns
patterns = [
r"token=([a-zA-Z0-9\-_]+)",
r"/verify/([a-zA-Z0-9\-_]+)",
r"verification code: ([a-zA-Z0-9\-_]+)",
]
for pattern in patterns:
match = re.search(pattern, email_content)
if match:
return match.group(1)
return None
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest_asyncio.fixture(scope="function")
async def db() -> AsyncGenerator[AsyncSession, None]:
"""
Database session fixture with automatic rollback after each test.
This prevents InFailedSQLTransactionError between tests.
"""
async with AsyncSessionLocal() as session:
yield session
# EZ A KULCS: Minden teszt után takarítunk!
try:
await session.rollback()
except DBAPIError as e:
logger.warning(f"Rollback failed (likely already aborted): {e}. Closing session.")
await session.close()
# Opcionálisan: await session.close()
@pytest_asyncio.fixture(scope="function")
async def authenticated_client() -> AsyncGenerator[httpx.AsyncClient, None]:
"""
Fixture that performs full user journey and returns an authenticated httpx.AsyncClient.
Steps:
1. Clean Mailpit to ensure only new emails
2. Register a new user with unique email (time-based to avoid duplicate key)
3. Poll Mailpit for verification email and extract token
4. Verify email
5. Login to get JWT token
6. Complete KYC
7. Return authenticated client with Authorization header
"""
# Generate unique email using timestamp to avoid duplicate key errors in user_stats
unique_id = int(time.time() * 1000) # milliseconds
email = f"test_{unique_id}@{TEST_EMAIL_DOMAIN}"
password = "TestPassword123!"
api_client = APIClient()
mailpit = MailpitClient()
try:
# 0. Clean Mailpit before registration
logger.debug("Cleaning Mailpit before registration...")
await mailpit.delete_all_messages()
# 1. Register
logger.debug(f"Registering user with email: {email}")
reg_response = await api_client.register(email, password)
assert reg_response.status_code in (200, 201), f"Registration failed: {reg_response.text}"
# 2. Poll for verification email and extract token
logger.debug("Polling Mailpit for verification email...")
token = await mailpit.poll_for_verification_email(email, max_attempts=5, wait_seconds=3)
assert token is not None, "Could not retrieve verification token after polling"
# 3. Verify email
verify_response = await api_client.verify_email(token)
assert verify_response.status_code == 200, f"Email verification failed: {verify_response.text}"
# 4. Login
access_token = await api_client.login(email, password)
assert access_token is not None, "Login failed"
# 5. Complete KYC (optional, log failure but continue)
kyc_response = await api_client.complete_kyc(access_token)
if kyc_response.status_code != 200:
logger.warning(f"KYC completion returned {kyc_response.status_code}: {kyc_response.text}. Continuing anyway.")
# 6. Create authenticated client
auth_client = await api_client.get_authenticated_client(access_token)
yield auth_client
# Cleanup
await auth_client.aclose()
finally:
await api_client.cleanup()
await mailpit.cleanup()
@pytest_asyncio.fixture(scope="function")
async def setup_organization(authenticated_client):
"""Létrehoz egy céget a jármű/költség tesztekhez."""
import time
import random
unique_id = int(time.time() * 1000) + random.randint(1, 9999)
# Generate a valid Hungarian tax number format: 8 digits + "-1-42"
tax_prefix = random.randint(10000000, 99999999)
payload = {
"name": f"Test Fleet {unique_id}",
"display_name": f"Test Fleet {unique_id}",
"full_name": f"Test Fleet Kft. {unique_id}",
"tax_number": f"{tax_prefix}-1-42",
"registration_number": f"01-09-{unique_id}"[:6],
"org_type": "business",
"country_code": "HU",
"address_zip": "1051",
"address_city": "Budapest",
"address_street_name": "Váci",
"address_street_type": "utca",
"address_house_number": "1",
"address_stairwell": None,
"address_floor": None,
"address_door": None,
"address_hrsz": None,
}
response = await authenticated_client.post("/api/v1/organizations/onboard", json=payload)
# Accept 409 as well (already exists) and try to fetch existing organization
if response.status_code == 409:
# Maybe we can reuse the existing organization? For simplicity, we'll just skip and raise.
# But we need an organization ID. Let's try to get the user's organizations.
# For now, raise a specific error.
raise ValueError(f"Organization with tax number already exists: {payload['tax_number']}")
assert response.status_code in (200, 201), f"Organization creation failed: {response.text}"
data = response.json()
# Try multiple possible keys for organization ID
if "id" in data:
return data["id"]
elif "organization_id" in data:
return data["organization_id"]
elif "organization" in data and "id" in data["organization"]:
return data["organization"]["id"]
else:
# Fallback: extract from location header or raise
raise ValueError(f"Could not find organization ID in response: {data}")
@pytest_asyncio.fixture
async def setup_vehicle(authenticated_client, setup_organization):
import time
unique_vin = f"WBA0000000{int(time.time())}"[:17].ljust(17, '0')
payload = {
"vin": unique_vin,
"license_plate": "TEST-123",
"organization_id": setup_organization,
"purchase_price_net": 10000000,
"purchase_date": "2023-01-01",
"initial_mileage": 10000,
"fuel_type": "petrol",
"transmission": "manual"
}
response = await authenticated_client.post("/api/v1/assets/vehicles", json=payload)
assert response.status_code in (200, 201), f"Vehicle creation failed: {response.text}"
return response.json()["id"]