Fix duplicate detection, add wellness suffixes, add reprocess endpoint
Build and push images / validate (push) Successful in 2s
Build and push images / build-backend (push) Successful in 5s
Build and push images / build-worker (push) Successful in 5s
Build and push images / build-frontend (push) Successful in 5s

This commit is contained in:
2026-06-06 19:02:42 +01:00
parent 93b8f00f94
commit e9811d8d83
2 changed files with 80 additions and 55 deletions
+36 -2
View File
@@ -1,6 +1,6 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc
from sqlalchemy import select, func, desc, delete
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
@@ -126,7 +126,6 @@ async def get_data_points(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Verify ownership
act = await db.execute(
select(Activity).where(
Activity.id == activity_id,
@@ -211,3 +210,38 @@ async def delete_activity(
raise HTTPException(status_code=404, detail="Activity not found")
await db.delete(activity)
await db.commit()
@router.post("/{activity_id}/reprocess")
async def reprocess_activity(
activity_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Re-parse the source FIT file and update polyline, data points etc."""
import os
result = await db.execute(
select(Activity).where(
Activity.id == activity_id,
Activity.user_id == current_user.id,
)
)
activity = result.scalar_one_or_none()
if not activity:
raise HTTPException(status_code=404, detail="Activity not found")
if not activity.source_file:
raise HTTPException(status_code=400, detail="No source file stored for this activity")
if not os.path.exists(activity.source_file):
raise HTTPException(status_code=404, detail="Source file no longer exists on disk")
source_file = activity.source_file
source_type = activity.source_type or "fit"
await db.execute(delete(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity_id))
await db.execute(delete(ActivityLap).where(ActivityLap.activity_id == activity_id))
await db.delete(activity)
await db.commit()
from app.workers.tasks import process_activity_file
task = process_activity_file.delay(source_file, current_user.id, source_type)
return {"task_id": task.id, "status": "queued"}
+43 -52
View File
@@ -24,16 +24,19 @@ celery_app.conf.update(
worker_prefetch_multiplier=1,
)
# Garmin FIT file suffixes that are health/wellness data, not activities
WELLNESS_SUFFIXES = (
"_METRICS.fit",
"_WELLNESS.fit",
"_SLEEP.fit",
"_SLEEP_DATA.fit",
"_STRESS.fit",
"_SPO2.fit",
"_HRV.fit",
"_HRV_STATUS.fit",
"_MONITORING.fit",
"_MONITORING_B.fit",
"_RESPIRATION.fit",
"_PULSE_OX.fit",
)
@@ -46,7 +49,6 @@ def is_wellness_file(file_path: str) -> bool:
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
# Route wellness/metrics files to health parser instead
if is_wellness_file(file_path):
parse_wellness_fit.delay(file_path, user_id)
return {"status": "routed_to_wellness", "file": file_path}
@@ -54,7 +56,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
from app.core.database import SyncSessionLocal
from app.models.user import Activity, ActivityDataPoint, ActivityLap
from sqlalchemy import select
from sqlalchemy import select, func
from datetime import datetime
self.update_state(state="PROGRESS", meta={"step": "parsing"})
@@ -67,25 +69,31 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
except Exception as e:
raise self.retry(exc=e, countdown=10, max_retries=3)
# Skip files with no usable activity data
if not parsed.get("start_time"):
return {"status": "skipped", "reason": "no start_time", "file": file_path}
with SyncSessionLocal() as db:
# Check for duplicate by garmin activity ID
if parsed.get("garmin_activity_id"):
existing = db.execute(
select(Activity).where(
Activity.garmin_activity_id == parsed["garmin_activity_id"]
)
).scalar_one_or_none()
if existing:
return {"activity_id": existing.id, "status": "duplicate"}
start_time = datetime.fromisoformat(parsed["start_time"])
# Get user's configured max HR for accurate zone calculation
# Falls back to: user-set value → 220-age → activity max → 190
# Deduplicate by (user_id, sport_type, start_time date + within 60s)
existing = db.execute(
select(Activity).where(
Activity.user_id == user_id,
Activity.sport_type == parsed["sport_type"],
func.date(Activity.start_time) == start_time.date(),
Activity.start_time >= start_time.replace(second=0, microsecond=0),
)
).scalar_one_or_none()
if existing:
return {"activity_id": existing.id, "status": "duplicate"}
# Get user max HR for zone calculation
from app.models.user import User as UserModel
user_obj = db.execute(select(UserModel).where(UserModel.id == user_id)).scalar_one_or_none()
user_obj = db.execute(
select(UserModel).where(UserModel.id == user_id)
).scalar_one_or_none()
user_max_hr = None
if user_obj:
user_max_hr = user_obj.max_heart_rate
@@ -94,15 +102,9 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
age = _date.today().year - user_obj.birth_year
user_max_hr = 220 - age
if not user_max_hr:
# Last resort: use activity max but warn this may shift zones
user_max_hr = parsed.get("max_heart_rate") or 190
hr_zones = calculate_hr_zones(
parsed.get("data_points", []),
user_max_hr
)
start_time = datetime.fromisoformat(parsed["start_time"])
hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr)
activity = Activity(
user_id=user_id,
@@ -132,11 +134,9 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
db.add(activity)
db.flush()
# Insert data points, deduping on (activity_id, timestamp)
seen = set()
points = parsed.get("data_points", [])
batch = []
for p in points:
for p in parsed.get("data_points", []):
if not p.get("timestamp"):
continue
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
@@ -165,7 +165,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
db.add_all(batch)
db.flush()
# Laps
for lap in parsed.get("laps", []):
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
db.add(ActivityLap(
@@ -184,7 +183,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
activity_id = activity.id
compute_personal_records.delay(activity_id, user_id, parsed)
# Auto route detection for running and cycling
if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"):
detect_route.delay(activity_id, user_id)
return {"activity_id": activity_id, "status": "ok"}
@@ -192,16 +190,17 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
@celery_app.task(name="parse_wellness_fit")
def parse_wellness_fit(file_path: str, user_id: int):
"""
Parse a Garmin wellness/metrics FIT file and upsert into health_metrics.
Uses wellness_parser which handles standard FIT + Garmin proprietary messages.
"""
"""Parse a Garmin wellness FIT file and upsert into health_metrics."""
from app.services.wellness_parser import parse_wellness_fit as _parse
from app.core.database import SyncSessionLocal
from datetime import datetime, timezone
from sqlalchemy import text
result = _parse(file_path)
try:
result = _parse(file_path)
except Exception as e:
return {"status": "error", "error": str(e), "file": file_path}
if result.get("error"):
return {"status": "error", "error": result["error"], "file": file_path}
@@ -263,30 +262,26 @@ def parse_wellness_fit(file_path: str, user_id: int):
return {"status": "ok", "days_processed": len(days), "file": file_path}
@celery_app.task(name="detect_route")
def detect_route(activity_id: int, user_id: int):
"""
After importing an activity, check if it matches any existing named routes.
If two+ unassigned activities match each other, auto-create a named route.
"""
"""Auto-detect and link activities to named routes."""
from app.services.route_matcher import routes_are_similar
from app.core.database import SyncSessionLocal
from app.models.user import Activity, NamedRoute
from sqlalchemy import select
with SyncSessionLocal() as db:
# Get the new activity
new_act = db.execute(
select(Activity).where(Activity.id == activity_id)
).scalar_one_or_none()
if not new_act or not new_act.polyline:
return {"status": "no_polyline"}
# Already assigned to a route?
if new_act.named_route_id:
return {"status": "already_assigned"}
# Check against existing named routes first
routes = db.execute(
select(NamedRoute).where(
NamedRoute.user_id == user_id,
@@ -303,7 +298,6 @@ def detect_route(activity_id: int, user_id: int):
db.commit()
return {"status": "matched_existing", "route_id": route.id}
# No existing route matched - check unassigned activities for a match
candidates = db.execute(
select(Activity).where(
Activity.user_id == user_id,
@@ -311,7 +305,6 @@ def detect_route(activity_id: int, user_id: int):
Activity.named_route_id == None,
Activity.id != activity_id,
Activity.polyline != None,
# Within 20% distance
Activity.distance_m >= (new_act.distance_m or 0) * 0.8,
Activity.distance_m <= (new_act.distance_m or 0) * 1.2,
)
@@ -322,7 +315,6 @@ def detect_route(activity_id: int, user_id: int):
new_act.polyline, candidate.polyline,
new_act.bounding_box, candidate.bounding_box,
):
# Auto-create a route from the older activity
older = candidate if candidate.start_time < new_act.start_time else new_act
newer = new_act if candidate.start_time < new_act.start_time else candidate
@@ -400,8 +392,8 @@ def process_garmin_health_zip(zip_path: str, user_id: int):
import zipfile
import json
from app.core.database import SyncSessionLocal
from app.models.user import HealthMetric
from datetime import datetime, timezone
from sqlalchemy import text
with SyncSessionLocal() as db:
with zipfile.ZipFile(zip_path) as zf:
@@ -423,20 +415,19 @@ def process_garmin_health_zip(zip_path: str, user_id: int):
except ValueError:
continue
from sqlalchemy import text as _text
db.execute(_text("""
db.execute(text("""
INSERT INTO health_metrics (user_id, date, resting_hr, steps,
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
VALUES (:user_id, :date, :resting_hr, :steps,
:floors, :active_cal, :total_cal, :stress, :spo2)
ON CONFLICT (user_id, date) DO UPDATE SET
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
"""), {
"user_id": user_id, "date": date_dt,
"resting_hr": data.get("restingHeartRate"),