""" Code Indexer Worker - Background indexing of Bitbucket repositories. """ import asyncio import logging import os from datetime import datetime from pathlib import Path from typing import Optional, List, Dict, Any import redis.asyncio as redis logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CodeIndexer: """ Background worker that indexes code from Bitbucket repositories. Flow: 1. Poll Redis queue for indexing jobs 2. Clone/pull repository 3. Parse COBOL/SQL/JCL files 4. Generate embeddings 5. Store in Qdrant vector database """ # File extensions to index INDEXABLE_EXTENSIONS = { ".cbl": "cobol", ".cob": "cobol", ".cpy": "copybook", ".sql": "sql", ".jcl": "jcl", ".proc": "jcl_proc", } def __init__( self, redis_url: str = "redis://localhost:6379", qdrant_url: str = "http://localhost:6333", work_dir: str = "/tmp/jira-indexer", queue_name: str = "indexer:jobs", ): self.redis_url = redis_url self.qdrant_url = qdrant_url self.work_dir = Path(work_dir) self.queue_name = queue_name self.redis: Optional[redis.Redis] = None self.running = False # Create work directory self.work_dir.mkdir(parents=True, exist_ok=True) async def connect(self): """Connect to Redis.""" self.redis = redis.from_url(self.redis_url) logger.info(f"🔌 Connected to Redis: {self.redis_url}") async def disconnect(self): """Disconnect from Redis.""" if self.redis: await self.redis.close() async def run(self): """Main worker loop.""" self.running = True await self.connect() logger.info("🚀 Code Indexer worker started") while self.running: try: # Block waiting for new jobs (timeout 10s) result = await self.redis.blpop(self.queue_name, timeout=10) if result: _, data = result job = eval(data) # Simple deserialization await self.process_job(job) except Exception as e: logger.error(f"❌ Error in worker loop: {e}") await asyncio.sleep(5) await self.disconnect() logger.info("👋 Code Indexer worker stopped") async def process_job(self, job: dict): """ Process an indexing job. Args: job: Dict with repository info and options """ repo_name = job.get("name") repo_url = job.get("url") logger.info(f"📦 Indexing repository: {repo_name}") try: # 1. Clone or update repository repo_path = await self.clone_or_update(repo_name, repo_url) # 2. Find indexable files files = self.find_indexable_files(repo_path) logger.info(f"📁 Found {len(files)} indexable files") # 3. Process files in batches total_chunks = 0 batch_size = 10 for i in range(0, len(files), batch_size): batch = files[i:i + batch_size] chunks = await self.process_file_batch(batch, repo_name) total_chunks += chunks # Report progress progress = min(100, ((i + batch_size) / len(files)) * 100) await self.report_progress(repo_name, progress) # 4. Update repository status await self.update_repo_status(repo_name, total_chunks) logger.info(f"✅ Indexed {repo_name}: {total_chunks} chunks from {len(files)} files") except Exception as e: logger.error(f"❌ Failed to index {repo_name}: {e}") await self.update_repo_status(repo_name, 0, error=str(e)) async def clone_or_update(self, name: str, url: str) -> Path: """Clone repository or pull latest changes.""" repo_path = self.work_dir / name if repo_path.exists(): logger.info(f"📥 Pulling latest changes: {name}") # TODO: Implement git pull else: logger.info(f"📥 Cloning repository: {name}") # TODO: Implement git clone repo_path.mkdir(parents=True, exist_ok=True) return repo_path def find_indexable_files(self, repo_path: Path) -> List[Path]: """Find all files that should be indexed.""" files = [] for ext in self.INDEXABLE_EXTENSIONS: files.extend(repo_path.rglob(f"*{ext}")) return files async def process_file_batch(self, files: List[Path], repo_name: str) -> int: """Process a batch of files and return chunk count.""" total_chunks = 0 for file_path in files: try: content = file_path.read_text(encoding="latin-1") file_type = self.INDEXABLE_EXTENSIONS.get(file_path.suffix.lower()) # Parse into chunks chunks = self.parse_file(content, file_path, file_type) # Generate embeddings and store for chunk in chunks: await self.index_chunk(chunk, repo_name) total_chunks += 1 except Exception as e: logger.warning(f"⚠️ Failed to process {file_path}: {e}") return total_chunks def parse_file(self, content: str, path: Path, file_type: str) -> List[Dict[str, Any]]: """Parse a file into indexable chunks.""" chunks = [] if file_type in ("cobol", "copybook"): chunks = self.parse_cobol(content, path) elif file_type == "sql": chunks = self.parse_sql(content, path) elif file_type in ("jcl", "jcl_proc"): chunks = self.parse_jcl(content, path) return chunks def parse_cobol(self, content: str, path: Path) -> List[Dict[str, Any]]: """Parse COBOL program into chunks.""" # TODO: Implement full COBOL parsing # For now, create one chunk per file return [{ "file_path": str(path), "content": content[:4000], # Limit size "type": "cobol", "start_line": 1, "end_line": content.count("\n"), }] def parse_sql(self, content: str, path: Path) -> List[Dict[str, Any]]: """Parse SQL file into chunks.""" return [{ "file_path": str(path), "content": content[:4000], "type": "sql", "start_line": 1, "end_line": content.count("\n"), }] def parse_jcl(self, content: str, path: Path) -> List[Dict[str, Any]]: """Parse JCL file into chunks.""" return [{ "file_path": str(path), "content": content[:4000], "type": "jcl", "start_line": 1, "end_line": content.count("\n"), }] async def index_chunk(self, chunk: Dict[str, Any], repo_name: str): """Generate embedding and store chunk in Qdrant.""" # TODO: Implement embedding generation and Qdrant storage pass async def report_progress(self, repo_name: str, progress: float): """Report indexing progress.""" if self.redis: await self.redis.set(f"indexer:progress:{repo_name}", str(progress)) async def update_repo_status( self, repo_name: str, chunk_count: int, error: str = None, ): """Update repository status in database.""" # TODO: Implement database update status = "indexed" if not error else "failed" logger.info(f"📊 Repo status: {repo_name} -> {status} ({chunk_count} chunks)") def stop(self): """Signal worker to stop.""" self.running = False async def main(): """Entry point for the worker.""" worker = CodeIndexer( redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"), qdrant_url=os.getenv("QDRANT_URL", "http://localhost:6333"), ) try: await worker.run() except KeyboardInterrupt: worker.stop() if __name__ == "__main__": asyncio.run(main())