FIX: Import error and enhanced atomized address structure for organizations

This commit is contained in:
2026-02-07 14:37:39 +00:00
parent 7249aa5809
commit e370ca3021
8 changed files with 150 additions and 30 deletions

View File

@@ -1,9 +1,12 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List
from app.db.session import get_db
from app.schemas.organization import CorpOnboardIn, CorpOnboardResponse
from app.models.organization import Organization, OrgType
from app.models.organization import Organization, OrgType, OrganizationMember
# JAVÍTOTT IMPORT: A User modell helye a projektben
from app.models.user import User
from app.core.config import settings
import os
import re
@@ -18,18 +21,15 @@ async def onboard_organization(
db: AsyncSession = Depends(get_db)
):
"""
Új szervezet (cég/szerviz) rögzítése.
- Magyar adószám validáció (XXXXXXXX-Y-ZZ).
- Duplikáció ellenőrzés adószám alapján.
- NAS mappa és DB rekord létrehozása.
Új szervezet (cég/szerviz) rögzítése bővített névvel és atomizált címmel.
"""
# 1. Magyar adószám validáció
# 1. Magyar adószám validáció (XXXXXXXX-Y-ZZ)
if org_in.country_code == "HU":
if not re.match(r"^\d{8}-\d-\d{2}$", org_in.tax_number):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Érvénytelen magyar adószám formátum! (Példa: 12345678-1-12)"
detail="Érvénytelen magyar adószám formátum!"
)
# 2. Duplikáció ellenőrzés
@@ -41,30 +41,76 @@ async def onboard_organization(
detail="Ezzel az adószámmal már regisztráltak céget!"
)
# 3. Mentés (Dinamikus státusszal és kisbetűs Enummal)
# 3. Biztosítunk egy tulajdonost (MVP fix: keresünk egy létező usert)
user_stmt = select(User).limit(1)
user_res = await db.execute(user_stmt)
test_user = user_res.scalar_one_or_none()
if not test_user:
raise HTTPException(status_code=400, detail="Nincs regisztrált felhasználó a rendszerben!")
# 4. Mentés (Szervezet létrehozása atomizált adatokkal és név-hierarchiával)
new_org = Organization(
full_name=org_in.full_name,
name=org_in.name,
display_name=org_in.display_name,
tax_number=org_in.tax_number,
reg_number=org_in.reg_number,
headquarters_address=org_in.headquarters_address,
address_zip=org_in.address_zip,
address_city=org_in.address_city,
address_street_name=org_in.address_street_name,
address_street_type=org_in.address_street_type,
address_house_number=org_in.address_house_number,
address_hrsz=org_in.address_hrsz,
address_stairwell=org_in.address_stairwell,
address_floor=org_in.address_floor,
address_door=org_in.address_door,
country_code=org_in.country_code,
org_type=OrgType.business, # Most már kisbetűs 'business' kerül beküldésre
org_type=OrgType.business,
status="pending_verification"
)
db.add(new_org)
await db.flush() # ID generálás a NAS-hoz
await db.flush()
# 4. NAS Mappa létrehozása
# 5. TULAJDONOS RÖGZÍTÉSE (Membership lánc)
owner_member = OrganizationMember(
organization_id=new_org.id,
user_id=test_user.id,
role="owner"
)
db.add(owner_member)
# 6. NAS Mappa létrehozása (Org izoláció)
try:
base_path = getattr(settings, "NAS_STORAGE_PATH", "/mnt/nas/app_data")
org_path = os.path.join(base_path, "organizations", str(new_org.id))
os.makedirs(org_path, exist_ok=True)
logger.info(f"NAS mappa létrehozva szervezetnek: {org_path}")
os.makedirs(os.path.join(org_path, "documents"), exist_ok=True)
logger.info(f"NAS mappa struktúra kész: {org_path}")
except Exception as e:
logger.error(f"NAS hiba az onboardingnál: {e}")
logger.error(f"NAS hiba: {e}")
await db.commit()
await db.refresh(new_org)
return {"organization_id": new_org.id, "status": new_org.status}
return {"organization_id": new_org.id, "status": new_org.status}
@router.get("/my", response_model=List[CorpOnboardResponse])
async def get_my_organizations(
db: AsyncSession = Depends(get_db)
):
"""
A bejelentkezett felhasználóhoz tartozó összes cég/szervezet listázása.
"""
# MVP Teszt: Kézzel keresünk egy létező usert (később: current_user.id)
user_stmt = select(User).limit(1)
user_res = await db.execute(user_stmt)
test_user = user_res.scalar_one_or_none()
if not test_user:
return []
stmt = select(Organization).join(OrganizationMember).where(OrganizationMember.user_id == test_user.id)
result = await db.execute(stmt)
orgs = result.scalars().all()
return [{"organization_id": o.id, "status": o.status} for o in orgs]