jira-ai-fixer/app/api/webhooks.py

314 lines
11 KiB
Python

"""Webhook endpoints for external integrations."""
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.models.organization import Organization
from app.models.integration import Integration, IntegrationType, IntegrationStatus
from app.models.issue import Issue, IssueStatus, IssuePriority
import hmac
import hashlib
router = APIRouter()
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify webhook signature."""
if not secret or not signature:
return True # Skip verification if no secret configured
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
async def process_webhook(
org_id: int,
integration_type: IntegrationType,
payload: dict,
background_tasks: BackgroundTasks,
db: AsyncSession
) -> dict:
"""Process incoming webhook and create issue."""
# Find integration
result = await db.execute(
select(Integration)
.where(Integration.organization_id == org_id)
.where(Integration.type == integration_type)
.where(Integration.status == IntegrationStatus.ACTIVE)
)
integration = result.scalar_one_or_none()
if not integration:
return {"status": "ignored", "message": "No active integration found"}
# Update integration stats
integration.issues_processed = (integration.issues_processed or 0) + 1
integration.last_sync_at = datetime.utcnow()
# Normalize payload based on type
issue_data = normalize_payload(integration_type, payload)
if not issue_data:
return {"status": "ignored", "message": "Event not processed"}
# Create issue
issue = Issue(
organization_id=org_id,
integration_id=integration.id,
external_id=issue_data.get("external_id"),
external_key=issue_data.get("external_key"),
external_url=issue_data.get("external_url"),
source=integration_type.value,
title=issue_data.get("title"),
description=issue_data.get("description"),
priority=IssuePriority(issue_data.get("priority", "medium")),
labels=issue_data.get("labels"),
callback_url=issue_data.get("callback_url") or integration.callback_url,
raw_payload=payload
)
db.add(issue)
await db.flush()
# Queue analysis if auto_analyze enabled
if integration.auto_analyze:
from app.api.issues import run_analysis
from app.core.config import settings
background_tasks.add_task(
run_analysis,
issue.id,
settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
)
return {"status": "accepted", "issue_id": issue.id}
def normalize_payload(integration_type: IntegrationType, payload: dict) -> Optional[dict]:
"""Normalize webhook payload to common format."""
if integration_type == IntegrationType.JIRA_CLOUD:
event = payload.get("webhookEvent", "")
if "issue_created" not in event:
return None
issue = payload.get("issue", {})
fields = issue.get("fields", {})
return {
"external_id": str(issue.get("id")),
"external_key": issue.get("key"),
"external_url": f"{payload.get('issue', {}).get('self', '').split('/rest/')[0]}/browse/{issue.get('key')}",
"title": fields.get("summary"),
"description": fields.get("description"),
"priority": normalize_priority(fields.get("priority", {}).get("name")),
"labels": fields.get("labels", [])
}
elif integration_type == IntegrationType.SERVICENOW:
return {
"external_id": payload.get("sys_id"),
"external_key": payload.get("number"),
"external_url": payload.get("url"),
"title": payload.get("short_description"),
"description": payload.get("description"),
"priority": normalize_priority(payload.get("priority")),
"callback_url": payload.get("callback_url")
}
elif integration_type == IntegrationType.ZENDESK:
ticket = payload.get("ticket", payload)
return {
"external_id": str(ticket.get("id")),
"external_key": f"ZD-{ticket.get('id')}",
"external_url": ticket.get("url"),
"title": ticket.get("subject"),
"description": ticket.get("description"),
"priority": normalize_priority(ticket.get("priority")),
"labels": ticket.get("tags", [])
}
elif integration_type == IntegrationType.GITHUB:
action = payload.get("action")
if action != "opened":
return None
issue = payload.get("issue", {})
return {
"external_id": str(issue.get("id")),
"external_key": f"GH-{issue.get('number')}",
"external_url": issue.get("html_url"),
"title": issue.get("title"),
"description": issue.get("body"),
"priority": "medium",
"labels": [l.get("name") for l in issue.get("labels", [])]
}
elif integration_type == IntegrationType.GITLAB:
event = payload.get("object_kind")
if event != "issue":
return None
attrs = payload.get("object_attributes", {})
if attrs.get("action") != "open":
return None
return {
"external_id": str(attrs.get("id")),
"external_key": f"GL-{attrs.get('iid')}",
"external_url": attrs.get("url"),
"title": attrs.get("title"),
"description": attrs.get("description"),
"priority": "medium",
"labels": payload.get("labels", [])
}
elif integration_type == IntegrationType.TICKETHUB:
event = payload.get("event", "")
if "created" not in event:
return None
data = payload.get("data", payload)
return {
"external_id": str(data.get("id")),
"external_key": data.get("key"),
"external_url": f"https://tickethub.startdata.com.br/tickets/{data.get('id')}",
"title": data.get("title"),
"description": data.get("description"),
"priority": normalize_priority(data.get("priority")),
"labels": data.get("labels", [])
}
elif integration_type == IntegrationType.GITEA:
action = payload.get("action")
if action != "opened":
return None
issue = payload.get("issue", {})
repo = payload.get("repository", {})
return {
"external_id": str(issue.get("id")),
"external_key": f"GITEA-{issue.get('number')}",
"external_url": issue.get("html_url"),
"title": issue.get("title"),
"description": issue.get("body"),
"priority": "medium",
"labels": [l.get("name") for l in issue.get("labels", [])],
"repo": repo.get("full_name")
}
return None
def normalize_priority(priority: Optional[str]) -> str:
"""Normalize priority to standard values."""
if not priority:
return "medium"
priority = str(priority).lower()
if priority in ("1", "critical", "highest", "urgent"):
return "critical"
elif priority in ("2", "high"):
return "high"
elif priority in ("3", "medium", "normal"):
return "medium"
else:
return "low"
# Webhook endpoints for each integration type
@router.post("/{org_id}/jira")
async def webhook_jira(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.JIRA_CLOUD, payload, background_tasks, db)
@router.post("/{org_id}/servicenow")
async def webhook_servicenow(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.SERVICENOW, payload, background_tasks, db)
@router.post("/{org_id}/zendesk")
async def webhook_zendesk(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.ZENDESK, payload, background_tasks, db)
@router.post("/{org_id}/github")
async def webhook_github(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
x_github_event: Optional[str] = Header(None),
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
if x_github_event != "issues":
return {"status": "ignored", "message": "Not an issues event"}
return await process_webhook(org_id, IntegrationType.GITHUB, payload, background_tasks, db)
@router.post("/{org_id}/gitlab")
async def webhook_gitlab(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.GITLAB, payload, background_tasks, db)
@router.post("/{org_id}/tickethub")
async def webhook_tickethub(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.TICKETHUB, payload, background_tasks, db)
@router.post("/{org_id}/gitea")
async def webhook_gitea(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
payload = await request.json()
return await process_webhook(org_id, IntegrationType.GITEA, payload, background_tasks, db)
@router.post("/{org_id}/generic")
async def webhook_generic(
org_id: int,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Generic webhook for custom integrations."""
payload = await request.json()
# Direct mapping
issue = Issue(
organization_id=org_id,
external_id=str(payload.get("id")),
external_key=payload.get("key"),
external_url=payload.get("url"),
source=payload.get("source", "generic"),
title=payload.get("title"),
description=payload.get("description"),
priority=IssuePriority(normalize_priority(payload.get("priority"))),
labels=payload.get("labels"),
callback_url=payload.get("callback_url"),
raw_payload=payload
)
db.add(issue)
await db.flush()
from app.api.issues import run_analysis
from app.core.config import settings
background_tasks.add_task(
run_analysis,
issue.id,
settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
)
return {"status": "accepted", "issue_id": issue.id}