Files
service-finder/backend/app/workers/vehicle/vehicle_robot_2_researcher.py

194 lines
8.4 KiB
Python
Executable File

import asyncio
import logging
import warnings
import os
import json
from datetime import datetime
from sqlalchemy import text, update, func
from app.database import AsyncSessionLocal
from app.models.vehicle_definitions import VehicleModelDefinition
warnings.filterwarnings("ignore", category=RuntimeWarning, module='duckduckgo_search')
from duckduckgo_search import DDGS
# MB 2.0 Szabvány naplózás
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] Robot-2-Researcher: %(message)s')
logger = logging.getLogger("Vehicle-Robot-2-Researcher")
class QuotaManager:
""" Szigorú napi limit figyelő a fizetős/hatósági API-khoz """
def __init__(self, service_name: str, daily_limit: int):
self.service_name = service_name
self.daily_limit = daily_limit
self.state_file = f"/app/temp/.quota_{service_name}.json"
self._ensure_file()
def _ensure_file(self):
os.makedirs(os.path.dirname(self.state_file), exist_ok=True)
if not os.path.exists(self.state_file):
with open(self.state_file, 'w') as f:
json.dump({"date": datetime.now().strftime("%Y-%m-%d"), "count": 0}, f)
def can_make_request(self) -> bool:
with open(self.state_file, 'r') as f:
data = json.load(f)
today = datetime.now().strftime("%Y-%m-%d")
if data["date"] != today:
data = {"date": today, "count": 0} # Új nap, kvóta nullázása
if data["count"] >= self.daily_limit:
return False
# Növeljük a számlálót
data["count"] += 1
with open(self.state_file, 'w') as f:
json.dump(data, f)
return True
class VehicleResearcher:
"""
Vehicle Robot 2.5: Sniper Researcher (Mesterlövész Adatgyűjtő)
Célzott keresésekkel és strukturált aktakészítéssel dolgozik az AI kímélése érdekében.
"""
def __init__(self):
self.max_attempts = 5
self.search_timeout = 15.0
# Kvóta menedzserek beállítása (.env-ből olvasva)
dvla_limit = int(os.getenv("DVLA_DAILY_LIMIT", "1000"))
self.dvla_quota = QuotaManager("dvla", dvla_limit)
self.dvla_token = os.getenv("DVLA_API_KEY")
async def fetch_ddg_targeted(self, label: str, query: str) -> str:
""" Célzott keresés szálbiztosan a DuckDuckGo-n. """
try:
def search():
with DDGS() as ddgs:
# max_results=2: Nem kell sok zaj, csak a legrelevánsabb 2 találat
results = ddgs.text(query, max_results=2)
return [f"- {r.get('body', '')}" for r in results] if results else []
results = await asyncio.wait_for(asyncio.to_thread(search), timeout=self.search_timeout)
if not results:
return f"[SOURCE: {label}]\nNincs érdemi találat.\n"
content = f"[SOURCE: {label} | KERESÉS: {query}]\n"
content += "\n".join(results) + "\n"
return content
except Exception as e:
logger.debug(f"Keresési hiba ({label}): {e}")
return f"[SOURCE: {label}]\nKERESÉSI HIBA.\n"
async def research_vehicle(self, db, vehicle_id: int, make: str, model: str, engine: str, year: str, current_attempts: int):
""" Egy jármű átvilágítása és a strukturált 'Akta' elkészítése a GPU számára. """
engine_safe = engine or ""
year_safe = str(year) if year else ""
logger.info(f"🔎 Mesterlövész Kutatás: {make} {model} (Motor: {engine_safe})")
# 1. TIER: Ingyenes, Célzott Keresések (A legmegbízhatóbb források)
queries = [
("ULTIMATE_SPECS", f"{make} {model} {engine_safe} {year_safe} site:ultimatespecs.com"),
("AUTO_DATA", f"{make} {model} {engine_safe} {year_safe} site:auto-data.net"),
("COMMON_ISSUES", f"{make} {model} {engine_safe} reliability common problems")
]
tasks = [self.fetch_ddg_targeted(label, q) for label, q in queries]
search_results = await asyncio.gather(*tasks)
# 2. TIER: Fizetős / Kvótás API-k (Példa a DVLA helyére)
# Ha a jövőben bejön brit rendszám, itt hívjuk meg a DVLA-t:
# if has_uk_plate and self.dvla_quota.can_make_request():
# uk_data = await self.fetch_dvla_data(plate)
# search_results.append(uk_data)
# 3. ÖSSZESÍTÉS (Az Akta összeállítása)
# Maximalizáljuk a szöveg hosszát, hogy az AI GPU ne fulladjon le!
full_context = "\n".join(search_results)
if len(full_context) > 2500:
full_context = full_context[:2500] + "\n...[TRUNCATED TO SAVE GPU TOKENS]"
try:
if len(full_context.strip()) > 150: # Csökkentettük az elvárást, mert a célzott keresés tömörebb
await db.execute(
update(VehicleModelDefinition)
.where(VehicleModelDefinition.id == vehicle_id)
.values(
raw_search_context=full_context,
status='awaiting_ai_synthesis', # Kész az Akta, mehet az Alkimistának!
last_research_at=func.now(),
attempts=current_attempts + 1
)
)
logger.info(f"✅ Akta rögzítve ({len(full_context)} karakter): {make} {model}")
else:
new_status = 'suspended_research' if current_attempts + 1 >= self.max_attempts else 'unverified'
await db.execute(
update(VehicleModelDefinition)
.where(VehicleModelDefinition.id == vehicle_id)
.values(
status=new_status,
attempts=current_attempts + 1,
last_research_at=func.now()
)
)
if new_status == 'suspended_research':
logger.warning(f"🛑 Felfüggesztve (Nincs nyom a weben): {make} {model}")
else:
logger.warning(f"⚠️ Kevés adat: {make} {model}, visszatéve a sorba.")
await db.commit()
except Exception as e:
await db.rollback()
logger.error(f"🚨 Adatbázis hiba az eredmény mentésénél ({vehicle_id}): {e}")
@classmethod
async def run(cls):
self_instance = cls()
logger.info("🚀 Vehicle Researcher 2.5 ONLINE (Sniper & Quota Manager)")
while True:
try:
async with AsyncSessionLocal() as db:
# ATOMI ZÁROLÁS
query = text("""
UPDATE data.vehicle_model_definitions
SET status = 'research_in_progress'
WHERE id = (
SELECT id FROM data.vehicle_model_definitions
WHERE status IN ('unverified', 'awaiting_research')
AND attempts < :max_attempts
ORDER BY
CASE WHEN make = 'TOYOTA' THEN 1 ELSE 2 END,
attempts ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, make, marketing_name, engine_code, year_from, attempts;
""")
result = await db.execute(query, {"max_attempts": self_instance.max_attempts})
task = result.fetchone()
await db.commit()
if task:
v_id, v_make, v_model, v_engine, v_year, v_attempts = task
async with AsyncSessionLocal() as process_db:
await self_instance.research_vehicle(process_db, v_id, v_make, v_model, v_engine, v_year, v_attempts)
await asyncio.sleep(2) # Rate limit védelem a DDG felé
else:
await asyncio.sleep(30)
except Exception as e:
logger.error(f"💀 Kritikus hiba a főciklusban: {e}")
await asyncio.sleep(10)
if __name__ == "__main__":
try:
asyncio.run(VehicleResearcher.run())
except KeyboardInterrupt:
logger.info("🛑 Kutató robot leállítva.")