From f5d91cf8aec658024a976517f5df3894f88d2652 Mon Sep 17 00:00:00 2001 From: owain Date: Mon, 8 Jun 2026 10:17:51 +0100 Subject: [PATCH] Fix Garmin full export import: UDSFile health data and nested zip FIT files Garmin Connect exports use UDSFile_*.json (not DailyMetrics) for daily wellness summaries, and pack activity FIT files inside nested sub-zips under DI-Connect-Uploaded-Files/ rather than at the top level. - process_garmin_health_zip: match UDSFile_*.json instead of DailyMetrics, handle list-of-records format, extract stress from allDayStress.aggregatorList, convert floorsAscendedInMeters to floor count - upload_garmin_export: recurse into nested .zip files to find and queue individual activity FIT files Co-Authored-By: Claude Sonnet 4.6 --- backend/app/api/upload.py | 16 ++++++ backend/app/workers/tasks.py | 97 +++++++++++++++++++++++------------- 2 files changed, 78 insertions(+), 35 deletions(-) diff --git a/backend/app/api/upload.py b/backend/app/api/upload.py index 40d6ac4..a390e88 100644 --- a/backend/app/api/upload.py +++ b/backend/app/api/upload.py @@ -75,6 +75,22 @@ async def upload_garmin_export( fit_path = extract_dir / name task = process_activity_file.delay(str(fit_path), current_user.id, "fit") task_ids.append(task.id) + elif lower.endswith(".zip"): + # Garmin exports nest activity FIT files inside sub-zips + # (e.g. DI-Connect-Uploaded-Files/UploadedFiles_*_Part*.zip) + nested_zip_path = extract_dir / name + nested_extract = nested_zip_path.parent / nested_zip_path.stem + nested_extract.mkdir(exist_ok=True) + try: + with zipfile.ZipFile(nested_zip_path) as nzf: + nzf.extractall(nested_extract) + for nested_name in nzf.namelist(): + if nested_name.lower().endswith(".fit"): + fit_path = nested_extract / nested_name + task = process_activity_file.delay(str(fit_path), current_user.id, "fit") + task_ids.append(task.id) + except zipfile.BadZipFile: + pass # Queue health/wellness data extraction health_task = process_garmin_health_zip.delay(str(dest), current_user.id) diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index caf32b5..80ac5cd 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -420,50 +420,77 @@ def process_garmin_health_zip(zip_path: str, user_id: int): from datetime import datetime, timezone from sqlalchemy import text + INSERT_SQL = text(""" + INSERT INTO health_metrics (user_id, date, resting_hr, steps, + floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) + VALUES (:user_id, :date, :resting_hr, :steps, + :floors, :active_cal, :total_cal, :stress, :spo2) + ON CONFLICT (user_id, date) DO UPDATE SET + resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), + steps = COALESCE(EXCLUDED.steps, health_metrics.steps), + floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), + active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), + total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), + avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), + spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) + """) + + def _extract_stress(item): + stress_data = item.get("allDayStress") + if not stress_data or not isinstance(stress_data, dict): + return item.get("averageStressLevel") + for agg in stress_data.get("aggregatorList", []): + if agg.get("type") == "TOTAL": + return agg.get("averageStressLevel") + return None + + def _floors_from_item(item): + # UDS format reports meters; 1 floor = 3.048 m + meters = item.get("floorsAscendedInMeters") + if meters is not None: + return round(meters / 3.048) + return item.get("floorsAscended") + + def _process_record(db, item): + date_str = item.get("calendarDate") or item.get("date") + if not date_str: + return + try: + date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) + except ValueError: + return + db.execute(INSERT_SQL, { + "user_id": user_id, "date": date_dt, + "resting_hr": item.get("restingHeartRate"), + "steps": item.get("totalSteps"), + "floors": _floors_from_item(item), + "active_cal": item.get("activeKilocalories"), + "total_cal": item.get("totalKilocalories"), + "stress": _extract_stress(item), + "spo2": item.get("avgSpo2"), + }) + with SyncSessionLocal() as db: with zipfile.ZipFile(zip_path) as zf: for name in zf.namelist(): - if "DailyMetrics" not in name or not name.endswith(".json"): + if not name.endswith(".json"): + continue + # Garmin Connect export stores daily summaries in UDSFile_*.json + # (DI-Connect-Aggregator). Older/alternative exports may use DailyMetrics. + is_uds = "UDSFile" in name + is_legacy = "DailyMetrics" in name + if not (is_uds or is_legacy): continue with zf.open(name) as f: try: data = json.load(f) except Exception: continue - - date_str = data.get("calendarDate") or data.get("date") - if not date_str: - continue - - try: - date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) - except ValueError: - continue - - db.execute(text(""" - INSERT INTO health_metrics (user_id, date, resting_hr, steps, - floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) - VALUES (:user_id, :date, :resting_hr, :steps, - :floors, :active_cal, :total_cal, :stress, :spo2) - ON CONFLICT (user_id, date) DO UPDATE SET - resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), - steps = COALESCE(EXCLUDED.steps, health_metrics.steps), - floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), - active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), - total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), - avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), - spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) - """), { - "user_id": user_id, "date": date_dt, - "resting_hr": data.get("restingHeartRate"), - "steps": data.get("totalSteps"), - "floors": data.get("floorsAscended"), - "active_cal": data.get("activeKilocalories"), - "total_cal": data.get("totalKilocalories"), - "stress": data.get("averageStressLevel"), - "spo2": data.get("avgSpo2"), - }) - + # UDS files are lists of daily records; legacy format is a single object + records = data if isinstance(data, list) else [data] + for item in records: + if isinstance(item, dict): + _process_record(db, item) db.commit()