Files
service-finder/backend/app/services/auth_service.py
Kincses 425f598fa3 feat: SuperAdmin bootstrap, i18n sync fix and AssetAssignment ORM fix
- Fixed AttributeError in User model (added region_code, preferred_language)
- Fixed InvalidRequestError in AssetAssignment (added organization relationship)
- Configured STATIC_DIR for translation sync
- Applied Alembic migrations for user schema updates
2026-02-10 21:01:58 +00:00

278 lines
11 KiB
Python

import os
import logging
import uuid
import json
from datetime import datetime, timedelta, timezone
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
from sqlalchemy.orm import joinedload
from fastapi.encoders import jsonable_encoder
from app.models.identity import User, Person, UserRole, VerificationToken, Wallet
from app.models.gamification import UserStats
from app.models.organization import Organization, OrganizationMember, OrgType
from app.schemas.auth import UserLiteRegister, UserKYCComplete
from app.core.security import get_password_hash, verify_password
from app.services.email_manager import email_manager
from app.core.config import settings
from app.services.config_service import config
from app.services.geo_service import GeoService
from app.services.security_service import security_service # Sentinel integráció
logger = logging.getLogger(__name__)
class AuthService:
@staticmethod
async def register_lite(db: AsyncSession, user_in: UserLiteRegister):
"""Step 1: Lite Regisztráció."""
try:
new_person = Person(
first_name=user_in.first_name,
last_name=user_in.last_name,
is_active=False
)
db.add(new_person)
await db.flush()
new_user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
person_id=new_person.id,
role=UserRole.user,
is_active=False,
is_deleted=False,
region_code=user_in.region_code,
preferred_language=user_in.lang,
timezone=user_in.timezone
)
db.add(new_user)
await db.flush()
reg_hours = await config.get_setting("auth_registration_hours", region_code=user_in.region_code, default=48)
token_val = uuid.uuid4()
db.add(VerificationToken(
token=token_val,
user_id=new_user.id,
token_type="registration",
expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reg_hours))
))
verification_link = f"{settings.FRONTEND_BASE_URL}/verify?token={token_val}"
await email_manager.send_email(
recipient=user_in.email,
template_key="reg",
variables={"first_name": user_in.first_name, "link": verification_link},
lang=user_in.lang
)
await db.commit()
await db.refresh(new_user)
return new_user
except Exception as e:
await db.rollback()
logger.error(f"Registration Error: {str(e)}")
raise e
@staticmethod
async def complete_kyc(db: AsyncSession, user_id: int, kyc_in: UserKYCComplete):
"""1.3. Fázis: Atomi Tranzakció & Shadow Identity."""
try:
stmt = select(User).options(joinedload(User.person)).where(User.id == user_id)
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if not user: return None
if hasattr(kyc_in, 'preferred_currency') and kyc_in.preferred_currency:
user.preferred_currency = kyc_in.preferred_currency
identity_stmt = select(Person).where(and_(
Person.mothers_last_name == kyc_in.mothers_last_name,
Person.mothers_first_name == kyc_in.mothers_first_name,
Person.birth_place == kyc_in.birth_place,
Person.birth_date == kyc_in.birth_date
))
existing_person = (await db.execute(identity_stmt)).scalar_one_or_none()
if existing_person:
user.person_id = existing_person.id
active_person = existing_person
logger.info(f"Shadow Identity linked: User {user_id} -> Person {existing_person.id}")
else:
active_person = user.person
addr_id = await GeoService.get_or_create_full_address(
db,
zip_code=kyc_in.address_zip,
city=kyc_in.address_city,
street_name=kyc_in.address_street_name,
street_type=kyc_in.address_street_type,
house_number=kyc_in.address_house_number,
parcel_id=kyc_in.address_hrsz
)
active_person.mothers_last_name = kyc_in.mothers_last_name
active_person.mothers_first_name = kyc_in.mothers_first_name
active_person.birth_place = kyc_in.birth_place
active_person.birth_date = kyc_in.birth_date
active_person.phone = kyc_in.phone_number
active_person.address_id = addr_id
active_person.identity_docs = jsonable_encoder(kyc_in.identity_docs)
active_person.ice_contact = jsonable_encoder(kyc_in.ice_contact)
active_person.is_active = True
new_org = Organization(
full_name=f"{active_person.last_name} {active_person.first_name} Egyéni Flotta",
name=f"{active_person.last_name} Flotta",
org_type=OrgType.individual,
owner_id=user.id,
is_transferable=False,
is_active=True,
status="verified",
language=user.preferred_language,
default_currency=user.preferred_currency,
country_code=user.region_code
)
db.add(new_org)
await db.flush()
db.add(OrganizationMember(
organization_id=new_org.id,
user_id=user.id,
role="owner",
permissions={"can_add_asset": True, "can_view_costs": True, "is_admin": True}
))
db.add(Wallet(
user_id=user.id,
coin_balance=0,
credit_balance=0,
currency=user.preferred_currency
))
db.add(UserStats(user_id=user.id, total_xp=0, current_level=1))
user.is_active = True
await db.commit()
await db.refresh(user)
return user
except Exception as e:
await db.rollback()
logger.error(f"KYC Atomi Tranzakció Hiba: {str(e)}")
raise e
@staticmethod
async def soft_delete_user(db: AsyncSession, user_id: int, reason: str, actor_id: int):
"""
Step 2 utáni Soft-Delete: Email felszabadítás és izoláció.
Az email átnevezésre kerül, így az eredeti cím újra regisztrálható 'tiszta lappal'.
"""
stmt = select(User).where(User.id == user_id)
user = (await db.execute(stmt)).scalar_one_or_none()
if not user or user.is_deleted:
return False
old_email = user.email
# Email felszabadítása: deleted_ID_TIMESTAMP_EMAIL formátumban
user.email = f"deleted_{user.id}_{datetime.now().strftime('%Y%m%d')}_{old_email}"
user.is_deleted = True
user.is_active = False
# Sentinel AuditLog bejegyzés
await security_service.log_event(
db,
user_id=actor_id,
action="USER_SOFT_DELETE",
severity="warning",
target_type="User",
target_id=str(user_id),
old_data={"email": old_email},
new_data={"is_deleted": True, "reason": reason}
)
await db.commit()
return True
@staticmethod
async def verify_email(db: AsyncSession, token_str: str):
try:
token_uuid = uuid.UUID(token_str)
stmt = select(VerificationToken).where(
and_(
VerificationToken.token == token_uuid,
VerificationToken.is_used == False,
VerificationToken.expires_at > datetime.now(timezone.utc)
)
)
res = await db.execute(stmt)
token = res.scalar_one_or_none()
if not token: return False
token.is_used = True
await db.commit()
return True
except:
return False
@staticmethod
async def authenticate(db: AsyncSession, email: str, password: str):
stmt = select(User).where(and_(User.email == email, User.is_deleted == False))
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if user and verify_password(password, user.hashed_password):
return user
return None
@staticmethod
async def initiate_password_reset(db: AsyncSession, email: str):
stmt = select(User).where(and_(User.email == email, User.is_deleted == False))
user = (await db.execute(stmt)).scalar_one_or_none()
if user:
reset_hours = await config.get_setting("auth_password_reset_hours", region_code=user.region_code, default=2)
token_val = uuid.uuid4()
db.add(VerificationToken(
token=token_val,
user_id=user.id,
token_type="password_reset",
expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reset_hours))
))
reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?token={token_val}"
await email_manager.send_email(
recipient=email,
template_key="pwd_reset",
variables={"link": reset_link},
lang=user.preferred_language
)
await db.commit()
return "success"
return "not_found"
@staticmethod
async def reset_password(db: AsyncSession, email: str, token_str: str, new_password: str):
try:
token_uuid = uuid.UUID(token_str)
stmt = select(VerificationToken).join(User).where(
and_(
User.email == email,
VerificationToken.token == token_uuid,
VerificationToken.token_type == "password_reset",
VerificationToken.is_used == False,
VerificationToken.expires_at > datetime.now(timezone.utc)
)
)
token_rec = (await db.execute(stmt)).scalar_one_or_none()
if not token_rec: return False
user_stmt = select(User).where(User.id == token_rec.user_id)
user = (await db.execute(user_stmt)).scalar_one()
user.hashed_password = get_password_hash(new_password)
token_rec.is_used = True
await db.commit()
return True
except:
return False