70 lines
2.3 KiB
Python
70 lines
2.3 KiB
Python
"""API dependencies."""
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from app.core.database import get_db
|
|
from app.core.security import decode_token, has_permission
|
|
from app.models.user import User
|
|
from app.models.organization import OrganizationMember
|
|
|
|
security = HTTPBearer()
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: AsyncSession = Depends(get_db)
|
|
) -> User:
|
|
"""Get current authenticated user."""
|
|
token = credentials.credentials
|
|
payload = decode_token(token)
|
|
|
|
if not payload or payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired token"
|
|
)
|
|
|
|
user_id = payload.get("user_id")
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user or not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User not found or inactive"
|
|
)
|
|
|
|
return user
|
|
|
|
async def get_org_member(
|
|
org_id: int,
|
|
user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db)
|
|
) -> OrganizationMember:
|
|
"""Get user's membership in organization."""
|
|
result = await db.execute(
|
|
select(OrganizationMember)
|
|
.where(OrganizationMember.organization_id == org_id)
|
|
.where(OrganizationMember.user_id == user.id)
|
|
)
|
|
member = result.scalar_one_or_none()
|
|
|
|
if not member and not user.is_superuser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not a member of this organization"
|
|
)
|
|
|
|
return member
|
|
|
|
def require_role(required_role: str):
|
|
"""Dependency to require a minimum role."""
|
|
async def check_role(member: OrganizationMember = Depends(get_org_member)):
|
|
if not has_permission(member.role.value, required_role):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Requires {required_role} role or higher"
|
|
)
|
|
return member
|
|
return check_role
|