"""Gitea integration service.""" import httpx from typing import Optional, Dict, Any, List class GiteaService: def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip('/') self.token = token self.headers = { "Authorization": f"token {token}", "Content-Type": "application/json" } async def get_repo(self, owner: str, repo: str) -> Optional[Dict[str, Any]]: """Get repository details.""" async with httpx.AsyncClient() as client: try: response = await client.get( f"{self.base_url}/api/v1/repos/{owner}/{repo}", headers=self.headers, timeout=10.0 ) response.raise_for_status() return response.json() except Exception: return None async def get_file(self, owner: str, repo: str, path: str, ref: str = "main") -> Optional[str]: """Get file content from repository.""" async with httpx.AsyncClient() as client: try: response = await client.get( f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}", headers=self.headers, timeout=10.0 ) response.raise_for_status() data = response.json() # Gitea returns base64 encoded content import base64 return base64.b64decode(data.get("content", "")).decode("utf-8") except Exception: return None async def create_branch(self, owner: str, repo: str, branch: str, from_branch: str = "main") -> bool: """Create a new branch.""" async with httpx.AsyncClient() as client: try: response = await client.post( f"{self.base_url}/api/v1/repos/{owner}/{repo}/branches", headers=self.headers, json={"new_branch_name": branch, "old_branch_name": from_branch}, timeout=10.0 ) response.raise_for_status() return True except Exception: return False async def update_file(self, owner: str, repo: str, path: str, content: str, message: str, branch: str, sha: Optional[str] = None) -> bool: """Update file in repository.""" import base64 async with httpx.AsyncClient() as client: try: payload = { "content": base64.b64encode(content.encode()).decode(), "message": message, "branch": branch } if sha: payload["sha"] = sha response = await client.put( f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}", headers=self.headers, json=payload, timeout=10.0 ) response.raise_for_status() return True except Exception: return False async def create_pull_request(self, owner: str, repo: str, title: str, body: str, head: str, base: str = "main") -> Optional[str]: """Create a pull request.""" async with httpx.AsyncClient() as client: try: response = await client.post( f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls", headers=self.headers, json={ "title": title, "body": body, "head": head, "base": base }, timeout=10.0 ) response.raise_for_status() pr_data = response.json() return pr_data.get("html_url") except Exception: return None async def list_repositories(self, owner: str) -> List[Dict[str, Any]]: """List repositories for owner.""" async with httpx.AsyncClient() as client: try: response = await client.get( f"{self.base_url}/api/v1/users/{owner}/repos", headers=self.headers, timeout=10.0 ) response.raise_for_status() return response.json() except Exception: return []