2026.03.30 front és garázs logika
This commit is contained in:
169
archive/2026-03-29/root/create_test_identity.py
Normal file
169
archive/2026-03-29/root/create_test_identity.py
Normal file
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Create a persistent test identity for integration testing.
|
||||
This script runs inside the sf_api container via docker compose exec.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, '/app/backend')
|
||||
|
||||
from app.db.session import AsyncSessionLocal
|
||||
from app.models.identity import User, Person, UserRole
|
||||
from app.core.security import get_password_hash
|
||||
from sqlalchemy import select
|
||||
from datetime import datetime
|
||||
|
||||
TEST_EMAIL = "integration_test_admin@servicefinder.local"
|
||||
TEST_PASSWORD = "TestPassword123!"
|
||||
TEST_FIRST_NAME = "Integration"
|
||||
TEST_LAST_NAME = "TestAdmin"
|
||||
|
||||
async def create_test_identity():
|
||||
async with AsyncSessionLocal() as db:
|
||||
# Check if user already exists
|
||||
result = await db.execute(
|
||||
select(User).where(User.email == TEST_EMAIL)
|
||||
)
|
||||
existing_user = result.scalar_one_or_none()
|
||||
|
||||
if existing_user:
|
||||
print(f"User {TEST_EMAIL} already exists with ID {existing_user.id}")
|
||||
# Update role to admin if not already
|
||||
if existing_user.role != UserRole.admin:
|
||||
existing_user.role = UserRole.admin
|
||||
await db.commit()
|
||||
print(f"Updated user role to {UserRole.admin}")
|
||||
user = existing_user
|
||||
else:
|
||||
# Create Person first
|
||||
person = Person(
|
||||
first_name=TEST_FIRST_NAME,
|
||||
last_name=TEST_LAST_NAME,
|
||||
email=TEST_EMAIL,
|
||||
is_active=True,
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
db.add(person)
|
||||
await db.flush() # Get person.id
|
||||
|
||||
# Create User with ADMIN role
|
||||
user = User(
|
||||
email=TEST_EMAIL,
|
||||
hashed_password=get_password_hash(TEST_PASSWORD),
|
||||
role=UserRole.admin,
|
||||
person_id=person.id,
|
||||
is_active=True,
|
||||
subscription_plan="PREMIUM",
|
||||
scope_level="individual",
|
||||
preferred_language="en",
|
||||
region_code="HU",
|
||||
ui_mode="personal"
|
||||
)
|
||||
db.add(user)
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
print(f"Created new user {TEST_EMAIL} with ID {user.id}, role {user.role}")
|
||||
|
||||
# Get organization ID if any (optional)
|
||||
from app.models.identity import OrganizationMember
|
||||
result = await db.execute(
|
||||
select(OrganizationMember.organization_id)
|
||||
.where(OrganizationMember.user_id == user.id)
|
||||
.limit(1)
|
||||
)
|
||||
org_member = result.scalar_one_or_none()
|
||||
org_id = org_member.organization_id if org_member else None
|
||||
|
||||
# Get a test vehicle ID if any (optional)
|
||||
from app.models.data import Vehicle
|
||||
result = await db.execute(
|
||||
select(Vehicle.id)
|
||||
.where(Vehicle.owner_user_id == user.id)
|
||||
.limit(1)
|
||||
)
|
||||
vehicle = result.scalar_one_or_none()
|
||||
vehicle_id = vehicle.id if vehicle else None
|
||||
|
||||
# If no vehicle, create a dummy one (optional)
|
||||
if not vehicle_id:
|
||||
# Check if there's a catalog entry
|
||||
from app.models.data import VehicleModelDefinition
|
||||
result = await db.execute(
|
||||
select(VehicleModelDefinition.id).limit(1)
|
||||
)
|
||||
catalog_id = result.scalar_one_or_none()
|
||||
if catalog_id:
|
||||
import uuid
|
||||
vehicle = Vehicle(
|
||||
catalog_id=catalog_id,
|
||||
license_plate=f"TEST-{uuid.uuid4().hex[:4]}".upper(),
|
||||
vin=f"VIN{uuid.uuid4().hex[:10]}".upper(),
|
||||
nickname="Integration Test Vehicle",
|
||||
owner_user_id=user.id,
|
||||
status="DRAFT", # Follow 2-step vehicle flow
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
db.add(vehicle)
|
||||
await db.commit()
|
||||
await db.refresh(vehicle)
|
||||
vehicle_id = vehicle.id
|
||||
print(f"Created test vehicle with ID {vehicle_id}")
|
||||
|
||||
# Generate a token for testing (we'll need to login properly)
|
||||
# For now, we'll just output credentials
|
||||
print("\n" + "="*60)
|
||||
print("TEST IDENTITY CREATED/VERIFIED")
|
||||
print("="*60)
|
||||
print(f"Email: {TEST_EMAIL}")
|
||||
print(f"Password: {TEST_PASSWORD}")
|
||||
print(f"User ID: {user.id}")
|
||||
print(f"Role: {user.role}")
|
||||
print(f"Organization ID: {org_id}")
|
||||
print(f"Test Vehicle ID: {vehicle_id}")
|
||||
print("="*60)
|
||||
|
||||
# Save to integration_session.json
|
||||
import json
|
||||
session_data = {
|
||||
"email": TEST_EMAIL,
|
||||
"password": TEST_PASSWORD,
|
||||
"user_id": user.id,
|
||||
"role": user.role.value,
|
||||
"organization_id": org_id,
|
||||
"test_vehicle_id": vehicle_id
|
||||
}
|
||||
|
||||
# We'll need to get a token by actually logging in
|
||||
# Let's call the auth service
|
||||
from app.services.auth_service import AuthService
|
||||
token_data = await AuthService.authenticate(db, TEST_EMAIL, TEST_PASSWORD)
|
||||
if token_data:
|
||||
# Actually we need to create tokens
|
||||
from app.core.security import create_tokens
|
||||
from app.core.config import settings
|
||||
ranks = await settings.get_db_setting(db, "rbac_rank_matrix", default={})
|
||||
role_key = user.role.value.upper()
|
||||
token_payload = {
|
||||
"sub": str(user.id),
|
||||
"role": user.role.value,
|
||||
"rank": ranks.get(role_key, 10),
|
||||
"scope_level": user.scope_level or "individual",
|
||||
"scope_id": str(user.scope_id) if user.scope_id else str(user.id)
|
||||
}
|
||||
access_token, refresh_token = create_tokens(data=token_payload)
|
||||
session_data["test_token"] = access_token
|
||||
print(f"Access Token: {access_token[:50]}...")
|
||||
|
||||
# Write to file
|
||||
output_path = "/opt/docker/dev/service_finder/tests/integration_session.json"
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, 'w') as f:
|
||||
json.dump(session_data, f, indent=2)
|
||||
print(f"\nSession data saved to {output_path}")
|
||||
|
||||
return session_data
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(create_test_identity())
|
||||
Reference in New Issue
Block a user