From 8104ca5ed050da671b10e187437878a4ef9e6bc4 Mon Sep 17 00:00:00 2001 From: owain Date: Sat, 6 Jun 2026 15:50:25 +0100 Subject: [PATCH] Route wellness FIT files to health parser, parse HR/HRV/sleep/stress/SpO2 --- backend/app/workers/tasks.py | 261 ++++++++++++++++++++++++++++++++--- 1 file changed, 242 insertions(+), 19 deletions(-) diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index 106e5d2..3fc9f96 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -24,10 +24,33 @@ celery_app.conf.update( worker_prefetch_multiplier=1, ) +# Garmin FIT file suffixes that are health/wellness data, not activities +WELLNESS_SUFFIXES = ( + "_METRICS.fit", + "_WELLNESS.fit", + "_SLEEP.fit", + "_STRESS.fit", + "_SPO2.fit", + "_HRV.fit", + "_MONITORING.fit", + "_MONITORING_B.fit", +) + + +def is_wellness_file(file_path: str) -> bool: + name = file_path.upper() + return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES) + @celery_app.task(bind=True, name="process_activity_file") def process_activity_file(self, file_path: str, user_id: int, source_type: str): - """Parse a FIT/GPX file and insert activity + data points into DB.""" + """Parse a FIT/GPX file. Routes wellness files to health parser.""" + + # Route wellness/metrics files to health parser instead + if is_wellness_file(file_path): + parse_wellness_fit.delay(file_path, user_id) + return {"status": "routed_to_wellness", "file": file_path} + from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones from app.core.database import SyncSessionLocal from app.models.user import Activity, ActivityDataPoint, ActivityLap @@ -44,8 +67,12 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): except Exception as e: raise self.retry(exc=e, countdown=10, max_retries=3) + # Skip files with no usable activity data + if not parsed.get("start_time"): + return {"status": "skipped", "reason": "no start_time", "file": file_path} + with SyncSessionLocal() as db: - # Check for duplicate + # Check for duplicate by garmin activity ID if parsed.get("garmin_activity_id"): existing = db.execute( select(Activity).where( @@ -60,7 +87,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): parsed.get("max_heart_rate") or 190 ) - start_time = datetime.fromisoformat(parsed["start_time"]) if parsed.get("start_time") else None + start_time = datetime.fromisoformat(parsed["start_time"]) activity = Activity( user_id=user_id, @@ -90,8 +117,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): db.add(activity) db.flush() - # Insert data points in batches - dedupe (activity_id, timestamp) pairs - # since composite PK rejects duplicates and Garmin sometimes has same-second readings + # Insert data points, deduping on (activity_id, timestamp) seen = set() points = parsed.get("data_points", []) batch = [] @@ -142,11 +168,189 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): db.commit() activity_id = activity.id - # Queue PR calculation compute_personal_records.delay(activity_id, user_id, parsed) return {"activity_id": activity_id, "status": "ok"} +@celery_app.task(name="parse_wellness_fit") +def parse_wellness_fit(file_path: str, user_id: int): + """ + Parse a Garmin wellness/metrics FIT file and upsert into health_metrics. + These files contain resting HR, HRV, sleep, stress, SpO2 etc. + """ + import fitparse + from app.core.database import SyncSessionLocal + from app.models.user import HealthMetric + from sqlalchemy import select, func + from datetime import datetime, timezone, date + + try: + fit = fitparse.FitFile(file_path) + except Exception as e: + return {"status": "error", "error": str(e)} + + # Collect all monitoring/daily summary records keyed by date + daily = {} # date -> dict of fields + + def get_or_create_day(d: date) -> dict: + if d not in daily: + daily[d] = {} + return daily[d] + + for record in fit.get_messages(): + name = record.name + fields = {f.name: f.value for f in record if f.value is not None} + + if name == "monitoring_info": + ts = fields.get("timestamp") or fields.get("local_timestamp") + if ts: + d = ts.date() if hasattr(ts, "date") else None + if d: + day = get_or_create_day(d) + day.setdefault("resting_hr", fields.get("resting_heart_rate")) + + elif name == "monitoring": + ts = fields.get("timestamp") or fields.get("local_timestamp") + if not ts: + continue + d = ts.date() if hasattr(ts, "date") else None + if not d: + continue + day = get_or_create_day(d) + # Accumulate steps (they're stored as increments) + if "steps" in fields: + day["steps"] = day.get("steps", 0) + int(fields["steps"]) + if "heart_rate" in fields: + hrs = day.setdefault("heart_rates", []) + hrs.append(int(fields["heart_rate"])) + if "stress_level_value" in fields: + stresses = day.setdefault("stress_values", []) + stresses.append(int(fields["stress_level_value"])) + + elif name == "hrv_status_summary": + ts = fields.get("timestamp") + if ts: + d = ts.date() if hasattr(ts, "date") else None + if d: + day = get_or_create_day(d) + day.setdefault("hrv_nightly_avg", fields.get("weekly_average")) + day.setdefault("hrv_5min_high", fields.get("last_night_5_min_high")) + day.setdefault("hrv_status", str(fields.get("hrv_status", ""))) + + elif name == "sleep_level": + ts = fields.get("timestamp") + if ts: + d = ts.date() if hasattr(ts, "date") else None + if d: + day = get_or_create_day(d) + levels = day.setdefault("sleep_levels", []) + levels.append(fields.get("sleep_level")) + + elif name == "stress": + ts = fields.get("timestamp") + if ts: + d = ts.date() if hasattr(ts, "date") else None + if d: + day = get_or_create_day(d) + if "stress_level_value" in fields: + stresses = day.setdefault("stress_values", []) + stresses.append(int(fields["stress_level_value"])) + + elif name == "spo2_data": + ts = fields.get("timestamp") + if ts: + d = ts.date() if hasattr(ts, "date") else None + if d: + day = get_or_create_day(d) + readings = day.setdefault("spo2_readings", []) + if "spo2_percent" in fields: + readings.append(fields["spo2_percent"]) + + if not daily: + return {"status": "no_data", "file": file_path} + + # Upsert into health_metrics + with SyncSessionLocal() as db: + for day_date, data in daily.items(): + # Compute averages from raw readings + hrs = data.pop("heart_rates", []) + stresses = data.pop("stress_values", []) + spo2s = data.pop("spo2_readings", []) + sleep_levels = data.pop("sleep_levels", []) + + resting_hr = data.get("resting_hr") + avg_hr = (sum(hrs) / len(hrs)) if hrs else None + avg_stress = (sum(stresses) / len(stresses)) if stresses else None + spo2_avg = (sum(spo2s) / len(spo2s)) if spo2s else None + + # Rough sleep stage breakdown from level codes + # Garmin sleep levels: 0=unmeasurable, 1=awake, 2=light, 3=deep, 4=rem + sleep_deep_s = sum(30 for l in sleep_levels if l == 3) if sleep_levels else None + sleep_light_s = sum(30 for l in sleep_levels if l == 2) if sleep_levels else None + sleep_rem_s = sum(30 for l in sleep_levels if l == 4) if sleep_levels else None + sleep_awake_s = sum(30 for l in sleep_levels if l == 1) if sleep_levels else None + sleep_duration_s = ( + (sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0) + ) or None + + date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc) + + # Check for existing record + existing = db.execute( + select(HealthMetric).where( + HealthMetric.user_id == user_id, + func.date(HealthMetric.date) == day_date, + ) + ).scalar_one_or_none() + + if existing: + # Update only fields we have data for + if resting_hr: + existing.resting_hr = resting_hr + if avg_hr: + existing.avg_hr_day = avg_hr + if avg_stress: + existing.avg_stress = avg_stress + if spo2_avg: + existing.spo2_avg = spo2_avg + if data.get("hrv_nightly_avg"): + existing.hrv_nightly_avg = data["hrv_nightly_avg"] + if data.get("hrv_5min_high"): + existing.hrv_5min_high = data["hrv_5min_high"] + if data.get("hrv_status"): + existing.hrv_status = data["hrv_status"] + if data.get("steps"): + existing.steps = data["steps"] + if sleep_duration_s: + existing.sleep_duration_s = sleep_duration_s + existing.sleep_deep_s = sleep_deep_s + existing.sleep_light_s = sleep_light_s + existing.sleep_rem_s = sleep_rem_s + existing.sleep_awake_s = sleep_awake_s + else: + db.add(HealthMetric( + user_id=user_id, + date=date_dt, + resting_hr=resting_hr, + avg_hr_day=avg_hr, + avg_stress=avg_stress, + spo2_avg=spo2_avg, + hrv_nightly_avg=data.get("hrv_nightly_avg"), + hrv_5min_high=data.get("hrv_5min_high"), + hrv_status=data.get("hrv_status"), + steps=data.get("steps"), + sleep_duration_s=sleep_duration_s, + sleep_deep_s=sleep_deep_s, + sleep_light_s=sleep_light_s, + sleep_rem_s=sleep_rem_s, + sleep_awake_s=sleep_awake_s, + )) + + db.commit() + + return {"status": "ok", "days_processed": len(daily), "file": file_path} + + @celery_app.task(name="compute_personal_records") def compute_personal_records(activity_id: int, user_id: int, parsed: dict): """Calculate personal records for standard distances from this activity.""" @@ -197,11 +401,12 @@ def compute_personal_records(activity_id: int, user_id: int, parsed: dict): @celery_app.task(name="process_garmin_health_zip") def process_garmin_health_zip(zip_path: str, user_id: int): - """Extract wellness/sleep/HRV data from a Garmin Connect export ZIP.""" + """Extract wellness data from a Garmin Connect export ZIP.""" import zipfile import json from app.core.database import SyncSessionLocal from app.models.user import HealthMetric + from sqlalchemy import select, func from datetime import datetime, timezone with SyncSessionLocal() as db: @@ -224,17 +429,35 @@ def process_garmin_health_zip(zip_path: str, user_id: int): except ValueError: continue - metric = HealthMetric( - user_id=user_id, - date=date, - resting_hr=data.get("restingHeartRate"), - steps=data.get("totalSteps"), - floors_climbed=data.get("floorsAscended"), - active_calories=data.get("activeKilocalories"), - total_calories=data.get("totalKilocalories"), - avg_stress=data.get("averageStressLevel"), - spo2_avg=data.get("avgSpo2"), - ) - db.add(metric) + existing = db.execute( + select(HealthMetric).where( + HealthMetric.user_id == user_id, + func.date(HealthMetric.date) == date.date(), + ) + ).scalar_one_or_none() + + if existing: + if data.get("restingHeartRate"): + existing.resting_hr = data["restingHeartRate"] + if data.get("totalSteps"): + existing.steps = data["totalSteps"] + if data.get("activeKilocalories"): + existing.active_calories = data["activeKilocalories"] + if data.get("averageStressLevel"): + existing.avg_stress = data["averageStressLevel"] + if data.get("avgSpo2"): + existing.spo2_avg = data["avgSpo2"] + else: + db.add(HealthMetric( + user_id=user_id, + date=date, + resting_hr=data.get("restingHeartRate"), + steps=data.get("totalSteps"), + floors_climbed=data.get("floorsAscended"), + active_calories=data.get("activeKilocalories"), + total_calories=data.get("totalKilocalories"), + avg_stress=data.get("averageStressLevel"), + spo2_avg=data.get("avgSpo2"), + )) db.commit() \ No newline at end of file