""" Garmin Connect sync helpers. authenticate_garmin() returns an authenticated client, refreshing the stored OAuth token when possible and falling back to email/password re-login. sync_activities() downloads new FIT files and queues them for processing. sync_wellness() pulls daily stats/sleep/HRV summaries from the JSON API and upserts them into health_metrics. """ import io import zipfile import logging from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Optional, Tuple logger = logging.getLogger(__name__) # ── Password encryption ───────────────────────────────────────────────────── def _fernet(): import base64, hashlib from cryptography.fernet import Fernet from app.core.config import settings key = base64.urlsafe_b64encode(hashlib.sha256(settings.secret_key.encode()).digest()) return Fernet(key) def encrypt_password(password: str) -> str: return _fernet().encrypt(password.encode()).decode() def decrypt_password(enc: str) -> str: return _fernet().decrypt(enc.encode()).decode() # ── Auth ───────────────────────────────────────────────────────────────────── def authenticate_garmin(email: str, password_enc: str, token_store: Optional[str]) -> Tuple: """ Returns (garmin_client, new_token_store_or_None). new_token_store is set only when tokens were refreshed/re-created so the caller can persist them. """ import garminconnect # Try stored OAuth token first. # Use garth.loads() directly (always treats the argument as an inline string). # garmin.login(tokenstore=...) dispatches on len>512, treating short tokens as # filesystem paths and raising FileNotFoundError on every token-based auth attempt. # After loads(), set display_name from the embedded profile — required by # get_stats(), get_sleep_data(), and other endpoints that build URLs from it. if token_store: try: garmin = garminconnect.Garmin( email=email, password=decrypt_password(password_enc) ) garmin.garth.loads(token_store) garmin.display_name = (garmin.garth.profile or {}).get("displayName", "") return garmin, None except Exception as exc: logger.info("Garmin token invalid (%s), re-authenticating", exc) # Full login with email + password garmin = garminconnect.Garmin(email=email, password=decrypt_password(password_enc)) garmin.login() return garmin, garmin.garth.dumps() # ── Activity sync ───────────────────────────────────────────────────────────── def sync_activities(garmin, user_id: int, since: Optional[datetime], db, file_store_path: str, lookback_days: int = 30) -> int: """ List activities from Garmin Connect, skip any already in the DB, download FIT ZIPs for new ones, and queue them for processing. lookback_days controls the start date on every sync: -1 → full history back to 2010 on first sync, then incremental (since-1d) N → incremental (since-1d) when since is set; else last N days on first sync Returns the number of new activities queued. """ import time from app.workers.tasks import process_activity_file from app.models.user import Activity from sqlalchemy import select, func if lookback_days == -1: # All-time: full pull on first sync, incremental thereafter start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1) elif since: # Incremental: one day overlap to catch any late-arriving activities start_date = (since - timedelta(days=1)).date() else: start_date = date.today() - timedelta(days=max(lookback_days, 1)) end_date = date.today() try: activities = garmin.get_activities_by_date( start_date.isoformat(), end_date.isoformat() ) except Exception as exc: logger.error("Failed to list Garmin activities: %s", exc) return 0 queued = 0 for act in activities: garmin_id = str(act.get("activityId", "")).strip() if not garmin_id: continue # Fast path: already imported via Garmin Connect sync existing = db.execute( select(Activity).where(Activity.garmin_activity_id == garmin_id) ).scalar_one_or_none() if existing: continue # Slow-path dedup: activity imported via bulk export (no garmin_activity_id). # Check by start_time; stamp the ID so future syncs skip it in the fast path. act_start_str = act.get("startTimeLocal") or act.get("startTimeGMT") or "" if act_start_str: try: from datetime import datetime as _dt act_start = _dt.fromisoformat(act_start_str.replace("Z", "+00:00")) time_match = db.execute( select(Activity).where( Activity.user_id == user_id, func.date(Activity.start_time) == act_start.date(), ) ).scalar_one_or_none() if time_match: if not time_match.garmin_activity_id: time_match.garmin_activity_id = garmin_id db.commit() continue except Exception: pass # couldn't parse time — fall through to download # Download original FIT (Garmin wraps it in a ZIP) try: zip_bytes = garmin.download_activity( int(garmin_id), dl_fmt=garmin.ActivityDownloadFormat.ORIGINAL, ) except Exception as exc: logger.warning("Failed to download activity %s: %s", garmin_id, exc) continue # Extract the FIT from the ZIP try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")] if not fit_names: logger.debug("No FIT in ZIP for activity %s", garmin_id) continue fit_data = zf.read(fit_names[0]) except Exception as exc: logger.warning("Failed to unzip activity %s: %s", garmin_id, exc) continue # Save to disk and queue dest_dir = Path(file_store_path) / str(user_id) / "garmin_connect" dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / f"{garmin_id}.fit" dest.write_bytes(fit_data) process_activity_file.delay(str(dest), user_id, "fit", garmin_id) queued += 1 # Brief pause to avoid hammering the Garmin API time.sleep(0.5) return queued # ── Wellness sync ───────────────────────────────────────────────────────────── def sync_wellness(garmin, user_id: int, since: Optional[datetime], db, lookback_days: int = 90) -> int: """ Fetch daily stats / sleep / HRV from the Garmin Connect JSON API for each day in the window and upsert into health_metrics. lookback_days controls the window on every sync: -1 → full history back to 2010 on first sync, then incremental (since-1d) N → incremental (since-1d) when since is set; else last N days on first sync Returns the number of days upserted. """ from sqlalchemy import text if lookback_days == -1: start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1) elif since: # Incremental: one day overlap to catch any late-arriving wellness data start_date = (since - timedelta(days=1)).date() else: start_date = date.today() - timedelta(days=max(lookback_days, 1)) days = (date.today() - start_date).days + 1 processed = 0 import time as _time for i in range(max(days, 1)): day = start_date + timedelta(days=i) day_str = day.isoformat() stats = _safe(garmin.get_stats, day_str) sleep_data = _safe(garmin.get_sleep_data, day_str) hrv_data = _safe(garmin.get_hrv_data, day_str) _time.sleep(0.25) # avoid hammering Garmin's wellness API row = _parse_day(stats, sleep_data, hrv_data) if not row: continue cols = list(row.keys()) col_sql = ", ".join(cols) val_sql = ", ".join(f":{c}" for c in cols) upd_sql = ", ".join( # total_calories uses GREATEST so multiple sources don't downgrade f"{c} = GREATEST(EXCLUDED.{c}, health_metrics.{c})" if c == "total_calories" else f"{c} = COALESCE(EXCLUDED.{c}, health_metrics.{c})" for c in cols ) params = {"user_id": user_id, "day": day.isoformat()} params.update(row) db.execute(text(f""" INSERT INTO health_metrics (user_id, date, {col_sql}) VALUES (:user_id, :day, {val_sql}) ON CONFLICT (user_id, date) DO UPDATE SET {upd_sql} """), params) db.commit() processed += 1 return processed def _safe(fn, *args): try: return fn(*args) except Exception as exc: logger.debug("%s(%s) skipped: %s", fn.__name__, args, exc) return None def _parse_day(stats, sleep_data, hrv_data) -> dict: row = {} if stats: _set(row, "resting_hr", stats.get("restingHeartRate")) _set(row, "avg_hr_day", stats.get("averageHeartRate")) _set(row, "max_hr_day", stats.get("maxHeartRate")) _set(row, "steps", stats.get("totalSteps")) _set(row, "floors_climbed", stats.get("floorsAscended")) _set(row, "avg_stress", stats.get("averageStressLevel")) active = stats.get("activeKilocalories") bmr = stats.get("bmrKilocalories") _set(row, "active_calories", active) if active and bmr: _set(row, "total_calories", float(active) + float(bmr)) if sleep_data: dto = sleep_data.get("dailySleepDTO") or sleep_data _set(row, "sleep_duration_s", dto.get("sleepTimeSeconds")) _set(row, "sleep_deep_s", dto.get("deepSleepSeconds")) _set(row, "sleep_light_s", dto.get("lightSleepSeconds")) _set(row, "sleep_rem_s", dto.get("remSleepSeconds")) _set(row, "sleep_awake_s", dto.get("awakeSleepSeconds")) # Timestamps are milliseconds since epoch in local time for key, col in (("sleepStartTimestampLocal", "sleep_start"), ("sleepEndTimestampLocal", "sleep_end")): ms = dto.get(key) if ms: _set(row, col, datetime.fromtimestamp(ms / 1000, tz=timezone.utc).isoformat()) # SpO2 spo2 = dto.get("averageSpO2Value") if spo2 and 50 < float(spo2) <= 100: row["spo2_avg"] = float(spo2) # Sleep score — structure varies across firmware scores = sleep_data.get("sleepScores") or sleep_data.get("sleepScore") if isinstance(scores, dict): overall = scores.get("overall") or scores.get("qualityScore") if isinstance(overall, dict): _set(row, "sleep_score", overall.get("value")) else: _set(row, "sleep_score", overall) elif isinstance(scores, (int, float)): row["sleep_score"] = scores if hrv_data: summary = hrv_data.get("hrvSummary") or hrv_data _set(row, "hrv_nightly_avg", summary.get("lastNight") or summary.get("lastNightAvg")) _set(row, "hrv_5min_high", summary.get("lastNight5MinHigh")) status = summary.get("status") if status: row["hrv_status"] = str(status).lower() return row def _set(d: dict, key: str, val): if val is not None: d[key] = val