01a8fe135c
- /token: reject password auth with a clear message if pocketid_sub is set on the account — passkey-linked users must sign in via PocketID - Link callback + auto-link-by-email: null out hashed_password when the passkey is attached so the old hash can't be used even if the check above were bypassed Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
320 lines
13 KiB
Python
320 lines
13 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
from datetime import timedelta
|
|
from jose import jwt, JWTError
|
|
import httpx
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import verify_password, create_access_token, get_current_user
|
|
from app.core.config import settings
|
|
from app.models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
# Marks a short-lived OIDC `state` token as an account-link request (as opposed
|
|
# to a normal sign-in), so the callback attaches the passkey to a known user
|
|
# instead of creating/looking-up by identity.
|
|
LINK_STATE_PURPOSE = "pocketid-link"
|
|
LOGIN_STATE_PURPOSE = "pocketid-login"
|
|
|
|
|
|
def _make_link_state(user_id: int) -> str:
|
|
"""Signed, short-lived token carrying 'link this passkey to user_id' intent."""
|
|
return create_access_token(
|
|
{"sub": str(user_id), "purpose": LINK_STATE_PURPOSE},
|
|
expires_delta=timedelta(minutes=10),
|
|
)
|
|
|
|
|
|
def _make_login_state() -> str:
|
|
"""Signed, short-lived CSRF token proving the login flow started from this app."""
|
|
return create_access_token(
|
|
{"sub": "login", "purpose": LOGIN_STATE_PURPOSE},
|
|
expires_delta=timedelta(minutes=10),
|
|
)
|
|
|
|
|
|
def _valid_login_state(state: Optional[str]) -> bool:
|
|
"""True if `state` is a valid, unexpired login-state token we issued."""
|
|
if not state:
|
|
return False
|
|
try:
|
|
payload = jwt.decode(state, settings.secret_key, algorithms=[settings.algorithm])
|
|
return payload.get("purpose") == LOGIN_STATE_PURPOSE
|
|
except JWTError:
|
|
return False
|
|
|
|
|
|
def _decode_link_state(state: Optional[str]) -> Optional[int]:
|
|
"""Return the user id from a valid link-state token, else None."""
|
|
if not state:
|
|
return None
|
|
try:
|
|
payload = jwt.decode(state, settings.secret_key, algorithms=[settings.algorithm])
|
|
if payload.get("purpose") != LINK_STATE_PURPOSE:
|
|
return None
|
|
return int(payload["sub"])
|
|
except (JWTError, KeyError, TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
async def _config_admin(db: AsyncSession):
|
|
"""The admin row that holds instance-wide PocketID settings.
|
|
|
|
Settings live on an admin user row, but there can be more than one admin.
|
|
Prefer the admin that actually has an issuer configured; otherwise fall back
|
|
to the lowest-id admin. Without this, an unordered LIMIT 1 could return an
|
|
admin with no config and make PocketID look disabled / gating inconsistent.
|
|
"""
|
|
result = await db.execute(
|
|
select(User)
|
|
.where(User.is_admin == True, User.pocketid_issuer.isnot(None))
|
|
.order_by(User.id)
|
|
.limit(1)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
if admin is None:
|
|
result = await db.execute(
|
|
select(User).where(User.is_admin == True).order_by(User.id).limit(1)
|
|
)
|
|
admin = result.scalar_one_or_none()
|
|
return admin
|
|
|
|
|
|
async def _get_pocketid_config(db: AsyncSession):
|
|
"""Get PocketID config from DB (admin user) falling back to env vars."""
|
|
admin = await _config_admin(db)
|
|
issuer = (admin and admin.pocketid_issuer) or settings.pocketid_issuer
|
|
client_id = (admin and admin.pocketid_client_id) or settings.pocketid_client_id
|
|
client_secret = (admin and admin.pocketid_client_secret) or settings.pocketid_client_secret
|
|
return issuer, client_id, client_secret
|
|
|
|
|
|
async def _get_allowed_group(db: AsyncSession):
|
|
"""Group a PocketID user must belong to in order to sign in (None = allow all)."""
|
|
admin = await _config_admin(db)
|
|
group = (admin and admin.pocketid_allowed_group) or settings.pocketid_allowed_group
|
|
return (group or "").strip() or None
|
|
|
|
|
|
async def _unique_username(db: AsyncSession, base: str) -> str:
|
|
"""Return `base`, or `base-2`, `base-3`, … until it is not already taken."""
|
|
base = (base or "user").strip() or "user"
|
|
candidate = base
|
|
n = 1
|
|
while True:
|
|
existing = await db.execute(select(User).where(User.username == candidate))
|
|
if existing.scalar_one_or_none() is None:
|
|
return candidate
|
|
n += 1
|
|
candidate = f"{base}-{n}"
|
|
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
user_id: int
|
|
username: str
|
|
is_admin: bool
|
|
|
|
|
|
class UserOut(BaseModel):
|
|
id: int
|
|
username: str
|
|
email: Optional[str]
|
|
is_admin: bool
|
|
has_passkey: bool = False
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
@router.post("/token", response_model=Token)
|
|
async def login(
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(User).where(User.username == form_data.username))
|
|
user = result.scalar_one_or_none()
|
|
if not user or not user.hashed_password:
|
|
raise HTTPException(status_code=400, detail="Invalid credentials")
|
|
if user.pocketid_sub is not None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Password login is disabled for this account — use your passkey to sign in.",
|
|
)
|
|
if not verify_password(form_data.password, user.hashed_password):
|
|
raise HTTPException(status_code=400, detail="Invalid credentials")
|
|
token = create_access_token({"sub": str(user.id)})
|
|
return Token(access_token=token, token_type="bearer",
|
|
user_id=user.id, username=user.username, is_admin=user.is_admin)
|
|
|
|
|
|
@router.get("/me", response_model=UserOut)
|
|
async def get_me(current_user: User = Depends(get_current_user)):
|
|
return UserOut(
|
|
id=current_user.id,
|
|
username=current_user.username,
|
|
email=current_user.email,
|
|
is_admin=current_user.is_admin,
|
|
has_passkey=current_user.pocketid_sub is not None,
|
|
)
|
|
|
|
|
|
@router.get("/pocketid/available")
|
|
async def pocketid_available(db: AsyncSession = Depends(get_db)):
|
|
issuer, client_id, _ = await _get_pocketid_config(db)
|
|
return {"available": bool(issuer and client_id)}
|
|
|
|
|
|
@router.get("/pocketid/login-url")
|
|
async def pocketid_login_url(db: AsyncSession = Depends(get_db)):
|
|
issuer, client_id, _ = await _get_pocketid_config(db)
|
|
if not issuer or not client_id:
|
|
raise HTTPException(status_code=404, detail="PocketID not configured")
|
|
from urllib.parse import urlencode
|
|
params = {
|
|
"client_id": client_id,
|
|
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
|
|
"response_type": "code",
|
|
"scope": "openid profile email groups",
|
|
"state": _make_login_state(),
|
|
}
|
|
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
|
|
|
|
|
|
@router.get("/pocketid/link-url")
|
|
async def pocketid_link_url(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Authenticated user starts an OIDC flow to attach a passkey to THEIR account.
|
|
|
|
The `state` carries a signed 'link to this user' token so the callback links
|
|
the returned identity instead of creating/matching a new account.
|
|
"""
|
|
issuer, client_id, _ = await _get_pocketid_config(db)
|
|
if not issuer or not client_id:
|
|
raise HTTPException(status_code=404, detail="PocketID not configured")
|
|
from urllib.parse import urlencode
|
|
params = {
|
|
"client_id": client_id,
|
|
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
|
|
"response_type": "code",
|
|
"scope": "openid profile email groups",
|
|
"state": _make_link_state(current_user.id),
|
|
}
|
|
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
|
|
|
|
|
|
@router.get("/pocketid/callback")
|
|
async def pocketid_callback(code: str, state: Optional[str] = None, db: AsyncSession = Depends(get_db)):
|
|
issuer, client_id, client_secret = await _get_pocketid_config(db)
|
|
if not issuer:
|
|
raise HTTPException(status_code=404, detail="PocketID not configured")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
f"{issuer}/api/oidc/token",
|
|
data={"grant_type": "authorization_code", "code": code,
|
|
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
|
|
"client_id": client_id, "client_secret": client_secret},
|
|
)
|
|
if resp.status_code != 200:
|
|
print(f"PocketID token exchange failed ({resp.status_code}): {resp.text}")
|
|
raise HTTPException(status_code=400, detail="Token exchange failed")
|
|
tokens = resp.json()
|
|
access_token = tokens.get("access_token")
|
|
if not access_token:
|
|
raise HTTPException(status_code=400, detail="Token exchange failed")
|
|
userinfo_resp = await client.get(
|
|
f"{issuer}/api/oidc/userinfo",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
if userinfo_resp.status_code != 200:
|
|
raise HTTPException(status_code=400, detail="Failed to fetch user info")
|
|
userinfo = userinfo_resp.json()
|
|
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
sub = userinfo.get("sub")
|
|
email = userinfo.get("email")
|
|
preferred_username = userinfo.get("preferred_username") or email
|
|
|
|
# A missing subject means we cannot identify the user. Never continue, or the
|
|
# `pocketid_sub == sub` (== None → IS NULL) lookups below would match any
|
|
# password-only account and log the caller in as someone else.
|
|
if not sub:
|
|
return RedirectResponse(url="/login?auth_error=no_identity")
|
|
|
|
# ── Explicit account-link flow ──────────────────────────────────────────
|
|
# Initiated by an already-authenticated user from their profile. Attach the
|
|
# passkey to that account. No group gating here: this is identity linking,
|
|
# not access control, and the initiator is already an authorised user.
|
|
link_user_id = _decode_link_state(state)
|
|
if link_user_id is not None:
|
|
result = await db.execute(select(User).where(User.pocketid_sub == sub))
|
|
holder = result.scalar_one_or_none()
|
|
if holder and holder.id != link_user_id:
|
|
# This passkey is already attached to a different account.
|
|
return RedirectResponse(url="/login?auth_error=passkey_in_use")
|
|
result = await db.execute(select(User).where(User.id == link_user_id))
|
|
target = result.scalar_one_or_none()
|
|
if target is None:
|
|
return RedirectResponse(url="/login?auth_error=link_failed")
|
|
target.pocketid_sub = sub
|
|
target.hashed_password = None # disable password login once passkey is linked
|
|
if not target.email and email:
|
|
dup = await db.execute(
|
|
select(User).where(User.email == email, User.id != target.id)
|
|
)
|
|
if dup.scalar_one_or_none() is None:
|
|
target.email = email
|
|
return RedirectResponse(url="/profile?linked=1")
|
|
|
|
# Normal sign-in: require the signed, short-lived state we issued in
|
|
# /pocketid/login-url, so the callback can't be driven by an injected code.
|
|
if not _valid_login_state(state):
|
|
return RedirectResponse(url="/login?auth_error=invalid_state")
|
|
|
|
# Group gating: if an allowed group is configured, the user must be in it.
|
|
allowed_group = await _get_allowed_group(db)
|
|
if allowed_group:
|
|
groups = userinfo.get("groups") or []
|
|
if allowed_group not in groups:
|
|
return RedirectResponse(url="/login?auth_error=not_authorized")
|
|
|
|
# 1) Existing passkey identity → use it.
|
|
result = await db.execute(select(User).where(User.pocketid_sub == sub))
|
|
user = result.scalar_one_or_none()
|
|
|
|
# 2) No passkey identity yet, but an account with this email exists and is
|
|
# not already linked to a different passkey → link them (preserves data).
|
|
if not user and email:
|
|
result = await db.execute(select(User).where(User.email == email))
|
|
existing = result.scalar_one_or_none()
|
|
if existing and existing.pocketid_sub is None:
|
|
existing.pocketid_sub = sub
|
|
existing.hashed_password = None # disable password login once passkey is linked
|
|
user = existing
|
|
|
|
# 3) Otherwise provision a new account with a collision-safe username.
|
|
if not user:
|
|
base = preferred_username or (email.split("@")[0] if email else "user")
|
|
username = await _unique_username(db, base)
|
|
# Only set email if no other account already claims it (unique column).
|
|
email_taken = False
|
|
if email:
|
|
dup = await db.execute(select(User).where(User.email == email))
|
|
email_taken = dup.scalar_one_or_none() is not None
|
|
user = User(username=username, email=None if email_taken else email, pocketid_sub=sub)
|
|
db.add(user)
|
|
await db.flush()
|
|
|
|
token = create_access_token({"sub": str(user.id)})
|
|
return RedirectResponse(url=f"/?token={token}")
|