jira-ai-fixer/workers/indexer.py

262 lines
8.4 KiB
Python

"""
Code Indexer Worker - Background indexing of Bitbucket repositories.
"""
import asyncio
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CodeIndexer:
"""
Background worker that indexes code from Bitbucket repositories.
Flow:
1. Poll Redis queue for indexing jobs
2. Clone/pull repository
3. Parse COBOL/SQL/JCL files
4. Generate embeddings
5. Store in Qdrant vector database
"""
# File extensions to index
INDEXABLE_EXTENSIONS = {
".cbl": "cobol",
".cob": "cobol",
".cpy": "copybook",
".sql": "sql",
".jcl": "jcl",
".proc": "jcl_proc",
}
def __init__(
self,
redis_url: str = "redis://localhost:6379",
qdrant_url: str = "http://localhost:6333",
work_dir: str = "/tmp/jira-indexer",
queue_name: str = "indexer:jobs",
):
self.redis_url = redis_url
self.qdrant_url = qdrant_url
self.work_dir = Path(work_dir)
self.queue_name = queue_name
self.redis: Optional[redis.Redis] = None
self.running = False
# Create work directory
self.work_dir.mkdir(parents=True, exist_ok=True)
async def connect(self):
"""Connect to Redis."""
self.redis = redis.from_url(self.redis_url)
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
async def disconnect(self):
"""Disconnect from Redis."""
if self.redis:
await self.redis.close()
async def run(self):
"""Main worker loop."""
self.running = True
await self.connect()
logger.info("🚀 Code Indexer worker started")
while self.running:
try:
# Block waiting for new jobs (timeout 10s)
result = await self.redis.blpop(self.queue_name, timeout=10)
if result:
_, data = result
job = eval(data) # Simple deserialization
await self.process_job(job)
except Exception as e:
logger.error(f"❌ Error in worker loop: {e}")
await asyncio.sleep(5)
await self.disconnect()
logger.info("👋 Code Indexer worker stopped")
async def process_job(self, job: dict):
"""
Process an indexing job.
Args:
job: Dict with repository info and options
"""
repo_name = job.get("name")
repo_url = job.get("url")
logger.info(f"📦 Indexing repository: {repo_name}")
try:
# 1. Clone or update repository
repo_path = await self.clone_or_update(repo_name, repo_url)
# 2. Find indexable files
files = self.find_indexable_files(repo_path)
logger.info(f"📁 Found {len(files)} indexable files")
# 3. Process files in batches
total_chunks = 0
batch_size = 10
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
chunks = await self.process_file_batch(batch, repo_name)
total_chunks += chunks
# Report progress
progress = min(100, ((i + batch_size) / len(files)) * 100)
await self.report_progress(repo_name, progress)
# 4. Update repository status
await self.update_repo_status(repo_name, total_chunks)
logger.info(f"✅ Indexed {repo_name}: {total_chunks} chunks from {len(files)} files")
except Exception as e:
logger.error(f"❌ Failed to index {repo_name}: {e}")
await self.update_repo_status(repo_name, 0, error=str(e))
async def clone_or_update(self, name: str, url: str) -> Path:
"""Clone repository or pull latest changes."""
repo_path = self.work_dir / name
if repo_path.exists():
logger.info(f"📥 Pulling latest changes: {name}")
# TODO: Implement git pull
else:
logger.info(f"📥 Cloning repository: {name}")
# TODO: Implement git clone
repo_path.mkdir(parents=True, exist_ok=True)
return repo_path
def find_indexable_files(self, repo_path: Path) -> List[Path]:
"""Find all files that should be indexed."""
files = []
for ext in self.INDEXABLE_EXTENSIONS:
files.extend(repo_path.rglob(f"*{ext}"))
return files
async def process_file_batch(self, files: List[Path], repo_name: str) -> int:
"""Process a batch of files and return chunk count."""
total_chunks = 0
for file_path in files:
try:
content = file_path.read_text(encoding="latin-1")
file_type = self.INDEXABLE_EXTENSIONS.get(file_path.suffix.lower())
# Parse into chunks
chunks = self.parse_file(content, file_path, file_type)
# Generate embeddings and store
for chunk in chunks:
await self.index_chunk(chunk, repo_name)
total_chunks += 1
except Exception as e:
logger.warning(f"⚠️ Failed to process {file_path}: {e}")
return total_chunks
def parse_file(self, content: str, path: Path, file_type: str) -> List[Dict[str, Any]]:
"""Parse a file into indexable chunks."""
chunks = []
if file_type in ("cobol", "copybook"):
chunks = self.parse_cobol(content, path)
elif file_type == "sql":
chunks = self.parse_sql(content, path)
elif file_type in ("jcl", "jcl_proc"):
chunks = self.parse_jcl(content, path)
return chunks
def parse_cobol(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse COBOL program into chunks."""
# TODO: Implement full COBOL parsing
# For now, create one chunk per file
return [{
"file_path": str(path),
"content": content[:4000], # Limit size
"type": "cobol",
"start_line": 1,
"end_line": content.count("\n"),
}]
def parse_sql(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse SQL file into chunks."""
return [{
"file_path": str(path),
"content": content[:4000],
"type": "sql",
"start_line": 1,
"end_line": content.count("\n"),
}]
def parse_jcl(self, content: str, path: Path) -> List[Dict[str, Any]]:
"""Parse JCL file into chunks."""
return [{
"file_path": str(path),
"content": content[:4000],
"type": "jcl",
"start_line": 1,
"end_line": content.count("\n"),
}]
async def index_chunk(self, chunk: Dict[str, Any], repo_name: str):
"""Generate embedding and store chunk in Qdrant."""
# TODO: Implement embedding generation and Qdrant storage
pass
async def report_progress(self, repo_name: str, progress: float):
"""Report indexing progress."""
if self.redis:
await self.redis.set(f"indexer:progress:{repo_name}", str(progress))
async def update_repo_status(
self,
repo_name: str,
chunk_count: int,
error: str = None,
):
"""Update repository status in database."""
# TODO: Implement database update
status = "indexed" if not error else "failed"
logger.info(f"📊 Repo status: {repo_name} -> {status} ({chunk_count} chunks)")
def stop(self):
"""Signal worker to stop."""
self.running = False
async def main():
"""Entry point for the worker."""
worker = CodeIndexer(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
qdrant_url=os.getenv("QDRANT_URL", "http://localhost:6333"),
)
try:
await worker.run()
except KeyboardInterrupt:
worker.stop()
if __name__ == "__main__":
asyncio.run(main())