# /opt/docker/dev/service_finder/backend/app/api/v1/endpoints/gamification.py from fastapi import APIRouter, Depends, HTTPException, Body, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, desc, func, and_ from typing import List, Optional from datetime import datetime, timedelta from app.db.session import get_db from app.api.deps import get_current_user from app.models.identity import User from app.models import UserStats, PointsLedger, LevelConfig, UserContribution, Badge, UserBadge, Season from app.models.system import SystemParameter, ParameterScope from app.models.marketplace.service import ServiceStaging from app.schemas.gamification import SeasonResponse, UserStatResponse, LeaderboardEntry router = APIRouter() # -- SEGÉDFÜGGVÉNY A RENDSZERBEÁLLÍTÁSOKHOZ -- async def get_system_param(db: AsyncSession, key: str, default_value): stmt = select(SystemParameter).where(SystemParameter.key == key) res = (await db.execute(stmt)).scalar_one_or_none() return res.value if res else default_value @router.get("/my-stats") async def get_my_stats(db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user)): stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats = (await db.execute(stmt)).scalar_one_or_none() if not stats: return {"total_xp": 0, "current_level": 1, "penalty_points": 0, "services_submitted": 0} return stats @router.get("/leaderboard") async def get_leaderboard( limit: int = 10, season_id: Optional[int] = None, db: AsyncSession = Depends(get_db) ): """Vezetőlista - globális vagy szezonális""" if season_id: # Szezonális vezetőlista stmt = ( select( User.email, func.sum(UserContribution.points_awarded).label("total_points"), func.sum(UserContribution.xp_awarded).label("total_xp") ) .join(UserContribution, User.id == UserContribution.user_id) .where(UserContribution.season_id == season_id) .group_by(User.id) .order_by(desc("total_points")) .limit(limit) ) else: # Globális vezetőlista stmt = ( select(User.email, UserStats.total_xp, UserStats.current_level) .join(UserStats, User.id == UserStats.user_id) .order_by(desc(UserStats.total_xp)) .limit(limit) ) result = await db.execute(stmt) if season_id: return [ {"user": f"{r[0][:2]}***@{r[0].split('@')[1]}", "points": r[1], "xp": r[2]} for r in result.all() ] else: return [ {"user": f"{r[0][:2]}***@{r[0].split('@')[1]}", "xp": r[1], "level": r[2]} for r in result.all() ] @router.get("/seasons") async def get_seasons( active_only: bool = True, db: AsyncSession = Depends(get_db) ): """Szezonok listázása""" stmt = select(Season) if active_only: stmt = stmt.where(Season.is_active == True) result = await db.execute(stmt) seasons = result.scalars().all() return [ { "id": s.id, "name": s.name, "start_date": s.start_date, "end_date": s.end_date, "is_active": s.is_active } for s in seasons ] @router.get("/my-contributions") async def get_my_contributions( season_id: Optional[int] = None, limit: int = 50, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """Felhasználó hozzájárulásainak listázása""" stmt = select(UserContribution).where(UserContribution.user_id == current_user.id) if season_id: stmt = stmt.where(UserContribution.season_id == season_id) stmt = stmt.order_by(desc(UserContribution.created_at)).limit(limit) result = await db.execute(stmt) contributions = result.scalars().all() return [ { "id": c.id, "contribution_type": c.contribution_type, "entity_type": c.entity_type, "entity_id": c.entity_id, "points_awarded": c.points_awarded, "xp_awarded": c.xp_awarded, "status": c.status, "created_at": c.created_at } for c in contributions ] @router.get("/season-standings/{season_id}") async def get_season_standings( season_id: int, limit: int = 20, db: AsyncSession = Depends(get_db) ): """Szezon állása - top hozzájárulók""" # Aktuális szezon ellenőrzése season_stmt = select(Season).where(Season.id == season_id) season = (await db.execute(season_stmt)).scalar_one_or_none() if not season: raise HTTPException(status_code=404, detail="Season not found") # Top hozzájárulók lekérdezése stmt = ( select( User.email, func.sum(UserContribution.points_awarded).label("total_points"), func.sum(UserContribution.xp_awarded).label("total_xp"), func.count(UserContribution.id).label("contribution_count") ) .join(UserContribution, User.id == UserContribution.user_id) .where( and_( UserContribution.season_id == season_id, UserContribution.status == "approved" ) ) .group_by(User.id) .order_by(desc("total_points")) .limit(limit) ) result = await db.execute(stmt) standings = result.all() # Szezonális jutalmak konfigurációja season_config = await get_system_param( db, "seasonal_competition_config", { "top_contributors_count": 10, "rewards": { "first_place": {"credits": 1000, "badge": "season_champion"}, "second_place": {"credits": 500, "badge": "season_runner_up"}, "third_place": {"credits": 250, "badge": "season_bronze"}, "top_10": {"credits": 100, "badge": "season_elite"} } } ) return { "season": { "id": season.id, "name": season.name, "start_date": season.start_date, "end_date": season.end_date }, "standings": [ { "rank": idx + 1, "user": f"{r[0][:2]}***@{r[0].split('@')[1]}", "points": r[1], "xp": r[2], "contributions": r[3], "reward": get_season_reward(idx + 1, season_config) } for idx, r in enumerate(standings) ], "config": season_config } def get_season_reward(rank: int, config: dict) -> dict: """Szezonális jutalom meghatározása a rang alapján""" rewards = config.get("rewards", {}) if rank == 1: return rewards.get("first_place", {}) elif rank == 2: return rewards.get("second_place", {}) elif rank == 3: return rewards.get("third_place", {}) elif rank <= config.get("top_contributors_count", 10): return rewards.get("top_10", {}) else: return {} @router.get("/self-defense-status") async def get_self_defense_status( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """Önvédelmi rendszer státusz lekérdezése""" stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats = (await db.execute(stmt)).scalar_one_or_none() if not stats: return { "penalty_level": 0, "restrictions": [], "recovery_progress": 0, "can_submit_services": True } # Önvédelmi büntetések konfigurációja penalty_config = await get_system_param( db, "self_defense_penalties", { "level_minus_1": {"restrictions": ["no_service_submissions"], "duration_days": 7}, "level_minus_2": {"restrictions": ["no_service_submissions", "no_reviews"], "duration_days": 30}, "level_minus_3": {"restrictions": ["no_service_submissions", "no_reviews", "no_messaging"], "duration_days": 365} } ) # Büntetési szint meghatározása (egyszerűsített logika) penalty_level = 0 if stats.penalty_points >= 1000: penalty_level = -3 elif stats.penalty_points >= 500: penalty_level = -2 elif stats.penalty_points >= 100: penalty_level = -1 restrictions = [] if penalty_level < 0: level_key = f"level_minus_{abs(penalty_level)}" restrictions = penalty_config.get(level_key, {}).get("restrictions", []) return { "penalty_level": penalty_level, "penalty_points": stats.penalty_points, "restrictions": restrictions, "recovery_progress": min(stats.total_xp / 10000 * 100, 100) if penalty_level < 0 else 100, "can_submit_services": "no_service_submissions" not in restrictions } # --- AZ ÚJ, DINAMIKUS BEKÜLDŐ VÉGPONT (Gamification 2.0 kompatibilis) --- @router.post("/submit-service") async def submit_new_service( name: str = Body(...), city: str = Body(...), address: str = Body(...), contact_phone: Optional[str] = Body(None), website: Optional[str] = Body(None), description: Optional[str] = Body(None), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): # 1. Önvédelmi státusz ellenőrzése defense_status = await get_self_defense_status(db, current_user) if not defense_status["can_submit_services"]: raise HTTPException( status_code=403, detail="Önvédelmi korlátozás miatt nem küldhetsz be új szerviz adatokat." ) # 2. Beállítások lekérése az Admin által vezérelt táblából submission_rewards = await get_system_param( db, "service_submission_rewards", {"points": 50, "xp": 100, "social_credits": 10} ) contribution_config = await get_system_param( db, "contribution_types_config", { "service_submission": {"points": 50, "xp": 100, "weight": 1.0} } ) # 3. Aktuális szezon lekérdezése season_stmt = select(Season).where( and_( Season.is_active == True, Season.start_date <= datetime.utcnow().date(), Season.end_date >= datetime.utcnow().date() ) ).limit(1) season_result = await db.execute(season_stmt) current_season = season_result.scalar_one_or_none() # 4. Felhasználó statisztikák stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats = (await db.execute(stmt)).scalar_one_or_none() user_lvl = stats.current_level if stats else 1 # 5. Trust score számítás a szint alapján trust_weight = min(20 + (user_lvl * 6), 90) # 6. Nyers adat beküldése a Robotoknak (Staging) import hashlib f_print = hashlib.md5(f"{name.lower()}{city.lower()}{address.lower()}".encode()).hexdigest() new_staging = ServiceStaging( name=name, city=city, address_line1=address, contact_phone=contact_phone, website=website, description=description, fingerprint=f_print, status="pending", trust_score=trust_weight, submitted_by=current_user.id, raw_data={ "submitted_by_user": current_user.id, "user_level": user_lvl, "submitted_at": datetime.utcnow().isoformat() } ) db.add(new_staging) await db.flush() # Get the ID # 7. UserContribution létrehozása contribution = UserContribution( user_id=current_user.id, season_id=current_season.id if current_season else None, contribution_type="service_submission", entity_type="service_staging", entity_id=new_staging.id, points_awarded=submission_rewards.get("points", 50), xp_awarded=submission_rewards.get("xp", 100), status="pending", # Robot 5 jóváhagyására vár metadata={ "service_name": name, "city": city, "staging_id": new_staging.id }, created_at=datetime.utcnow() ) db.add(contribution) # 8. PointsLedger bejegyzés ledger = PointsLedger( user_id=current_user.id, points=submission_rewards.get("points", 50), xp=submission_rewards.get("xp", 100), source_type="service_submission", source_id=new_staging.id, description=f"Szerviz beküldés: {name}", created_at=datetime.utcnow() ) db.add(ledger) # 9. UserStats frissítése if stats: stats.total_points += submission_rewards.get("points", 50) stats.total_xp += submission_rewards.get("xp", 100) stats.services_submitted += 1 stats.updated_at = datetime.utcnow() else: # Ha nincs még UserStats, létrehozzuk stats = UserStats( user_id=current_user.id, total_points=submission_rewards.get("points", 50), total_xp=submission_rewards.get("xp", 100), services_submitted=1, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) db.add(stats) try: await db.commit() return { "status": "success", "message": "Szerviz beküldve a rendszerbe elemzésre!", "xp_earned": submission_rewards.get("xp", 100), "points_earned": submission_rewards.get("points", 50), "staging_id": new_staging.id, "season_id": current_season.id if current_season else None } except Exception as e: await db.rollback() raise HTTPException(status_code=400, detail=f"Hiba a beküldés során: {str(e)}") # --- Gamification 2.0 API végpontok (Frontend/Mobil) --- @router.get("/me", response_model=UserStatResponse) async def get_my_gamification_stats( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Visszaadja a bejelentkezett felhasználó aktuális statisztikáit. Ha nincs rekord, alapértelmezett értékekkel tér vissza. """ stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats = (await db.execute(stmt)).scalar_one_or_none() if not stats: # Alapértelmezett statisztika return UserStatResponse( user_id=current_user.id, total_xp=0, current_level=1, restriction_level=0, penalty_quota_remaining=0, banned_until=None ) return UserStatResponse.from_orm(stats) @router.get("/seasons/active", response_model=SeasonResponse) async def get_active_season( db: AsyncSession = Depends(get_db) ): """ Visszaadja az éppen aktív szezont. """ stmt = select(Season).where(Season.is_active == True) season = (await db.execute(stmt)).scalar_one_or_none() if not season: raise HTTPException(status_code=404, detail="No active season found") return SeasonResponse.from_orm(season) @router.get("/leaderboard", response_model=List[LeaderboardEntry]) async def get_leaderboard_top10( limit: int = Query(10, ge=1, le=100), db: AsyncSession = Depends(get_db) ): """ Visszaadja a top felhasználókat total_xp alapján csökkenő sorrendben. """ stmt = ( select(UserStats, User.email) .join(User, UserStats.user_id == User.id) .order_by(desc(UserStats.total_xp)) .limit(limit) ) result = await db.execute(stmt) rows = result.all() leaderboard = [] for stats, email in rows: leaderboard.append( LeaderboardEntry( user_id=stats.user_id, username=email, # email használata username helyett total_xp=stats.total_xp, current_level=stats.current_level ) ) return leaderboard # --- QUIZ ENDPOINTS FOR DAILY QUIZ GAMIFICATION --- @router.get("/quiz/daily") async def get_daily_quiz( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Returns daily quiz questions for the user. Checks if user has already played today. """ # Check if user has already played today today = datetime.now().date() stmt = select(PointsLedger).where( PointsLedger.user_id == current_user.id, func.date(PointsLedger.created_at) == today, PointsLedger.reason.ilike("%quiz%") ) result = await db.execute(stmt) already_played = result.scalar_one_or_none() if already_played: raise HTTPException( status_code=400, detail="You have already played the daily quiz today. Try again tomorrow." ) # Return quiz questions (for now, using mock questions - in production these would come from a database) quiz_questions = [ { "id": 1, "question": "Melyik alkatrész felelős a motor levegő‑üzemanyag keverékének szabályozásáért?", "options": ["Generátor", "Lambda‑szonda", "Féktárcsa", "Olajszűrő"], "correctAnswer": 1, "explanation": "A lambda‑szonda méri a kipufogógáz oxigéntartalmát, és ezen alapul a befecskendezés." }, { "id": 2, "question": "Mennyi ideig érvényes egy gépjármű műszaki vizsgája Magyarországon?", "options": ["1 év", "2 év", "4 év", "6 év"], "correctAnswer": 1, "explanation": "A személygépkocsik műszaki vizsgája 2 évre érvényes, kivéve az újonnan forgalomba helyezett autókat." }, { "id": 3, "question": "Melyik anyag NEM része a hibrid autók akkumulátorának?", "options": ["Lítium", "Nikkel", "Ólom", "Kobalt"], "correctAnswer": 2, "explanation": "A hibrid és elektromos autók akkumulátoraiban általában lítium, nikkel és kobalt található, ólom az ólom‑savas akkukban van." } ] return { "questions": quiz_questions, "total_questions": len(quiz_questions), "date": today.isoformat() } @router.post("/quiz/answer") async def submit_quiz_answer( question_id: int = Body(...), selected_option: int = Body(...), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Submit answer to a quiz question and award points if correct. """ # Check if user has already played today today = datetime.now().date() stmt = select(PointsLedger).where( PointsLedger.user_id == current_user.id, func.date(PointsLedger.created_at) == today, PointsLedger.reason.ilike("%quiz%") ) result = await db.execute(stmt) already_played = result.scalar_one_or_none() if already_played: raise HTTPException( status_code=400, detail="You have already played the daily quiz today. Try again tomorrow." ) # Mock quiz data - in production this would come from a database quiz_data = { 1: {"correct_answer": 1, "points": 10, "explanation": "A lambda‑szonda méri a kipufogógáz oxigéntartalmát, és ezen alapul a befecskendezés."}, 2: {"correct_answer": 1, "points": 10, "explanation": "A személygépkocsik műszaki vizsgája 2 évre érvényes, kivéve az újonnan forgalomba helyezett autókat."}, 3: {"correct_answer": 2, "points": 10, "explanation": "A hibrid és elektromos autók akkumulátoraiban általában lítium, nikkel és kobalt található, ólom az ólom‑savas akkukban van."} } if question_id not in quiz_data: raise HTTPException(status_code=404, detail="Question not found") question_info = quiz_data[question_id] is_correct = selected_option == question_info["correct_answer"] # Award points if correct if is_correct: # Update user stats stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats_result = await db.execute(stats_stmt) user_stats = stats_result.scalar_one_or_none() if not user_stats: # Create user stats if they don't exist user_stats = UserStats( user_id=current_user.id, total_xp=question_info["points"], current_level=1 ) db.add(user_stats) else: user_stats.total_xp += question_info["points"] # Add points ledger entry points_ledger = PointsLedger( user_id=current_user.id, points=question_info["points"], reason=f"Daily quiz correct answer - Question {question_id}", created_at=datetime.now() ) db.add(points_ledger) await db.commit() return { "is_correct": is_correct, "correct_answer": question_info["correct_answer"], "points_awarded": question_info["points"] if is_correct else 0, "explanation": question_info["explanation"] } @router.post("/quiz/complete") async def complete_daily_quiz( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Mark daily quiz as completed for today. This prevents the user from playing again today. """ today = datetime.now().date() # Check if already completed today stmt = select(PointsLedger).where( PointsLedger.user_id == current_user.id, func.date(PointsLedger.created_at) == today, PointsLedger.reason == "Daily quiz completed" ) result = await db.execute(stmt) already_completed = result.scalar_one_or_none() if already_completed: raise HTTPException( status_code=400, detail="Daily quiz already marked as completed today." ) # Add completion entry completion_ledger = PointsLedger( user_id=current_user.id, points=0, reason="Daily quiz completed", created_at=datetime.now() ) db.add(completion_ledger) await db.commit() return {"message": "Daily quiz marked as completed for today."} @router.get("/quiz/stats") async def get_quiz_stats( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get user's quiz statistics including points, streak, and last played date. """ # Get user stats stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats_result = await db.execute(stats_stmt) user_stats = stats_result.scalar_one_or_none() # Get quiz points from ledger points_stmt = select(func.sum(PointsLedger.points)).where( PointsLedger.user_id == current_user.id, PointsLedger.reason.ilike("%quiz%") ) points_result = await db.execute(points_stmt) quiz_points = points_result.scalar() or 0 # Get last played date last_played_stmt = select(PointsLedger.created_at).where( PointsLedger.user_id == current_user.id, PointsLedger.reason.ilike("%quiz%") ).order_by(desc(PointsLedger.created_at)).limit(1) last_played_result = await db.execute(last_played_stmt) last_played = last_played_result.scalar() # Calculate streak (simplified - in production would be more sophisticated) streak = 0 if last_played: # Simple streak calculation - check last 7 days streak = 1 # Placeholder return { "total_quiz_points": quiz_points, "total_xp": user_stats.total_xp if user_stats else 0, "current_level": user_stats.current_level if user_stats else 1, "last_played": last_played.isoformat() if last_played else None, "current_streak": streak, "can_play_today": not await has_played_today(db, current_user.id) } async def has_played_today(db: AsyncSession, user_id: int) -> bool: """Check if user has already played quiz today.""" today = datetime.now().date() stmt = select(PointsLedger).where( PointsLedger.user_id == user_id, func.date(PointsLedger.created_at) == today, PointsLedger.reason.ilike("%quiz%") ) result = await db.execute(stmt) return result.scalar_one_or_none() is not None # --- BADGE/TROPHY ENDPOINTS --- @router.get("/badges") async def get_all_badges( db: AsyncSession = Depends(get_db) ): """ Get all available badges in the system. """ stmt = select(Badge) result = await db.execute(stmt) badges = result.scalars().all() return [ { "id": badge.id, "name": badge.name, "description": badge.description, "icon_url": badge.icon_url } for badge in badges ] @router.get("/my-badges") async def get_my_badges( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get badges earned by the current user. """ stmt = ( select(UserBadge, Badge) .join(Badge, UserBadge.badge_id == Badge.id) .where(UserBadge.user_id == current_user.id) .order_by(desc(UserBadge.earned_at)) ) result = await db.execute(stmt) user_badges = result.all() return [ { "badge_id": badge.id, "badge_name": badge.name, "badge_description": badge.description, "badge_icon_url": badge.icon_url, "earned_at": user_badge.earned_at.isoformat() if user_badge.earned_at else None } for user_badge, badge in user_badges ] @router.post("/badges/award/{badge_id}") async def award_badge_to_user( badge_id: int, user_id: int = Body(None), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Award a badge to a user (admin only or automated system). """ # Check if badge exists badge_stmt = select(Badge).where(Badge.id == badge_id) badge_result = await db.execute(badge_stmt) badge = badge_result.scalar_one_or_none() if not badge: raise HTTPException(status_code=404, detail="Badge not found") # Determine target user (default to current user if not specified) target_user_id = user_id if user_id else current_user.id # Check if user already has this badge existing_stmt = select(UserBadge).where( UserBadge.user_id == target_user_id, UserBadge.badge_id == badge_id ) existing_result = await db.execute(existing_stmt) existing = existing_result.scalar_one_or_none() if existing: raise HTTPException(status_code=400, detail="User already has this badge") # Award the badge user_badge = UserBadge( user_id=target_user_id, badge_id=badge_id, earned_at=datetime.now() ) db.add(user_badge) # Also add points for earning a badge points_ledger = PointsLedger( user_id=target_user_id, points=50, # Points for earning a badge reason=f"Badge earned: {badge.name}", created_at=datetime.now() ) db.add(points_ledger) # Update user stats stats_stmt = select(UserStats).where(UserStats.user_id == target_user_id) stats_result = await db.execute(stats_stmt) user_stats = stats_result.scalar_one_or_none() if user_stats: user_stats.total_xp += 50 else: user_stats = UserStats( user_id=target_user_id, total_xp=50, current_level=1 ) db.add(user_stats) await db.commit() return { "message": f"Badge '{badge.name}' awarded to user", "badge_id": badge.id, "badge_name": badge.name, "points_awarded": 50 } @router.get("/achievements") async def get_achievements_progress( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get user's progress on various achievements (combines badges and other metrics). """ # Get user badges badges_stmt = select(UserBadge.badge_id).where(UserBadge.user_id == current_user.id) badges_result = await db.execute(badges_stmt) user_badge_ids = [row[0] for row in badges_result.all()] # Get all badges all_badges_stmt = select(Badge) all_badges_result = await db.execute(all_badges_stmt) all_badges = all_badges_result.scalars().all() # Get user stats stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id) stats_result = await db.execute(stats_stmt) user_stats = stats_result.scalar_one_or_none() # Define achievement categories achievements = [] # Badge-based achievements for badge in all_badges: achievements.append({ "id": f"badge_{badge.id}", "title": badge.name, "description": badge.description, "icon_url": badge.icon_url, "is_earned": badge.id in user_badge_ids, "category": "badge", "progress": 100 if badge.id in user_badge_ids else 0 }) # XP-based achievements xp_levels = [ {"title": "Novice", "xp_required": 100, "description": "Earn 100 XP"}, {"title": "Apprentice", "xp_required": 500, "description": "Earn 500 XP"}, {"title": "Expert", "xp_required": 2000, "description": "Earn 2000 XP"}, {"title": "Master", "xp_required": 5000, "description": "Earn 5000 XP"}, ] current_xp = user_stats.total_xp if user_stats else 0 for level in xp_levels: progress = min((current_xp / level["xp_required"]) * 100, 100) achievements.append({ "id": f"xp_{level['xp_required']}", "title": level["title"], "description": level["description"], "icon_url": None, "is_earned": current_xp >= level["xp_required"], "category": "xp", "progress": progress }) # Quiz-based achievements quiz_points_stmt = select(func.sum(PointsLedger.points)).where( PointsLedger.user_id == current_user.id, PointsLedger.reason.ilike("%quiz%") ) quiz_points_result = await db.execute(quiz_points_stmt) quiz_points = quiz_points_result.scalar() or 0 quiz_achievements = [ {"title": "Quiz Beginner", "points_required": 50, "description": "Earn 50 quiz points"}, {"title": "Quiz Enthusiast", "points_required": 200, "description": "Earn 200 quiz points"}, {"title": "Quiz Master", "points_required": 500, "description": "Earn 500 quiz points"}, ] for achievement in quiz_achievements: progress = min((quiz_points / achievement["points_required"]) * 100, 100) achievements.append({ "id": f"quiz_{achievement['points_required']}", "title": achievement["title"], "description": achievement["description"], "icon_url": None, "is_earned": quiz_points >= achievement["points_required"], "category": "quiz", "progress": progress }) return { "achievements": achievements, "total_achievements": len(achievements), "earned_count": sum(1 for a in achievements if a["is_earned"]), "progress_percentage": round((sum(1 for a in achievements if a["is_earned"]) / len(achievements)) * 100, 1) if achievements else 0 }