import asyncio import httpx import logging import uuid import os import sys import csv from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, text from sqlalchemy.orm import selectinload from app.db.session import SessionLocal # Modellek importálása from app.models.service import ServiceProfile, ExpertiseTag from app.models.organization import Organization, OrganizationFinancials, OrgType, OrgUserRole, OrganizationMember from app.models.identity import Person from app.models.address import Address, GeoPostalCode from geoalchemy2.elements import WKTElement from datetime import datetime, timezone # Naplózás beállítása logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Robot2-Dunakeszi-Detective") class ServiceHunter: """ Robot 2.7.2: Dunakeszi Detective - Deep Model Integration. Logika: 1. Helyi CSV (Saját beküldés - Cím alapú Geocoding-al - 50 pont Trust) 2. OSM (Közösségi adat - 10 pont Trust) 3. Google (Adatpótlás/Fallback - 30 pont Trust) """ OVERPASS_URL = "http://overpass-api.de/api/interpreter" PLACES_NEW_URL = "https://places.googleapis.com/v1/places:searchNearby" GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json" GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") LOCAL_CSV_PATH = "/app/app/workers/local_services.csv" @classmethod async def geocode_address(cls, address_text): """Cím szövegből GPS koordinátát és címkomponenseket csinál.""" if not cls.GOOGLE_API_KEY: logger.warning("⚠️ Google API kulcs hiányzik!") return None params = {"address": address_text, "key": cls.GOOGLE_API_KEY} try: async with httpx.AsyncClient() as client: resp = await client.get(cls.GEOCODE_URL, params=params, timeout=10) if resp.status_code == 200: data = resp.json() if data.get("results"): result = data["results"][0] loc = result["geometry"]["location"] # Címkomponensek kinyerése a kötelező mezőkhöz components = result.get("address_components", []) parsed = {"lat": loc["lat"], "lng": loc["lng"], "zip": "", "city": "", "street": "Ismeretlen", "type": "utca", "number": "1"} for c in components: types = c.get("types", []) if "postal_code" in types: parsed["zip"] = c["long_name"] if "locality" in types: parsed["city"] = c["long_name"] if "route" in types: parsed["street"] = c["long_name"] if "street_number" in types: parsed["number"] = c["long_name"] logger.info(f"📍 Geocoding sikeres: {address_text}") return parsed else: logger.error(f"❌ Geocoding hiba: {resp.status_code}") except Exception as e: logger.error(f"❌ Geocoding hiba: {e}") return None @classmethod async def get_google_place_details_new(cls, lat, lon): """Google Places API (New) - Adatpótlás FieldMask használatával.""" if not cls.GOOGLE_API_KEY: return None headers = { "Content-Type": "application/json", "X-Goog-Api-Key": cls.GOOGLE_API_KEY, "X-Goog-FieldMask": "places.displayName,places.id,places.types,places.internationalPhoneNumber,places.websiteUri" } payload = { "includedTypes": ["car_repair", "gas_station", "ev_charging_station", "car_wash", "motorcycle_repair"], "maxResultCount": 1, "locationRestriction": { "circle": { "center": {"latitude": lat, "longitude": lon}, "radius": 40.0 } } } try: async with httpx.AsyncClient() as client: resp = await client.post(cls.PLACES_NEW_URL, json=payload, headers=headers, timeout=10) if resp.status_code == 200: places = resp.json().get("places", []) if places: p = places[0] return { "name": p.get("displayName", {}).get("text"), "google_id": p.get("id"), "types": p.get("types", []), "phone": p.get("internationalPhoneNumber"), "website": p.get("websiteUri") } except Exception as e: logger.error(f"❌ Google kiegészítő hívás hiba: {e}") return None @classmethod async def import_local_csv(cls, db: AsyncSession): """Manuális adatok betöltése CSV-ből.""" if not os.path.exists(cls.LOCAL_CSV_PATH): return try: with open(cls.LOCAL_CSV_PATH, mode='r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: geo_data = None if row.get('cim'): geo_data = await cls.geocode_address(row['cim']) if geo_data: element = { "tags": { "name": row['nev'], "phone": row.get('telefon'), "website": row.get('web'), "amenity": row.get('tipus', 'car_repair'), "addr:full": row.get('cim'), "addr:city": geo_data["city"], "addr:zip": geo_data["zip"], "addr:street": geo_data["street"], "addr:type": geo_data["type"], "addr:number": geo_data["number"] }, "lat": geo_data["lat"], "lon": geo_data["lng"] } await cls.save_service_deep(db, element, source="local_manual") logger.info("✅ Helyi CSV adatok feldolgozva.") except Exception as e: logger.error(f"❌ CSV feldolgozási hiba: {e}") @classmethod async def get_or_create_person(cls, db: AsyncSession, name: str) -> Person: """Ghost Person kezelése.""" names = name.split(' ', 1) last_name = names[0] first_name = names[1] if len(names) > 1 else "Ismeretlen" stmt = select(Person).where(Person.last_name == last_name, Person.first_name == first_name) result = await db.execute(stmt); person = result.scalar_one_or_none() if not person: person = Person(last_name=last_name, first_name=first_name, is_ghost=True, is_active=False) db.add(person); await db.flush() return person @classmethod async def enrich_financials(cls, db: AsyncSession, org_id: int): """Pénzügyi rekord inicializálása.""" financial = OrganizationFinancials( organization_id=org_id, year=datetime.now(timezone.utc).year - 1, source="bot_discovery" ) db.add(financial) @classmethod async def save_service_deep(cls, db: AsyncSession, element: dict, source="osm"): """Mély mentés a modelled specifikus mezőneveivel és kötelező értékeivel.""" tags = element.get("tags", {}) lat, lon = element.get("lat"), element.get("lon") if not lat or not lon: return osm_name = tags.get("name") or tags.get("brand") or tags.get("operator") google_data = None if not osm_name or osm_name.lower() in ['aprilia', 'bosch', 'shell', 'mol', 'omv', 'ismeretlen']: google_data = await cls.get_google_place_details_new(lat, lon) final_name = (google_data["name"] if google_data else osm_name) or "Ismeretlen Szolgáltató" stmt = select(Organization).where(Organization.full_name == final_name) result = await db.execute(stmt); org = result.scalar_one_or_none() if not org: # 1. Address létrehozása (a kötelező mezőket kitöltjük az átadott tags-ből vagy alapértékkel) new_addr = Address( latitude=lat, longitude=lon, full_address_text=tags.get("addr:full") or f"2120 Dunakeszi, {tags.get('addr:street', 'Ismeretlen')} {tags.get('addr:housenumber', '1')}", street_name=tags.get("addr:street") or "Ismeretlen", street_type=tags.get("addr:type") or "utca", house_number=tags.get("addr:number") or tags.get("addr:housenumber") or "1" ) db.add(new_addr); await db.flush() # 2. Organization létrehozása (a modelled alapján ezek a mezők itt vannak) org = Organization( full_name=final_name, name=final_name[:50], org_type=OrgType.service, address_id=new_addr.id, address_city=tags.get("addr:city") or "Dunakeszi", address_zip=tags.get("addr:zip") or "2120", address_street_name=new_addr.street_name, address_street_type=new_addr.street_type, address_house_number=new_addr.house_number ) db.add(org); await db.flush() # 3. Service Profile trust = 50 if source == "local_manual" else (30 if google_data else 10) spec = {"brands": [], "types": google_data["types"] if google_data else [], "osm_tags": tags} if tags.get("brand"): spec["brands"].append(tags.get("brand")) profile = ServiceProfile( organization_id=org.id, location=WKTElement(f'POINT({lon} {lat})', srid=4326), status="ghost", trust_score=trust, google_place_id=google_data["google_id"] if google_data else None, specialization_tags=spec, website=google_data["website"] if google_data else tags.get("website"), contact_phone=google_data["phone"] if google_data else tags.get("phone") ) db.add(profile) # 4. Tulajdonos rögzítése owner_name = tags.get("operator") or tags.get("contact:person") if owner_name and len(owner_name) > 3: person = await cls.get_or_create_person(db, owner_name) db.add(OrganizationMember( organization_id=org.id, person_id=person.id, role=OrgUserRole.OWNER, is_verified=False )) await cls.enrich_financials(db, org.id) await db.flush() logger.info(f"✨ [{source.upper()}] Mentve: {final_name} (Bizalom: {trust})") @classmethod async def run(cls): logger.info("🤖 Robot 2.7.2: Dunakeszi Detective indítása...") # Kapcsolódási védelem connected = False while not connected: try: async with SessionLocal() as db: await db.execute(text("SELECT 1")) connected = True except Exception as e: logger.warning(f"⏳ Várakozás a hálózatra (shared-postgres host?): {e}") await asyncio.sleep(5) while True: async with SessionLocal() as db: try: await db.execute(text("SET search_path TO data, public")) # 1. Beküldött CSV feldolgozása (Geocoding-al) await cls.import_local_csv(db) await db.commit() # 2. OSM Szkennelés query = """[out:json][timeout:120];area["name"="Dunakeszi"]->.city;(nwr["shop"~"car_repair|motorcycle_repair|tyres|car_parts|motorcycle"](area.city);nwr["amenity"~"car_repair|vehicle_inspection|motorcycle_repair|fuel|charging_station|car_wash"](area.city);nwr["amenity"~"car_repair|fuel|charging_station"](around:5000, 47.63, 19.13););out center;""" async with httpx.AsyncClient() as client: resp = await client.post(cls.OVERPASS_URL, data={"data": query}, timeout=120) if resp.status_code == 200: elements = resp.json().get("elements", []) for el in elements: await cls.save_service_deep(db, el, source="osm") await db.commit() except Exception as e: logger.error(f"❌ Futáshiba: {e}") logger.info("😴 Scan kész, 24 óra pihenő...") await asyncio.sleep(86400) if __name__ == "__main__": asyncio.run(ServiceHunter.run())