jira-ai-fixer/workers/analyzer.py

209 lines
7.0 KiB
Python

"""
Issue Analyzer Worker - Background processing of JIRA issues.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Optional
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IssueAnalyzer:
"""
Background worker that processes JIRA issues from the queue.
Flow:
1. Poll Redis queue for new issues
2. Fetch issue details from JIRA
3. Search for relevant code in vector DB
4. Send to LLM for analysis
5. Generate fix proposal
6. Create PR if confidence is high enough
7. Post analysis back to JIRA
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
queue_name: str = "issues:pending",
):
self.redis_url = redis_url
self.queue_name = queue_name
self.redis: Optional[redis.Redis] = None
self.running = False
async def connect(self):
"""Connect to Redis."""
self.redis = redis.from_url(self.redis_url)
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
async def disconnect(self):
"""Disconnect from Redis."""
if self.redis:
await self.redis.close()
async def run(self):
"""Main worker loop."""
self.running = True
await self.connect()
logger.info("🚀 Issue Analyzer worker started")
while self.running:
try:
# Block waiting for new items (timeout 5s)
result = await self.redis.blpop(self.queue_name, timeout=5)
if result:
_, data = result
issue_data = json.loads(data)
await self.process_issue(issue_data)
except Exception as e:
logger.error(f"❌ Error in worker loop: {e}")
await asyncio.sleep(5) # Back off on error
await self.disconnect()
logger.info("👋 Issue Analyzer worker stopped")
async def process_issue(self, issue_data: dict):
"""
Process a single issue.
Args:
issue_data: Dict with issue key and metadata
"""
issue_key = issue_data.get("key")
logger.info(f"📋 Processing issue: {issue_key}")
try:
# Update status
await self.update_status(issue_key, "analyzing")
# 1. Fetch full issue details from JIRA
issue_details = await self.fetch_issue_details(issue_key)
# 2. Extract relevant context
description = issue_details.get("fields", {}).get("description", "")
summary = issue_details.get("fields", {}).get("summary", "")
# 3. Search for relevant code
code_context = await self.search_relevant_code(summary, description)
# 4. Get business rules for the affected module
module = await self.identify_module(summary, description)
business_rules = await self.get_module_rules(module)
# 5. Send to LLM for analysis
analysis = await self.analyze_with_llm(
issue_description=f"{summary}\n\n{description}",
code_context=code_context,
business_rules=business_rules,
)
# 6. Store analysis result
await self.store_analysis(issue_key, analysis)
# 7. If confidence is high, create PR
if analysis.get("confidence", 0) >= 0.75:
pr_url = await self.create_pull_request(issue_key, analysis)
analysis["pr_url"] = pr_url
# 8. Post comment to JIRA
await self.post_jira_comment(issue_key, analysis)
# Update status
status = "pr_created" if analysis.get("pr_url") else "analyzed"
await self.update_status(issue_key, status)
logger.info(f"✅ Completed analysis: {issue_key} (confidence: {analysis.get('confidence', 0):.0%})")
except Exception as e:
logger.error(f"❌ Failed to process {issue_key}: {e}")
await self.update_status(issue_key, "failed", str(e))
async def fetch_issue_details(self, issue_key: str) -> dict:
"""Fetch issue details from JIRA."""
# TODO: Implement JIRA client call
logger.info(f"🔍 Fetching issue details: {issue_key}")
return {}
async def search_relevant_code(self, summary: str, description: str) -> str:
"""Search vector DB for relevant code."""
# TODO: Implement vector search
logger.info("🔍 Searching for relevant code...")
return ""
async def identify_module(self, summary: str, description: str) -> Optional[str]:
"""Identify which business module the issue relates to."""
# TODO: Implement module identification
return None
async def get_module_rules(self, module: Optional[str]) -> str:
"""Get business rules for a module."""
# TODO: Load from database
return ""
async def analyze_with_llm(
self,
issue_description: str,
code_context: str,
business_rules: str,
) -> dict:
"""Send to LLM for analysis."""
# TODO: Implement LLM service call
logger.info("🤖 Analyzing with LLM...")
return {
"root_cause": "Analysis pending",
"affected_files": [],
"proposed_fix": "",
"confidence": 0.0,
"explanation": "",
}
async def store_analysis(self, issue_key: str, analysis: dict):
"""Store analysis result in database."""
# TODO: Implement database storage
logger.info(f"💾 Storing analysis for {issue_key}")
async def create_pull_request(self, issue_key: str, analysis: dict) -> Optional[str]:
"""Create a pull request with the proposed fix."""
# TODO: Implement Bitbucket PR creation
logger.info(f"📝 Creating PR for {issue_key}")
return None
async def post_jira_comment(self, issue_key: str, analysis: dict):
"""Post analysis as a comment on the JIRA issue."""
# TODO: Implement JIRA comment
logger.info(f"💬 Posting comment to {issue_key}")
async def update_status(self, issue_key: str, status: str, error: str = None):
"""Update issue status in database and Redis."""
# TODO: Implement status update
logger.info(f"📊 Status update: {issue_key} -> {status}")
def stop(self):
"""Signal worker to stop."""
self.running = False
async def main():
"""Entry point for the worker."""
worker = IssueAnalyzer(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
)
try:
await worker.run()
except KeyboardInterrupt:
worker.stop()
if __name__ == "__main__":
asyncio.run(main())