"""Webhook endpoints for external integrations.""" from typing import Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, Header from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.database import get_db from app.models.organization import Organization from app.models.integration import Integration, IntegrationType, IntegrationStatus from app.models.issue import Issue, IssueStatus, IssuePriority import hmac import hashlib router = APIRouter() def verify_signature(payload: bytes, signature: str, secret: str) -> bool: """Verify webhook signature.""" if not secret or not signature: return True # Skip verification if no secret configured expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) async def process_webhook( org_id: int, integration_type: IntegrationType, payload: dict, background_tasks: BackgroundTasks, db: AsyncSession ) -> dict: """Process incoming webhook and create issue.""" # Find integration result = await db.execute( select(Integration) .where(Integration.organization_id == org_id) .where(Integration.type == integration_type) .where(Integration.status == IntegrationStatus.ACTIVE) ) integration = result.scalar_one_or_none() if not integration: return {"status": "ignored", "message": "No active integration found"} # Update integration stats integration.issues_processed = (integration.issues_processed or 0) + 1 integration.last_sync_at = datetime.utcnow() # Normalize payload based on type issue_data = normalize_payload(integration_type, payload) if not issue_data: return {"status": "ignored", "message": "Event not processed"} # Create issue issue = Issue( organization_id=org_id, integration_id=integration.id, external_id=issue_data.get("external_id"), external_key=issue_data.get("external_key"), external_url=issue_data.get("external_url"), source=integration_type.value, title=issue_data.get("title"), description=issue_data.get("description"), priority=IssuePriority(issue_data.get("priority", "medium")), labels=issue_data.get("labels"), callback_url=issue_data.get("callback_url") or integration.callback_url, raw_payload=payload ) db.add(issue) await db.flush() # Queue analysis if auto_analyze enabled if integration.auto_analyze: from app.api.issues import run_analysis from app.core.config import settings background_tasks.add_task( run_analysis, issue.id, settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://") ) return {"status": "accepted", "issue_id": issue.id} def normalize_payload(integration_type: IntegrationType, payload: dict) -> Optional[dict]: """Normalize webhook payload to common format.""" if integration_type == IntegrationType.JIRA_CLOUD: event = payload.get("webhookEvent", "") if "issue_created" not in event: return None issue = payload.get("issue", {}) fields = issue.get("fields", {}) return { "external_id": str(issue.get("id")), "external_key": issue.get("key"), "external_url": f"{payload.get('issue', {}).get('self', '').split('/rest/')[0]}/browse/{issue.get('key')}", "title": fields.get("summary"), "description": fields.get("description"), "priority": normalize_priority(fields.get("priority", {}).get("name")), "labels": fields.get("labels", []) } elif integration_type == IntegrationType.SERVICENOW: return { "external_id": payload.get("sys_id"), "external_key": payload.get("number"), "external_url": payload.get("url"), "title": payload.get("short_description"), "description": payload.get("description"), "priority": normalize_priority(payload.get("priority")), "callback_url": payload.get("callback_url") } elif integration_type == IntegrationType.ZENDESK: ticket = payload.get("ticket", payload) return { "external_id": str(ticket.get("id")), "external_key": f"ZD-{ticket.get('id')}", "external_url": ticket.get("url"), "title": ticket.get("subject"), "description": ticket.get("description"), "priority": normalize_priority(ticket.get("priority")), "labels": ticket.get("tags", []) } elif integration_type == IntegrationType.GITHUB: action = payload.get("action") if action != "opened": return None issue = payload.get("issue", {}) return { "external_id": str(issue.get("id")), "external_key": f"GH-{issue.get('number')}", "external_url": issue.get("html_url"), "title": issue.get("title"), "description": issue.get("body"), "priority": "medium", "labels": [l.get("name") for l in issue.get("labels", [])] } elif integration_type == IntegrationType.GITLAB: event = payload.get("object_kind") if event != "issue": return None attrs = payload.get("object_attributes", {}) if attrs.get("action") != "open": return None return { "external_id": str(attrs.get("id")), "external_key": f"GL-{attrs.get('iid')}", "external_url": attrs.get("url"), "title": attrs.get("title"), "description": attrs.get("description"), "priority": "medium", "labels": payload.get("labels", []) } elif integration_type == IntegrationType.TICKETHUB: event = payload.get("event", "") if "created" not in event: return None data = payload.get("data", payload) return { "external_id": str(data.get("id")), "external_key": data.get("key"), "external_url": f"https://tickethub.startdata.com.br/tickets/{data.get('id')}", "title": data.get("title"), "description": data.get("description"), "priority": normalize_priority(data.get("priority")), "labels": data.get("labels", []) } return None def normalize_priority(priority: Optional[str]) -> str: """Normalize priority to standard values.""" if not priority: return "medium" priority = str(priority).lower() if priority in ("1", "critical", "highest", "urgent"): return "critical" elif priority in ("2", "high"): return "high" elif priority in ("3", "medium", "normal"): return "medium" else: return "low" # Webhook endpoints for each integration type @router.post("/{org_id}/jira") async def webhook_jira( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): payload = await request.json() return await process_webhook(org_id, IntegrationType.JIRA_CLOUD, payload, background_tasks, db) @router.post("/{org_id}/servicenow") async def webhook_servicenow( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): payload = await request.json() return await process_webhook(org_id, IntegrationType.SERVICENOW, payload, background_tasks, db) @router.post("/{org_id}/zendesk") async def webhook_zendesk( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): payload = await request.json() return await process_webhook(org_id, IntegrationType.ZENDESK, payload, background_tasks, db) @router.post("/{org_id}/github") async def webhook_github( org_id: int, request: Request, background_tasks: BackgroundTasks, x_github_event: Optional[str] = Header(None), db: AsyncSession = Depends(get_db) ): payload = await request.json() if x_github_event != "issues": return {"status": "ignored", "message": "Not an issues event"} return await process_webhook(org_id, IntegrationType.GITHUB, payload, background_tasks, db) @router.post("/{org_id}/gitlab") async def webhook_gitlab( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): payload = await request.json() return await process_webhook(org_id, IntegrationType.GITLAB, payload, background_tasks, db) @router.post("/{org_id}/tickethub") async def webhook_tickethub( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): payload = await request.json() return await process_webhook(org_id, IntegrationType.TICKETHUB, payload, background_tasks, db) @router.post("/{org_id}/generic") async def webhook_generic( org_id: int, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): """Generic webhook for custom integrations.""" payload = await request.json() # Direct mapping issue = Issue( organization_id=org_id, external_id=str(payload.get("id")), external_key=payload.get("key"), external_url=payload.get("url"), source=payload.get("source", "generic"), title=payload.get("title"), description=payload.get("description"), priority=IssuePriority(normalize_priority(payload.get("priority"))), labels=payload.get("labels"), callback_url=payload.get("callback_url"), raw_payload=payload ) db.add(issue) await db.flush() from app.api.issues import run_analysis from app.core.config import settings background_tasks.add_task( run_analysis, issue.id, settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://") ) return {"status": "accepted", "issue_id": issue.id}