import asyncio import httpx import logging import os import datetime from sqlalchemy import select, and_ from sqlalchemy.exc import IntegrityError from app.db.session import SessionLocal from app.models.vehicle_definitions import VehicleModelDefinition from app.services.ai_service import AIService logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Robot-Bulk-Master") class TechEnricher: API_URL = "https://opendata.rdw.nl/resource/kyri-nuah.json" RDW_TOKEN = os.getenv("RDW_APP_TOKEN") HEADERS = {"X-App-Token": RDW_TOKEN} if RDW_TOKEN else {} @classmethod async def fetch_rdw_tech_data(cls, make, model): params = {"merk": make.upper(), "handelsbenaming": str(model).strip().upper(), "$limit": 1} async with httpx.AsyncClient(headers=cls.HEADERS) as client: try: resp = await client.get(cls.API_URL, params=params, timeout=15) return resp.json()[0] if resp.status_code == 200 and resp.json() else None except: return None @classmethod async def run(cls): logger.info("🚀 Master-Merge Robot FOLYAMATOS ÜZEMMÓD INDUL...") while True: # Folyamatos ciklus, amĂ­g el nem fogy az adat async with SessionLocal() as main_db: stmt = select(VehicleModelDefinition.id).where( VehicleModelDefinition.status == "unverified" ).limit(50) # Egyszerre 50 ID-t foglalunk le res = await main_db.execute(stmt) ids = res.scalars().all() if not ids: logger.info("🏁 Minden rekord feldolgozva. A robot megĂĄll.") break logger.info(f"📩 Új csomag indĂ­tĂĄsa: {len(ids)} rekord.") for m_id in ids: async with SessionLocal() as db: try: current = await db.get(VehicleModelDefinition, m_id) if not current: continue logger.info(f"đŸ§Ș FeldolgozĂĄs: {current.make} {current.marketing_name} (ID: {m_id})") rdw_data = await cls.fetch_rdw_tech_data(current.make, current.marketing_name) if rdw_data: current.engine_capacity = int(float(rdw_data.get("cilinderinhoud", 0))) or current.engine_capacity current.power_kw = int(float(rdw_data.get("netto_maximum_vermogen_kw", 0))) or current.power_kw ai_data = await AIService.get_clean_vehicle_data(current.make, current.marketing_name, current.vehicle_type) if ai_data: tech_code = ai_data.get("technical_code") or "N/A" new_ccm = ai_data.get("ccm") or current.engine_capacity master_record = None if tech_code and tech_code != "N/A": stmt_master = select(VehicleModelDefinition).where(and_( VehicleModelDefinition.make == current.make, VehicleModelDefinition.technical_code == tech_code, VehicleModelDefinition.engine_capacity == new_ccm, VehicleModelDefinition.status == 'ai_enriched', VehicleModelDefinition.id != m_id )) master_record = (await db.execute(stmt_master)).scalar_one_or_none() if master_record: logger.info(f"🔗 Merge: ID:{m_id} -> Master ID:{master_record.id}") syns = set(master_record.synonyms or []) syns.update(ai_data.get("synonyms", [])) syns.add(current.marketing_name) master_record.synonyms = list(syns) current.status = "duplicate" current.parent_id = master_record.id else: current.technical_code = tech_code if tech_code != "N/A" else f"N/A-{m_id}" current.marketing_name = ai_data.get("marketing_name", current.marketing_name) current.engine_capacity = new_ccm current.power_kw = ai_data.get("kw") or current.power_kw current.year_from = ai_data.get("year_from") current.year_to = ai_data.get("year_to") current.synonyms = ai_data.get("synonyms", []) if ai_data.get("maintenance"): old_spec = current.specifications or {} old_spec.update(ai_data.get("maintenance")) current.specifications = old_spec current.status = "ai_enriched" else: if not current.technical_code: current.technical_code = f"UNKNOWN-{m_id}" current.updated_at = datetime.datetime.now() await db.commit() logger.info(f"✅ Mentve (ID: {m_id})") except Exception as e: await db.rollback() logger.error(f"❌ Hiba ID:{m_id}: {e}") finally: await db.close() if __name__ == "__main__": asyncio.run(TechEnricher.run())