from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel from typing import Optional from datetime import timedelta from jose import jwt, JWTError import httpx from app.core.database import get_db from app.core.security import verify_password, create_access_token, get_current_user from app.core.config import settings from app.models.user import User router = APIRouter() # Marks a short-lived OIDC `state` token as an account-link request (as opposed # to a normal sign-in), so the callback attaches the passkey to a known user # instead of creating/looking-up by identity. LINK_STATE_PURPOSE = "pocketid-link" LOGIN_STATE_PURPOSE = "pocketid-login" def _make_link_state(user_id: int) -> str: """Signed, short-lived token carrying 'link this passkey to user_id' intent.""" return create_access_token( {"sub": str(user_id), "purpose": LINK_STATE_PURPOSE}, expires_delta=timedelta(minutes=10), ) def _make_login_state() -> str: """Signed, short-lived CSRF token proving the login flow started from this app.""" return create_access_token( {"sub": "login", "purpose": LOGIN_STATE_PURPOSE}, expires_delta=timedelta(minutes=10), ) def _valid_login_state(state: Optional[str]) -> bool: """True if `state` is a valid, unexpired login-state token we issued.""" if not state: return False try: payload = jwt.decode(state, settings.secret_key, algorithms=[settings.algorithm]) return payload.get("purpose") == LOGIN_STATE_PURPOSE except JWTError: return False def _decode_link_state(state: Optional[str]) -> Optional[int]: """Return the user id from a valid link-state token, else None.""" if not state: return None try: payload = jwt.decode(state, settings.secret_key, algorithms=[settings.algorithm]) if payload.get("purpose") != LINK_STATE_PURPOSE: return None return int(payload["sub"]) except (JWTError, KeyError, TypeError, ValueError): return None async def _config_admin(db: AsyncSession): """The admin row that holds instance-wide PocketID settings. Settings live on an admin user row, but there can be more than one admin. Prefer the admin that actually has an issuer configured; otherwise fall back to the lowest-id admin. Without this, an unordered LIMIT 1 could return an admin with no config and make PocketID look disabled / gating inconsistent. """ result = await db.execute( select(User) .where(User.is_admin == True, User.pocketid_issuer.isnot(None)) .order_by(User.id) .limit(1) ) admin = result.scalar_one_or_none() if admin is None: result = await db.execute( select(User).where(User.is_admin == True).order_by(User.id).limit(1) ) admin = result.scalar_one_or_none() return admin async def _get_pocketid_config(db: AsyncSession): """Get PocketID config from DB (admin user) falling back to env vars.""" admin = await _config_admin(db) issuer = (admin and admin.pocketid_issuer) or settings.pocketid_issuer client_id = (admin and admin.pocketid_client_id) or settings.pocketid_client_id client_secret = (admin and admin.pocketid_client_secret) or settings.pocketid_client_secret return issuer, client_id, client_secret async def _get_allowed_group(db: AsyncSession): """Group a PocketID user must belong to in order to sign in (None = allow all).""" admin = await _config_admin(db) group = (admin and admin.pocketid_allowed_group) or settings.pocketid_allowed_group return (group or "").strip() or None async def _unique_username(db: AsyncSession, base: str) -> str: """Return `base`, or `base-2`, `base-3`, … until it is not already taken.""" base = (base or "user").strip() or "user" candidate = base n = 1 while True: existing = await db.execute(select(User).where(User.username == candidate)) if existing.scalar_one_or_none() is None: return candidate n += 1 candidate = f"{base}-{n}" class Token(BaseModel): access_token: str token_type: str user_id: int username: str is_admin: bool class UserOut(BaseModel): id: int username: str email: Optional[str] is_admin: bool has_passkey: bool = False class Config: from_attributes = True @router.post("/token", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(User).where(User.username == form_data.username)) user = result.scalar_one_or_none() if not user or not user.hashed_password: raise HTTPException(status_code=400, detail="Invalid credentials") if not verify_password(form_data.password, user.hashed_password): raise HTTPException(status_code=400, detail="Invalid credentials") token = create_access_token({"sub": str(user.id)}) return Token(access_token=token, token_type="bearer", user_id=user.id, username=user.username, is_admin=user.is_admin) @router.get("/me", response_model=UserOut) async def get_me(current_user: User = Depends(get_current_user)): return UserOut( id=current_user.id, username=current_user.username, email=current_user.email, is_admin=current_user.is_admin, has_passkey=current_user.pocketid_sub is not None, ) @router.get("/pocketid/available") async def pocketid_available(db: AsyncSession = Depends(get_db)): issuer, client_id, _ = await _get_pocketid_config(db) return {"available": bool(issuer and client_id)} @router.get("/pocketid/login-url") async def pocketid_login_url(db: AsyncSession = Depends(get_db)): issuer, client_id, _ = await _get_pocketid_config(db) if not issuer or not client_id: raise HTTPException(status_code=404, detail="PocketID not configured") from urllib.parse import urlencode params = { "client_id": client_id, "redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback", "response_type": "code", "scope": "openid profile email groups", "state": _make_login_state(), } return {"url": f"{issuer}/authorize?{urlencode(params)}"} @router.get("/pocketid/link-url") async def pocketid_link_url( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Authenticated user starts an OIDC flow to attach a passkey to THEIR account. The `state` carries a signed 'link to this user' token so the callback links the returned identity instead of creating/matching a new account. """ issuer, client_id, _ = await _get_pocketid_config(db) if not issuer or not client_id: raise HTTPException(status_code=404, detail="PocketID not configured") from urllib.parse import urlencode params = { "client_id": client_id, "redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback", "response_type": "code", "scope": "openid profile email groups", "state": _make_link_state(current_user.id), } return {"url": f"{issuer}/authorize?{urlencode(params)}"} @router.get("/pocketid/callback") async def pocketid_callback(code: str, state: Optional[str] = None, db: AsyncSession = Depends(get_db)): issuer, client_id, client_secret = await _get_pocketid_config(db) if not issuer: raise HTTPException(status_code=404, detail="PocketID not configured") async with httpx.AsyncClient() as client: resp = await client.post( f"{issuer}/api/oidc/token", data={"grant_type": "authorization_code", "code": code, "redirect_uri": f"{settings.base_url}/api/auth/pocketid/callback", "client_id": client_id, "client_secret": client_secret}, ) if resp.status_code != 200: print(f"PocketID token exchange failed ({resp.status_code}): {resp.text}") raise HTTPException(status_code=400, detail="Token exchange failed") tokens = resp.json() access_token = tokens.get("access_token") if not access_token: raise HTTPException(status_code=400, detail="Token exchange failed") userinfo_resp = await client.get( f"{issuer}/api/oidc/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) if userinfo_resp.status_code != 200: raise HTTPException(status_code=400, detail="Failed to fetch user info") userinfo = userinfo_resp.json() from fastapi.responses import RedirectResponse sub = userinfo.get("sub") email = userinfo.get("email") preferred_username = userinfo.get("preferred_username") or email # A missing subject means we cannot identify the user. Never continue, or the # `pocketid_sub == sub` (== None → IS NULL) lookups below would match any # password-only account and log the caller in as someone else. if not sub: return RedirectResponse(url="/login?auth_error=no_identity") # ── Explicit account-link flow ────────────────────────────────────────── # Initiated by an already-authenticated user from their profile. Attach the # passkey to that account. No group gating here: this is identity linking, # not access control, and the initiator is already an authorised user. link_user_id = _decode_link_state(state) if link_user_id is not None: result = await db.execute(select(User).where(User.pocketid_sub == sub)) holder = result.scalar_one_or_none() if holder and holder.id != link_user_id: # This passkey is already attached to a different account. return RedirectResponse(url="/login?auth_error=passkey_in_use") result = await db.execute(select(User).where(User.id == link_user_id)) target = result.scalar_one_or_none() if target is None: return RedirectResponse(url="/login?auth_error=link_failed") target.pocketid_sub = sub if not target.email and email: dup = await db.execute( select(User).where(User.email == email, User.id != target.id) ) if dup.scalar_one_or_none() is None: target.email = email return RedirectResponse(url="/profile?linked=1") # Normal sign-in: require the signed, short-lived state we issued in # /pocketid/login-url, so the callback can't be driven by an injected code. if not _valid_login_state(state): return RedirectResponse(url="/login?auth_error=invalid_state") # Group gating: if an allowed group is configured, the user must be in it. allowed_group = await _get_allowed_group(db) if allowed_group: groups = userinfo.get("groups") or [] if allowed_group not in groups: return RedirectResponse(url="/login?auth_error=not_authorized") # 1) Existing passkey identity → use it. result = await db.execute(select(User).where(User.pocketid_sub == sub)) user = result.scalar_one_or_none() # 2) No passkey identity yet, but an account with this email exists and is # not already linked to a different passkey → link them (preserves data). if not user and email: result = await db.execute(select(User).where(User.email == email)) existing = result.scalar_one_or_none() if existing and existing.pocketid_sub is None: existing.pocketid_sub = sub user = existing # 3) Otherwise provision a new account with a collision-safe username. if not user: base = preferred_username or (email.split("@")[0] if email else "user") username = await _unique_username(db, base) # Only set email if no other account already claims it (unique column). email_taken = False if email: dup = await db.execute(select(User).where(User.email == email)) email_taken = dup.scalar_one_or_none() is not None user = User(username=username, email=None if email_taken else email, pocketid_sub=sub) db.add(user) await db.flush() token = create_access_token({"sub": str(user.id)}) return RedirectResponse(url=f"/?token={token}")