import os import zipfile from pathlib import Path from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, BackgroundTasks from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.core.security import get_current_user from app.core.config import settings from app.models.user import User from app.workers.tasks import process_activity_file, process_garmin_health_zip router = APIRouter() MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB upload cap MAX_EXTRACT_SIZE = 4 * 1024 * 1024 * 1024 # 4 GB total uncompressed cap (zip-bomb guard) _CHUNK = 1024 * 1024 def _safe_name(filename: str) -> str: """Reduce an uploaded filename to a safe basename — no path traversal.""" name = os.path.basename((filename or "").replace("\\", "/")) if not name or name in (".", ".."): raise HTTPException(status_code=400, detail="Invalid filename") return name def save_upload(upload: UploadFile, dest_dir: Path) -> Path: """Stream an upload to disk under dest_dir, enforcing the size cap.""" dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / _safe_name(upload.filename) size = 0 with open(dest, "wb") as f: while True: chunk = upload.file.read(_CHUNK) if not chunk: break size += len(chunk) if size > MAX_FILE_SIZE: f.close() dest.unlink(missing_ok=True) raise HTTPException(status_code=413, detail="File exceeds the 500 MB limit") f.write(chunk) return dest def _safe_extract(zf: zipfile.ZipFile, dest_dir: Path) -> list[Path]: """Extract a zip safely: skip path-traversal members, cap total uncompressed bytes (zip-bomb guard). Returns the list of extracted regular-file paths.""" dest_dir.mkdir(parents=True, exist_ok=True) dest_root = dest_dir.resolve() total = 0 extracted: list[Path] = [] for info in zf.infolist(): if info.is_dir(): continue target = (dest_root / info.filename).resolve() # Reject absolute paths and ../ traversal: the target must stay under dest_root. if target != dest_root and dest_root not in target.parents: continue target.parent.mkdir(parents=True, exist_ok=True) with zf.open(info) as src, open(target, "wb") as out: while True: chunk = src.read(_CHUNK) if not chunk: break total += len(chunk) if total > MAX_EXTRACT_SIZE: out.close() target.unlink(missing_ok=True) raise HTTPException(status_code=413, detail="Archive expands beyond the size limit") out.write(chunk) extracted.append(target) return extracted @router.post("/activity") async def upload_activity( file: UploadFile = File(...), background_tasks: BackgroundTasks = None, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Upload a single .fit or .gpx activity file.""" suffix = Path(file.filename).suffix.lower() if suffix not in {".fit", ".gpx"}: raise HTTPException(status_code=400, detail="Only .fit and .gpx files are supported") dest_dir = Path(settings.file_store_path) / str(current_user.id) / "activities" dest = save_upload(file, dest_dir) # Queue processing task = process_activity_file.delay(str(dest), current_user.id, suffix[1:]) return {"task_id": task.id, "status": "queued", "filename": file.filename} @router.post("/garmin-export") async def upload_garmin_export( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Upload a full Garmin Connect data export ZIP. Processes all FIT files for activities + wellness data. """ if not file.filename.endswith(".zip"): raise HTTPException(status_code=400, detail="Please upload a .zip Garmin export") dest_dir = Path(settings.file_store_path) / str(current_user.id) / "exports" dest = save_upload(file, dest_dir) # Extract (safely) and queue all FIT files extract_dir = dest_dir / f"garmin_{dest.stem}" task_ids = [] try: with zipfile.ZipFile(dest) as zf: extracted = _safe_extract(zf, extract_dir) except zipfile.BadZipFile: dest.unlink(missing_ok=True) raise HTTPException(status_code=400, detail="Uploaded file is not a valid ZIP archive") has_health = False for path in extracted: suffix = path.suffix.lower() if suffix == ".fit": task = process_activity_file.delay(str(path), current_user.id, "fit") task_ids.append(task.id) elif suffix == ".json": has_health = True # Garmin wellness data is exported as JSON files elif suffix == ".zip": # Garmin exports nest activity FIT files inside sub-zips # (e.g. DI-Connect-Uploaded-Files/UploadedFiles_*_Part*.zip) nested_extract = path.parent / path.stem try: with zipfile.ZipFile(path) as nzf: nested = _safe_extract(nzf, nested_extract) except zipfile.BadZipFile: nested = [] for np in nested: if np.suffix.lower() == ".fit": task = process_activity_file.delay(str(np), current_user.id, "fit") task_ids.append(task.id) if not task_ids and not has_health: raise HTTPException( status_code=400, detail="No fitness data found in this archive — make sure you uploaded your full Garmin Connect export ZIP", ) # Queue health/wellness data extraction health_task = process_garmin_health_zip.delay(str(dest), current_user.id) return { "status": "queued", "activity_tasks": len(task_ids), "task_id": health_task.id, } @router.post("/strava-export") async def upload_strava_export( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Upload a Strava bulk export ZIP (contains activities/ folder with GPX/FIT files).""" if not file.filename.endswith(".zip"): raise HTTPException(status_code=400, detail="Please upload a .zip Strava export") dest_dir = Path(settings.file_store_path) / str(current_user.id) / "exports" dest = save_upload(file, dest_dir) extract_dir = dest_dir / f"strava_{dest.stem}" task_ids = [] try: with zipfile.ZipFile(dest) as zf: extracted = _safe_extract(zf, extract_dir) except zipfile.BadZipFile: dest.unlink(missing_ok=True) raise HTTPException(status_code=400, detail="Uploaded file is not a valid ZIP archive") for path in extracted: suffix = path.suffix.lower() if suffix in (".fit", ".gpx"): task = process_activity_file.delay(str(path), current_user.id, suffix[1:]) task_ids.append(task.id) if not task_ids: raise HTTPException( status_code=400, detail="No activity files (.fit or .gpx) found in this Strava archive", ) return { "status": "queued", "activity_tasks": len(task_ids), "task_id": task_ids[-1] if task_ids else None, } @router.get("/task/{task_id}") async def check_task_status( task_id: str, current_user: User = Depends(get_current_user), ): """Check the status of an upload processing task.""" from app.workers.celery_app import celery_app result = celery_app.AsyncResult(task_id) return { "task_id": task_id, "status": result.status, "result": result.result if result.ready() else None, }