209 lines
7.0 KiB
Python
209 lines
7.0 KiB
Python
"""
|
|
Issue Analyzer Worker - Background processing of JIRA issues.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import redis.asyncio as redis
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IssueAnalyzer:
|
|
"""
|
|
Background worker that processes JIRA issues from the queue.
|
|
|
|
Flow:
|
|
1. Poll Redis queue for new issues
|
|
2. Fetch issue details from JIRA
|
|
3. Search for relevant code in vector DB
|
|
4. Send to LLM for analysis
|
|
5. Generate fix proposal
|
|
6. Create PR if confidence is high enough
|
|
7. Post analysis back to JIRA
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
redis_url: str = "redis://localhost:6379",
|
|
queue_name: str = "issues:pending",
|
|
):
|
|
self.redis_url = redis_url
|
|
self.queue_name = queue_name
|
|
self.redis: Optional[redis.Redis] = None
|
|
self.running = False
|
|
|
|
async def connect(self):
|
|
"""Connect to Redis."""
|
|
self.redis = redis.from_url(self.redis_url)
|
|
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from Redis."""
|
|
if self.redis:
|
|
await self.redis.close()
|
|
|
|
async def run(self):
|
|
"""Main worker loop."""
|
|
self.running = True
|
|
await self.connect()
|
|
|
|
logger.info("🚀 Issue Analyzer worker started")
|
|
|
|
while self.running:
|
|
try:
|
|
# Block waiting for new items (timeout 5s)
|
|
result = await self.redis.blpop(self.queue_name, timeout=5)
|
|
|
|
if result:
|
|
_, data = result
|
|
issue_data = json.loads(data)
|
|
await self.process_issue(issue_data)
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in worker loop: {e}")
|
|
await asyncio.sleep(5) # Back off on error
|
|
|
|
await self.disconnect()
|
|
logger.info("👋 Issue Analyzer worker stopped")
|
|
|
|
async def process_issue(self, issue_data: dict):
|
|
"""
|
|
Process a single issue.
|
|
|
|
Args:
|
|
issue_data: Dict with issue key and metadata
|
|
"""
|
|
issue_key = issue_data.get("key")
|
|
logger.info(f"📋 Processing issue: {issue_key}")
|
|
|
|
try:
|
|
# Update status
|
|
await self.update_status(issue_key, "analyzing")
|
|
|
|
# 1. Fetch full issue details from JIRA
|
|
issue_details = await self.fetch_issue_details(issue_key)
|
|
|
|
# 2. Extract relevant context
|
|
description = issue_details.get("fields", {}).get("description", "")
|
|
summary = issue_details.get("fields", {}).get("summary", "")
|
|
|
|
# 3. Search for relevant code
|
|
code_context = await self.search_relevant_code(summary, description)
|
|
|
|
# 4. Get business rules for the affected module
|
|
module = await self.identify_module(summary, description)
|
|
business_rules = await self.get_module_rules(module)
|
|
|
|
# 5. Send to LLM for analysis
|
|
analysis = await self.analyze_with_llm(
|
|
issue_description=f"{summary}\n\n{description}",
|
|
code_context=code_context,
|
|
business_rules=business_rules,
|
|
)
|
|
|
|
# 6. Store analysis result
|
|
await self.store_analysis(issue_key, analysis)
|
|
|
|
# 7. If confidence is high, create PR
|
|
if analysis.get("confidence", 0) >= 0.75:
|
|
pr_url = await self.create_pull_request(issue_key, analysis)
|
|
analysis["pr_url"] = pr_url
|
|
|
|
# 8. Post comment to JIRA
|
|
await self.post_jira_comment(issue_key, analysis)
|
|
|
|
# Update status
|
|
status = "pr_created" if analysis.get("pr_url") else "analyzed"
|
|
await self.update_status(issue_key, status)
|
|
|
|
logger.info(f"✅ Completed analysis: {issue_key} (confidence: {analysis.get('confidence', 0):.0%})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to process {issue_key}: {e}")
|
|
await self.update_status(issue_key, "failed", str(e))
|
|
|
|
async def fetch_issue_details(self, issue_key: str) -> dict:
|
|
"""Fetch issue details from JIRA."""
|
|
# TODO: Implement JIRA client call
|
|
logger.info(f"🔍 Fetching issue details: {issue_key}")
|
|
return {}
|
|
|
|
async def search_relevant_code(self, summary: str, description: str) -> str:
|
|
"""Search vector DB for relevant code."""
|
|
# TODO: Implement vector search
|
|
logger.info("🔍 Searching for relevant code...")
|
|
return ""
|
|
|
|
async def identify_module(self, summary: str, description: str) -> Optional[str]:
|
|
"""Identify which business module the issue relates to."""
|
|
# TODO: Implement module identification
|
|
return None
|
|
|
|
async def get_module_rules(self, module: Optional[str]) -> str:
|
|
"""Get business rules for a module."""
|
|
# TODO: Load from database
|
|
return ""
|
|
|
|
async def analyze_with_llm(
|
|
self,
|
|
issue_description: str,
|
|
code_context: str,
|
|
business_rules: str,
|
|
) -> dict:
|
|
"""Send to LLM for analysis."""
|
|
# TODO: Implement LLM service call
|
|
logger.info("🤖 Analyzing with LLM...")
|
|
return {
|
|
"root_cause": "Analysis pending",
|
|
"affected_files": [],
|
|
"proposed_fix": "",
|
|
"confidence": 0.0,
|
|
"explanation": "",
|
|
}
|
|
|
|
async def store_analysis(self, issue_key: str, analysis: dict):
|
|
"""Store analysis result in database."""
|
|
# TODO: Implement database storage
|
|
logger.info(f"💾 Storing analysis for {issue_key}")
|
|
|
|
async def create_pull_request(self, issue_key: str, analysis: dict) -> Optional[str]:
|
|
"""Create a pull request with the proposed fix."""
|
|
# TODO: Implement Bitbucket PR creation
|
|
logger.info(f"📝 Creating PR for {issue_key}")
|
|
return None
|
|
|
|
async def post_jira_comment(self, issue_key: str, analysis: dict):
|
|
"""Post analysis as a comment on the JIRA issue."""
|
|
# TODO: Implement JIRA comment
|
|
logger.info(f"💬 Posting comment to {issue_key}")
|
|
|
|
async def update_status(self, issue_key: str, status: str, error: str = None):
|
|
"""Update issue status in database and Redis."""
|
|
# TODO: Implement status update
|
|
logger.info(f"📊 Status update: {issue_key} -> {status}")
|
|
|
|
def stop(self):
|
|
"""Signal worker to stop."""
|
|
self.running = False
|
|
|
|
|
|
async def main():
|
|
"""Entry point for the worker."""
|
|
worker = IssueAnalyzer(
|
|
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
|
|
)
|
|
|
|
try:
|
|
await worker.run()
|
|
except KeyboardInterrupt:
|
|
worker.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|