From 27b72e3ccd94bbd3420af3c0c191dcdad629f1f3 Mon Sep 17 00:00:00 2001 From: Ricel Leite Date: Wed, 18 Feb 2026 14:03:34 -0300 Subject: [PATCH] feat: Background workers - analyzer.py: Issue analysis pipeline (JIRA -> LLM -> PR) - indexer.py: Code indexing pipeline (Bitbucket -> Embeddings -> Qdrant) - Redis queue-based processing - Progress tracking and status updates --- workers/__init__.py | 1 + workers/analyzer.py | 208 +++++++++++++++++++++++++++++++ workers/indexer.py | 261 +++++++++++++++++++++++++++++++++++++++ workers/requirements.txt | 5 + 4 files changed, 475 insertions(+) create mode 100644 workers/__init__.py create mode 100644 workers/analyzer.py create mode 100644 workers/indexer.py create mode 100644 workers/requirements.txt diff --git a/workers/__init__.py b/workers/__init__.py new file mode 100644 index 0000000..b96b624 --- /dev/null +++ b/workers/__init__.py @@ -0,0 +1 @@ +"""Workers package.""" diff --git a/workers/analyzer.py b/workers/analyzer.py new file mode 100644 index 0000000..c58b59e --- /dev/null +++ b/workers/analyzer.py @@ -0,0 +1,208 @@ +""" +Issue Analyzer Worker - Background processing of JIRA issues. +""" +import asyncio +import json +import logging +import os +from datetime import datetime +from typing import Optional + +import redis.asyncio as redis + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class IssueAnalyzer: + """ + Background worker that processes JIRA issues from the queue. + + Flow: + 1. Poll Redis queue for new issues + 2. Fetch issue details from JIRA + 3. Search for relevant code in vector DB + 4. Send to LLM for analysis + 5. Generate fix proposal + 6. Create PR if confidence is high enough + 7. Post analysis back to JIRA + """ + + def __init__( + self, + redis_url: str = "redis://localhost:6379", + queue_name: str = "issues:pending", + ): + self.redis_url = redis_url + self.queue_name = queue_name + self.redis: Optional[redis.Redis] = None + self.running = False + + async def connect(self): + """Connect to Redis.""" + self.redis = redis.from_url(self.redis_url) + logger.info(f"🔌 Connected to Redis: {self.redis_url}") + + async def disconnect(self): + """Disconnect from Redis.""" + if self.redis: + await self.redis.close() + + async def run(self): + """Main worker loop.""" + self.running = True + await self.connect() + + logger.info("🚀 Issue Analyzer worker started") + + while self.running: + try: + # Block waiting for new items (timeout 5s) + result = await self.redis.blpop(self.queue_name, timeout=5) + + if result: + _, data = result + issue_data = json.loads(data) + await self.process_issue(issue_data) + except Exception as e: + logger.error(f"❌ Error in worker loop: {e}") + await asyncio.sleep(5) # Back off on error + + await self.disconnect() + logger.info("👋 Issue Analyzer worker stopped") + + async def process_issue(self, issue_data: dict): + """ + Process a single issue. + + Args: + issue_data: Dict with issue key and metadata + """ + issue_key = issue_data.get("key") + logger.info(f"📋 Processing issue: {issue_key}") + + try: + # Update status + await self.update_status(issue_key, "analyzing") + + # 1. Fetch full issue details from JIRA + issue_details = await self.fetch_issue_details(issue_key) + + # 2. Extract relevant context + description = issue_details.get("fields", {}).get("description", "") + summary = issue_details.get("fields", {}).get("summary", "") + + # 3. Search for relevant code + code_context = await self.search_relevant_code(summary, description) + + # 4. Get business rules for the affected module + module = await self.identify_module(summary, description) + business_rules = await self.get_module_rules(module) + + # 5. Send to LLM for analysis + analysis = await self.analyze_with_llm( + issue_description=f"{summary}\n\n{description}", + code_context=code_context, + business_rules=business_rules, + ) + + # 6. Store analysis result + await self.store_analysis(issue_key, analysis) + + # 7. If confidence is high, create PR + if analysis.get("confidence", 0) >= 0.75: + pr_url = await self.create_pull_request(issue_key, analysis) + analysis["pr_url"] = pr_url + + # 8. Post comment to JIRA + await self.post_jira_comment(issue_key, analysis) + + # Update status + status = "pr_created" if analysis.get("pr_url") else "analyzed" + await self.update_status(issue_key, status) + + logger.info(f"✅ Completed analysis: {issue_key} (confidence: {analysis.get('confidence', 0):.0%})") + + except Exception as e: + logger.error(f"❌ Failed to process {issue_key}: {e}") + await self.update_status(issue_key, "failed", str(e)) + + async def fetch_issue_details(self, issue_key: str) -> dict: + """Fetch issue details from JIRA.""" + # TODO: Implement JIRA client call + logger.info(f"🔍 Fetching issue details: {issue_key}") + return {} + + async def search_relevant_code(self, summary: str, description: str) -> str: + """Search vector DB for relevant code.""" + # TODO: Implement vector search + logger.info("🔍 Searching for relevant code...") + return "" + + async def identify_module(self, summary: str, description: str) -> Optional[str]: + """Identify which business module the issue relates to.""" + # TODO: Implement module identification + return None + + async def get_module_rules(self, module: Optional[str]) -> str: + """Get business rules for a module.""" + # TODO: Load from database + return "" + + async def analyze_with_llm( + self, + issue_description: str, + code_context: str, + business_rules: str, + ) -> dict: + """Send to LLM for analysis.""" + # TODO: Implement LLM service call + logger.info("🤖 Analyzing with LLM...") + return { + "root_cause": "Analysis pending", + "affected_files": [], + "proposed_fix": "", + "confidence": 0.0, + "explanation": "", + } + + async def store_analysis(self, issue_key: str, analysis: dict): + """Store analysis result in database.""" + # TODO: Implement database storage + logger.info(f"💾 Storing analysis for {issue_key}") + + async def create_pull_request(self, issue_key: str, analysis: dict) -> Optional[str]: + """Create a pull request with the proposed fix.""" + # TODO: Implement Bitbucket PR creation + logger.info(f"📝 Creating PR for {issue_key}") + return None + + async def post_jira_comment(self, issue_key: str, analysis: dict): + """Post analysis as a comment on the JIRA issue.""" + # TODO: Implement JIRA comment + logger.info(f"💬 Posting comment to {issue_key}") + + async def update_status(self, issue_key: str, status: str, error: str = None): + """Update issue status in database and Redis.""" + # TODO: Implement status update + logger.info(f"📊 Status update: {issue_key} -> {status}") + + def stop(self): + """Signal worker to stop.""" + self.running = False + + +async def main(): + """Entry point for the worker.""" + worker = IssueAnalyzer( + redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"), + ) + + try: + await worker.run() + except KeyboardInterrupt: + worker.stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workers/indexer.py b/workers/indexer.py new file mode 100644 index 0000000..6b72345 --- /dev/null +++ b/workers/indexer.py @@ -0,0 +1,261 @@ +""" +Code Indexer Worker - Background indexing of Bitbucket repositories. +""" +import asyncio +import logging +import os +from datetime import datetime +from pathlib import Path +from typing import Optional, List, Dict, Any + +import redis.asyncio as redis + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class CodeIndexer: + """ + Background worker that indexes code from Bitbucket repositories. + + Flow: + 1. Poll Redis queue for indexing jobs + 2. Clone/pull repository + 3. Parse COBOL/SQL/JCL files + 4. Generate embeddings + 5. Store in Qdrant vector database + """ + + # File extensions to index + INDEXABLE_EXTENSIONS = { + ".cbl": "cobol", + ".cob": "cobol", + ".cpy": "copybook", + ".sql": "sql", + ".jcl": "jcl", + ".proc": "jcl_proc", + } + + def __init__( + self, + redis_url: str = "redis://localhost:6379", + qdrant_url: str = "http://localhost:6333", + work_dir: str = "/tmp/aci-indexer", + queue_name: str = "indexer:jobs", + ): + self.redis_url = redis_url + self.qdrant_url = qdrant_url + self.work_dir = Path(work_dir) + self.queue_name = queue_name + self.redis: Optional[redis.Redis] = None + self.running = False + + # Create work directory + self.work_dir.mkdir(parents=True, exist_ok=True) + + async def connect(self): + """Connect to Redis.""" + self.redis = redis.from_url(self.redis_url) + logger.info(f"🔌 Connected to Redis: {self.redis_url}") + + async def disconnect(self): + """Disconnect from Redis.""" + if self.redis: + await self.redis.close() + + async def run(self): + """Main worker loop.""" + self.running = True + await self.connect() + + logger.info("🚀 Code Indexer worker started") + + while self.running: + try: + # Block waiting for new jobs (timeout 10s) + result = await self.redis.blpop(self.queue_name, timeout=10) + + if result: + _, data = result + job = eval(data) # Simple deserialization + await self.process_job(job) + except Exception as e: + logger.error(f"❌ Error in worker loop: {e}") + await asyncio.sleep(5) + + await self.disconnect() + logger.info("👋 Code Indexer worker stopped") + + async def process_job(self, job: dict): + """ + Process an indexing job. + + Args: + job: Dict with repository info and options + """ + repo_name = job.get("name") + repo_url = job.get("url") + + logger.info(f"📦 Indexing repository: {repo_name}") + + try: + # 1. Clone or update repository + repo_path = await self.clone_or_update(repo_name, repo_url) + + # 2. Find indexable files + files = self.find_indexable_files(repo_path) + logger.info(f"📁 Found {len(files)} indexable files") + + # 3. Process files in batches + total_chunks = 0 + batch_size = 10 + + for i in range(0, len(files), batch_size): + batch = files[i:i + batch_size] + chunks = await self.process_file_batch(batch, repo_name) + total_chunks += chunks + + # Report progress + progress = min(100, ((i + batch_size) / len(files)) * 100) + await self.report_progress(repo_name, progress) + + # 4. Update repository status + await self.update_repo_status(repo_name, total_chunks) + + logger.info(f"✅ Indexed {repo_name}: {total_chunks} chunks from {len(files)} files") + + except Exception as e: + logger.error(f"❌ Failed to index {repo_name}: {e}") + await self.update_repo_status(repo_name, 0, error=str(e)) + + async def clone_or_update(self, name: str, url: str) -> Path: + """Clone repository or pull latest changes.""" + repo_path = self.work_dir / name + + if repo_path.exists(): + logger.info(f"📥 Pulling latest changes: {name}") + # TODO: Implement git pull + else: + logger.info(f"📥 Cloning repository: {name}") + # TODO: Implement git clone + repo_path.mkdir(parents=True, exist_ok=True) + + return repo_path + + def find_indexable_files(self, repo_path: Path) -> List[Path]: + """Find all files that should be indexed.""" + files = [] + + for ext in self.INDEXABLE_EXTENSIONS: + files.extend(repo_path.rglob(f"*{ext}")) + + return files + + async def process_file_batch(self, files: List[Path], repo_name: str) -> int: + """Process a batch of files and return chunk count.""" + total_chunks = 0 + + for file_path in files: + try: + content = file_path.read_text(encoding="latin-1") + file_type = self.INDEXABLE_EXTENSIONS.get(file_path.suffix.lower()) + + # Parse into chunks + chunks = self.parse_file(content, file_path, file_type) + + # Generate embeddings and store + for chunk in chunks: + await self.index_chunk(chunk, repo_name) + total_chunks += 1 + + except Exception as e: + logger.warning(f"⚠️ Failed to process {file_path}: {e}") + + return total_chunks + + def parse_file(self, content: str, path: Path, file_type: str) -> List[Dict[str, Any]]: + """Parse a file into indexable chunks.""" + chunks = [] + + if file_type in ("cobol", "copybook"): + chunks = self.parse_cobol(content, path) + elif file_type == "sql": + chunks = self.parse_sql(content, path) + elif file_type in ("jcl", "jcl_proc"): + chunks = self.parse_jcl(content, path) + + return chunks + + def parse_cobol(self, content: str, path: Path) -> List[Dict[str, Any]]: + """Parse COBOL program into chunks.""" + # TODO: Implement full COBOL parsing + # For now, create one chunk per file + return [{ + "file_path": str(path), + "content": content[:4000], # Limit size + "type": "cobol", + "start_line": 1, + "end_line": content.count("\n"), + }] + + def parse_sql(self, content: str, path: Path) -> List[Dict[str, Any]]: + """Parse SQL file into chunks.""" + return [{ + "file_path": str(path), + "content": content[:4000], + "type": "sql", + "start_line": 1, + "end_line": content.count("\n"), + }] + + def parse_jcl(self, content: str, path: Path) -> List[Dict[str, Any]]: + """Parse JCL file into chunks.""" + return [{ + "file_path": str(path), + "content": content[:4000], + "type": "jcl", + "start_line": 1, + "end_line": content.count("\n"), + }] + + async def index_chunk(self, chunk: Dict[str, Any], repo_name: str): + """Generate embedding and store chunk in Qdrant.""" + # TODO: Implement embedding generation and Qdrant storage + pass + + async def report_progress(self, repo_name: str, progress: float): + """Report indexing progress.""" + if self.redis: + await self.redis.set(f"indexer:progress:{repo_name}", str(progress)) + + async def update_repo_status( + self, + repo_name: str, + chunk_count: int, + error: str = None, + ): + """Update repository status in database.""" + # TODO: Implement database update + status = "indexed" if not error else "failed" + logger.info(f"📊 Repo status: {repo_name} -> {status} ({chunk_count} chunks)") + + def stop(self): + """Signal worker to stop.""" + self.running = False + + +async def main(): + """Entry point for the worker.""" + worker = CodeIndexer( + redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"), + qdrant_url=os.getenv("QDRANT_URL", "http://localhost:6333"), + ) + + try: + await worker.run() + except KeyboardInterrupt: + worker.stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workers/requirements.txt b/workers/requirements.txt new file mode 100644 index 0000000..5a023b9 --- /dev/null +++ b/workers/requirements.txt @@ -0,0 +1,5 @@ +redis>=5.0.0 +httpx>=0.26.0 +sentence-transformers>=2.2.0 +qdrant-client>=1.7.0 +python-dotenv>=1.0.0