""" Background tasks: activity ingestion, route matching, PR calculation. Uses synchronous SQLAlchemy because Celery's prefork model doesn't play well with asyncio - each worker process needs its own connection pool, and async pools don't survive process forks. """ from celery import Celery from app.core.config import settings celery_app = Celery( "milevault", broker=settings.redis_url, backend=settings.redis_url, ) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="UTC", enable_utc=True, task_track_started=True, worker_prefetch_multiplier=1, ) # Garmin FIT file suffixes that are health/wellness data, not activities WELLNESS_SUFFIXES = ( "_METRICS.fit", "_WELLNESS.fit", "_SLEEP.fit", "_STRESS.fit", "_SPO2.fit", "_HRV.fit", "_MONITORING.fit", "_MONITORING_B.fit", ) def is_wellness_file(file_path: str) -> bool: name = file_path.upper() return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES) @celery_app.task(bind=True, name="process_activity_file") def process_activity_file(self, file_path: str, user_id: int, source_type: str): """Parse a FIT/GPX file. Routes wellness files to health parser.""" # Route wellness/metrics files to health parser instead if is_wellness_file(file_path): parse_wellness_fit.delay(file_path, user_id) return {"status": "routed_to_wellness", "file": file_path} from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones from app.core.database import SyncSessionLocal from app.models.user import Activity, ActivityDataPoint, ActivityLap from sqlalchemy import select from datetime import datetime self.update_state(state="PROGRESS", meta={"step": "parsing"}) try: if source_type == "fit" or file_path.endswith(".fit"): parsed = parse_fit_file(file_path) else: parsed = parse_gpx_file(file_path) except Exception as e: raise self.retry(exc=e, countdown=10, max_retries=3) # Skip files with no usable activity data if not parsed.get("start_time"): return {"status": "skipped", "reason": "no start_time", "file": file_path} with SyncSessionLocal() as db: # Check for duplicate by garmin activity ID if parsed.get("garmin_activity_id"): existing = db.execute( select(Activity).where( Activity.garmin_activity_id == parsed["garmin_activity_id"] ) ).scalar_one_or_none() if existing: return {"activity_id": existing.id, "status": "duplicate"} hr_zones = calculate_hr_zones( parsed.get("data_points", []), parsed.get("max_heart_rate") or 190 ) start_time = datetime.fromisoformat(parsed["start_time"]) activity = Activity( user_id=user_id, name=parsed["name"], sport_type=parsed["sport_type"], start_time=start_time, distance_m=parsed.get("distance_m"), duration_s=parsed.get("duration_s"), elevation_gain_m=parsed.get("elevation_gain_m"), elevation_loss_m=parsed.get("elevation_loss_m"), avg_heart_rate=parsed.get("avg_heart_rate"), max_heart_rate=parsed.get("max_heart_rate"), avg_cadence=parsed.get("avg_cadence"), avg_power=parsed.get("avg_power"), normalized_power=parsed.get("normalized_power"), avg_speed_ms=parsed.get("avg_speed_ms"), max_speed_ms=parsed.get("max_speed_ms"), avg_temperature_c=parsed.get("avg_temperature_c"), calories=parsed.get("calories"), training_stress_score=parsed.get("training_stress_score"), polyline=parsed.get("polyline"), bounding_box=parsed.get("bounding_box"), source_file=file_path, source_type=parsed.get("source_type"), hr_zones=hr_zones, ) db.add(activity) db.flush() # Insert data points, deduping on (activity_id, timestamp) seen = set() points = parsed.get("data_points", []) batch = [] for p in points: if not p.get("timestamp"): continue ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"] key = (activity.id, ts) if key in seen: continue seen.add(key) batch.append(ActivityDataPoint( activity_id=activity.id, timestamp=ts, latitude=p.get("latitude"), longitude=p.get("longitude"), altitude_m=p.get("altitude_m"), heart_rate=p.get("heart_rate"), cadence=p.get("cadence"), speed_ms=p.get("speed_ms"), power=p.get("power"), temperature_c=p.get("temperature_c"), distance_m=p.get("distance_m"), )) if len(batch) >= 500: db.add_all(batch) db.flush() batch = [] if batch: db.add_all(batch) db.flush() # Laps for lap in parsed.get("laps", []): ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None db.add(ActivityLap( activity_id=activity.id, lap_number=lap["lap_number"], start_time=ls, duration_s=lap.get("duration_s"), distance_m=lap.get("distance_m"), avg_heart_rate=lap.get("avg_heart_rate"), avg_cadence=lap.get("avg_cadence"), avg_speed_ms=lap.get("avg_speed_ms"), avg_power=lap.get("avg_power"), )) db.commit() activity_id = activity.id compute_personal_records.delay(activity_id, user_id, parsed) return {"activity_id": activity_id, "status": "ok"} @celery_app.task(name="parse_wellness_fit") def parse_wellness_fit(file_path: str, user_id: int): """ Parse a Garmin wellness/metrics FIT file and upsert into health_metrics. These files contain resting HR, HRV, sleep, stress, SpO2 etc. """ import fitparse from app.core.database import SyncSessionLocal from app.models.user import HealthMetric from sqlalchemy import text from datetime import datetime, timezone, date try: fit = fitparse.FitFile(file_path) except Exception as e: return {"status": "error", "error": str(e)} # Collect all monitoring/daily summary records keyed by date daily = {} # date -> dict of fields def get_or_create_day(d: date) -> dict: if d not in daily: daily[d] = {} return daily[d] for record in fit.get_messages(): name = record.name fields = {f.name: f.value for f in record if f.value is not None} if name == "monitoring_info": ts = fields.get("timestamp") or fields.get("local_timestamp") if ts: d = ts.date() if hasattr(ts, "date") else None if d: day = get_or_create_day(d) day.setdefault("resting_hr", fields.get("resting_heart_rate")) elif name == "monitoring": ts = fields.get("timestamp") or fields.get("local_timestamp") if not ts: continue d = ts.date() if hasattr(ts, "date") else None if not d: continue day = get_or_create_day(d) # Accumulate steps (they're stored as increments) if "steps" in fields: day["steps"] = day.get("steps", 0) + int(fields["steps"]) if "heart_rate" in fields: hrs = day.setdefault("heart_rates", []) hrs.append(int(fields["heart_rate"])) if "stress_level_value" in fields: stresses = day.setdefault("stress_values", []) stresses.append(int(fields["stress_level_value"])) elif name == "hrv_status_summary": ts = fields.get("timestamp") if ts: d = ts.date() if hasattr(ts, "date") else None if d: day = get_or_create_day(d) day.setdefault("hrv_nightly_avg", fields.get("weekly_average")) day.setdefault("hrv_5min_high", fields.get("last_night_5_min_high")) day.setdefault("hrv_status", str(fields.get("hrv_status", ""))) elif name == "sleep_level": ts = fields.get("timestamp") if ts: d = ts.date() if hasattr(ts, "date") else None if d: day = get_or_create_day(d) levels = day.setdefault("sleep_levels", []) levels.append(fields.get("sleep_level")) elif name == "stress": ts = fields.get("timestamp") if ts: d = ts.date() if hasattr(ts, "date") else None if d: day = get_or_create_day(d) if "stress_level_value" in fields: stresses = day.setdefault("stress_values", []) stresses.append(int(fields["stress_level_value"])) elif name == "spo2_data": ts = fields.get("timestamp") if ts: d = ts.date() if hasattr(ts, "date") else None if d: day = get_or_create_day(d) readings = day.setdefault("spo2_readings", []) if "spo2_percent" in fields: readings.append(fields["spo2_percent"]) if not daily: return {"status": "no_data", "file": file_path} # Upsert into health_metrics using ON CONFLICT to handle concurrent workers with SyncSessionLocal() as db: for day_date, data in daily.items(): hrs = data.pop("heart_rates", []) stresses = data.pop("stress_values", []) spo2s = data.pop("spo2_readings", []) sleep_levels = data.pop("sleep_levels", []) resting_hr = data.get("resting_hr") avg_hr = (sum(hrs) / len(hrs)) if hrs else None avg_stress = (sum(stresses) / len(stresses)) if stresses else None spo2_avg = (sum(spo2s) / len(spo2s)) if spo2s else None # Garmin sleep levels: 0=unmeasurable, 1=awake, 2=light, 3=deep, 4=rem sleep_deep_s = sum(30 for l in sleep_levels if l == 3) if sleep_levels else None sleep_light_s = sum(30 for l in sleep_levels if l == 2) if sleep_levels else None sleep_rem_s = sum(30 for l in sleep_levels if l == 4) if sleep_levels else None sleep_awake_s = sum(30 for l in sleep_levels if l == 1) if sleep_levels else None sleep_duration_s = ( (sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0) ) or None date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc) # ON CONFLICT upsert - race-condition safe, COALESCE preserves existing data db.execute(text(""" INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, avg_stress, spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status, steps, sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s) VALUES (:user_id, :date, :resting_hr, :avg_hr, :avg_stress, :spo2_avg, :hrv_avg, :hrv_high, :hrv_status, :steps, :sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake) ON CONFLICT (user_id, date) DO UPDATE SET resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day), avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg), hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg), hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high), hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status), steps = COALESCE(EXCLUDED.steps, health_metrics.steps), sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s), sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s), sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s), sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s), sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s) """), { "user_id": user_id, "date": date_dt, "resting_hr": resting_hr, "avg_hr": avg_hr, "avg_stress": avg_stress, "spo2_avg": spo2_avg, "hrv_avg": data.get("hrv_nightly_avg"), "hrv_high": data.get("hrv_5min_high"), "hrv_status": data.get("hrv_status"), "steps": data.get("steps"), "sleep_dur": sleep_duration_s, "sleep_deep": sleep_deep_s, "sleep_light": sleep_light_s, "sleep_rem": sleep_rem_s, "sleep_awake": sleep_awake_s, }) db.commit() return {"status": "ok", "days_processed": len(daily), "file": file_path} @celery_app.task(name="compute_personal_records") def compute_personal_records(activity_id: int, user_id: int, parsed: dict): """Calculate personal records for standard distances from this activity.""" from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES from app.core.database import SyncSessionLocal from app.models.user import PersonalRecord from sqlalchemy import select from datetime import datetime, timezone data_points = parsed.get("data_points", []) total_dist = parsed.get("distance_m", 0) or 0 sport = parsed.get("sport_type", "running") start_time_str = parsed.get("start_time") start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc) best_splits = compute_best_splits(data_points, total_dist) with SyncSessionLocal() as db: for label, duration_s in best_splits.items(): dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None) if dist_m is None: continue current = db.execute( select(PersonalRecord).where( PersonalRecord.user_id == user_id, PersonalRecord.sport_type == sport, PersonalRecord.distance_m == dist_m, PersonalRecord.is_current_record == True, ) ).scalar_one_or_none() if current is None or duration_s < current.duration_s: if current: current.is_current_record = False db.add(PersonalRecord( user_id=user_id, activity_id=activity_id, sport_type=sport, distance_m=dist_m, distance_label=label, duration_s=duration_s, achieved_at=start_time, is_current_record=True, )) db.commit() @celery_app.task(name="process_garmin_health_zip") def process_garmin_health_zip(zip_path: str, user_id: int): """Extract wellness data from a Garmin Connect export ZIP.""" import zipfile import json from app.core.database import SyncSessionLocal from app.models.user import HealthMetric from datetime import datetime, timezone with SyncSessionLocal() as db: with zipfile.ZipFile(zip_path) as zf: for name in zf.namelist(): if "DailyMetrics" not in name or not name.endswith(".json"): continue with zf.open(name) as f: try: data = json.load(f) except Exception: continue date_str = data.get("calendarDate") or data.get("date") if not date_str: continue try: date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) except ValueError: continue from sqlalchemy import text as _text db.execute(_text(""" INSERT INTO health_metrics (user_id, date, resting_hr, steps, floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) VALUES (:user_id, :date, :resting_hr, :steps, :floors, :active_cal, :total_cal, :stress, :spo2) ON CONFLICT (user_id, date) DO UPDATE SET resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), steps = COALESCE(EXCLUDED.steps, health_metrics.steps), floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) """), { "user_id": user_id, "date": date_dt, "resting_hr": data.get("restingHeartRate"), "steps": data.get("totalSteps"), "floors": data.get("floorsAscended"), "active_cal": data.get("activeKilocalories"), "total_cal": data.get("totalKilocalories"), "stress": data.get("averageStressLevel"), "spo2": data.get("avgSpo2"), }) db.commit()