import os import logging import uuid import json from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_ from sqlalchemy.orm import joinedload from fastapi.encoders import jsonable_encoder from fastapi import HTTPException, status from app.models.identity import User, Person, UserRole, VerificationToken, Wallet from app.models.gamification import UserStats from app.models.organization import Organization, OrganizationMember, OrgType from app.schemas.auth import UserLiteRegister, UserKYCComplete from app.core.security import get_password_hash, verify_password, generate_secure_slug from app.services.email_manager import email_manager from app.core.config import settings from app.services.config_service import config from app.services.geo_service import GeoService from app.services.security_service import security_service # Sentinel integráció logger = logging.getLogger(__name__) class AuthService: @staticmethod async def register_lite(db: AsyncSession, user_in: UserLiteRegister): """ Step 1: Lite Regisztráció (Manuális). Létrehozza a Person és User rekordokat, de a fiók inaktív marad. A folder_slug itt még NEM generálódik le! """ try: # --- Dinamikus jelszóhossz ellenőrzés --- # Lekérjük az admin beállítást, minimum 8 karakter a hard limit. min_pass = await config.get_setting(db, "auth_min_password_length", default=8) min_len = max(int(min_pass), 8) if len(user_in.password) < min_len: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"A jelszónak legalább {min_len} karakter hosszúnak kell lennie." ) new_person = Person( first_name=user_in.first_name, last_name=user_in.last_name, is_active=False ) db.add(new_person) await db.flush() new_user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), person_id=new_person.id, role=UserRole.user, is_active=False, is_deleted=False, region_code=user_in.region_code, preferred_language=user_in.lang, timezone=user_in.timezone # folder_slug marad NULL a Step 2-ig ) db.add(new_user) await db.flush() # Verifikációs token generálása reg_hours = await config.get_setting(db, "auth_registration_hours", region_code=user_in.region_code, default=48) token_val = uuid.uuid4() db.add(VerificationToken( token=token_val, user_id=new_user.id, token_type="registration", expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reg_hours)) )) # Email kiküldése verification_link = f"{settings.FRONTEND_BASE_URL}/verify?token={token_val}" await email_manager.send_email( recipient=user_in.email, template_key="reg", variables={"first_name": user_in.first_name, "link": verification_link}, lang=user_in.lang ) # Audit log a regisztrációról await security_service.log_event( db, user_id=new_user.id, action="USER_REGISTER_LITE", severity="info", target_type="User", target_id=str(new_user.id), new_data={"email": user_in.email, "method": "manual"} ) await db.commit() await db.refresh(new_user) return new_user except HTTPException: await db.rollback() raise except Exception as e: await db.rollback() logger.error(f"Registration Error: {str(e)}") raise e @staticmethod async def complete_kyc(db: AsyncSession, user_id: int, kyc_in: UserKYCComplete): """ Step 2: Atomi Tranzakció. Módosított verzió: Meglévő biztonsági logika + Telephely (Branch) integráció. """ try: # 1. User és Person betöltése stmt = select(User).options(joinedload(User.person)).where(User.id == user_id) res = await db.execute(stmt) user = res.scalar_one_or_none() if not user: return None # --- BIZTONSÁG: Slug generálása --- if not user.folder_slug: user.folder_slug = generate_secure_slug(length=12) if hasattr(kyc_in, 'preferred_currency') and kyc_in.preferred_currency: user.preferred_currency = kyc_in.preferred_currency # --- SHADOW IDENTITY ELLENŐRZÉS --- identity_stmt = select(Person).where(and_( Person.mothers_last_name == kyc_in.mothers_last_name, Person.mothers_first_name == kyc_in.mothers_first_name, Person.birth_place == kyc_in.birth_place, Person.birth_date == kyc_in.birth_date )) existing_person = (await db.execute(identity_stmt)).scalar_one_or_none() if existing_person: user.person_id = existing_person.id active_person = existing_person else: active_person = user.person # --- CÍM RÖGZÍTÉSE --- addr_id = await GeoService.get_or_create_full_address( db, zip_code=kyc_in.address_zip, city=kyc_in.address_city, street_name=kyc_in.address_street_name, street_type=kyc_in.address_street_type, house_number=kyc_in.address_house_number, parcel_id=kyc_in.address_hrsz ) # --- SZEMÉLYES ADATOK FRISSÍTÉSE --- active_person.mothers_last_name = kyc_in.mothers_last_name active_person.mothers_first_name = kyc_in.mothers_first_name active_person.birth_place = kyc_in.birth_place active_person.birth_date = kyc_in.birth_date active_person.phone = kyc_in.phone_number active_person.address_id = addr_id active_person.identity_docs = jsonable_encoder(kyc_in.identity_docs) active_person.ice_contact = jsonable_encoder(kyc_in.ice_contact) active_person.is_active = True # --- EGYÉNI FLOTTA LÉTREHOZÁSA --- new_org = Organization( full_name=f"{active_person.last_name} {active_person.first_name} Egyéni Flotta", name=f"{active_person.last_name} Flotta", folder_slug=generate_secure_slug(length=12), org_type=OrgType.individual, owner_id=user.id, is_transferable=False, # Step 2: Individual flotta nem átruházható is_ownership_transferable=False, # A te új meződ is_active=True, status="verified", language=user.preferred_language, default_currency=user.preferred_currency or "HUF", country_code=user.region_code ) db.add(new_org) await db.flush() # --- ÚJ: MAIN BRANCH (KÖZPONTI TELEPHELY) LÉTREHOZÁSA --- # Magánszemélynél a megadott cím lesz az első telephely is. from app.models.address import Branch new_branch = Branch( organization_id=new_org.id, address_id=addr_id, name="Központ / Otthon", is_main=True, postal_code=kyc_in.address_zip, city=kyc_in.address_city, street_name=kyc_in.address_street_name, street_type=kyc_in.address_street_type, house_number=kyc_in.address_house_number, hrsz=kyc_in.address_hrsz, status="active" ) db.add(new_branch) await db.flush() # --- TAGSÁG, WALLET, STATS --- db.add(OrganizationMember( organization_id=new_org.id, user_id=user.id, role="owner", permissions={"can_add_asset": True, "can_view_costs": True, "is_admin": True} )) db.add(Wallet(user_id=user.id, currency=user.preferred_currency or "HUF")) db.add(UserStats(user_id=user.id, total_xp=0, current_level=1)) # --- 7. AKTIVÁLÁS ÉS AUDIT (Ami az előzőből kimaradt) --- user.is_active = True await security_service.log_event( db, user_id=user.id, action="USER_KYC_COMPLETED", severity="info", target_type="User", target_id=str(user.id), new_data={ "status": "active", "user_folder": user.folder_slug, "organization_id": new_org.id, "branch_id": str(new_branch.id), # Új telephely az auditban "wallet_created": True } ) await db.commit() await db.refresh(user) return user except Exception as e: await db.rollback() logger.error(f"KYC Atomi Tranzakció Hiba: {str(e)}") raise e @staticmethod async def soft_delete_user(db: AsyncSession, user_id: int, reason: str, actor_id: int): """ Soft-Delete: Email felszabadítás és izoláció. """ stmt = select(User).where(User.id == user_id) user = (await db.execute(stmt)).scalar_one_or_none() if not user or user.is_deleted: return False old_email = user.email # Email átnevezése az egyediség megőrzése érdekében (újraregisztrációhoz) user.email = f"deleted_{user.id}_{datetime.now().strftime('%Y%m%d')}_{old_email}" user.is_deleted = True user.is_active = False await security_service.log_event( db, user_id=actor_id, action="USER_SOFT_DELETE", severity="warning", target_type="User", target_id=str(user_id), old_data={"email": old_email}, new_data={"is_deleted": True, "reason": reason} ) await db.commit() return True @staticmethod async def verify_email(db: AsyncSession, token_str: str): try: token_uuid = uuid.UUID(token_str) stmt = select(VerificationToken).where( and_( VerificationToken.token == token_uuid, VerificationToken.is_used == False, VerificationToken.expires_at > datetime.now(timezone.utc) ) ) res = await db.execute(stmt) token = res.scalar_one_or_none() if not token: return False token.is_used = True await db.commit() return True except: return False @staticmethod async def authenticate(db: AsyncSession, email: str, password: str): stmt = select(User).where(and_(User.email == email, User.is_deleted == False)) res = await db.execute(stmt) user = res.scalar_one_or_none() if user and verify_password(password, user.hashed_password): return user return None @staticmethod async def initiate_password_reset(db: AsyncSession, email: str): stmt = select(User).where(and_(User.email == email, User.is_deleted == False)) user = (await db.execute(stmt)).scalar_one_or_none() if user: reset_hours = await config.get_setting(db, "auth_password_reset_hours", region_code=user.region_code, default=2) token_val = uuid.uuid4() db.add(VerificationToken( token=token_val, user_id=user.id, token_type="password_reset", expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reset_hours)) )) reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?token={token_val}" await email_manager.send_email( recipient=email, template_key="pwd_reset", variables={"link": reset_link}, lang=user.preferred_language ) await db.commit() return "success" return "not_found" @staticmethod async def reset_password(db: AsyncSession, email: str, token_str: str, new_password: str): try: token_uuid = uuid.UUID(token_str) stmt = select(VerificationToken).join(User).where( and_( User.email == email, VerificationToken.token == token_uuid, VerificationToken.token_type == "password_reset", VerificationToken.is_used == False, VerificationToken.expires_at > datetime.now(timezone.utc) ) ) token_rec = (await db.execute(stmt)).scalar_one_or_none() if not token_rec: return False user_stmt = select(User).where(User.id == token_rec.user_id) user = (await db.execute(user_stmt)).scalar_one() user.hashed_password = get_password_hash(new_password) token_rec.is_used = True await db.commit() return True except: return False