#!/usr/bin/env python3 import asyncio import logging import warnings import json import httpx import re from bs4 import BeautifulSoup from duckduckgo_search import DDGS from playwright.async_api import async_playwright from sqlalchemy import text from app.database import AsyncSessionLocal # Figyelmeztetések némítása (a csomag átnevezése miatti zaj elkerülésére) warnings.filterwarnings("ignore", category=RuntimeWarning, module='duckduckgo_search') logging.basicConfig( level=logging.INFO, format='%(asctime)s [R2-MASTER-EDITION] %(message)s' ) logger = logging.getLogger("R2-Researcher") class VehicleResearcher: def __init__(self, concurrency=5): # Egyszerre 5 böngésző fület kezelünk a sebesség érdekében self.semaphore = asyncio.Semaphore(concurrency) self.ollama_url = "http://sf_ollama:11434/api/generate" # FORDÍTÓ SZÓTÁR: Holland RDW -> Nemzetközi keresési nevek self.translation_map = { "ER REIHE": "Series", "T-MODELL": "Estate", "KLASSE": "Class", "PERSONENAUTO": "Car", "STATIONWAGEN": "Estate", "MERCEDES-BENZ": "Mercedes", "Vrachtwagen": "Truck", "Oplegger": "Trailer" } def clean_name(self, make, model): """Lefordítja a holland modellneveket, hogy a Google/Bing megtalálja őket.""" name = f"{make} {model}".upper() for dutch, eng in self.translation_map.items(): name = name.replace(dutch, eng) return name.title() async def get_url(self, make, model, year, kw): """Keresés a DuckDuckGo-val. JAVÍTVA: 0kW fix és több találat.""" clean_n = self.clean_name(make, model) # Ha a kW 0, None vagy érvénytelen, kihagyjuk a keresésből a találati arány javítására kw_val = 0 try: if kw and str(kw).replace('.','').isdigit(): kw_val = int(float(kw)) except: pass kw_part = f"{kw_val}kW" if kw_val > 0 else "" query = f"site:auto-data.net {clean_n} {year} {kw_part} specifications" try: def _search(): with DDGS() as ddgs: # Megnézzük az első 3 találatot, hátha az első nem direkt link res = ddgs.search(query, max_results=3) return [r.get('link', r.get('href', '')) for r in res if 'auto-data.net' in r.get('link', r.get('href', ''))] links = await asyncio.to_thread(_search) return links[0] if links else None except Exception as e: logger.warning(f"Keresési hiba ({query}): {e}") return None async def scrape_auto_data(self, url, browser): """Letölti az oldalt és kinyeri az összes technikai adatot.""" specs = {} full_text = "" try: page = await browser.new_page() # Gyorsítás: képek, videók és stíluslapok tiltása await page.route("**/*.{png,jpg,jpeg,gif,css,woff2}", lambda r: r.abort()) await page.goto(url, wait_until="domcontentloaded", timeout=20000) html = await page.content() # Kimentjük a tiszta szöveget is, ha az AI-nak kellene később full_text = await page.evaluate("() => document.body.innerText") await page.close() soup = BeautifulSoup(html, 'html.parser') # Végigfutunk minden táblázat soron for row in soup.find_all('tr'): th = row.find('th') td = row.find('td') if th and td: k, v = th.get_text(strip=True).lower(), td.get_text(strip=True) # Minden fontos mező kinyerése if "engine model/code" in k: specs["engine_code"] = v elif "engine oil capacity" in k: specs["oil_l"] = v elif "acceleration 0 - 100" in k: specs["acc_0_100"] = v elif "maximum speed" in k: specs["max_speed"] = v elif "fuel consumption" in k and "combined" in k: specs["cons_avg"] = v elif "co2 emissions" in k: specs["co2"] = v elif "generation" in k: specs["generation"] = v elif "tires size" in k: specs["tires"] = v elif "trunk (boot) space" in k: specs["trunk_l"] = v elif "kerb weight" in k: specs["weight_kg"] = v elif "drivetrain" in k: specs["drivetrain"] = v elif "number of gears" in k: specs["transmission"] = v return specs, full_text except Exception as e: logger.error(f"Scraping hiba az oldalon ({url}): {e}") return {}, "" async def ask_ai_fallback(self, raw_text): """Ha a BeautifulSoup nem talál táblázatot, megkérjük az Ollamát.""" if not raw_text or len(raw_text) < 200: return {} prompt = f"Extract vehicle specs (engine_code, oil_capacity, tires, generation) as JSON from this text: {raw_text[:2500]}" try: async with httpx.AsyncClient(timeout=30.0) as client: r = await client.post(self.ollama_url, json={ "model": "qwen2.5-coder:14b", "prompt": prompt, "stream": False, "format": "json" }) return json.loads(r.json().get("response", "{}")) except: return {} async def process_vehicle(self, v_id, make, model, year, kw, browser): """Egy jármű dúsításának teljes folyamata.""" async with self.semaphore: logger.info(f"🔍 Kutatás: {make} {model} ({year}) | kW: {kw}") url = await self.get_url(make, model, year, kw) specs = {} if url: logger.info(f"🔗 Találat: {url}") specs, raw_text = await self.scrape_auto_data(url, browser) # Ha a táblázatból nem jött ki elég adat, jöhet az AI fallback if len(specs) < 3: ai_specs = await self.ask_ai_fallback(raw_text) specs.update(ai_specs) # MENTÉS: Minden szál saját adatbázis kapcsolatot használ a biztonság érdekében async with AsyncSessionLocal() as db: # Csak akkor validation_ready, ha találtunk adatot. Ha nem, külön státuszba tesszük. new_status = 'validation_ready' if len(specs) > 0 else 'research_failed_empty' update_query = text(""" UPDATE vehicle.vehicle_model_definitions SET specifications = specifications || CAST(:specs AS JSONB), status = :status, last_research_at = now() WHERE id = :id """) await db.execute(update_query, { "specs": json.dumps(specs), "status": new_status, "id": v_id }) await db.commit() if len(specs) > 0: logger.info(f"✅ SIKER: {make} {model} ({len(specs)} adat kinyerve)") else: logger.warning(f"❌ SIKERTELEN: {make} {model} (nem találtunk adatot a neten)") async def run(self): logger.info("🚀 R2-Kutató MASTER-EDITION (0kW fix + AI Fallback) ONLINE") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) while True: try: async with AsyncSessionLocal() as db: # 10 autó bekérése párhuzamos feldolgozásra res = await db.execute(text(""" UPDATE vehicle.vehicle_model_definitions SET status = 'research_in_progress' WHERE id IN ( SELECT id FROM vehicle.vehicle_model_definitions WHERE status = 'enrich_ready' LIMIT 10 ) RETURNING id, make, marketing_name, year_from, power_kw """)) rows = res.fetchall() await db.commit() if not rows: await asyncio.sleep(15) continue tasks = [self.process_vehicle(r[0], r[1], r[2], r[3], r[4], browser) for r in rows] await asyncio.gather(*tasks) except Exception as e: logger.error(f"💀 Kritikus hiba a főciklusban: {e}") await asyncio.sleep(10) if __name__ == "__main__": asyncio.run(VehicleResearcher().run())