""" Webhook dispatcher - sends events to configured URLs """ import httpx import json from datetime import datetime from app.services.database import get_db async def trigger_webhook(project_id: int, event: str, payload: dict): """Send webhook to all configured endpoints for this project""" db = await get_db() cursor = await db.execute( "SELECT * FROM webhooks WHERE project_id = ? AND active = 1", (project_id,) ) webhooks = await cursor.fetchall() # Also check project's default webhook_url cursor = await db.execute("SELECT webhook_url FROM projects WHERE id = ?", (project_id,)) project = await cursor.fetchone() await db.close() urls = [] for wh in webhooks: wh = dict(wh) events = json.loads(wh.get("events", "[]")) if event in events or not events: urls.append(wh["url"]) if project and project["webhook_url"]: urls.append(project["webhook_url"]) # Deduplicate urls = list(set(urls)) webhook_payload = { "event": event, "timestamp": datetime.utcnow().isoformat(), "data": payload } async with httpx.AsyncClient(timeout=10.0) as client: for url in urls: try: await client.post(url, json=webhook_payload) except Exception as e: print(f"Webhook failed for {url}: {e}")