Files
service-finder/backend/app/services/auth_service.py

183 lines
6.7 KiB
Python

from datetime import datetime, timedelta, timezone
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from app.models.identity import User, Person, UserRole, VerificationToken, Wallet
from app.models.organization import Organization
from app.schemas.auth import UserLiteRegister, UserKYCComplete
from app.core.security import get_password_hash, verify_password
from app.services.email_manager import email_manager
from app.core.config import settings
class AuthService:
@staticmethod
async def register_lite(db: AsyncSession, user_in: UserLiteRegister):
"""Step 1: Lite regisztráció + Token generálás + Email."""
try:
# 1. Person alap létrehozása
new_person = Person(
first_name=user_in.first_name,
last_name=user_in.last_name,
is_active=False
)
db.add(new_person)
await db.flush()
# 2. User technikai fiók
new_user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
person_id=new_person.id,
role=UserRole.user,
is_active=False,
region_code=user_in.region_code
)
db.add(new_user)
await db.flush()
# 3. Kormányozható Token generálása
expire_hours = getattr(settings, "REGISTRATION_TOKEN_EXPIRE_HOURS", 48)
token_val = uuid.uuid4()
new_token = VerificationToken(
token=token_val,
user_id=new_user.id,
token_type="registration",
expires_at=datetime.now(timezone.utc) + timedelta(hours=expire_hours)
)
db.add(new_token)
await db.flush()
# 4. Email küldés gombbal
verification_link = f"{settings.FRONTEND_BASE_URL}/verify?token={token_val}"
try:
await email_manager.send_email(
recipient=user_in.email,
template_key="registration",
variables={
"first_name": user_in.first_name,
"link": verification_link
}
)
except Exception as email_err:
print(f"CRITICAL: Email failed: {str(email_err)}")
await db.commit()
await db.refresh(new_user)
return new_user
except Exception as e:
await db.rollback()
raise e
@staticmethod
async def verify_email(db: AsyncSession, token_str: str):
"""Token ellenőrzése (Email megerősítés)."""
try:
token_uuid = uuid.UUID(token_str)
stmt = select(VerificationToken).where(
VerificationToken.token == token_uuid,
VerificationToken.is_used == False,
VerificationToken.expires_at > datetime.now(timezone.utc)
)
result = await db.execute(stmt)
token_obj = result.scalar_one_or_none()
if not token_obj:
return False
token_obj.is_used = True
await db.commit()
return True
except Exception as e:
print(f"Verify error: {e}")
await db.rollback()
return False
@staticmethod
async def complete_kyc(db: AsyncSession, user_id: int, kyc_in: UserKYCComplete):
"""Step 2: KYC adatok, Telefon, Privát Flotta és Wallet aktiválása."""
try:
# 1. User és Person lekérése
stmt = select(User).where(User.id == user_id).join(User.person)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
return None
# 2. Személyes adatok rögzítése (tábla szinten)
p = user.person
p.phone = kyc_in.phone_number
p.birth_place = kyc_in.birth_place
p.birth_date = datetime.combine(kyc_in.birth_date, datetime.min.time())
p.mothers_name = kyc_in.mothers_name
# JSONB mezők mentése Pydantic modellekből
p.identity_docs = {k: v.dict() for k, v in kyc_in.identity_docs.items()}
p.ice_contact = kyc_in.ice_contact.dict()
p.is_active = True
# 3. PRIVÁT FLOTTA (Organization) automata generálása
new_org = Organization(
name=f"{p.last_name} {p.first_name} - Privát Flotta",
owner_id=user.id,
is_active=True,
org_type="individual"
)
db.add(new_org)
await db.flush()
# 4. WALLET automata generálása
new_wallet = Wallet(
user_id=user.id,
coin_balance=0.00,
xp_balance=0
)
db.add(new_wallet)
# 5. USER TELJES AKTIVÁLÁSA
user.is_active = True
await db.commit()
await db.refresh(user)
return user
except Exception as e:
await db.rollback()
raise e
@staticmethod
async def authenticate(db: AsyncSession, email: str, password: str):
stmt = select(User).where(User.email == email, User.is_deleted == False)
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if not user or not user.hashed_password or not verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def initiate_password_reset(db: AsyncSession, email: str):
"""Jelszó-visszaállítás indítása."""
stmt = select(User).where(User.email == email, User.is_deleted == False)
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if user:
expire_hours = getattr(settings, "PASSWORD_RESET_TOKEN_EXPIRE_HOURS", 1)
token_val = uuid.uuid4()
new_token = VerificationToken(
token=token_val,
user_id=user.id,
token_type="password_reset",
expires_at=datetime.now(timezone.utc) + timedelta(hours=expire_hours)
)
db.add(new_token)
reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?token={token_val}"
await email_manager.send_email(
recipient=email,
template_key="password_reset",
variables={"link": reset_link},
user_id=user.id
)
await db.commit()
return True
return False