Files
MileVault/backend/app/api/auth.py
T
owain ec5a01d12a
Build and push images / build-backend (push) Successful in 33s
Build and push images / build-worker (push) Successful in 32s
Build and push images / build-frontend (push) Failing after 6s
All tweaks added
2026-06-06 18:10:35 +01:00

123 lines
4.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
from typing import Optional
import httpx
from app.core.database import get_db
from app.core.security import verify_password, create_access_token, get_current_user
from app.core.config import settings
from app.models.user import User
router = APIRouter()
async def _get_pocketid_config(db: AsyncSession):
"""Get PocketID config from DB (admin user) falling back to env vars."""
result = await db.execute(select(User).where(User.is_admin == True).limit(1))
admin = result.scalar_one_or_none()
issuer = (admin and admin.pocketid_issuer) or settings.pocketid_issuer
client_id = (admin and admin.pocketid_client_id) or settings.pocketid_client_id
client_secret = (admin and admin.pocketid_client_secret) or settings.pocketid_client_secret
return issuer, client_id, client_secret
class Token(BaseModel):
access_token: str
token_type: str
user_id: int
username: str
is_admin: bool
class UserOut(BaseModel):
id: int
username: str
email: Optional[str]
is_admin: bool
class Config:
from_attributes = True
@router.post("/token", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not user.hashed_password:
raise HTTPException(status_code=400, detail="Invalid credentials")
if not verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail="Invalid credentials")
token = create_access_token({"sub": str(user.id)})
return Token(access_token=token, token_type="bearer",
user_id=user.id, username=user.username, is_admin=user.is_admin)
@router.get("/me", response_model=UserOut)
async def get_me(current_user: User = Depends(get_current_user)):
return current_user
@router.get("/pocketid/available")
async def pocketid_available(db: AsyncSession = Depends(get_db)):
issuer, client_id, _ = await _get_pocketid_config(db)
return {"available": bool(issuer and client_id)}
@router.get("/pocketid/login-url")
async def pocketid_login_url(db: AsyncSession = Depends(get_db)):
issuer, client_id, _ = await _get_pocketid_config(db)
if not issuer or not client_id:
raise HTTPException(status_code=404, detail="PocketID not configured")
from urllib.parse import urlencode
params = {
"client_id": client_id,
"redirect_uri": "/api/auth/pocketid/callback",
"response_type": "code",
"scope": "openid profile email",
}
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
@router.get("/pocketid/callback")
async def pocketid_callback(code: str, db: AsyncSession = Depends(get_db)):
issuer, client_id, client_secret = await _get_pocketid_config(db)
if not issuer:
raise HTTPException(status_code=404, detail="PocketID not configured")
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{issuer}/token",
data={"grant_type": "authorization_code", "code": code,
"redirect_uri": "/api/auth/pocketid/callback",
"client_id": client_id, "client_secret": client_secret},
)
if resp.status_code != 200:
raise HTTPException(status_code=400, detail="Token exchange failed")
tokens = resp.json()
userinfo_resp = await client.get(
f"{issuer}/userinfo",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
userinfo = userinfo_resp.json()
sub = userinfo.get("sub")
email = userinfo.get("email")
preferred_username = userinfo.get("preferred_username") or email
result = await db.execute(select(User).where(User.pocketid_sub == sub))
user = result.scalar_one_or_none()
if not user:
user = User(username=preferred_username, email=email, pocketid_sub=sub)
db.add(user)
await db.flush()
token = create_access_token({"sub": str(user.id)})
from fastapi.responses import RedirectResponse
return RedirectResponse(url=f"/?token={token}")