318 lines
14 KiB
Python
318 lines
14 KiB
Python
# /opt/docker/dev/service_finder/backend/app/tests/test_admin_audit_gitea_final.py
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Enterprise Admin & Gamification Audit - Kombinált Verzió
|
|
|
|
Ez a szkript a két legjobb megközelítést egyesíti:
|
|
1. Részletes hardcode szkennelés általános és specifikus mintákkal
|
|
2. Gitea integráció importálással (fallback subprocess-szel)
|
|
3. Teljes projekt terv létrehozása a Gitea-ban a szigorú sablonnal
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
# ==================== KONFIGURÁCIÓ ====================
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent # /opt/docker/dev/service_finder
|
|
GITEA_SCRIPT = PROJECT_ROOT / ".roo" / "scripts" / "gitea_manager.py"
|
|
|
|
SCAN_DIRS = [
|
|
PROJECT_ROOT / "backend" / "app" / "services",
|
|
PROJECT_ROOT / "backend" / "app" / "api",
|
|
]
|
|
|
|
# Hardcode minta regexek - kombinált lista
|
|
HARDCODE_PATTERNS = [
|
|
# Általános minták (1. kódból)
|
|
(r'\b\d{1,3}\b', "Mágikus szám (1-3 jegyű)"),
|
|
(r'\b(50|10|100|1000|5000|10000)\b', "Gyakori mágikus szám (pl. 50, 10)"),
|
|
(r'"(active|inactive|pending|approved|rejected|blocked)"', "Fix státusz string"),
|
|
(r"'active'|'inactive'|'pending'|'approved'|'rejected'|'blocked'", "Fix státusz string (aposztróf)"),
|
|
(r'\b(True|False)\b', "Hardcode boolean"),
|
|
(r'\b(max|min|limit|threshold|default)\s*=\s*\d+', "Limit/Threshold érték"),
|
|
|
|
# Specifikus gamification minták (2. kódból)
|
|
(r'award_points\([^)]*,\s*(\d+)\s*,', "Fix pontszám osztás (Gamification)"),
|
|
(r'validation_level\s*[+=]\s*(\d+)', "Fix validációs szint növelés/csökkentés"),
|
|
(r'validation_level\s*[<>]=?\s*(\d+)', "Fix validációs küszöb ellenőrzés"),
|
|
(r'trust_score\s*[+=]\s*(\d+)', "Fix trust score módosítás"),
|
|
(r'level_threshold\s*=\s*(\d+)', "Fix szint küszöbérték"),
|
|
(r'bonus_multiplier\s*=\s*(\d+(?:\.\d+)?)', "Fix bónusz szorzó"),
|
|
]
|
|
|
|
# ==================== GITEA INTEGRÁCIÓ ====================
|
|
|
|
class GiteaManager:
|
|
"""Gitea integráció kezelése - importálás vagy subprocess fallback"""
|
|
|
|
def __init__(self):
|
|
self.use_import = False
|
|
self.gitea_module = None
|
|
|
|
# Próbáljuk meg importálni a gitea_manager-t
|
|
try:
|
|
gitea_manager_path = PROJECT_ROOT / ".roo" / "scripts"
|
|
sys.path.append(str(gitea_manager_path))
|
|
import gitea_manager as gm
|
|
self.gitea_module = gm
|
|
self.use_import = True
|
|
print("✅ Gitea manager importálva")
|
|
except ImportError as e:
|
|
print(f"⚠️ Gitea manager importálási hiba: {e}")
|
|
print(" Subprocess fallback használata")
|
|
|
|
def run_command(self, args: List[str]) -> Tuple[bool, str]:
|
|
"""Futtat egy Gitea parancsot importálással vagy subprocess-szel"""
|
|
if self.use_import and self.gitea_module:
|
|
return self._run_via_import(args)
|
|
else:
|
|
return self._run_via_subprocess(args)
|
|
|
|
def _run_via_import(self, args: List[str]) -> Tuple[bool, str]:
|
|
"""Importált modul használata"""
|
|
try:
|
|
action = args[0].lower() if args else ""
|
|
|
|
if action == "ms" and len(args) > 1 and args[1].lower() == "create":
|
|
# Mérföldkő létrehozása
|
|
title = args[2]
|
|
description = args[3] if len(args) > 3 else ""
|
|
ms_id = self.gitea_module.create_milestone(title, description)
|
|
return True, f"Mérföldkő létrehozva: {ms_id}"
|
|
|
|
elif action == "create" and len(args) > 2:
|
|
# Issue létrehozása
|
|
title = args[1].strip('"')
|
|
body = args[2].strip('"')
|
|
milestone_ref = "v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig"
|
|
categories = []
|
|
|
|
if len(args) > 3:
|
|
# Címkék kinyerése
|
|
for arg in args[3:]:
|
|
if any(arg.startswith(prefix) for prefix in ["Status:", "Scope:", "Type:", "Role:"]):
|
|
categories.append(arg)
|
|
|
|
success = self.gitea_module.create_issue(
|
|
title=title,
|
|
body=body,
|
|
categories=categories,
|
|
milestone_ref=milestone_ref
|
|
)
|
|
return success, "Issue létrehozva" if success else "Hiba az issue létrehozásakor"
|
|
|
|
else:
|
|
return False, f"Ismeretlen parancs: {action}"
|
|
|
|
except Exception as e:
|
|
return False, f"Import hiba: {e}"
|
|
|
|
def _run_via_subprocess(self, args: List[str]) -> Tuple[bool, str]:
|
|
"""Subprocess használata"""
|
|
cmd = ["docker", "exec", "roo-helper", "python3", "/scripts/gitea_manager.py"] + args
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
return result.returncode == 0, result.stdout + "\n" + result.stderr
|
|
except subprocess.TimeoutExpired:
|
|
return False, "Időtúllépés a parancs futtatásakor"
|
|
except Exception as e:
|
|
return False, f"Hiba: {e}"
|
|
|
|
# ==================== SEGÉDFÜGGVÉNYEK ====================
|
|
|
|
def find_python_files(directory: Path) -> List[Path]:
|
|
"""Rekurzívan gyűjti össze az összes .py fájlt a megadott könyvtárban."""
|
|
python_files = []
|
|
for root, dirs, files in os.walk(directory):
|
|
for file in files:
|
|
if file.endswith('.py'):
|
|
python_files.append(Path(root) / file)
|
|
return python_files
|
|
|
|
def scan_file(file_path: Path) -> List[Dict]:
|
|
"""Egy fájlban keres hardcode értékeket a regex minták alapján."""
|
|
findings = []
|
|
try:
|
|
content = file_path.read_text(encoding='utf-8')
|
|
lines = content.splitlines()
|
|
|
|
for line_num, line in enumerate(lines, 1):
|
|
for pattern, description in HARDCODE_PATTERNS:
|
|
matches = re.finditer(pattern, line)
|
|
for match in matches:
|
|
# Csak számértékeket gyűjtsünk a specifikus mintáknál
|
|
if 'gamification' in description.lower() or 'trust' in description.lower():
|
|
value = match.group(1) if match.groups() else match.group()
|
|
else:
|
|
value = match.group()
|
|
|
|
findings.append({
|
|
'file': str(file_path.relative_to(PROJECT_ROOT)),
|
|
'line': line_num,
|
|
'column': match.start() + 1,
|
|
'match': value,
|
|
'description': description,
|
|
'context': line.strip()[:100]
|
|
})
|
|
except Exception as e:
|
|
print(f"⚠️ Hiba a fájl olvasásakor {file_path}: {e}")
|
|
|
|
return findings
|
|
|
|
def generate_markdown_report(findings: List[Dict]) -> str:
|
|
"""Generál egy Markdown formátumú riportot a találatokról."""
|
|
if not findings:
|
|
return "## ✅ Nincs hardcode találat\n\nA szkennelés nem talált gyanús hardcode értékeket."
|
|
|
|
# Csoportosítás fájl szerint
|
|
by_file = {}
|
|
for finding in findings:
|
|
file = finding['file']
|
|
if file not in by_file:
|
|
by_file[file] = []
|
|
by_file[file].append(finding)
|
|
|
|
report_lines = [
|
|
"# 🔍 Hardcode Audit Részletes Részletek",
|
|
"",
|
|
f"**Összes találat:** {len(findings)}",
|
|
"",
|
|
"---",
|
|
]
|
|
|
|
for file, file_findings in sorted(by_file.items()):
|
|
report_lines.append(f"## 📄 {file}")
|
|
report_lines.append("")
|
|
|
|
for finding in file_findings:
|
|
report_lines.append(f"### L{ finding['line'] }: `{ finding['match'] }`")
|
|
report_lines.append(f"- **Leírás:** {finding['description']}")
|
|
report_lines.append(f"- **Kontextus:** `{finding['context']}`")
|
|
report_lines.append(f"- **Hely:** {finding['file']}:{finding['line']}:{finding['column']}")
|
|
report_lines.append("")
|
|
|
|
return "\n".join(report_lines)
|
|
|
|
# ==================== GITEA PROJEKT LÉTREHOZÁS ====================
|
|
|
|
def create_gitea_project(gitea: GiteaManager, markdown_report: str):
|
|
"""Létrehozza a teljes projekt tervet a Gitea-ban."""
|
|
|
|
print("\n🚀 Gitea Projekt Terv Létrehozása...")
|
|
|
|
# 1. Mérföldkő létrehozása
|
|
print("📌 Mérföldkő létrehozása...")
|
|
success, msg = gitea.run_command([
|
|
"ms", "create", "v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig",
|
|
"Admin rendszer fejlesztése: RBAC, dinamikus konfiguráció, anomália detektálás"
|
|
])
|
|
if success:
|
|
print("✅ Mérföldkő sikeresen létrehozva")
|
|
else:
|
|
print(f"⚠️ Figyelmeztetés: {msg}")
|
|
|
|
# 2. Issue 1: Hardcode Értékek Dinamikussá Tétele
|
|
issue1_body = f"""**Mérföldkő:** v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig
|
|
**Cél:** Hardcode értékek kiszervezése SystemParameter táblába és ConfigService-be
|
|
|
|
### 🔗 Függőségek (Dependencies)
|
|
- **Bemenet (Mikre támaszkodik):** Database (system.parameters tábla), ConfigService
|
|
- **Kimenet (Mik támaszkodnak rá):** GamificationService, NotificationService, SecurityService
|
|
|
|
### 📝 Elemzés
|
|
A hardcode audit {len(markdown_report.splitlines())} sor találatot jelentett. Ezeket az értékeket át kell helyezni a dinamikus konfigurációs rendszerbe.
|
|
|
|
### ✅ To-Do Lista
|
|
- [ ] `SystemParameter` vagy `AdminConfig` adatbázis tábla létrehozása (Key-Value alapú)
|
|
- [ ] `ConfigService` megírása (Redis/Memória gyorsítótárral)
|
|
- [ ] A kód átírása, hogy az értékeket a DB-ből olvassa
|
|
|
|
### 🔍 Hardcode Találatok (Összefoglaló)
|
|
{markdown_report[:1500]}...
|
|
"""
|
|
|
|
print("📝 Issue 1 létrehozása...")
|
|
gitea.run_command([
|
|
"create", "Phase 1: Hardcode Értékek Dinamikussá Tétele",
|
|
issue1_body,
|
|
"v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig",
|
|
"Scope: Backend", "Type: Refactor", "Status: To Do"
|
|
])
|
|
|
|
# 3. Issue 2: RBAC és Admin API Router
|
|
issue2_body = """**Mérföldkő:** v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig
|
|
**Cél:** Superadmin, Moderator szerepkörök és `/api/v1/admin` végpontok implementálása
|
|
|
|
### 🔗 Függőségek (Dependencies)
|
|
- **Bemenet (Mikre támaszkodik):** Identity modell (User, Role), Permission tábla
|
|
- **Kimenet (Mik támaszkodnak rá):** Admin UI, Moderátori felület
|
|
|
|
### 📝 Elemzés
|
|
Létre kell hozni a Role-Based Access Control (RBAC) rendszert, amely támogatja a Superadmin, Moderator, és Auditor szerepköröket.
|
|
|
|
### ✅ To-Do Lista
|
|
- [ ] `User` modell bővítése: `role` oszlop bevezetése (user, moderator, superadmin)
|
|
- [ ] FastAPI Dependency (`get_current_admin`) megírása, ami blokkolja a normál usereket
|
|
- [ ] Az `/api/v1/admin` router regisztrálása a main.py-ban
|
|
"""
|
|
|
|
print("📝 Issue 2 létrehozása...")
|
|
gitea.run_command([
|
|
"create", "Phase 2: RBAC és Admin API Router",
|
|
issue2_body,
|
|
"v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig",
|
|
"Scope: Backend", "Type: Feature", "Role: Admin", "Status: To Do"
|
|
])
|
|
|
|
# 4. Issue 3: Core Felügyeleti Végpontok
|
|
issue3_body = """**Mérföldkő:** v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig
|
|
**Cél:** User (KYC), Jármű, Szerviz felügyelet (Tiltás/Jóváhagyás) végpontok
|
|
|
|
### 🔗 Függőségek (Dependencies)
|
|
- **Bemenet (Mikre támaszkodik):** UserService, VehicleService, ServiceRegistry
|
|
- **Kimenet (Mik támaszkodnak rá):** Admin dashboard, Moderátori munkafolyamatok
|
|
|
|
### 📝 Elemzés
|
|
Külön végpontok kellenek a felhasználók KYC (Know Your Customer) jóváhagyásához, járművek tiltásához/engedélyezéséhez, és szervizek moderálásához.
|
|
|
|
### ✅ To-Do Lista
|
|
- [ ] `GET /admin/users` - Felhasználók listázása (szűréssel, pl. pending KYC)
|
|
- [ ] `POST /admin/users/{id}/ban` - Fiók tiltása/felfüggesztése
|
|
- [ ] `POST /admin/marketplace/services/{id}/approve` - Szerviz manuális 100%-ra validálása (Kék pipa)
|
|
- [ ] `GET /admin/assets/flagged` - Gyanús járművek listája
|
|
"""
|
|
|
|
print("📝 Issue 3 létrehozása...")
|
|
gitea.run_command([
|
|
"create", "Phase 3: Core Felügyeleti Végpontok",
|
|
issue3_body,
|
|
"v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig",
|
|
"Scope: API", "Type: Feature", "Role: Admin", "Status: To Do"
|
|
])
|
|
|
|
# 5. Issue 4: Anomália Detektálás (Anti-Cheat)
|
|
issue4_body = """**Mérföldkő:** v2.0 - Enterprise Admin Rendszer & Dinamikus Konfig
|
|
**Cél:** Robot felügyelő a gyanús aktivitásokhoz (pl. túl gyors pontgyűjtés, sok sikertelen bejelentkezés)
|
|
|
|
### 🔗 Függőségek (Dependencies)
|
|
- **Bemenet (Mikre támaszkodik):** Audit log, Gamification events, Security events
|
|
- **Kimenet (Mik támaszkodnak rá):** Admin értesítések, Automatikus tiltások
|
|
|
|
### 📝 Elemzés
|
|
Anomália detektáló algoritmus készítése, amely gyanús mintákat keres a felhasználói aktivitásban.
|
|
|
|
### ✅ To-Do Lista
|
|
- [ ] Automatikus detektálás gyanúsan sok validációra rövid időn belül
|
|
- [ ] Sebesség/Távolság ellenőrzés (Nem lehet 1 perc alatt 50km-re lévő szervizeket rögzíteni)
|
|
- [ ] Riasztások küldése a `/admin/alerts` végpontra a Moderátoroknak
|
|
"""
|
|
|
|
print("📝 Issue 4 létrehozása...")
|
|
gitea.run_command([
|
|
"create", "Phase 4: Anomália Detektálás (Anti-Cheat)",
|
|
issue4_body,
|
|
"v2.0 - Enterprise Admin Rendszer |