314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""Webhook endpoints for external integrations."""
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, Header
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from app.core.database import get_db
|
|
from app.models.organization import Organization
|
|
from app.models.integration import Integration, IntegrationType, IntegrationStatus
|
|
from app.models.issue import Issue, IssueStatus, IssuePriority
|
|
import hmac
|
|
import hashlib
|
|
|
|
router = APIRouter()
|
|
|
|
def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
|
|
"""Verify webhook signature."""
|
|
if not secret or not signature:
|
|
return True # Skip verification if no secret configured
|
|
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
|
|
return hmac.compare_digest(f"sha256={expected}", signature)
|
|
|
|
async def process_webhook(
|
|
org_id: int,
|
|
integration_type: IntegrationType,
|
|
payload: dict,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession
|
|
) -> dict:
|
|
"""Process incoming webhook and create issue."""
|
|
# Find integration
|
|
result = await db.execute(
|
|
select(Integration)
|
|
.where(Integration.organization_id == org_id)
|
|
.where(Integration.type == integration_type)
|
|
.where(Integration.status == IntegrationStatus.ACTIVE)
|
|
)
|
|
integration = result.scalar_one_or_none()
|
|
|
|
if not integration:
|
|
return {"status": "ignored", "message": "No active integration found"}
|
|
|
|
# Update integration stats
|
|
integration.issues_processed = (integration.issues_processed or 0) + 1
|
|
integration.last_sync_at = datetime.utcnow()
|
|
|
|
# Normalize payload based on type
|
|
issue_data = normalize_payload(integration_type, payload)
|
|
if not issue_data:
|
|
return {"status": "ignored", "message": "Event not processed"}
|
|
|
|
# Create issue
|
|
issue = Issue(
|
|
organization_id=org_id,
|
|
integration_id=integration.id,
|
|
external_id=issue_data.get("external_id"),
|
|
external_key=issue_data.get("external_key"),
|
|
external_url=issue_data.get("external_url"),
|
|
source=integration_type.value,
|
|
title=issue_data.get("title"),
|
|
description=issue_data.get("description"),
|
|
priority=IssuePriority(issue_data.get("priority", "medium")),
|
|
labels=issue_data.get("labels"),
|
|
callback_url=issue_data.get("callback_url") or integration.callback_url,
|
|
raw_payload=payload
|
|
)
|
|
db.add(issue)
|
|
await db.flush()
|
|
|
|
# Queue analysis if auto_analyze enabled
|
|
if integration.auto_analyze:
|
|
from app.api.issues import run_analysis
|
|
from app.core.config import settings
|
|
background_tasks.add_task(
|
|
run_analysis,
|
|
issue.id,
|
|
settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
|
|
)
|
|
|
|
return {"status": "accepted", "issue_id": issue.id}
|
|
|
|
def normalize_payload(integration_type: IntegrationType, payload: dict) -> Optional[dict]:
|
|
"""Normalize webhook payload to common format."""
|
|
|
|
if integration_type == IntegrationType.JIRA_CLOUD:
|
|
event = payload.get("webhookEvent", "")
|
|
if "issue_created" not in event:
|
|
return None
|
|
issue = payload.get("issue", {})
|
|
fields = issue.get("fields", {})
|
|
return {
|
|
"external_id": str(issue.get("id")),
|
|
"external_key": issue.get("key"),
|
|
"external_url": f"{payload.get('issue', {}).get('self', '').split('/rest/')[0]}/browse/{issue.get('key')}",
|
|
"title": fields.get("summary"),
|
|
"description": fields.get("description"),
|
|
"priority": normalize_priority(fields.get("priority", {}).get("name")),
|
|
"labels": fields.get("labels", [])
|
|
}
|
|
|
|
elif integration_type == IntegrationType.SERVICENOW:
|
|
return {
|
|
"external_id": payload.get("sys_id"),
|
|
"external_key": payload.get("number"),
|
|
"external_url": payload.get("url"),
|
|
"title": payload.get("short_description"),
|
|
"description": payload.get("description"),
|
|
"priority": normalize_priority(payload.get("priority")),
|
|
"callback_url": payload.get("callback_url")
|
|
}
|
|
|
|
elif integration_type == IntegrationType.ZENDESK:
|
|
ticket = payload.get("ticket", payload)
|
|
return {
|
|
"external_id": str(ticket.get("id")),
|
|
"external_key": f"ZD-{ticket.get('id')}",
|
|
"external_url": ticket.get("url"),
|
|
"title": ticket.get("subject"),
|
|
"description": ticket.get("description"),
|
|
"priority": normalize_priority(ticket.get("priority")),
|
|
"labels": ticket.get("tags", [])
|
|
}
|
|
|
|
elif integration_type == IntegrationType.GITHUB:
|
|
action = payload.get("action")
|
|
if action != "opened":
|
|
return None
|
|
issue = payload.get("issue", {})
|
|
return {
|
|
"external_id": str(issue.get("id")),
|
|
"external_key": f"GH-{issue.get('number')}",
|
|
"external_url": issue.get("html_url"),
|
|
"title": issue.get("title"),
|
|
"description": issue.get("body"),
|
|
"priority": "medium",
|
|
"labels": [l.get("name") for l in issue.get("labels", [])]
|
|
}
|
|
|
|
elif integration_type == IntegrationType.GITLAB:
|
|
event = payload.get("object_kind")
|
|
if event != "issue":
|
|
return None
|
|
attrs = payload.get("object_attributes", {})
|
|
if attrs.get("action") != "open":
|
|
return None
|
|
return {
|
|
"external_id": str(attrs.get("id")),
|
|
"external_key": f"GL-{attrs.get('iid')}",
|
|
"external_url": attrs.get("url"),
|
|
"title": attrs.get("title"),
|
|
"description": attrs.get("description"),
|
|
"priority": "medium",
|
|
"labels": payload.get("labels", [])
|
|
}
|
|
|
|
elif integration_type == IntegrationType.TICKETHUB:
|
|
event = payload.get("event", "")
|
|
if "created" not in event:
|
|
return None
|
|
data = payload.get("data", payload)
|
|
return {
|
|
"external_id": str(data.get("id")),
|
|
"external_key": data.get("key"),
|
|
"external_url": f"https://tickethub.startdata.com.br/tickets/{data.get('id')}",
|
|
"title": data.get("title"),
|
|
"description": data.get("description"),
|
|
"priority": normalize_priority(data.get("priority")),
|
|
"labels": data.get("labels", [])
|
|
}
|
|
|
|
elif integration_type == IntegrationType.GITEA:
|
|
action = payload.get("action")
|
|
if action != "opened":
|
|
return None
|
|
issue = payload.get("issue", {})
|
|
repo = payload.get("repository", {})
|
|
return {
|
|
"external_id": str(issue.get("id")),
|
|
"external_key": f"GITEA-{issue.get('number')}",
|
|
"external_url": issue.get("html_url"),
|
|
"title": issue.get("title"),
|
|
"description": issue.get("body"),
|
|
"priority": "medium",
|
|
"labels": [l.get("name") for l in issue.get("labels", [])],
|
|
"repo": repo.get("full_name")
|
|
}
|
|
|
|
return None
|
|
|
|
def normalize_priority(priority: Optional[str]) -> str:
|
|
"""Normalize priority to standard values."""
|
|
if not priority:
|
|
return "medium"
|
|
priority = str(priority).lower()
|
|
if priority in ("1", "critical", "highest", "urgent"):
|
|
return "critical"
|
|
elif priority in ("2", "high"):
|
|
return "high"
|
|
elif priority in ("3", "medium", "normal"):
|
|
return "medium"
|
|
else:
|
|
return "low"
|
|
|
|
# Webhook endpoints for each integration type
|
|
@router.post("/{org_id}/jira")
|
|
async def webhook_jira(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.JIRA_CLOUD, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/servicenow")
|
|
async def webhook_servicenow(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.SERVICENOW, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/zendesk")
|
|
async def webhook_zendesk(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.ZENDESK, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/github")
|
|
async def webhook_github(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
x_github_event: Optional[str] = Header(None),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
if x_github_event != "issues":
|
|
return {"status": "ignored", "message": "Not an issues event"}
|
|
return await process_webhook(org_id, IntegrationType.GITHUB, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/gitlab")
|
|
async def webhook_gitlab(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.GITLAB, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/tickethub")
|
|
async def webhook_tickethub(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.TICKETHUB, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/gitea")
|
|
async def webhook_gitea(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
payload = await request.json()
|
|
return await process_webhook(org_id, IntegrationType.GITEA, payload, background_tasks, db)
|
|
|
|
@router.post("/{org_id}/generic")
|
|
async def webhook_generic(
|
|
org_id: int,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Generic webhook for custom integrations."""
|
|
payload = await request.json()
|
|
|
|
# Direct mapping
|
|
issue = Issue(
|
|
organization_id=org_id,
|
|
external_id=str(payload.get("id")),
|
|
external_key=payload.get("key"),
|
|
external_url=payload.get("url"),
|
|
source=payload.get("source", "generic"),
|
|
title=payload.get("title"),
|
|
description=payload.get("description"),
|
|
priority=IssuePriority(normalize_priority(payload.get("priority"))),
|
|
labels=payload.get("labels"),
|
|
callback_url=payload.get("callback_url"),
|
|
raw_payload=payload
|
|
)
|
|
db.add(issue)
|
|
await db.flush()
|
|
|
|
from app.api.issues import run_analysis
|
|
from app.core.config import settings
|
|
background_tasks.add_task(
|
|
run_analysis,
|
|
issue.id,
|
|
settings.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
|
|
)
|
|
|
|
return {"status": "accepted", "issue_id": issue.id}
|