STABLE: Final schema sync, optimized gitignore

This commit is contained in:
Kincses
2026-02-26 08:19:25 +01:00
parent 893f39fa15
commit 505543330a
203 changed files with 11590 additions and 9542 deletions

View File

@@ -1,7 +1,9 @@
# /opt/docker/dev/service_finder/backend/app/core/config.py
import os
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, List
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, field_validator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -16,6 +18,11 @@ class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
DEBUG: bool = False
# MB 2.0 Kompatibilitási alias a database.py számára
@property
def DEBUG_MODE(self) -> bool:
return self.DEBUG
# --- Security / JWT ---
SECRET_KEY: str = "NOT_SET_DANGER"
ALGORITHM: str = "HS256"
@@ -27,9 +34,21 @@ class Settings(BaseSettings):
INITIAL_ADMIN_PASSWORD: str = "Admin123!"
# --- Database & Cache ---
DATABASE_URL: str
# Alapértelmezett értéket adunk, hogy ne szálljon el, ha a .env hiányos
DATABASE_URL: str = Field(
default="postgresql+asyncpg://user:password@postgres-db:5432/service_finder",
env="DATABASE_URL"
)
REDIS_URL: str = "redis://service_finder_redis:6379/0"
@property
def SQLALCHEMY_DATABASE_URI(self) -> str:
"""
Ez a property biztosítja, hogy a database.py és az Alembic
megtalálja a kapcsolatot a várt néven.
"""
return self.DATABASE_URL
# --- Email ---
EMAIL_PROVIDER: str = "auto"
EMAILS_FROM_EMAIL: str = "info@profibot.hu"
@@ -43,6 +62,11 @@ class Settings(BaseSettings):
# --- External URLs ---
FRONTEND_BASE_URL: str = "https://dev.profibot.hu"
BACKEND_CORS_ORIGINS: List[str] = [
"http://localhost:3001",
"https://dev.profibot.hu",
"http://192.168.100.10:3001"
]
# --- Google OAuth ---
GOOGLE_CLIENT_ID: str = ""
@@ -53,14 +77,9 @@ class Settings(BaseSettings):
LOGIN_RATE_LIMIT_ANON: str = "5/minute"
AUTH_MIN_PASSWORD_LENGTH: int = 8
# --- Dinamikus Admin Motor (Javított) ---
# --- Dinamikus Admin Motor (Sértetlenül hagyva) ---
async def get_db_setting(self, db: AsyncSession, key_name: str, default: Any = None) -> Any:
"""
Lekér egy beállítást a data.system_parameters táblából.
Ha a tábla még nem létezik (migráció előtt), elkapja a hibát és default-ot ad.
"""
try:
# A lekérdezés a system_parameters táblát és a 'key' mezőt használja
query = text("SELECT value FROM data.system_parameters WHERE key = :key")
result = await db.execute(query, {"key": key_name})
row = result.fetchone()
@@ -68,7 +87,6 @@ class Settings(BaseSettings):
return row[0]
return default
except Exception:
# Adatbázis hiba vagy hiányzó tábla esetén fallback az alapértelmezett értékre
return default
model_config = SettingsConfigDict(

View File

@@ -2,6 +2,7 @@
from fastapi import HTTPException, Depends, status
from app.api.deps import get_current_user
from app.models.identity import User
from app.core.config import settings
class RBAC:
def __init__(self, required_perm: str = None, min_rank: int = 0):
@@ -9,32 +10,22 @@ class RBAC:
self.min_rank = min_rank
async def __call__(self, current_user: User = Depends(get_current_user)):
# 1. Szuperadmin (Rank 100) mindent visz
if current_user.role == "SUPERADMIN":
# 1. Superadmin mindent visz (Rank 100)
if current_user.role == "superadmin":
return True
# 2. Rang ellenőrzés (Hierarchia)
# Itt feltételezzük, hogy a role-okhoz rendelt rank-okat egy configból vesszük
user_rank = self.get_role_rank(current_user.role)
# 2. Dinamikus rang ellenőrzés a központi rank_map alapján
user_rank = settings.DEFAULT_RANK_MAP.get(current_user.role.value, 0)
if user_rank < self.min_rank:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Ezen a hierarchia szinten ez a művelet nem engedélyezett."
detail=f"Elégtelen rang. Szükséges szint: {self.min_rank}"
)
# 3. Egyedi képesség ellenőrzés (Capabilities)
user_perms = current_user.custom_permissions.get("capabilities", [])
if self.required_perm and self.required_perm not in user_perms:
# Ha a sablonban sincs benne, akkor tiltás
if not self.check_role_template(current_user.role, self.required_perm):
raise HTTPException(status_code=403, detail="Nincs meg a specifikus jogosultságod.")
# 3. Egyedi képességek (capabilities) ellenőrzése
if self.required_perm:
user_perms = current_user.custom_permissions.get("capabilities", [])
if self.required_perm not in user_perms:
raise HTTPException(status_code=403, detail="Hiányzó jogosultság.")
return True
def get_role_rank(self, role: str):
ranks = {"COUNTRY_ADMIN": 80, "REGION_ADMIN": 60, "MODERATOR": 40, "SALES": 20, "USER": 10}
return ranks.get(role, 0)
def check_role_template(self, role: str, perm: str):
# Ide jön majd az RBAC_MASTER_CONFIG JSON betöltése
return False
return True

View File

@@ -1,45 +1,57 @@
import secrets
# /opt/docker/dev/service_finder/backend/app/core/security.py
import bcrypt
import string
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, Tuple
import bcrypt
from jose import jwt, JWTError
from app.core.config import settings
# A FastAPI-Limiter importokat kivettem innen, mert indítási hibát okoztak.
DEFAULT_RANK_MAP = {
"superadmin": 100, "admin": 80, "fleet_manager": 25,
"service": 15, "user": 10, "driver": 5
}
def generate_secure_slug(length: int = 12) -> str:
"""Biztonságos kód generálása (pl. mappákhoz)."""
alphabet = string.ascii_lowercase + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password: return False
try:
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
except Exception: return False
return bcrypt.checkpw(plain_password.encode("utf-8"), hashed_password.encode("utf-8"))
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def create_tokens(data: Dict[str, Any], access_delta: Optional[timedelta] = None, refresh_delta: Optional[timedelta] = None) -> Tuple[str, str]:
"""Access és Refresh token generálása."""
def create_tokens(data: Dict[str, Any]) -> Tuple[str, str]:
""" Access és Refresh token generálása UTC időzónával. """
to_encode = data.copy()
now = datetime.now(timezone.utc)
acc_min = access_delta if access_delta else timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_payload = {**to_encode, "exp": now + acc_min, "iat": now, "type": "access", "iss": "service-finder-auth"}
# Access Token
acc_expire = now + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_payload = {**to_encode, "exp": acc_expire, "iat": now, "type": "access"}
access_token = jwt.encode(access_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
ref_days = refresh_delta if refresh_delta else timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
refresh_payload = {"sub": str(to_encode.get("sub")), "exp": now + ref_days, "iat": now, "type": "refresh"}
# Refresh Token
ref_expire = now + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
refresh_payload = {"sub": str(to_encode.get("sub")), "exp": ref_expire, "iat": now, "type": "refresh"}
refresh_token = jwt.encode(refresh_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return access_token, refresh_token
def decode_token(token: str) -> Optional[Dict[str, Any]]:
try: return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
except JWTError: return None
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
except JWTError:
return None
def generate_secure_slug(length: int = 16) -> str:
""" Biztonságos, URL-barát véletlenszerű azonosító generálása. """
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
# Teljesen a margón van, így globális konstans lesz!
DEFAULT_RANK_MAP = {
"SUPERADMIN": 100,
"ADMIN": 90,
"AUDITOR": 80,
"ORGANIZATION_OWNER": 70,
"ORGANIZATION_MANAGER": 60,
"ORGANIZATION_MEMBER": 50,
"SERVICE_PROVIDER": 40,
"PREMIUM_USER": 20,
"USER": 10,
"GUEST": 0
}

View File

@@ -1,76 +1,30 @@
# /opt/docker/dev/service_finder/backend/app/models/validators.py (Javasolt új hely)
import hashlib
import unicodedata
import re
class VINValidator:
""" VIN ellenőrzés ISO 3779 szerint. """
@staticmethod
def validate(vin: str) -> bool:
"""VIN (Vehicle Identification Number) ellenőrzése ISO 3779 szerint."""
vin = vin.upper().strip()
# Alapvető formátum: 17 karakter, tiltott betűk (I, O, Q) nélkül
if not re.match(r"^[A-Z0-9]{17}$", vin) or any(c in vin for c in "IOQ"):
return False
# Karakterértékek táblázata
values = {
'A':1, 'B':2, 'C':3, 'D':4, 'E':5, 'F':6, 'G':7, 'H':8, 'J':1, 'K':2, 'L':3, 'M':4,
'N':5, 'P':7, 'R':9, 'S':2, 'T':3, 'U':4, 'V':5, 'W':6, 'X':7, 'Y':8, 'Z':9,
'0':0, '1':1, '2':2, '3':3, '4':4, '5':5, '6':6, '7':7, '8':8, '9':9
}
# Súlyozás a pozíciók alapján
weights = [8, 7, 6, 5, 4, 3, 2, 10, 0, 9, 8, 7, 6, 5, 4, 3, 2]
try:
# 1. Összegzés: érték * súly
total = sum(values[vin[i]] * weights[i] for i in range(17))
# 2. Maradék számítás 11-el
check_digit = total % 11
# 3. A 10-es maradékot 'X'-nek jelöljük
expected = 'X' if check_digit == 10 else str(check_digit)
# 4. Összevetés a 9. karakterrel (index 8)
return vin[8] == expected
except KeyError:
return False
@staticmethod
def get_factory_data(vin: str) -> dict:
"""Kinyeri az alapadatokat a VIN-ből (WMI, Évjárat, Gyártó ország)."""
# Ez a 'Mágikus Gomb' alapja
countries = {"1": "USA", "2": "Kanada", "J": "Japán", "W": "Németország", "S": "Anglia"}
return {
"country": countries.get(vin[0], "Ismeretlen"),
"year_code": vin[9], # Modellév kódja
"wmi": vin[0:3] # World Manufacturer Identifier
}
# ISO Checksum logika marad (az eredeti kódod ezen része jó volt)
return True
class IdentityNormalizer:
""" Az MDM stratégia alapja: tisztított adatok és hash generálás. """
@staticmethod
def normalize_text(text: str) -> str:
"""Tisztítja a szöveget: kisbetű, ékezetmentesítés, szóközök és jelek törlése."""
if not text:
return ""
# 1. Kisbetűre alakítás
if not text: return ""
text = text.lower().strip()
# 2. Ékezetek eltávolítása (Unicode normalizálás)
text = "".join(
c for c in unicodedata.normalize('NFD', text)
if unicodedata.category(c) != 'Mn'
)
# 3. Csak az angol ABC betűi és számok maradjanak
text = "".join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn')
return re.sub(r'[^a-z0-9]', '', text)
@classmethod
def generate_person_hash(cls, last_name: str, first_name: str, mothers_name: str, birth_date: str) -> str:
"""Létrehozza az egyedi SHA256 ujjlenyomatot a személyhez."""
raw_combined = (
cls.normalize_text(last_name) +
cls.normalize_text(first_name) +
cls.normalize_text(mothers_name) +
cls.normalize_text(birth_date)
)
return hashlib.sha256(raw_combined.encode()).hexdigest()
""" SHA256 ujjlenyomat a duplikációk elkerülésére. """
raw = cls.normalize_text(last_name) + cls.normalize_text(first_name) + \
cls.normalize_text(mothers_name) + cls.normalize_text(birth_date)
return hashlib.sha256(raw.encode()).hexdigest()