Add explicit "link passkey to my account" flow
Build and push images / validate (push) Successful in 18s
Build and push images / build-backend (push) Successful in 30s
Build and push images / build-worker (push) Successful in 30s
Build and push images / build-frontend (push) Successful in 34s

Signing in by passkey on a fresh install created a new empty account because
the seeded admin has no email to match on. Add canonical SSO-style linking: an
authenticated user starts an OIDC flow whose `state` is a signed, short-lived
"link to user N" token (purpose=pocketid-link). The callback detects that state
and attaches the returned identity to that account instead of creating/matching
one — no reliance on emails lining up, and no group gating (the initiator is
already authorised; this is identity linking, not access control).

- auth.py: _make_link_state/_decode_link_state, GET /pocketid/link-url, callback
  handles state (rejects if the passkey is already on another account →
  auth_error=passkey_in_use). Expose has_passkey on /auth/me.
- Profile: "Passkey sign-in" section for all users — shows linked state or a
  "Link a passkey to this account" button; success banner on return.
- Login: messages for passkey_in_use / link_failed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-08 17:11:30 +01:00
parent e0ddc4cbf4
commit e5feeb1178
6 changed files with 270 additions and 16 deletions
+85 -2
View File
@@ -4,6 +4,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
from typing import Optional
from datetime import timedelta
from jose import jwt, JWTError
import httpx
from app.core.database import get_db
@@ -13,6 +15,32 @@ from app.models.user import User
router = APIRouter()
# Marks a short-lived OIDC `state` token as an account-link request (as opposed
# to a normal sign-in), so the callback attaches the passkey to a known user
# instead of creating/looking-up by identity.
LINK_STATE_PURPOSE = "pocketid-link"
def _make_link_state(user_id: int) -> str:
"""Signed, short-lived token carrying 'link this passkey to user_id' intent."""
return create_access_token(
{"sub": str(user_id), "purpose": LINK_STATE_PURPOSE},
expires_delta=timedelta(minutes=10),
)
def _decode_link_state(state: Optional[str]) -> Optional[int]:
"""Return the user id from a valid link-state token, else None."""
if not state:
return None
try:
payload = jwt.decode(state, settings.secret_key, algorithms=[settings.algorithm])
if payload.get("purpose") != LINK_STATE_PURPOSE:
return None
return int(payload["sub"])
except (JWTError, KeyError, TypeError, ValueError):
return None
async def _config_admin(db: AsyncSession):
"""The admin row that holds instance-wide PocketID settings.
@@ -79,6 +107,7 @@ class UserOut(BaseModel):
username: str
email: Optional[str]
is_admin: bool
has_passkey: bool = False
class Config:
from_attributes = True
@@ -102,7 +131,13 @@ async def login(
@router.get("/me", response_model=UserOut)
async def get_me(current_user: User = Depends(get_current_user)):
return current_user
return UserOut(
id=current_user.id,
username=current_user.username,
email=current_user.email,
is_admin=current_user.is_admin,
has_passkey=current_user.pocketid_sub is not None,
)
@router.get("/pocketid/available")
@@ -126,8 +161,32 @@ async def pocketid_login_url(db: AsyncSession = Depends(get_db)):
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
@router.get("/pocketid/link-url")
async def pocketid_link_url(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Authenticated user starts an OIDC flow to attach a passkey to THEIR account.
The `state` carries a signed 'link to this user' token so the callback links
the returned identity instead of creating/matching a new account.
"""
issuer, client_id, _ = await _get_pocketid_config(db)
if not issuer or not client_id:
raise HTTPException(status_code=404, detail="PocketID not configured")
from urllib.parse import urlencode
params = {
"client_id": client_id,
"redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback",
"response_type": "code",
"scope": "openid profile email groups",
"state": _make_link_state(current_user.id),
}
return {"url": f"{issuer}/authorize?{urlencode(params)}"}
@router.get("/pocketid/callback")
async def pocketid_callback(code: str, db: AsyncSession = Depends(get_db)):
async def pocketid_callback(code: str, state: Optional[str] = None, db: AsyncSession = Depends(get_db)):
issuer, client_id, client_secret = await _get_pocketid_config(db)
if not issuer:
raise HTTPException(status_code=404, detail="PocketID not configured")
@@ -155,6 +214,30 @@ async def pocketid_callback(code: str, db: AsyncSession = Depends(get_db)):
email = userinfo.get("email")
preferred_username = userinfo.get("preferred_username") or email
# ── Explicit account-link flow ──────────────────────────────────────────
# Initiated by an already-authenticated user from their profile. Attach the
# passkey to that account. No group gating here: this is identity linking,
# not access control, and the initiator is already an authorised user.
link_user_id = _decode_link_state(state)
if link_user_id is not None:
result = await db.execute(select(User).where(User.pocketid_sub == sub))
holder = result.scalar_one_or_none()
if holder and holder.id != link_user_id:
# This passkey is already attached to a different account.
return RedirectResponse(url="/login?auth_error=passkey_in_use")
result = await db.execute(select(User).where(User.id == link_user_id))
target = result.scalar_one_or_none()
if target is None:
return RedirectResponse(url="/login?auth_error=link_failed")
target.pocketid_sub = sub
if not target.email and email:
dup = await db.execute(
select(User).where(User.email == email, User.id != target.id)
)
if dup.scalar_one_or_none() is None:
target.email = email
return RedirectResponse(url="/profile?linked=1")
# Group gating: if an allowed group is configured, the user must be in it.
allowed_group = await _get_allowed_group(db)
if allowed_group: