From 38632cfe4fd37c5d0d829ab9f9484c8a9a1ec177 Mon Sep 17 00:00:00 2001 From: owain Date: Sat, 6 Jun 2026 15:53:56 +0100 Subject: [PATCH] Use ON CONFLICT upsert for health metrics - fixes concurrent worker race condition --- backend/app/workers/tasks.py | 147 +++++++++++++++-------------------- 1 file changed, 61 insertions(+), 86 deletions(-) diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index 3fc9f96..f8e044d 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -181,7 +181,7 @@ def parse_wellness_fit(file_path: str, user_id: int): import fitparse from app.core.database import SyncSessionLocal from app.models.user import HealthMetric - from sqlalchemy import select, func + from sqlalchemy import text from datetime import datetime, timezone, date try: @@ -269,10 +269,9 @@ def parse_wellness_fit(file_path: str, user_id: int): if not daily: return {"status": "no_data", "file": file_path} - # Upsert into health_metrics + # Upsert into health_metrics using ON CONFLICT to handle concurrent workers with SyncSessionLocal() as db: for day_date, data in daily.items(): - # Compute averages from raw readings hrs = data.pop("heart_rates", []) stresses = data.pop("stress_values", []) spo2s = data.pop("spo2_readings", []) @@ -283,7 +282,6 @@ def parse_wellness_fit(file_path: str, user_id: int): avg_stress = (sum(stresses) / len(stresses)) if stresses else None spo2_avg = (sum(spo2s) / len(spo2s)) if spo2s else None - # Rough sleep stage breakdown from level codes # Garmin sleep levels: 0=unmeasurable, 1=awake, 2=light, 3=deep, 4=rem sleep_deep_s = sum(30 for l in sleep_levels if l == 3) if sleep_levels else None sleep_light_s = sum(30 for l in sleep_levels if l == 2) if sleep_levels else None @@ -295,56 +293,40 @@ def parse_wellness_fit(file_path: str, user_id: int): date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc) - # Check for existing record - existing = db.execute( - select(HealthMetric).where( - HealthMetric.user_id == user_id, - func.date(HealthMetric.date) == day_date, - ) - ).scalar_one_or_none() - - if existing: - # Update only fields we have data for - if resting_hr: - existing.resting_hr = resting_hr - if avg_hr: - existing.avg_hr_day = avg_hr - if avg_stress: - existing.avg_stress = avg_stress - if spo2_avg: - existing.spo2_avg = spo2_avg - if data.get("hrv_nightly_avg"): - existing.hrv_nightly_avg = data["hrv_nightly_avg"] - if data.get("hrv_5min_high"): - existing.hrv_5min_high = data["hrv_5min_high"] - if data.get("hrv_status"): - existing.hrv_status = data["hrv_status"] - if data.get("steps"): - existing.steps = data["steps"] - if sleep_duration_s: - existing.sleep_duration_s = sleep_duration_s - existing.sleep_deep_s = sleep_deep_s - existing.sleep_light_s = sleep_light_s - existing.sleep_rem_s = sleep_rem_s - existing.sleep_awake_s = sleep_awake_s - else: - db.add(HealthMetric( - user_id=user_id, - date=date_dt, - resting_hr=resting_hr, - avg_hr_day=avg_hr, - avg_stress=avg_stress, - spo2_avg=spo2_avg, - hrv_nightly_avg=data.get("hrv_nightly_avg"), - hrv_5min_high=data.get("hrv_5min_high"), - hrv_status=data.get("hrv_status"), - steps=data.get("steps"), - sleep_duration_s=sleep_duration_s, - sleep_deep_s=sleep_deep_s, - sleep_light_s=sleep_light_s, - sleep_rem_s=sleep_rem_s, - sleep_awake_s=sleep_awake_s, - )) + # ON CONFLICT upsert - race-condition safe, COALESCE preserves existing data + db.execute(text(""" + INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, avg_stress, + spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status, steps, + sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s) + VALUES (:user_id, :date, :resting_hr, :avg_hr, :avg_stress, + :spo2_avg, :hrv_avg, :hrv_high, :hrv_status, :steps, + :sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake) + ON CONFLICT (user_id, date) DO UPDATE SET + resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), + avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day), + avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), + spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg), + hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg), + hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high), + hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status), + steps = COALESCE(EXCLUDED.steps, health_metrics.steps), + sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s), + sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s), + sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s), + sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s), + sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s) + """), { + "user_id": user_id, "date": date_dt, + "resting_hr": resting_hr, "avg_hr": avg_hr, + "avg_stress": avg_stress, "spo2_avg": spo2_avg, + "hrv_avg": data.get("hrv_nightly_avg"), + "hrv_high": data.get("hrv_5min_high"), + "hrv_status": data.get("hrv_status"), + "steps": data.get("steps"), + "sleep_dur": sleep_duration_s, "sleep_deep": sleep_deep_s, + "sleep_light": sleep_light_s, "sleep_rem": sleep_rem_s, + "sleep_awake": sleep_awake_s, + }) db.commit() @@ -406,7 +388,6 @@ def process_garmin_health_zip(zip_path: str, user_id: int): import json from app.core.database import SyncSessionLocal from app.models.user import HealthMetric - from sqlalchemy import select, func from datetime import datetime, timezone with SyncSessionLocal() as db: @@ -425,39 +406,33 @@ def process_garmin_health_zip(zip_path: str, user_id: int): continue try: - date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) + date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) except ValueError: continue - existing = db.execute( - select(HealthMetric).where( - HealthMetric.user_id == user_id, - func.date(HealthMetric.date) == date.date(), - ) - ).scalar_one_or_none() - - if existing: - if data.get("restingHeartRate"): - existing.resting_hr = data["restingHeartRate"] - if data.get("totalSteps"): - existing.steps = data["totalSteps"] - if data.get("activeKilocalories"): - existing.active_calories = data["activeKilocalories"] - if data.get("averageStressLevel"): - existing.avg_stress = data["averageStressLevel"] - if data.get("avgSpo2"): - existing.spo2_avg = data["avgSpo2"] - else: - db.add(HealthMetric( - user_id=user_id, - date=date, - resting_hr=data.get("restingHeartRate"), - steps=data.get("totalSteps"), - floors_climbed=data.get("floorsAscended"), - active_calories=data.get("activeKilocalories"), - total_calories=data.get("totalKilocalories"), - avg_stress=data.get("averageStressLevel"), - spo2_avg=data.get("avgSpo2"), - )) + from sqlalchemy import text as _text + db.execute(_text(""" + INSERT INTO health_metrics (user_id, date, resting_hr, steps, + floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) + VALUES (:user_id, :date, :resting_hr, :steps, + :floors, :active_cal, :total_cal, :stress, :spo2) + ON CONFLICT (user_id, date) DO UPDATE SET + resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), + steps = COALESCE(EXCLUDED.steps, health_metrics.steps), + floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), + active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), + total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), + avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), + spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) + """), { + "user_id": user_id, "date": date_dt, + "resting_hr": data.get("restingHeartRate"), + "steps": data.get("totalSteps"), + "floors": data.get("floorsAscended"), + "active_cal": data.get("activeKilocalories"), + "total_cal": data.get("totalKilocalories"), + "stress": data.get("averageStressLevel"), + "spo2": data.get("avgSpo2"), + }) db.commit() \ No newline at end of file