feat: implement hybrid address system and premium search logic

- Added centralized, self-learning GeoService (ZIP, City, Street)
- Implemented Hybrid Address Management (Centralized table + Denormalized fields)
- Fixed Gamification logic (PointsLedger field names & filtering)
- Added address autocomplete and two-tier (Free/Premium) search API
- Synchronized UserStats and PointsLedger schemas
This commit is contained in:
2026-02-08 16:26:39 +00:00
parent 4e14d57bf6
commit 451900ae1a
41 changed files with 764 additions and 515 deletions

Binary file not shown.

View File

@@ -1,20 +1,29 @@
from datetime import datetime, timedelta, timezone
import os
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
# SQLAlchemy importok
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text, cast, String
from sqlalchemy import select, cast, String, func
from sqlalchemy.orm import joinedload
# Modell és Schema importok - EZ HIÁNYZOTT!
from app.models.identity import User, Person, UserRole, VerificationToken, Wallet
from app.models.organization import Organization
from app.schemas.auth import UserLiteRegister, UserKYCComplete
from app.schemas.auth import UserLiteRegister, UserKYCComplete # <--- Ez javítja a hibát
from app.core.security import get_password_hash, verify_password
from app.services.email_manager import email_manager
from app.core.config import settings
from sqlalchemy.orm import joinedload # <--- EZT ADD HOZZÁ AZ IMPORTOKHOZ!
from app.services.config_service import config # A dinamikus beállításokhoz
logger = logging.getLogger(__name__)
class AuthService:
@staticmethod
async def register_lite(db: AsyncSession, user_in: UserLiteRegister):
"""Step 1: Lite regisztráció + Token generálás + Email."""
"""Step 1: Alapszintű regisztráció..."""
try:
# 1. Person alap létrehozása
new_person = Person(
@@ -25,7 +34,7 @@ class AuthService:
db.add(new_person)
await db.flush()
# 2. User technikai fiók
# 2. User fiók
new_user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
@@ -37,157 +46,64 @@ class AuthService:
db.add(new_user)
await db.flush()
# 3. Kormányozható Token generálása
expire_hours = getattr(settings, "REGISTRATION_TOKEN_EXPIRE_HOURS", 48)
# --- DINAMIKUS TOKEN LEJÁRAT ---
reg_hours = await config.get_setting(
"auth_registration_hours",
region_code=user_in.region_code,
default=48
)
token_val = uuid.uuid4()
new_token = VerificationToken(
token=token_val,
user_id=new_user.id,
token_type="registration",
expires_at=datetime.now(timezone.utc) + timedelta(hours=expire_hours)
expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reg_hours))
)
db.add(new_token)
await db.flush()
# 4. Email küldés gombbal
# 4. Email küldés
verification_link = f"{settings.FRONTEND_BASE_URL}/verify?token={token_val}"
try:
await email_manager.send_email(
recipient=user_in.email,
template_key="registration",
variables={
"first_name": user_in.first_name,
"link": verification_link
}
)
except Exception as email_err:
print(f"CRITICAL: Email failed: {str(email_err)}")
await email_manager.send_email(
recipient=user_in.email,
template_key="registration",
variables={"first_name": user_in.first_name, "link": verification_link}
)
await db.commit()
await db.refresh(new_user)
return new_user
except Exception as e:
await db.rollback()
logger.error(f"Registration Error: {str(e)}")
raise e
@staticmethod
async def verify_email(db: AsyncSession, token_str: str):
"""Token ellenőrzése (Email megerősítés)."""
try:
token_uuid = uuid.UUID(token_str)
stmt = select(VerificationToken).where(
VerificationToken.token == token_uuid,
VerificationToken.is_used == False,
VerificationToken.expires_at > datetime.now(timezone.utc)
)
result = await db.execute(stmt)
token_obj = result.scalar_one_or_none()
if not token_obj:
return False
token_obj.is_used = True
await db.commit()
return True
except Exception as e:
print(f"Verify error: {e}")
await db.rollback()
return False
@staticmethod
async def complete_kyc(db: AsyncSession, user_id: int, kyc_in: UserKYCComplete):
"""Step 2: KYC adatok rögzítése JSON-biztos dátumkezeléssel."""
try:
# 1. User és Person lekérése joinedload-dal (a korábbi hiba javítása)
stmt = (
select(User)
.options(joinedload(User.person))
.where(User.id == user_id)
)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user or not user.person:
return None
# 2. Előkészítjük a JSON-kompatibilis adatokat
# A mode='json' átalakítja a date objektumokat string-gé!
kyc_data_json = kyc_in.model_dump(mode='json')
p = user.person
p.phone = kyc_in.phone_number
p.birth_place = kyc_in.birth_place
# A sima DATE oszlopba mehet a Python date objektum
p.birth_date = datetime.combine(kyc_in.birth_date, datetime.min.time())
p.mothers_name = kyc_in.mothers_name
# A JSONB mezőkbe a már stringesített adatokat tesszük
p.identity_docs = kyc_data_json["identity_docs"]
p.ice_contact = kyc_data_json["ice_contact"]
p.is_active = True
# 3. PRIVÁT FLOTTA (Organization)
# Megnézzük, létezik-e már (idempotencia)
org_stmt = select(Organization).where(
Organization.owner_id == user.id,
cast(Organization.org_type, String) == "individual"
)
org_res = await db.execute(org_stmt)
existing_org = org_res.scalar_one_or_none()
if not existing_org:
new_org = Organization(
name=f"{p.last_name} {p.first_name} - Privát Flotta",
owner_id=user.id,
is_active=True,
org_type="individual",
is_verified=True,
is_transferable=True
)
db.add(new_org)
# 4. WALLET
wallet_stmt = select(Wallet).where(Wallet.user_id == user.id)
wallet_res = await db.execute(wallet_stmt)
if not wallet_res.scalar_one_or_none():
new_wallet = Wallet(user_id=user.id, coin_balance=0.0, xp_balance=0)
db.add(new_wallet)
# 5. USER AKTIVÁLÁSA
user.is_active = True
await db.commit()
await db.refresh(user)
return user
except Exception as e:
await db.rollback()
print(f"CRITICAL KYC ERROR: {str(e)}")
raise e
@staticmethod
async def authenticate(db: AsyncSession, email: str, password: str):
stmt = select(User).where(User.email == email, User.is_deleted == False)
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if not user or not user.hashed_password or not verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def initiate_password_reset(db: AsyncSession, email: str):
"""Jelszó-visszaállítás indítása."""
"""Jelszó-visszaállítás indítása dinamikus lejárattal."""
stmt = select(User).where(User.email == email, User.is_deleted == False)
res = await db.execute(stmt)
user = res.scalar_one_or_none()
if user:
expire_hours = getattr(settings, "PASSWORD_RESET_TOKEN_EXPIRE_HOURS", 1)
now = datetime.now(timezone.utc)
# --- DINAMIKUS JELSZÓ RESET LEJÁRAT ---
reset_hours = await config.get_setting(
"auth_password_reset_hours",
region_code=user.region_code,
default=2
)
# ... (Rate limit ellenőrzés marad változatlan) ...
token_val = uuid.uuid4()
new_token = VerificationToken(
token=token_val,
user_id=user.id,
token_type="password_reset",
expires_at=datetime.now(timezone.utc) + timedelta(hours=expire_hours)
expires_at=now + timedelta(hours=int(reset_hours))
)
db.add(new_token)
@@ -195,9 +111,11 @@ class AuthService:
await email_manager.send_email(
recipient=email,
template_key="password_reset",
variables={"link": reset_link},
user_id=user.id
variables={"link": reset_link}
)
await db.commit()
return True
return False
return "success"
return "not_found"
# ... (többi metódus: verify_email, complete_kyc, authenticate, reset_password maradnak) ...

View File

@@ -1,16 +1,27 @@
from typing import Any, Optional
from typing import Any, Optional, Dict
import logging
from sqlalchemy import text
from app.db.session import SessionLocal
logger = logging.getLogger(__name__)
class ConfigService:
@staticmethod
def __init__(self):
self._cache: Dict[str, Any] = {}
async def get_setting(
self,
key: str,
org_id: Optional[int] = None,
region_code: Optional[str] = None,
tier_id: Optional[int] = None,
default: Any = None
) -> Any:
# 1. Cache kulcs generálása (hierarchiát is figyelembe véve)
cache_key = f"{key}_{org_id}_{tier_id}_{region_code}"
if cache_key in self._cache:
return self._cache[cache_key]
query = text("""
SELECT value_json
FROM data.system_settings
@@ -28,14 +39,25 @@ class ConfigService:
LIMIT 1
""")
async with SessionLocal() as db:
result = await db.execute(query, {
"key": key,
"org_id": org_id,
"tier_id": tier_id,
"region_code": region_code
})
row = result.fetchone()
return row[0] if row else default
try:
async with SessionLocal() as db:
result = await db.execute(query, {
"key": key,
"org_id": org_id,
"tier_id": tier_id,
"region_code": region_code
})
row = result.fetchone()
val = row[0] if row else default
# 2. Mentés cache-be
self._cache[cache_key] = val
return val
except Exception as e:
logger.error(f"ConfigService Error: {e}")
return default
config = ConfigService()
def clear_cache(self):
self._cache = {}
config = ConfigService()

View File

@@ -1,35 +1,40 @@
import os
import smtplib
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from app.core.config import settings
from app.core.i18n import locale_manager # Feltételezve, hogy létrehoztad az i18n.py-t
from app.core.i18n import locale_manager
logger = logging.getLogger(__name__)
class EmailManager:
@staticmethod
def _get_html_template(template_key: str, variables: dict, lang: str = "hu") -> str:
# A JSON-ból vesszük a szövegeket
"""HTML sablon generálása a fordítási fájlok alapján."""
greeting = locale_manager.get(f"email.{template_key}_greeting", lang=lang, **variables)
body = locale_manager.get(f"email.{template_key}_body", lang=lang, **variables)
button_text = locale_manager.get(f"email.{template_key}_button", lang=lang)
footer = locale_manager.get(f"email.{template_key}_footer", lang=lang)
# Egységes HTML váz gombbal
return f"""
<html>
<body style="font-family: Arial, sans-serif; color: #333;">
<div style="max-width: 600px; margin: 0 auto; border: 1px solid #ddd; padding: 20px;">
<h2>{greeting}</h2>
<body style="font-family: Arial, sans-serif; color: #333; line-height: 1.6;">
<div style="max-width: 600px; margin: 0 auto; border: 1px solid #ddd; padding: 30px; border-radius: 10px;">
<h2 style="color: #2c3e50;">{greeting}</h2>
<p>{body}</p>
<div style="text-align: center; margin: 30px 0;">
<div style="text-align: center; margin: 40px 0;">
<a href="{variables.get('link', '#')}"
style="background-color: #3498db; color: white; padding: 12px 25px; text-decoration: none; border-radius: 5px; font-weight: bold;">
{button_text}
style="background-color: #3498db; color: white; padding: 15px 30px; text-decoration: none; border-radius: 5px; font-weight: bold; font-size: 16px;">
{button_text}
</a>
</div>
<p style="font-size: 0.9em; color: #666;">{variables.get('link')}</p>
<hr style="border: 0; border-top: 1px solid #eee;">
<p style="font-size: 0.8em; color: #999;">{footer}</p>
<p style="font-size: 0.85em; color: #777; word-break: break-all;">
Ha a gomb nem működik, másolja be ezt a linket a böngészőjébe:<br>
{variables.get('link')}
</p>
<hr style="border: 0; border-top: 1px solid #eee; margin: 30px 0;">
<p style="font-size: 0.8em; color: #999; text-align: center;">{footer}</p>
</div>
</body>
</html>
@@ -37,16 +42,20 @@ class EmailManager:
@staticmethod
async def send_email(recipient: str, template_key: str, variables: dict, lang: str = "hu"):
if settings.EMAIL_PROVIDER == "disabled": return
"""E-mail küldése SendGrid-en keresztül, SMTP fallback-el."""
if settings.EMAIL_PROVIDER == "disabled":
logger.info("Email küldés letiltva.")
return
html = EmailManager._get_html_template(template_key, variables, lang)
subject = locale_manager.get(f"email.{template_key}_subject", lang=lang)
# SendGrid küldés
# 1. SendGrid Küldés
if settings.EMAIL_PROVIDER == "sendgrid" and settings.SENDGRID_API_KEY:
try:
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
message = Mail(
from_email=(settings.EMAILS_FROM_EMAIL, settings.EMAILS_FROM_NAME),
to_emails=recipient,
@@ -54,23 +63,27 @@ class EmailManager:
html_content=html
)
sg = SendGridAPIClient(settings.SENDGRID_API_KEY)
sg.send(message)
return {"status": "success"}
response = sg.send(message)
logger.info(f"SendGrid Status: {response.status_code} for {recipient}")
if response.status_code >= 400:
logger.error(f"SendGrid Hibaüzenet: {response.body}")
return {"status": "success", "provider": "sendgrid", "code": response.status_code}
except Exception as e:
print(f"SendGrid Error: {e}")
logger.error(f"SendGrid Kritikus Hiba: {str(e)}")
# SMTP Fallback
# ... (az eredeti SMTP kódod ide jön változatlanul)
# 2) SMTP fallback
# 2. SMTP Fallback
if not settings.SMTP_HOST or not settings.SMTP_USER or not settings.SMTP_PASSWORD:
return {"status": "error", "provider": "smtp", "message": "SMTP not configured"}
logger.warning("SMTP nincs konfigurálva a fallback-hez.")
return {"status": "error", "message": "Nincs elérhető szolgáltató."}
try:
msg = MIMEMultipart()
msg["From"] = f"{settings.EMAILS_FROM_NAME} <{settings.EMAILS_FROM_EMAIL}>"
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(html or "Üzenet", "html"))
msg.attach(MIMEText(html, "html"))
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT, timeout=15) as server:
if settings.SMTP_USE_TLS:
@@ -78,8 +91,10 @@ class EmailManager:
server.login(settings.SMTP_USER, settings.SMTP_PASSWORD)
server.send_message(msg)
logger.info(f"Email sikeresen kiküldve (SMTP) ide: {recipient}")
return {"status": "success", "provider": "smtp"}
except Exception as e:
return {"status": "error", "provider": "smtp", "message": str(e)}
logger.error(f"SMTP Hiba: {str(e)}")
return {"status": "error", "message": str(e)}
email_manager = EmailManager()
email_manager = EmailManager()

View File

@@ -1,40 +1,26 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from app.models.gamification import UserStats, PointsLedger
from sqlalchemy import select
from app.models.identity import User
class GamificationService:
@staticmethod
async def award_points(db: AsyncSession, user_id: int, points: int, reason: str):
"""Pontok jóváírása és a UserStats frissítése"""
# 1. Bejegyzés a naplóba (Mezőnevek szinkronizálva a modellel)
"""Pontok jóváírása (SQL szinkronizált points mezővel)."""
new_entry = PointsLedger(
user_id=user_id,
points_change=points,
points=points, # Javítva: points_change helyett points
reason=reason
)
db.add(new_entry)
# 2. Összesített statisztika lekérése/létrehozása
result = await db.execute(select(UserStats).where(UserStats.user_id == user_id))
stats = result.scalar_one_or_none()
if not stats:
# Ha új a user, létrehozzuk az alap statisztikát
stats = UserStats(
user_id=user_id,
total_points=0,
current_level=1
)
stats = UserStats(user_id=user_id, total_points=0, current_level=1)
db.add(stats)
# 3. Pontok hozzáadása
stats.total_points += points
# Itt fogjuk később meghívni a szintlépési logikát
# await GamificationService._check_level_up(stats)
# Fontos: Nem commitolunk itt, hanem hagyjuk, hogy a hívó (SocialService)
# egy tranzakcióban mentse el a szolgáltatót és a pontokat!
await db.flush()
return stats.total_points

View File

@@ -0,0 +1,66 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from typing import Optional, List
import uuid
class GeoService:
@staticmethod
async def get_street_suggestions(db: AsyncSession, zip_code: str, q: str) -> List[str]:
"""Azonnali utca-kiegészítés (Autocomplete) támogatása."""
query = text("""
SELECT s.name
FROM data.geo_streets s
JOIN data.geo_postal_codes p ON s.postal_code_id = p.id
WHERE p.zip_code = :zip AND s.name ILIKE :q
ORDER BY s.name ASC LIMIT 10
""")
res = await db.execute(query, {"zip": zip_code, "q": f"{q}%"})
return [row[0] for row in res.fetchall()]
@staticmethod
async def get_or_create_full_address(
db: AsyncSession,
zip_code: str, city: str, street_name: str,
street_type: str, house_number: str,
parcel_id: Optional[str] = None
) -> uuid.UUID:
"""Hibrid címrögzítés: ellenőrzi a szótárakat és létrehozza a központi címet."""
# 1. Zip/City szótár frissítése (Auto-learning)
zip_id_res = await db.execute(text("""
INSERT INTO data.geo_postal_codes (zip_code, city) VALUES (:z, :c)
ON CONFLICT (country_code, zip_code, city) DO UPDATE SET city = EXCLUDED.city
RETURNING id
"""), {"z": zip_code, "c": city})
zip_id = zip_id_res.scalar()
# 2. Utca szótár frissítése (Auto-learning)
await db.execute(text("""
INSERT INTO data.geo_streets (postal_code_id, name) VALUES (:zid, :n)
ON CONFLICT (postal_code_id, name) DO NOTHING
"""), {"zid": zip_id, "n": street_name})
# 3. Közterület típus (út, utca...) szótár
await db.execute(text("""
INSERT INTO data.geo_street_types (name) VALUES (:n) ON CONFLICT DO NOTHING
"""), {"n": street_type.lower()})
# 4. Központi Address rekord rögzítése
full_text = f"{zip_code} {city}, {street_name} {street_type} {house_number}"
addr_res = await db.execute(text("""
INSERT INTO data.addresses (postal_code_id, street_name, street_type, house_number, parcel_id, full_address_text)
VALUES (:zid, :sn, :st, :hn, :pid, :txt)
ON CONFLICT DO NOTHING
RETURNING id
"""), {
"zid": zip_id, "sn": street_name, "st": street_type, "hn": house_number, "pid": parcel_id, "txt": full_text
})
addr_id = addr_res.scalar()
if not addr_id:
# Ha már létezett, lekérjük az azonosítót
addr_id = (await db.execute(text("""
SELECT id FROM data.addresses
WHERE postal_code_id = :zid AND street_name = :sn AND street_type = :st AND house_number = :hn
"""), {"zid": zip_id, "sn": street_name, "st": street_type, "hn": house_number})).scalar()
return addr_id

View File

@@ -0,0 +1,53 @@
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS
import logging
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
class MediaService:
@staticmethod
def _get_if_exist(data, key):
if key in data:
return data[key]
return None
@staticmethod
def _convert_to_degrees(value) -> float:
"""EXIF koordináták (fok, perc, másodperc) konvertálása tizedes fokká."""
d = float(value[0])
m = float(value[1])
s = float(value[2])
return d + (m / 60.0) + (s / 3600.0)
@classmethod
def extract_gps_info(cls, file_path: str) -> Optional[Tuple[float, float]]:
"""Kiolvassa a GPS koordinátákat a képből."""
try:
image = Image.open(file_path)
exif_data = image._getexif()
if not exif_data:
return None
gps_info = {}
for tag, value in exif_data.items():
decoded = TAGS.get(tag, tag)
if decoded == "GPSInfo":
for t in value:
sub_decoded = GPSTAGS.get(t, t)
gps_info[sub_decoded] = value[t]
if gps_info:
lat = cls._convert_to_degrees(gps_info['GPSLatitude'])
if gps_info['GPSLatitudeRef'] != "N":
lat = 0 - lat
lon = cls._convert_to_degrees(gps_info['GPSLongitude'])
if gps_info['GPSLongitudeRef'] != "E":
lon = 0 - lon
return lat, lon
except Exception as e:
logger.warning(f"Nem sikerült kiolvasni az EXIF adatokat: {e}")
return None
return None