""" Issue Analyzer Worker - Background processing of JIRA issues. """ import asyncio import json import logging import os from datetime import datetime from typing import Optional import redis.asyncio as redis logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class IssueAnalyzer: """ Background worker that processes JIRA issues from the queue. Flow: 1. Poll Redis queue for new issues 2. Fetch issue details from JIRA 3. Search for relevant code in vector DB 4. Send to LLM for analysis 5. Generate fix proposal 6. Create PR if confidence is high enough 7. Post analysis back to JIRA """ def __init__( self, redis_url: str = "redis://localhost:6379", queue_name: str = "issues:pending", ): self.redis_url = redis_url self.queue_name = queue_name self.redis: Optional[redis.Redis] = None self.running = False async def connect(self): """Connect to Redis.""" self.redis = redis.from_url(self.redis_url) logger.info(f"🔌 Connected to Redis: {self.redis_url}") async def disconnect(self): """Disconnect from Redis.""" if self.redis: await self.redis.close() async def run(self): """Main worker loop.""" self.running = True await self.connect() logger.info("🚀 Issue Analyzer worker started") while self.running: try: # Block waiting for new items (timeout 5s) result = await self.redis.blpop(self.queue_name, timeout=5) if result: _, data = result issue_data = json.loads(data) await self.process_issue(issue_data) except Exception as e: logger.error(f"❌ Error in worker loop: {e}") await asyncio.sleep(5) # Back off on error await self.disconnect() logger.info("👋 Issue Analyzer worker stopped") async def process_issue(self, issue_data: dict): """ Process a single issue. Args: issue_data: Dict with issue key and metadata """ issue_key = issue_data.get("key") logger.info(f"📋 Processing issue: {issue_key}") try: # Update status await self.update_status(issue_key, "analyzing") # 1. Fetch full issue details from JIRA issue_details = await self.fetch_issue_details(issue_key) # 2. Extract relevant context description = issue_details.get("fields", {}).get("description", "") summary = issue_details.get("fields", {}).get("summary", "") # 3. Search for relevant code code_context = await self.search_relevant_code(summary, description) # 4. Get business rules for the affected module module = await self.identify_module(summary, description) business_rules = await self.get_module_rules(module) # 5. Send to LLM for analysis analysis = await self.analyze_with_llm( issue_description=f"{summary}\n\n{description}", code_context=code_context, business_rules=business_rules, ) # 6. Store analysis result await self.store_analysis(issue_key, analysis) # 7. If confidence is high, create PR if analysis.get("confidence", 0) >= 0.75: pr_url = await self.create_pull_request(issue_key, analysis) analysis["pr_url"] = pr_url # 8. Post comment to JIRA await self.post_jira_comment(issue_key, analysis) # Update status status = "pr_created" if analysis.get("pr_url") else "analyzed" await self.update_status(issue_key, status) logger.info(f"✅ Completed analysis: {issue_key} (confidence: {analysis.get('confidence', 0):.0%})") except Exception as e: logger.error(f"❌ Failed to process {issue_key}: {e}") await self.update_status(issue_key, "failed", str(e)) async def fetch_issue_details(self, issue_key: str) -> dict: """Fetch issue details from JIRA.""" # TODO: Implement JIRA client call logger.info(f"🔍 Fetching issue details: {issue_key}") return {} async def search_relevant_code(self, summary: str, description: str) -> str: """Search vector DB for relevant code.""" # TODO: Implement vector search logger.info("🔍 Searching for relevant code...") return "" async def identify_module(self, summary: str, description: str) -> Optional[str]: """Identify which business module the issue relates to.""" # TODO: Implement module identification return None async def get_module_rules(self, module: Optional[str]) -> str: """Get business rules for a module.""" # TODO: Load from database return "" async def analyze_with_llm( self, issue_description: str, code_context: str, business_rules: str, ) -> dict: """Send to LLM for analysis.""" # TODO: Implement LLM service call logger.info("🤖 Analyzing with LLM...") return { "root_cause": "Analysis pending", "affected_files": [], "proposed_fix": "", "confidence": 0.0, "explanation": "", } async def store_analysis(self, issue_key: str, analysis: dict): """Store analysis result in database.""" # TODO: Implement database storage logger.info(f"💾 Storing analysis for {issue_key}") async def create_pull_request(self, issue_key: str, analysis: dict) -> Optional[str]: """Create a pull request with the proposed fix.""" # TODO: Implement Bitbucket PR creation logger.info(f"📝 Creating PR for {issue_key}") return None async def post_jira_comment(self, issue_key: str, analysis: dict): """Post analysis as a comment on the JIRA issue.""" # TODO: Implement JIRA comment logger.info(f"💬 Posting comment to {issue_key}") async def update_status(self, issue_key: str, status: str, error: str = None): """Update issue status in database and Redis.""" # TODO: Implement status update logger.info(f"📊 Status update: {issue_key} -> {status}") def stop(self): """Signal worker to stop.""" self.running = False async def main(): """Entry point for the worker.""" worker = IssueAnalyzer( redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"), ) try: await worker.run() except KeyboardInterrupt: worker.stop() if __name__ == "__main__": asyncio.run(main())