175 lines
6.4 KiB
Python
175 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple script to create a test user with ADMIN role.
|
|
Run with: docker compose exec sf_api python /app/backend/create_test_user_simple.py
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
# Add backend to path
|
|
sys.path.insert(0, '/app/backend')
|
|
|
|
async def main():
|
|
from app.db.session import AsyncSessionLocal
|
|
from app.models.identity import User, Person, UserRole
|
|
from app.core.security import get_password_hash
|
|
from sqlalchemy import select
|
|
from datetime import datetime
|
|
|
|
TEST_EMAIL = "integration_test_admin@servicefinder.local"
|
|
TEST_PASSWORD = "TestPassword123!"
|
|
TEST_FIRST_NAME = "Integration"
|
|
TEST_LAST_NAME = "TestAdmin"
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Check if user already exists
|
|
result = await db.execute(
|
|
select(User).where(User.email == TEST_EMAIL)
|
|
)
|
|
existing_user = result.scalar_one_or_none()
|
|
|
|
if existing_user:
|
|
print(f"User {TEST_EMAIL} already exists with ID {existing_user.id}")
|
|
# Update role to admin if not already
|
|
if existing_user.role != UserRole.admin:
|
|
existing_user.role = UserRole.admin
|
|
await db.commit()
|
|
print(f"Updated user role to {UserRole.admin}")
|
|
user = existing_user
|
|
else:
|
|
# Create Person first
|
|
person = Person(
|
|
first_name=TEST_FIRST_NAME,
|
|
last_name=TEST_LAST_NAME,
|
|
email=TEST_EMAIL,
|
|
is_active=True,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(person)
|
|
await db.flush() # Get person.id
|
|
|
|
# Create User with ADMIN role
|
|
user = User(
|
|
email=TEST_EMAIL,
|
|
hashed_password=get_password_hash(TEST_PASSWORD),
|
|
role=UserRole.admin,
|
|
person_id=person.id,
|
|
is_active=True,
|
|
subscription_plan="PREMIUM",
|
|
scope_level="individual",
|
|
preferred_language="en",
|
|
region_code="HU",
|
|
ui_mode="personal"
|
|
)
|
|
db.add(user)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
print(f"Created new user {TEST_EMAIL} with ID {user.id}, role {user.role}")
|
|
|
|
# Get organization ID if any
|
|
from app.models.identity import OrganizationMember
|
|
result = await db.execute(
|
|
select(OrganizationMember.organization_id)
|
|
.where(OrganizationMember.user_id == user.id)
|
|
.limit(1)
|
|
)
|
|
org_member = result.scalar_one_or_none()
|
|
org_id = org_member.organization_id if org_member else None
|
|
|
|
# Get or create a test vehicle
|
|
from app.models.data import Vehicle, VehicleModelDefinition
|
|
result = await db.execute(
|
|
select(Vehicle.id)
|
|
.where(Vehicle.owner_user_id == user.id)
|
|
.limit(1)
|
|
)
|
|
vehicle = result.scalar_one_or_none()
|
|
vehicle_id = vehicle.id if vehicle else None
|
|
|
|
if not vehicle_id:
|
|
# Try to find a catalog entry
|
|
result = await db.execute(
|
|
select(VehicleModelDefinition.id).limit(1)
|
|
)
|
|
catalog_id = result.scalar_one_or_none()
|
|
if catalog_id:
|
|
import uuid
|
|
vehicle = Vehicle(
|
|
catalog_id=catalog_id,
|
|
license_plate=f"TEST-{uuid.uuid4().hex[:4]}".upper(),
|
|
vin=f"VIN{uuid.uuid4().hex[:10]}".upper(),
|
|
nickname="Integration Test Vehicle",
|
|
owner_user_id=user.id,
|
|
status="DRAFT", # Follow 2-step vehicle flow
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(vehicle)
|
|
await db.commit()
|
|
await db.refresh(vehicle)
|
|
vehicle_id = vehicle.id
|
|
print(f"Created test vehicle with ID {vehicle_id}")
|
|
else:
|
|
print("No catalog entries found, skipping vehicle creation")
|
|
|
|
# Generate a token by simulating login
|
|
# We'll use the auth service to create proper tokens
|
|
from app.services.auth_service import AuthService
|
|
from app.core.security import create_tokens
|
|
from app.core.config import settings
|
|
|
|
# Authenticate to get user object
|
|
auth_user = await AuthService.authenticate(db, TEST_EMAIL, TEST_PASSWORD)
|
|
if auth_user:
|
|
ranks = await settings.get_db_setting(db, "rbac_rank_matrix", default={})
|
|
role_key = auth_user.role.value.upper()
|
|
token_payload = {
|
|
"sub": str(auth_user.id),
|
|
"role": auth_user.role.value,
|
|
"rank": ranks.get(role_key, 10),
|
|
"scope_level": auth_user.scope_level or "individual",
|
|
"scope_id": str(auth_user.scope_id) if auth_user.scope_id else str(auth_user.id)
|
|
}
|
|
access_token, refresh_token = create_tokens(data=token_payload)
|
|
test_token = access_token
|
|
print(f"Generated access token")
|
|
else:
|
|
test_token = None
|
|
print("Warning: Could not generate token")
|
|
|
|
# Prepare session data
|
|
session_data = {
|
|
"email": TEST_EMAIL,
|
|
"password": TEST_PASSWORD,
|
|
"test_token": test_token,
|
|
"user_id": user.id,
|
|
"role": user.role.value,
|
|
"organization_id": org_id,
|
|
"test_vehicle_id": vehicle_id
|
|
}
|
|
|
|
# Write to file
|
|
output_path = "/opt/docker/dev/service_finder/tests/integration_session.json"
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
with open(output_path, 'w') as f:
|
|
json.dump(session_data, f, indent=2)
|
|
|
|
print("\n" + "="*60)
|
|
print("TEST IDENTITY SETUP COMPLETE")
|
|
print("="*60)
|
|
print(f"Email: {TEST_EMAIL}")
|
|
print(f"Password: {TEST_PASSWORD}")
|
|
print(f"Token: {test_token[:50] if test_token else 'None'}...")
|
|
print(f"User ID: {user.id}")
|
|
print(f"Role: {user.role.value}")
|
|
print(f"Organization ID: {org_id}")
|
|
print(f"Test Vehicle ID: {vehicle_id}")
|
|
print(f"Session saved to: {output_path}")
|
|
print("="*60)
|
|
|
|
return session_data
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |