feat: Background workers

- analyzer.py: Issue analysis pipeline (JIRA -> LLM -> PR)
- indexer.py: Code indexing pipeline (Bitbucket -> Embeddings -> Qdrant)
- Redis queue-based processing
- Progress tracking and status updates
This commit is contained in:
Ricel Leite 2026-02-18 14:03:34 -03:00
parent 011a93c5b9
commit 27b72e3ccd
4 changed files with 475 additions and 0 deletions

1
workers/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Workers package."""

208
workers/analyzer.py Normal file
View File

@ -0,0 +1,208 @@
"""
Issue Analyzer Worker - Background processing of JIRA issues.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Optional
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IssueAnalyzer:
"""
Background worker that processes JIRA issues from the queue.
Flow:
1. Poll Redis queue for new issues
2. Fetch issue details from JIRA
3. Search for relevant code in vector DB
4. Send to LLM for analysis
5. Generate fix proposal
6. Create PR if confidence is high enough
7. Post analysis back to JIRA
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
queue_name: str = "issues:pending",
):
self.redis_url = redis_url
self.queue_name = queue_name
self.redis: Optional[redis.Redis] = None
self.running = False
async def connect(self):
"""Connect to Redis."""
self.redis = redis.from_url(self.redis_url)
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
async def disconnect(self):
"""Disconnect from Redis."""
if self.redis:
await self.redis.close()
async def run(self):
"""Main worker loop."""
self.running = True
await self.connect()
logger.info("🚀 Issue Analyzer worker started")
while self.running:
try:
# Block waiting for new items (timeout 5s)
result = await self.redis.blpop(self.queue_name, timeout=5)
if result:
_, data = result
issue_data = json.loads(data)
await self.process_issue(issue_data)
except Exception as e:
logger.error(f"❌ Error in worker loop: {e}")
await asyncio.sleep(5) # Back off on error
await self.disconnect()
logger.info("👋 Issue Analyzer worker stopped")
async def process_issue(self, issue_data: dict):
"""
Process a single issue.
Args:
issue_data: Dict with issue key and metadata
"""
issue_key = issue_data.get("key")
logger.info(f"📋 Processing issue: {issue_key}")
try:
# Update status
await self.update_status(issue_key, "analyzing")
# 1. Fetch full issue details from JIRA
issue_details = await self.fetch_issue_details(issue_key)
# 2. Extract relevant context
description = issue_details.get("fields", {}).get("description", "")
summary = issue_details.get("fields", {}).get("summary", "")
# 3. Search for relevant code
code_context = await self.search_relevant_code(summary, description)
# 4. Get business rules for the affected module
module = await self.identify_module(summary, description)
business_rules = await self.get_module_rules(module)
# 5. Send to LLM for analysis
analysis = await self.analyze_with_llm(
issue_description=f"{summary}\n\n{description}",
code_context=code_context,
business_rules=business_rules,
)
# 6. Store analysis result
await self.store_analysis(issue_key, analysis)
# 7. If confidence is high, create PR
if analysis.get("confidence", 0) >= 0.75:
pr_url = await self.create_pull_request(issue_key, analysis)
analysis["pr_url"] = pr_url
# 8. Post comment to JIRA
await self.post_jira_comment(issue_key, analysis)
# Update status
status = "pr_created" if analysis.get("pr_url") else "analyzed"
await self.update_status(issue_key, status)
logger.info(f"✅ Completed analysis: {issue_key} (confidence: {analysis.get('confidence', 0):.0%})")
except Exception as e:
logger.error(f"❌ Failed to process {issue_key}: {e}")
await self.update_status(issue_key, "failed", str(e))
async def fetch_issue_details(self, issue_key: str) -> dict:
"""Fetch issue details from JIRA."""
# TODO: Implement JIRA client call
logger.info(f"🔍 Fetching issue details: {issue_key}")
return {}
async def search_relevant_code(self, summary: str, description: str) -> str:
"""Search vector DB for relevant code."""
# TODO: Implement vector search
logger.info("🔍 Searching for relevant code...")
return ""
async def identify_module(self, summary: str, description: str) -> Optional[str]:
"""Identify which business module the issue relates to."""
# TODO: Implement module identification
return None
async def get_module_rules(self, module: Optional[str]) -> str:
"""Get business rules for a module."""
# TODO: Load from database
return ""
async def analyze_with_llm(
self,
issue_description: str,
code_context: str,
business_rules: str,
) -> dict:
"""Send to LLM for analysis."""
# TODO: Implement LLM service call
logger.info("🤖 Analyzing with LLM...")
return {
"root_cause": "Analysis pending",
"affected_files": [],
"proposed_fix": "",
"confidence": 0.0,
"explanation": "",
}
async def store_analysis(self, issue_key: str, analysis: dict):
"""Store analysis result in database."""
# TODO: Implement database storage
logger.info(f"💾 Storing analysis for {issue_key}")
async def create_pull_request(self, issue_key: str, analysis: dict) -> Optional[str]:
"""Create a pull request with the proposed fix."""
# TODO: Implement Bitbucket PR creation
logger.info(f"📝 Creating PR for {issue_key}")
return None
async def post_jira_comment(self, issue_key: str, analysis: dict):
"""Post analysis as a comment on the JIRA issue."""
# TODO: Implement JIRA comment
logger.info(f"💬 Posting comment to {issue_key}")
async def update_status(self, issue_key: str, status: str, error: str = None):
"""Update issue status in database and Redis."""
# TODO: Implement status update
logger.info(f"📊 Status update: {issue_key} -> {status}")
def stop(self):
"""Signal worker to stop."""
self.running = False
async def main():
"""Entry point for the worker."""
worker = IssueAnalyzer(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
)
try:
await worker.run()
except KeyboardInterrupt:
worker.stop()
if __name__ == "__main__":
asyncio.run(main())

261
workers/indexer.py Normal file
View File

@ -0,0 +1,261 @@
"""
Code Indexer Worker - Background indexing of Bitbucket repositories.
"""
import asyncio
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CodeIndexer:
"""
Background worker that indexes code from Bitbucket repositories.
Flow:
1. Poll Redis queue for indexing jobs
2. Clone/pull repository
3. Parse COBOL/SQL/JCL files
4. Generate embeddings
5. Store in Qdrant vector database
"""
# File extensions to index
INDEXABLE_EXTENSIONS = {
".cbl": "cobol",
".cob": "cobol",
".cpy": "copybook",
".sql": "sql",
".jcl": "jcl",
".proc": "jcl_proc",
}
def __init__(
self,
redis_url: str = "redis://localhost:6379",
qdrant_url: str = "http://localhost:6333",
work_dir: str = "/tmp/aci-indexer",
queue_name: str = "indexer:jobs",
):
self.redis_url = redis_url
self.qdrant_url = qdrant_url
self.work_dir = Path(work_dir)
self.queue_name = queue_name
self.redis: Optional[redis.Redis] = None
self.running = False
# Create work directory
self.work_dir.mkdir(parents=True, exist_ok=True)
async def connect(self):
"""Connect to Redis."""
self.redis = redis.from_url(self.redis_url)
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
async def disconnect(self):
"""Disconnect from Redis."""
if self.redis:
await self.redis.close()
async def run(self):
"""Main worker loop."""
self.running = True
await self.connect()
logger.info("🚀 Code Indexer worker started")
while self.running:
try:
# Block waiting for new jobs (timeout 10s)
result = await self.redis.blpop(self.queue_name, timeout=10)
if result:
_, data = result
job = eval(data) # Simple deserialization
await self.process_job(job)
except Exception as e:
logger.error(f"❌ Error in worker loop: {e}")
await asyncio.sleep(5)
await self.disconnect()
logger.info("👋 Code Indexer worker stopped")
async def process_job(self, job: dict):
"""
Process an indexing job.
Args:
job: Dict with repository info and options
"""
repo_name = job.get("name")
repo_url = job.get("url")
logger.info(f"📦 Indexing repository: {repo_name}")
try:
# 1. Clone or update repository
repo_path = await self.clone_or_update(repo_name, repo_url)
# 2. Find indexable files
files = self.find_indexable_files(repo_path)
logger.info(f"📁 Found {len(files)} indexable files")
# 3. Process files in batches
total_chunks = 0
batch_size = 10
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
chunks = await self.process_file_batch(batch, repo_name)
total_chunks += chunks
# Report progress
progress = min(100, ((i + batch_size) / len(files)) * 100)
await self.report_progress(repo_name, progress)
# 4. Update repository status
await self.update_repo_status(repo_name, total_chunks)
logger.info(f"✅ Indexed {repo_name}: {total_chunks} chunks from {len(files)} files")
except Exception as e:
logger.error(f"❌ Failed to index {repo_name}: {e}")
await self.update_repo_status(repo_name, 0, error=str(e))
async def clone_or_update(self, name: str, url: str) -> Path:
"""Clone repository or pull latest changes."""
repo_path = self.work_dir / name
if repo_path.exists():
logger.info(f"📥 Pulling latest changes: {name}")
# TODO: Implement git pull
else:
logger.info(f"📥 Cloning repository: {name}")
# TODO: Implement git clone
repo_path.mkdir(parents=True, exist_ok=True)
return repo_path
def find_indexable_files(self, repo_path: Path) -> List[Path]:
"""Find all files that should be indexed."""
files = []
for ext in self.INDEXABLE_EXTENSIONS:
files.extend(repo_path.rglob(f"*{ext}"))
return files
async def process_file_batch(self, files: List[Path], repo_name: str) -> int:
"""Process a batch of files and return chunk count."""
total_chunks = 0
for file_path in files:
try:
content = file_path.read_text(encoding="latin-1")
file_type = self.INDEXABLE_EXTENSIONS.get(file_path.suffix.lower())
# Parse into chunks
chunks = self.parse_file(content, file_path, file_type)
# Generate embeddings and store
for chunk in chunks:
await self.index_chunk(chunk, repo_name)
total_chunks += 1
except Exception as e:
logger.warning(f"⚠️ Failed to process {file_path}: {e}")
return total_chunks
def parse_file(self, content: str, path: Path, file_type: str) -> List[Dict[str, Any]]:
"""Parse a file into indexable chunks."""
chunks = []
if file_type in ("cobol", "copybook"):
chunks = self.parse_cobol(content, path)
elif file_type == "sql":
chunks = self.parse_sql(content, path)
elif file_type in ("jcl", "jcl_proc"):
chunks = self.parse_jcl(content, path)
return chunks
def parse_cobol(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse COBOL program into chunks."""
# TODO: Implement full COBOL parsing
# For now, create one chunk per file
return [{
"file_path": str(path),
"content": content[:4000], # Limit size
"type": "cobol",
"start_line": 1,
"end_line": content.count("\n"),
}]
def parse_sql(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse SQL file into chunks."""
return [{
"file_path": str(path),
"content": content[:4000],
"type": "sql",
"start_line": 1,
"end_line": content.count("\n"),
}]
def parse_jcl(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse JCL file into chunks."""
return [{
"file_path": str(path),
"content": content[:4000],
"type": "jcl",
"start_line": 1,
"end_line": content.count("\n"),
}]
async def index_chunk(self, chunk: Dict[str, Any], repo_name: str):
"""Generate embedding and store chunk in Qdrant."""
# TODO: Implement embedding generation and Qdrant storage
pass
async def report_progress(self, repo_name: str, progress: float):
"""Report indexing progress."""
if self.redis:
await self.redis.set(f"indexer:progress:{repo_name}", str(progress))
async def update_repo_status(
self,
repo_name: str,
chunk_count: int,
error: str = None,
):
"""Update repository status in database."""
# TODO: Implement database update
status = "indexed" if not error else "failed"
logger.info(f"📊 Repo status: {repo_name} -> {status} ({chunk_count} chunks)")
def stop(self):
"""Signal worker to stop."""
self.running = False
async def main():
"""Entry point for the worker."""
worker = CodeIndexer(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
qdrant_url=os.getenv("QDRANT_URL", "http://localhost:6333"),
)
try:
await worker.run()
except KeyboardInterrupt:
worker.stop()
if __name__ == "__main__":
asyncio.run(main())

5
workers/requirements.txt Normal file
View File

@ -0,0 +1,5 @@
redis>=5.0.0
httpx>=0.26.0
sentence-transformers>=2.2.0
qdrant-client>=1.7.0
python-dotenv>=1.0.0