From e9811d8d835237111808ce24d56b7f8921783b81 Mon Sep 17 00:00:00 2001 From: owain Date: Sat, 6 Jun 2026 19:02:42 +0100 Subject: [PATCH] Fix duplicate detection, add wellness suffixes, add reprocess endpoint --- backend/app/api/activities.py | 38 +++++++++++++- backend/app/workers/tasks.py | 97 ++++++++++++++++------------------- 2 files changed, 80 insertions(+), 55 deletions(-) diff --git a/backend/app/api/activities.py b/backend/app/api/activities.py index 0412448..2b08ef9 100644 --- a/backend/app/api/activities.py +++ b/backend/app/api/activities.py @@ -1,6 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, func, desc +from sqlalchemy import select, func, desc, delete from pydantic import BaseModel from typing import Optional, List from datetime import datetime @@ -126,7 +126,6 @@ async def get_data_points( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): - # Verify ownership act = await db.execute( select(Activity).where( Activity.id == activity_id, @@ -211,3 +210,38 @@ async def delete_activity( raise HTTPException(status_code=404, detail="Activity not found") await db.delete(activity) await db.commit() + + +@router.post("/{activity_id}/reprocess") +async def reprocess_activity( + activity_id: int, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Re-parse the source FIT file and update polyline, data points etc.""" + import os + result = await db.execute( + select(Activity).where( + Activity.id == activity_id, + Activity.user_id == current_user.id, + ) + ) + activity = result.scalar_one_or_none() + if not activity: + raise HTTPException(status_code=404, detail="Activity not found") + if not activity.source_file: + raise HTTPException(status_code=400, detail="No source file stored for this activity") + if not os.path.exists(activity.source_file): + raise HTTPException(status_code=404, detail="Source file no longer exists on disk") + + source_file = activity.source_file + source_type = activity.source_type or "fit" + + await db.execute(delete(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity_id)) + await db.execute(delete(ActivityLap).where(ActivityLap.activity_id == activity_id)) + await db.delete(activity) + await db.commit() + + from app.workers.tasks import process_activity_file + task = process_activity_file.delay(source_file, current_user.id, source_type) + return {"task_id": task.id, "status": "queued"} \ No newline at end of file diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index 5648dd3..10e1fb9 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -24,16 +24,19 @@ celery_app.conf.update( worker_prefetch_multiplier=1, ) -# Garmin FIT file suffixes that are health/wellness data, not activities WELLNESS_SUFFIXES = ( "_METRICS.fit", "_WELLNESS.fit", "_SLEEP.fit", + "_SLEEP_DATA.fit", "_STRESS.fit", "_SPO2.fit", "_HRV.fit", + "_HRV_STATUS.fit", "_MONITORING.fit", "_MONITORING_B.fit", + "_RESPIRATION.fit", + "_PULSE_OX.fit", ) @@ -46,7 +49,6 @@ def is_wellness_file(file_path: str) -> bool: def process_activity_file(self, file_path: str, user_id: int, source_type: str): """Parse a FIT/GPX file. Routes wellness files to health parser.""" - # Route wellness/metrics files to health parser instead if is_wellness_file(file_path): parse_wellness_fit.delay(file_path, user_id) return {"status": "routed_to_wellness", "file": file_path} @@ -54,7 +56,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones from app.core.database import SyncSessionLocal from app.models.user import Activity, ActivityDataPoint, ActivityLap - from sqlalchemy import select + from sqlalchemy import select, func from datetime import datetime self.update_state(state="PROGRESS", meta={"step": "parsing"}) @@ -67,25 +69,31 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): except Exception as e: raise self.retry(exc=e, countdown=10, max_retries=3) - # Skip files with no usable activity data if not parsed.get("start_time"): return {"status": "skipped", "reason": "no start_time", "file": file_path} with SyncSessionLocal() as db: - # Check for duplicate by garmin activity ID - if parsed.get("garmin_activity_id"): - existing = db.execute( - select(Activity).where( - Activity.garmin_activity_id == parsed["garmin_activity_id"] - ) - ).scalar_one_or_none() - if existing: - return {"activity_id": existing.id, "status": "duplicate"} + start_time = datetime.fromisoformat(parsed["start_time"]) - # Get user's configured max HR for accurate zone calculation - # Falls back to: user-set value → 220-age → activity max → 190 + # Deduplicate by (user_id, sport_type, start_time date + within 60s) + existing = db.execute( + select(Activity).where( + Activity.user_id == user_id, + Activity.sport_type == parsed["sport_type"], + func.date(Activity.start_time) == start_time.date(), + Activity.start_time >= start_time.replace(second=0, microsecond=0), + ) + ).scalar_one_or_none() + + if existing: + return {"activity_id": existing.id, "status": "duplicate"} + + # Get user max HR for zone calculation from app.models.user import User as UserModel - user_obj = db.execute(select(UserModel).where(UserModel.id == user_id)).scalar_one_or_none() + user_obj = db.execute( + select(UserModel).where(UserModel.id == user_id) + ).scalar_one_or_none() + user_max_hr = None if user_obj: user_max_hr = user_obj.max_heart_rate @@ -94,15 +102,9 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): age = _date.today().year - user_obj.birth_year user_max_hr = 220 - age if not user_max_hr: - # Last resort: use activity max but warn this may shift zones user_max_hr = parsed.get("max_heart_rate") or 190 - hr_zones = calculate_hr_zones( - parsed.get("data_points", []), - user_max_hr - ) - - start_time = datetime.fromisoformat(parsed["start_time"]) + hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr) activity = Activity( user_id=user_id, @@ -132,11 +134,9 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): db.add(activity) db.flush() - # Insert data points, deduping on (activity_id, timestamp) seen = set() - points = parsed.get("data_points", []) batch = [] - for p in points: + for p in parsed.get("data_points", []): if not p.get("timestamp"): continue ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"] @@ -165,7 +165,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): db.add_all(batch) db.flush() - # Laps for lap in parsed.get("laps", []): ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None db.add(ActivityLap( @@ -184,7 +183,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): activity_id = activity.id compute_personal_records.delay(activity_id, user_id, parsed) - # Auto route detection for running and cycling if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"): detect_route.delay(activity_id, user_id) return {"activity_id": activity_id, "status": "ok"} @@ -192,16 +190,17 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): @celery_app.task(name="parse_wellness_fit") def parse_wellness_fit(file_path: str, user_id: int): - """ - Parse a Garmin wellness/metrics FIT file and upsert into health_metrics. - Uses wellness_parser which handles standard FIT + Garmin proprietary messages. - """ + """Parse a Garmin wellness FIT file and upsert into health_metrics.""" from app.services.wellness_parser import parse_wellness_fit as _parse from app.core.database import SyncSessionLocal from datetime import datetime, timezone from sqlalchemy import text - result = _parse(file_path) + try: + result = _parse(file_path) + except Exception as e: + return {"status": "error", "error": str(e), "file": file_path} + if result.get("error"): return {"status": "error", "error": result["error"], "file": file_path} @@ -263,30 +262,26 @@ def parse_wellness_fit(file_path: str, user_id: int): return {"status": "ok", "days_processed": len(days), "file": file_path} + @celery_app.task(name="detect_route") def detect_route(activity_id: int, user_id: int): - """ - After importing an activity, check if it matches any existing named routes. - If two+ unassigned activities match each other, auto-create a named route. - """ + """Auto-detect and link activities to named routes.""" from app.services.route_matcher import routes_are_similar from app.core.database import SyncSessionLocal from app.models.user import Activity, NamedRoute from sqlalchemy import select with SyncSessionLocal() as db: - # Get the new activity new_act = db.execute( select(Activity).where(Activity.id == activity_id) ).scalar_one_or_none() + if not new_act or not new_act.polyline: return {"status": "no_polyline"} - # Already assigned to a route? if new_act.named_route_id: return {"status": "already_assigned"} - # Check against existing named routes first routes = db.execute( select(NamedRoute).where( NamedRoute.user_id == user_id, @@ -303,7 +298,6 @@ def detect_route(activity_id: int, user_id: int): db.commit() return {"status": "matched_existing", "route_id": route.id} - # No existing route matched - check unassigned activities for a match candidates = db.execute( select(Activity).where( Activity.user_id == user_id, @@ -311,7 +305,6 @@ def detect_route(activity_id: int, user_id: int): Activity.named_route_id == None, Activity.id != activity_id, Activity.polyline != None, - # Within 20% distance Activity.distance_m >= (new_act.distance_m or 0) * 0.8, Activity.distance_m <= (new_act.distance_m or 0) * 1.2, ) @@ -322,7 +315,6 @@ def detect_route(activity_id: int, user_id: int): new_act.polyline, candidate.polyline, new_act.bounding_box, candidate.bounding_box, ): - # Auto-create a route from the older activity older = candidate if candidate.start_time < new_act.start_time else new_act newer = new_act if candidate.start_time < new_act.start_time else candidate @@ -400,8 +392,8 @@ def process_garmin_health_zip(zip_path: str, user_id: int): import zipfile import json from app.core.database import SyncSessionLocal - from app.models.user import HealthMetric from datetime import datetime, timezone + from sqlalchemy import text with SyncSessionLocal() as db: with zipfile.ZipFile(zip_path) as zf: @@ -423,20 +415,19 @@ def process_garmin_health_zip(zip_path: str, user_id: int): except ValueError: continue - from sqlalchemy import text as _text - db.execute(_text(""" + db.execute(text(""" INSERT INTO health_metrics (user_id, date, resting_hr, steps, floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) VALUES (:user_id, :date, :resting_hr, :steps, :floors, :active_cal, :total_cal, :stress, :spo2) ON CONFLICT (user_id, date) DO UPDATE SET - resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), - steps = COALESCE(EXCLUDED.steps, health_metrics.steps), - floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), + resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), + steps = COALESCE(EXCLUDED.steps, health_metrics.steps), + floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), - total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), - avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), - spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) + total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), + avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), + spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) """), { "user_id": user_id, "date": date_dt, "resting_hr": data.get("restingHeartRate"), @@ -448,4 +439,4 @@ def process_garmin_health_zip(zip_path: str, user_id: int): "spo2": data.get("avgSpo2"), }) - db.commit() + db.commit() \ No newline at end of file