import asyncio import httpx import logging import os from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, text from app.db.session import SessionLocal # Modellek - Az új v1.3 struktúra from app.models.service import ServiceStaging, DiscoveryParameter logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("Robot-v1.3-ContinentalScout") class ServiceHunter: """ Robot v1.3.0: Continental Scout. EU-szintű felderítő motor, Discovery tábla alapú vezérléssel. """ OVERPASS_URL = "http://overpass-api.de/api/interpreter" PLACES_NEW_URL = "https://places.googleapis.com/v1/places:searchNearby" GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json" GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") @classmethod async def get_coordinates(cls, city, country_code): """Város központjának lekérése a keresés indításához.""" params = {"address": f"{city}, {country_code}", "key": cls.GOOGLE_API_KEY} async with httpx.AsyncClient() as client: resp = await client.get(cls.GEOCODE_URL, params=params) if resp.status_code == 200: results = resp.json().get("results") if results: loc = results[0]["geometry"]["location"] return loc["lat"], loc["lng"] return None, None @classmethod async def get_google_places(cls, lat, lon, keyword): """Google Places New API - Javított, 400-as hiba elleni védelemmel.""" if not cls.GOOGLE_API_KEY: return [] headers = { "Content-Type": "application/json", "X-Goog-Api-Key": cls.GOOGLE_API_KEY, "X-Goog-FieldMask": "places.displayName,places.id,places.types,places.internationalPhoneNumber,places.websiteUri,places.formattedAddress" } # A 'keyword' a TextQuery-hez kellene, a SearchNearby-nél típusokat (includedTypes) használunk. # EU szintű trükk: Ha nincs pontos típus, a 'car_repair' az alapértelmezett. payload = { "includedTypes": ["car_repair", "gas_station", "car_wash", "motorcycle_repair"], "maxResultCount": 20, "locationRestriction": { "circle": { "center": {"latitude": lat, "longitude": lon}, "radius": 5000.0 # 5km körzet } } } async with httpx.AsyncClient() as client: resp = await client.post(cls.PLACES_NEW_URL, json=payload, headers=headers) if resp.status_code == 200: return resp.json().get("places", []) else: logger.error(f"❌ Google API hiba ({resp.status_code}): {resp.text}") return [] @classmethod async def save_to_staging(cls, db: AsyncSession, data: dict): """Mentés a Staging táblába 9-mezős bontással.""" stmt = select(ServiceStaging).where(ServiceStaging.external_id == str(data['external_id'])) if (await db.execute(stmt)).scalar_one_or_none(): return new_entry = ServiceStaging( name=data['name'], source=data['source'], external_id=str(data['external_id']), # Itt történik a 9-mezős bontás (ha érkezik adat) postal_code=data.get('zip'), city=data.get('city'), street_name=data.get('street'), street_type=data.get('street_type', 'utca'), house_number=data.get('number'), full_address=data.get('full_address'), contact_phone=data.get('phone'), website=data.get('website'), raw_data=data.get('raw', {}), status="pending", trust_score=data.get('trust', 10) ) db.add(new_entry) @classmethod async def run(cls): logger.info("🤖 Robot v1.3.0: Continental Scout elindult...") while True: async with SessionLocal() as db: try: await db.execute(text("SET search_path TO data, public")) # 1. Paraméterek lekérése a táblából stmt = select(DiscoveryParameter).where(DiscoveryParameter.is_active == True) tasks = (await db.execute(stmt)).scalars().all() for task in tasks: logger.info(f"🔎 Felderítés: {task.city} ({task.country_code}) -> {task.keyword}") # Koordináták beszerzése a kereséshez lat, lon = await cls.get_coordinates(task.city, task.country_code) if not lat: continue # --- GOOGLE FÁZIS --- google_places = await cls.get_google_places(lat, lon, task.keyword) for p in google_places: await cls.save_to_staging(db, { "external_id": p.get('id'), "name": p.get('displayName', {}).get('text'), "full_address": p.get('formattedAddress'), "phone": p.get('internationalPhoneNumber'), "website": p.get('websiteUri'), "source": "google", "raw": p, "trust": 30 }) # --- OSM FÁZIS (EU kompatibilis lekérdezés) --- osm_query = f"""[out:json][timeout:60]; (nwr["amenity"~"car_repair|fuel"](around:5000, {lat}, {lon});); out center;""" async with httpx.AsyncClient() as client: resp = await client.post(cls.OVERPASS_URL, data={"data": osm_query}) if resp.status_code == 200: for el in resp.json().get("elements", []): t = el.get("tags", {}) await cls.save_to_staging(db, { "external_id": f"osm_{el['id']}", "name": t.get('name', 'Ismeretlen szerviz'), "city": t.get('addr:city', task.city), "zip": t.get('addr:postcode'), "street": t.get('addr:street'), "number": t.get('addr:housenumber'), "source": "osm", "raw": el, "trust": 15 }) task.last_run_at = datetime.now(timezone.utc) await db.commit() logger.info(f"✅ {task.city} felderítve.") except Exception as e: logger.error(f"💥 Kritikus hiba a ciklusban: {e}") logger.info("😴 Minden aktív feladat kész. Alvás 1 órán át...") await asyncio.sleep(3600) if __name__ == "__main__": asyncio.run(ServiceHunter.run())