# /opt/docker/dev/service_finder/backend/app/workers/service_hunter.py import asyncio import httpx import logging import os import hashlib from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, text, update from app.db.session import AsyncSessionLocal from app.models.staged_data import ServiceStaging, DiscoveryParameter # Naplózás beállítása a Sentinel monitorozáshoz logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') logger = logging.getLogger("Robot-Continental-Scout-v1.3") class ServiceHunter: """ Robot v1.3.1: Continental Scout (Grid Search Edition) Felelőssége: Új szervizpontok felfedezése külső API-k alapján. """ PLACES_NEW_URL = "https://places.googleapis.com/v1/places:searchNearby" GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") @classmethod def _generate_fingerprint(cls, name: str, city: str, address: str) -> str: """ MD5 Ujjlenyomat generálása. Ez biztosítja, hogy ha ugyanazt a helyet több rács-cellából is megtaláljuk, ne jöjjön létre duplikált rekord. """ raw = f"{str(name).lower()}|{str(city).lower()}|{str(address).lower()[:10]}" return hashlib.md5(raw.encode()).hexdigest() @classmethod async def _get_city_bounds(cls, city: str, country_code: str): """ Nominatim API hívás a város befoglaló téglalapjának lekéréséhez. """ url = "https://nominatim.openstreetmap.org/search" params = {"city": city, "country": country_code, "format": "json"} headers = {"User-Agent": "ServiceFinder-Scout-v1.3/2.0 (contact@servicefinder.com)"} async with httpx.AsyncClient(headers=headers, timeout=10) as client: try: resp = await client.get(url, params=params) if resp.status_code == 200 and resp.json(): bbox = resp.json()[0].get("boundingbox") # [min_lat, max_lat, min_lon, max_lon] return [float(x) for x in bbox] except Exception as e: logger.error(f"⚠️ Városhatár lekérdezési hiba ({city}): {e}") return None @classmethod async def get_google_places(cls, lat: float, lon: float): """ Google Places V1 (New) API hívás. """ if not cls.GOOGLE_API_KEY: logger.error("❌ Google API Key hiányzik!") return [] headers = { "Content-Type": "application/json", "X-Goog-Api-Key": cls.GOOGLE_API_KEY, "X-Goog-FieldMask": "places.displayName,places.id,places.internationalPhoneNumber,places.websiteUri,places.formattedAddress,places.location" } # MB 2.0 szűrők: Csak releváns típusok payload = { "includedTypes": ["car_repair", "motorcycle_repair", "car_wash", "tire_shop"], "maxResultCount": 20, "locationRestriction": { "circle": { "center": {"latitude": lat, "longitude": lon}, "radius": 1200.0 # 1.2km sugarú körök a jó átfedéshez } } } async with httpx.AsyncClient(timeout=15) as client: try: resp = await client.post(cls.PLACES_NEW_URL, json=payload, headers=headers) if resp.status_code == 200: return resp.json().get("places", []) logger.warning(f"Google API hiba: {resp.status_code} - {resp.text}") except Exception as e: logger.error(f"Google API hívás hiba: {e}") return [] @classmethod async def _save_to_staging(cls, db: AsyncSession, task, p_data: dict): """ Adatmentés a staging táblába deduplikációval. """ name = p_data.get('displayName', {}).get('text') addr = p_data.get('formattedAddress', '') f_print = cls._generate_fingerprint(name, task.city, addr) # Ellenőrzés, hogy létezik-e már (Ujjlenyomat alapján) stmt = select(ServiceStaging).where(ServiceStaging.fingerprint == f_print) existing = (await db.execute(stmt)).scalar_one_or_none() if existing: # Csak a bizalmi pontot és az utolsó észlelést frissítjük existing.trust_score += 2 existing.updated_at = datetime.now(timezone.utc) return # Új rekord létrehozása new_entry = ServiceStaging( name=name, source="google_scout_v1.3", external_id=p_data.get('id'), fingerprint=f_print, city=task.city, full_address=addr, contact_phone=p_data.get('internationalPhoneNumber'), website=p_data.get('websiteUri'), raw_data=p_data, status="pending", trust_score=30 # Alapértelmezett bizalmi szint ) db.add(new_entry) @classmethod async def run_grid_search(cls, db: AsyncSession, task: DiscoveryParameter): """ A város koordináta-alapú bejárása. """ bbox = await cls._get_city_bounds(task.city, task.country_code or 'HU') if not bbox: return # Lépésközök meghatározása (kb. 1km = 0.01 fok) lat_step = 0.012 lon_step = 0.018 curr_lat = bbox[0] while curr_lat < bbox[1]: curr_lon = bbox[2] while curr_lon < bbox[3]: logger.info(f"🛰️ Cella pásztázása: {curr_lat:.4f}, {curr_lon:.4f} ({task.city})") places = await cls.get_google_places(curr_lat, curr_lon) for p in places: await cls._save_to_staging(db, task, p) await db.commit() # Cellánként mentünk, hogy ne vesszen el a munka curr_lon += lon_step await asyncio.sleep(0.3) # Rate limit védelem curr_lat += lat_step @classmethod async def run(cls): """ A robot fő hurokfolyamata. """ logger.info("🤖 Continental Scout ONLINE - Grid Engine Indul...") while True: async with AsyncSessionLocal() as db: try: # Aktív keresési feladatok lekérése stmt = select(DiscoveryParameter).where(DiscoveryParameter.is_active == True) tasks = (await db.execute(stmt)).scalars().all() for task in tasks: # Csak akkor futtatjuk, ha már régen volt (pl. 30 naponta) if not task.last_run_at or (datetime.now(timezone.utc) - task.last_run_at).days >= 30: logger.info(f"🔎 Felderítés indítása: {task.city}") await cls.run_grid_search(db, task) task.last_run_at = datetime.now(timezone.utc) await db.commit() except Exception as e: logger.error(f"💥 Kritikus hiba a Scout robotban: {e}") await db.rollback() # 6 óránként ellenőrizzük, van-e új feladat await asyncio.sleep(21600) if __name__ == "__main__": asyncio.run(ServiceHunter.run())