""" Background tasks: activity ingestion, route matching, PR calculation. Uses synchronous SQLAlchemy because Celery's prefork model doesn't play well with asyncio - each worker process needs its own connection pool, and async pools don't survive process forks. """ from celery import Celery from app.core.config import settings celery_app = Celery( "milevault", broker=settings.redis_url, backend=settings.redis_url, ) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="UTC", enable_utc=True, task_track_started=True, worker_prefetch_multiplier=1, beat_schedule={ "sync-garmin-connect-hourly": { "task": "sync_all_garmin_connect", "schedule": 3600.0, # every hour }, }, ) WELLNESS_SUFFIXES = ( "_METRICS.fit", "_WELLNESS.fit", "_SLEEP.fit", "_SLEEP_DATA.fit", "_STRESS.fit", "_SPO2.fit", "_HRV.fit", "_HRV_STATUS.fit", "_MONITORING.fit", "_MONITORING_B.fit", "_RESPIRATION.fit", "_PULSE_OX.fit", ) def is_wellness_file(file_path: str) -> bool: name = file_path.upper() return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES) @celery_app.task(bind=True, name="process_activity_file") def process_activity_file(self, file_path: str, user_id: int, source_type: str, garmin_activity_id: str = None): """Parse a FIT/GPX file. Routes wellness files to health parser.""" if is_wellness_file(file_path): parse_wellness_fit.delay(file_path, user_id) return {"status": "routed_to_wellness", "file": file_path} from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones from app.core.database import SyncSessionLocal from app.models.user import Activity, ActivityDataPoint, ActivityLap from sqlalchemy import select, func from datetime import datetime self.update_state(state="PROGRESS", meta={"step": "parsing"}) try: if source_type == "fit" or file_path.endswith(".fit"): parsed = parse_fit_file(file_path) else: parsed = parse_gpx_file(file_path) except Exception as e: raise self.retry(exc=e, countdown=10, max_retries=3) if not parsed.get("start_time"): return {"status": "skipped", "reason": "no start_time", "file": file_path} with SyncSessionLocal() as db: start_time = datetime.fromisoformat(parsed["start_time"]) # Deduplicate by (user_id, sport_type, start_time date + within 60s) existing = db.execute( select(Activity).where( Activity.user_id == user_id, Activity.sport_type == parsed["sport_type"], func.date(Activity.start_time) == start_time.date(), Activity.start_time >= start_time.replace(second=0, microsecond=0), ) ).scalar_one_or_none() if existing: # Stamp garmin_activity_id if this came from a Garmin Connect sync # so future syncs skip the fast-path dedup and don't re-download. if garmin_activity_id and not existing.garmin_activity_id: existing.garmin_activity_id = garmin_activity_id db.commit() return {"activity_id": existing.id, "status": "duplicate"} # Get user max HR for zone calculation from app.models.user import User as UserModel user_obj = db.execute( select(UserModel).where(UserModel.id == user_id) ).scalar_one_or_none() user_max_hr = None if user_obj: user_max_hr = user_obj.max_heart_rate if not user_max_hr and user_obj.birth_year: from datetime import date as _date age = _date.today().year - user_obj.birth_year user_max_hr = 220 - age if not user_max_hr: user_max_hr = parsed.get("max_heart_rate") or 190 hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr) activity = Activity( user_id=user_id, name=parsed["name"], sport_type=parsed["sport_type"], garmin_activity_id=garmin_activity_id, start_time=start_time, distance_m=parsed.get("distance_m"), duration_s=parsed.get("duration_s"), elevation_gain_m=parsed.get("elevation_gain_m"), elevation_loss_m=parsed.get("elevation_loss_m"), avg_heart_rate=parsed.get("avg_heart_rate"), max_heart_rate=parsed.get("max_heart_rate"), avg_cadence=parsed.get("avg_cadence"), avg_power=parsed.get("avg_power"), normalized_power=parsed.get("normalized_power"), avg_speed_ms=parsed.get("avg_speed_ms"), max_speed_ms=parsed.get("max_speed_ms"), avg_temperature_c=parsed.get("avg_temperature_c"), calories=parsed.get("calories"), training_stress_score=parsed.get("training_stress_score"), polyline=parsed.get("polyline"), bounding_box=parsed.get("bounding_box"), source_file=file_path, source_type=parsed.get("source_type"), hr_zones=hr_zones, ) db.add(activity) db.flush() seen = set() batch = [] for p in parsed.get("data_points", []): if not p.get("timestamp"): continue ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"] key = (activity.id, ts) if key in seen: continue seen.add(key) batch.append(ActivityDataPoint( activity_id=activity.id, timestamp=ts, latitude=p.get("latitude"), longitude=p.get("longitude"), altitude_m=p.get("altitude_m"), heart_rate=p.get("heart_rate"), cadence=p.get("cadence"), speed_ms=p.get("speed_ms"), power=p.get("power"), temperature_c=p.get("temperature_c"), distance_m=p.get("distance_m"), )) if len(batch) >= 500: db.add_all(batch) db.flush() batch = [] if batch: db.add_all(batch) db.flush() for lap in parsed.get("laps", []): ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None db.add(ActivityLap( activity_id=activity.id, lap_number=lap["lap_number"], start_time=ls, duration_s=lap.get("duration_s"), distance_m=lap.get("distance_m"), avg_heart_rate=lap.get("avg_heart_rate"), avg_cadence=lap.get("avg_cadence"), avg_speed_ms=lap.get("avg_speed_ms"), avg_power=lap.get("avg_power"), )) db.commit() activity_id = activity.id compute_personal_records.delay(activity_id, user_id, parsed) if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"): detect_route.delay(activity_id, user_id) return {"activity_id": activity_id, "status": "ok"} @celery_app.task(name="parse_wellness_fit") def parse_wellness_fit(file_path: str, user_id: int): """Parse a Garmin wellness FIT file and upsert into health_metrics.""" from app.services.wellness_parser import parse_wellness_fit as _parse from app.core.database import SyncSessionLocal from datetime import datetime, timezone from sqlalchemy import text try: result = _parse(file_path) except Exception as e: return {"status": "error", "error": str(e), "file": file_path} if result.get("error"): return {"status": "error", "error": result["error"], "file": file_path} days = result.get("days", {}) if not days: return {"status": "no_data", "file": file_path} with SyncSessionLocal() as db: for day_date, data in days.items(): date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc) db.execute(text(""" INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, max_hr_day, avg_stress, spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status, steps, floors_climbed, active_calories, total_calories, sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s, sleep_score, sleep_start, sleep_end) VALUES (:user_id, :date, :resting_hr, :avg_hr, :max_hr, :avg_stress, :spo2_avg, :hrv_avg, :hrv_high, :hrv_status, :steps, :floors, :active_cal, :total_cal, :sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake, :sleep_score, :sleep_start, :sleep_end) ON CONFLICT (user_id, date) DO UPDATE SET resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day), max_hr_day = COALESCE(EXCLUDED.max_hr_day, health_metrics.max_hr_day), avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg), hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg), hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high), hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status), steps = COALESCE(EXCLUDED.steps, health_metrics.steps), floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), total_calories = GREATEST(EXCLUDED.total_calories, health_metrics.total_calories), sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s), sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s), sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s), sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s), sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s), sleep_score = COALESCE(EXCLUDED.sleep_score, health_metrics.sleep_score), sleep_start = COALESCE(EXCLUDED.sleep_start, health_metrics.sleep_start), sleep_end = COALESCE(EXCLUDED.sleep_end, health_metrics.sleep_end) """), { "user_id": user_id, "date": date_dt, "resting_hr": data.get("resting_hr"), "avg_hr": data.get("avg_hr_day"), "max_hr": data.get("max_hr_day"), "avg_stress": data.get("avg_stress"), "spo2_avg": data.get("spo2_avg"), "hrv_avg": data.get("hrv_nightly_avg"), "hrv_high": data.get("hrv_5min_high"), "hrv_status": data.get("hrv_status"), "steps": data.get("steps"), "floors": data.get("floors_climbed"), "active_cal": data.get("active_calories"), "total_cal": data.get("total_calories"), "sleep_dur": data.get("sleep_duration_s"), "sleep_deep": data.get("sleep_deep_s"), "sleep_light": data.get("sleep_light_s"), "sleep_rem": data.get("sleep_rem_s"), "sleep_awake": data.get("sleep_awake_s"), "sleep_score": data.get("sleep_score"), "sleep_start": data.get("sleep_start"), "sleep_end": data.get("sleep_end"), }) db.commit() return {"status": "ok", "days_processed": len(days), "file": file_path} @celery_app.task(name="detect_route") def detect_route(activity_id: int, user_id: int): """Auto-detect and link activities to named routes.""" from app.services.route_matcher import routes_are_similar from app.core.database import SyncSessionLocal from app.models.user import Activity, NamedRoute from sqlalchemy import select with SyncSessionLocal() as db: new_act = db.execute( select(Activity).where(Activity.id == activity_id) ).scalar_one_or_none() if not new_act or not new_act.polyline: return {"status": "no_polyline"} if new_act.named_route_id: return {"status": "already_assigned"} routes = db.execute( select(NamedRoute).where( NamedRoute.user_id == user_id, NamedRoute.sport_type == new_act.sport_type, ) ).scalars().all() for route in routes: if route.reference_polyline and routes_are_similar( new_act.polyline, route.reference_polyline, new_act.bounding_box, route.bounding_box, ): new_act.named_route_id = route.id db.commit() return {"status": "matched_existing", "route_id": route.id} candidates = db.execute( select(Activity).where( Activity.user_id == user_id, Activity.sport_type == new_act.sport_type, Activity.named_route_id == None, Activity.id != activity_id, Activity.polyline != None, Activity.distance_m >= (new_act.distance_m or 0) * 0.8, Activity.distance_m <= (new_act.distance_m or 0) * 1.2, ) ).scalars().all() for candidate in candidates: if routes_are_similar( new_act.polyline, candidate.polyline, new_act.bounding_box, candidate.bounding_box, ): older = candidate if candidate.start_time < new_act.start_time else new_act newer = new_act if candidate.start_time < new_act.start_time else candidate route_name = f"{older.sport_type.title()} route {older.start_time.strftime('%d %b %Y')}" new_route = NamedRoute( user_id=user_id, name=route_name, sport_type=older.sport_type, reference_polyline=older.polyline, bounding_box=older.bounding_box, distance_m=older.distance_m, auto_detected=True, ) db.add(new_route) db.flush() older.named_route_id = new_route.id newer.named_route_id = new_route.id db.commit() return {"status": "auto_created", "route_id": new_route.id} return {"status": "no_match"} @celery_app.task(name="compute_personal_records") def compute_personal_records(activity_id: int, user_id: int, parsed: dict): """Calculate personal records for standard distances from this activity.""" from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES from app.core.database import SyncSessionLocal from app.models.user import PersonalRecord from sqlalchemy import select from datetime import datetime, timezone data_points = parsed.get("data_points", []) total_dist = parsed.get("distance_m", 0) or 0 sport = parsed.get("sport_type", "running") start_time_str = parsed.get("start_time") start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc) best_splits = compute_best_splits(data_points, total_dist) with SyncSessionLocal() as db: for label, duration_s in best_splits.items(): dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None) if dist_m is None: continue current = db.execute( select(PersonalRecord).where( PersonalRecord.user_id == user_id, PersonalRecord.sport_type == sport, PersonalRecord.distance_m == dist_m, PersonalRecord.is_current_record == True, ) ).scalar_one_or_none() if current is None or duration_s < current.duration_s: if current: current.is_current_record = False db.add(PersonalRecord( user_id=user_id, activity_id=activity_id, sport_type=sport, distance_m=dist_m, distance_label=label, duration_s=duration_s, achieved_at=start_time, is_current_record=True, )) db.commit() @celery_app.task(name="process_garmin_health_zip") def process_garmin_health_zip(zip_path: str, user_id: int): """Extract wellness data from a Garmin Connect export ZIP.""" import zipfile import json from app.core.database import SyncSessionLocal from datetime import datetime, timezone from sqlalchemy import text with SyncSessionLocal() as db: with zipfile.ZipFile(zip_path) as zf: for name in zf.namelist(): if "DailyMetrics" not in name or not name.endswith(".json"): continue with zf.open(name) as f: try: data = json.load(f) except Exception: continue date_str = data.get("calendarDate") or data.get("date") if not date_str: continue try: date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) except ValueError: continue db.execute(text(""" INSERT INTO health_metrics (user_id, date, resting_hr, steps, floors_climbed, active_calories, total_calories, avg_stress, spo2_avg) VALUES (:user_id, :date, :resting_hr, :steps, :floors, :active_cal, :total_cal, :stress, :spo2) ON CONFLICT (user_id, date) DO UPDATE SET resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr), steps = COALESCE(EXCLUDED.steps, health_metrics.steps), floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed), active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories), total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories), avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress), spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg) """), { "user_id": user_id, "date": date_dt, "resting_hr": data.get("restingHeartRate"), "steps": data.get("totalSteps"), "floors": data.get("floorsAscended"), "active_cal": data.get("activeKilocalories"), "total_cal": data.get("totalKilocalories"), "stress": data.get("averageStressLevel"), "spo2": data.get("avgSpo2"), }) db.commit() @celery_app.task(name="sync_garmin_connect_user") def sync_garmin_connect_user(user_id: int): """Sync Garmin Connect data (activities + wellness) for one user.""" from app.services.garmin_connect_sync import authenticate_garmin, sync_activities, sync_wellness from app.core.database import SyncSessionLocal from app.models.user import GarminConnectConfig from app.core.config import settings from sqlalchemy import select from datetime import datetime, timezone with SyncSessionLocal() as db: cfg = db.execute( select(GarminConnectConfig).where(GarminConnectConfig.user_id == user_id) ).scalar_one_or_none() if not cfg or not cfg.sync_enabled: return {"status": "skipped"} try: garmin, new_token = authenticate_garmin(cfg.email, cfg.password_enc, cfg.token_store) except Exception as exc: cfg.last_sync_at = datetime.now(timezone.utc) cfg.last_sync_status = f"Auth error: {exc}" db.commit() return {"status": "auth_error", "error": str(exc)} if new_token: cfg.token_store = new_token activities_queued = 0 wellness_days = 0 errors = [] if cfg.sync_activities: try: activities_queued = sync_activities( garmin, user_id, cfg.last_sync_at, db, settings.file_store_path, lookback_days=cfg.sync_lookback_days if cfg.sync_lookback_days is not None else 30, ) except Exception as exc: errors.append(f"activities: {exc}") if cfg.sync_wellness: try: wellness_days = sync_wellness( garmin, user_id, cfg.last_sync_at, db, lookback_days=cfg.sync_lookback_days if cfg.sync_lookback_days is not None else 90, ) except Exception as exc: errors.append(f"wellness: {exc}") cfg.last_sync_at = datetime.now(timezone.utc) cfg.last_sync_status = ( f"OK — {activities_queued} activities queued, {wellness_days} wellness days synced" if not errors else f"Partial — {'; '.join(errors)}" ) db.commit() return {"status": "ok", "activities_queued": activities_queued, "wellness_days": wellness_days} @celery_app.task(name="sync_all_garmin_connect") def sync_all_garmin_connect(): """Hourly beat task: dispatch per-user sync for all enabled configs.""" from app.core.database import SyncSessionLocal from app.models.user import GarminConnectConfig from sqlalchemy import select with SyncSessionLocal() as db: configs = db.execute( select(GarminConnectConfig).where(GarminConnectConfig.sync_enabled == True) ).scalars().all() user_ids = [c.user_id for c in configs] for uid in user_ids: sync_garmin_connect_user.delay(uid) return {"dispatched": len(user_ids)} @celery_app.task(name="recalculate_hr_zones_for_user") def recalculate_hr_zones_for_user(user_id: int, new_max_hr: float): """Recalculate hr_zones for all of a user's activities using a new max HR.""" from app.services.fit_parser import calculate_hr_zones from app.core.database import SyncSessionLocal from app.models.user import Activity, ActivityDataPoint from sqlalchemy import select with SyncSessionLocal() as db: activities = db.execute( select(Activity).where(Activity.user_id == user_id) ).scalars().all() for activity in activities: data_points = db.execute( select(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity.id) ).scalars().all() points_dicts = [{"heart_rate": dp.heart_rate} for dp in data_points] new_zones = calculate_hr_zones(points_dicts, new_max_hr) if new_zones: activity.hr_zones = new_zones db.commit()