Files
MileVault/backend/app/api/auth.py
T
owain 0e18ef2291
Build and push images / validate (push) Successful in 19s
Build and push images / build-backend (push) Successful in 29s
Build and push images / build-worker (push) Successful in 29s
Build and push images / build-frontend (push) Successful in 29s
Fix PocketID secret wiped on re-save; log token-exchange failures
save_pocketid_config cleared the stored client secret whenever the form was
submitted with a blank secret field — but the UI hint says blank means "keep
existing". Re-saving config (e.g. to set the allowed group) therefore wiped the
secret and broke token exchange ("Token exchange failed"). Now a blank field
keeps the existing secret; only a non-empty value overwrites it.

Also log PocketID's actual token-endpoint response body on failure so the cause
(invalid_client, redirect_uri mismatch, etc.) is visible in backend logs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 13:26:35 +01:00

172 lines
6.6 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
from typing import Optional
import httpx
from app.core.database import get_db
from app.core.security import verify_password, create_access_token, get_current_user
from app.core.config import settings
from app.models.user import User
router = APIRouter()
async def _get_pocketid_config(db: AsyncSession):
"""Get PocketID config from DB (admin user) falling back to env vars."""
result = await db.execute(select(User).where(User.is_admin == True).limit(1))
admin = result.scalar_one_or_none()
issuer = (admin and admin.pocketid_issuer) or settings.pocketid_issuer
client_id = (admin and admin.pocketid_client_id) or settings.pocketid_client_id
client_secret = (admin and admin.pocketid_client_secret) or settings.pocketid_client_secret
return issuer, client_id, client_secret
async def _get_allowed_group(db: AsyncSession):
"""Group a PocketID user must belong to in order to sign in (None = allow all)."""
result = await db.execute(select(User).where(User.is_admin == True).limit(1))
admin = result.scalar_one_or_none()
group = (admin and admin.pocketid_allowed_group) or settings.pocketid_allowed_group
return (group or "").strip() or None
async def _unique_username(db: AsyncSession, base: str) -> str:
"""Return `base`, or `base-2`, `base-3`, … until it is not already taken."""
base = (base or "user").strip() or "user"
candidate = base
n = 1
while True:
existing = await db.execute(select(User).where(User.username == candidate))
if existing.scalar_one_or_none() is None:
return candidate
n += 1
candidate = f"{base}-{n}"
class Token(BaseModel):
access_token: str
token_type: str
user_id: int
username: str
is_admin: bool
class UserOut(BaseModel):
id: int
username: str
email: Optional[str]
is_admin: bool
class Config:
from_attributes = True
@router.post("/token", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not user.hashed_password:
raise HTTPException(status_code=400, detail="Invalid credentials")
if not verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail="Invalid credentials")
token = create_access_token({"sub": str(user.id)})
return Token(access_token=token, token_type="bearer",
user_id=user.id, username=user.username, is_admin=user.is_admin)
@router.get("/me", response_model=UserOut)
async def get_me(current_user: User = Depends(get_current_user)):
return current_user
@router.get("/pocketid/available")
async def pocketid_available(db: AsyncSession = Depends(get_db)):
issuer, client_id, _ = await _get_pocketid_config(db)
return {"available": bool(issuer and client_id)}
@router.get("/pocketid/login-url")
async def pocketid_login_url(db: AsyncSession = Depends(get_db)):
issuer, client_id, _ = await _get_pocketid_config(db)
if not issuer or not client_id:
raise HTTPException(status_code=404, detail="PocketID not configured")
from urllib.parse import urlencode
params = {
"client_id": client_id,
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
"response_type": "code",
"scope": "openid profile email groups",
}
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
@router.get("/pocketid/callback")
async def pocketid_callback(code: str, db: AsyncSession = Depends(get_db)):
issuer, client_id, client_secret = await _get_pocketid_config(db)
if not issuer:
raise HTTPException(status_code=404, detail="PocketID not configured")
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{issuer}/api/oidc/token",
data={"grant_type": "authorization_code", "code": code,
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
"client_id": client_id, "client_secret": client_secret},
)
if resp.status_code != 200:
print(f"PocketID token exchange failed ({resp.status_code}): {resp.text}")
raise HTTPException(status_code=400, detail="Token exchange failed")
tokens = resp.json()
userinfo_resp = await client.get(
f"{issuer}/api/oidc/userinfo",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
userinfo = userinfo_resp.json()
from fastapi.responses import RedirectResponse
sub = userinfo.get("sub")
email = userinfo.get("email")
preferred_username = userinfo.get("preferred_username") or email
# Group gating: if an allowed group is configured, the user must be in it.
allowed_group = await _get_allowed_group(db)
if allowed_group:
groups = userinfo.get("groups") or []
if allowed_group not in groups:
return RedirectResponse(url="/login?auth_error=not_authorized")
# 1) Existing passkey identity → use it.
result = await db.execute(select(User).where(User.pocketid_sub == sub))
user = result.scalar_one_or_none()
# 2) No passkey identity yet, but an account with this email exists and is
# not already linked to a different passkey → link them (preserves data).
if not user and email:
result = await db.execute(select(User).where(User.email == email))
existing = result.scalar_one_or_none()
if existing and existing.pocketid_sub is None:
existing.pocketid_sub = sub
user = existing
# 3) Otherwise provision a new account with a collision-safe username.
if not user:
base = preferred_username or (email.split("@")[0] if email else "user")
username = await _unique_username(db, base)
# Only set email if no other account already claims it (unique column).
email_taken = False
if email:
dup = await db.execute(select(User).where(User.email == email))
email_taken = dup.scalar_one_or_none() is not None
user = User(username=username, email=None if email_taken else email, pocketid_sub=sub)
db.add(user)
await db.flush()
token = create_access_token({"sub": str(user.id)})
return RedirectResponse(url=f"/?token={token}")