262 lines
8.4 KiB
Python
262 lines
8.4 KiB
Python
"""
|
|
Code Indexer Worker - Background indexing of Bitbucket repositories.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
import redis.asyncio as redis
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CodeIndexer:
|
|
"""
|
|
Background worker that indexes code from Bitbucket repositories.
|
|
|
|
Flow:
|
|
1. Poll Redis queue for indexing jobs
|
|
2. Clone/pull repository
|
|
3. Parse COBOL/SQL/JCL files
|
|
4. Generate embeddings
|
|
5. Store in Qdrant vector database
|
|
"""
|
|
|
|
# File extensions to index
|
|
INDEXABLE_EXTENSIONS = {
|
|
".cbl": "cobol",
|
|
".cob": "cobol",
|
|
".cpy": "copybook",
|
|
".sql": "sql",
|
|
".jcl": "jcl",
|
|
".proc": "jcl_proc",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
redis_url: str = "redis://localhost:6379",
|
|
qdrant_url: str = "http://localhost:6333",
|
|
work_dir: str = "/tmp/jira-indexer",
|
|
queue_name: str = "indexer:jobs",
|
|
):
|
|
self.redis_url = redis_url
|
|
self.qdrant_url = qdrant_url
|
|
self.work_dir = Path(work_dir)
|
|
self.queue_name = queue_name
|
|
self.redis: Optional[redis.Redis] = None
|
|
self.running = False
|
|
|
|
# Create work directory
|
|
self.work_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def connect(self):
|
|
"""Connect to Redis."""
|
|
self.redis = redis.from_url(self.redis_url)
|
|
logger.info(f"🔌 Connected to Redis: {self.redis_url}")
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from Redis."""
|
|
if self.redis:
|
|
await self.redis.close()
|
|
|
|
async def run(self):
|
|
"""Main worker loop."""
|
|
self.running = True
|
|
await self.connect()
|
|
|
|
logger.info("🚀 Code Indexer worker started")
|
|
|
|
while self.running:
|
|
try:
|
|
# Block waiting for new jobs (timeout 10s)
|
|
result = await self.redis.blpop(self.queue_name, timeout=10)
|
|
|
|
if result:
|
|
_, data = result
|
|
job = eval(data) # Simple deserialization
|
|
await self.process_job(job)
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in worker loop: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
await self.disconnect()
|
|
logger.info("👋 Code Indexer worker stopped")
|
|
|
|
async def process_job(self, job: dict):
|
|
"""
|
|
Process an indexing job.
|
|
|
|
Args:
|
|
job: Dict with repository info and options
|
|
"""
|
|
repo_name = job.get("name")
|
|
repo_url = job.get("url")
|
|
|
|
logger.info(f"📦 Indexing repository: {repo_name}")
|
|
|
|
try:
|
|
# 1. Clone or update repository
|
|
repo_path = await self.clone_or_update(repo_name, repo_url)
|
|
|
|
# 2. Find indexable files
|
|
files = self.find_indexable_files(repo_path)
|
|
logger.info(f"📁 Found {len(files)} indexable files")
|
|
|
|
# 3. Process files in batches
|
|
total_chunks = 0
|
|
batch_size = 10
|
|
|
|
for i in range(0, len(files), batch_size):
|
|
batch = files[i:i + batch_size]
|
|
chunks = await self.process_file_batch(batch, repo_name)
|
|
total_chunks += chunks
|
|
|
|
# Report progress
|
|
progress = min(100, ((i + batch_size) / len(files)) * 100)
|
|
await self.report_progress(repo_name, progress)
|
|
|
|
# 4. Update repository status
|
|
await self.update_repo_status(repo_name, total_chunks)
|
|
|
|
logger.info(f"✅ Indexed {repo_name}: {total_chunks} chunks from {len(files)} files")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to index {repo_name}: {e}")
|
|
await self.update_repo_status(repo_name, 0, error=str(e))
|
|
|
|
async def clone_or_update(self, name: str, url: str) -> Path:
|
|
"""Clone repository or pull latest changes."""
|
|
repo_path = self.work_dir / name
|
|
|
|
if repo_path.exists():
|
|
logger.info(f"📥 Pulling latest changes: {name}")
|
|
# TODO: Implement git pull
|
|
else:
|
|
logger.info(f"📥 Cloning repository: {name}")
|
|
# TODO: Implement git clone
|
|
repo_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
return repo_path
|
|
|
|
def find_indexable_files(self, repo_path: Path) -> List[Path]:
|
|
"""Find all files that should be indexed."""
|
|
files = []
|
|
|
|
for ext in self.INDEXABLE_EXTENSIONS:
|
|
files.extend(repo_path.rglob(f"*{ext}"))
|
|
|
|
return files
|
|
|
|
async def process_file_batch(self, files: List[Path], repo_name: str) -> int:
|
|
"""Process a batch of files and return chunk count."""
|
|
total_chunks = 0
|
|
|
|
for file_path in files:
|
|
try:
|
|
content = file_path.read_text(encoding="latin-1")
|
|
file_type = self.INDEXABLE_EXTENSIONS.get(file_path.suffix.lower())
|
|
|
|
# Parse into chunks
|
|
chunks = self.parse_file(content, file_path, file_type)
|
|
|
|
# Generate embeddings and store
|
|
for chunk in chunks:
|
|
await self.index_chunk(chunk, repo_name)
|
|
total_chunks += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Failed to process {file_path}: {e}")
|
|
|
|
return total_chunks
|
|
|
|
def parse_file(self, content: str, path: Path, file_type: str) -> List[Dict[str, Any]]:
|
|
"""Parse a file into indexable chunks."""
|
|
chunks = []
|
|
|
|
if file_type in ("cobol", "copybook"):
|
|
chunks = self.parse_cobol(content, path)
|
|
elif file_type == "sql":
|
|
chunks = self.parse_sql(content, path)
|
|
elif file_type in ("jcl", "jcl_proc"):
|
|
chunks = self.parse_jcl(content, path)
|
|
|
|
return chunks
|
|
|
|
def parse_cobol(self, content: str, path: Path) -> List[Dict[str, Any]]:
|
|
"""Parse COBOL program into chunks."""
|
|
# TODO: Implement full COBOL parsing
|
|
# For now, create one chunk per file
|
|
return [{
|
|
"file_path": str(path),
|
|
"content": content[:4000], # Limit size
|
|
"type": "cobol",
|
|
"start_line": 1,
|
|
"end_line": content.count("\n"),
|
|
}]
|
|
|
|
def parse_sql(self, content: str, path: Path) -> List[Dict[str, Any]]:
|
|
"""Parse SQL file into chunks."""
|
|
return [{
|
|
"file_path": str(path),
|
|
"content": content[:4000],
|
|
"type": "sql",
|
|
"start_line": 1,
|
|
"end_line": content.count("\n"),
|
|
}]
|
|
|
|
def parse_jcl(self, content: str, path: Path) -> List[Dict[str, Any]]:
|
|
"""Parse JCL file into chunks."""
|
|
return [{
|
|
"file_path": str(path),
|
|
"content": content[:4000],
|
|
"type": "jcl",
|
|
"start_line": 1,
|
|
"end_line": content.count("\n"),
|
|
}]
|
|
|
|
async def index_chunk(self, chunk: Dict[str, Any], repo_name: str):
|
|
"""Generate embedding and store chunk in Qdrant."""
|
|
# TODO: Implement embedding generation and Qdrant storage
|
|
pass
|
|
|
|
async def report_progress(self, repo_name: str, progress: float):
|
|
"""Report indexing progress."""
|
|
if self.redis:
|
|
await self.redis.set(f"indexer:progress:{repo_name}", str(progress))
|
|
|
|
async def update_repo_status(
|
|
self,
|
|
repo_name: str,
|
|
chunk_count: int,
|
|
error: str = None,
|
|
):
|
|
"""Update repository status in database."""
|
|
# TODO: Implement database update
|
|
status = "indexed" if not error else "failed"
|
|
logger.info(f"📊 Repo status: {repo_name} -> {status} ({chunk_count} chunks)")
|
|
|
|
def stop(self):
|
|
"""Signal worker to stop."""
|
|
self.running = False
|
|
|
|
|
|
async def main():
|
|
"""Entry point for the worker."""
|
|
worker = CodeIndexer(
|
|
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
|
|
qdrant_url=os.getenv("QDRANT_URL", "http://localhost:6333"),
|
|
)
|
|
|
|
try:
|
|
await worker.run()
|
|
except KeyboardInterrupt:
|
|
worker.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|