""" Garmin Connect sync helpers. authenticate_garmin() returns an authenticated client, refreshing the stored OAuth token when possible and falling back to email/password re-login. sync_activities() downloads new FIT files and queues them for processing. sync_wellness() pulls daily stats/sleep/HRV summaries from the JSON API and upserts them into health_metrics. """ import io import zipfile import logging from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Optional, Tuple logger = logging.getLogger(__name__) # ── Password encryption ───────────────────────────────────────────────────── def _fernet(): import base64, hashlib from cryptography.fernet import Fernet from app.core.config import settings key = base64.urlsafe_b64encode(hashlib.sha256(settings.secret_key.encode()).digest()) return Fernet(key) def encrypt_password(password: str) -> str: return _fernet().encrypt(password.encode()).decode() def decrypt_password(enc: str) -> str: return _fernet().decrypt(enc.encode()).decode() # ── Auth ───────────────────────────────────────────────────────────────────── def authenticate_garmin(email: str, password_enc: str, token_store: Optional[str]) -> Tuple: """ Returns (garmin_client, new_token_store_or_None). new_token_store is set only when tokens were refreshed/re-created so the caller can persist them. """ import garminconnect # Try stored OAuth token first (garth auto-refreshes access token on use) if token_store: try: garmin = garminconnect.Garmin( email=email, password=decrypt_password(password_enc) ) garmin.garth.loads(token_store) garmin.get_full_name() # lightweight request; triggers refresh if needed return garmin, None # tokens still valid except Exception as exc: logger.info("Garmin token invalid (%s), re-authenticating", exc) # Full login with email + password garmin = garminconnect.Garmin(email=email, password=decrypt_password(password_enc)) garmin.login() return garmin, garmin.garth.dumps() # ── Activity sync ───────────────────────────────────────────────────────────── def sync_activities(garmin, user_id: int, since: Optional[datetime], db, file_store_path: str) -> int: """ List activities since `since` from Garmin Connect, skip any already in the DB, download FIT ZIPs for new ones, and queue them for processing. Returns the number of new activities queued. """ from app.workers.tasks import process_activity_file from app.models.user import Activity from sqlalchemy import select start_date = (since - timedelta(days=1)).date() if since else (date.today() - timedelta(days=30)) end_date = date.today() try: activities = garmin.get_activities_by_date( start_date.isoformat(), end_date.isoformat() ) except Exception as exc: logger.error("Failed to list Garmin activities: %s", exc) return 0 queued = 0 for act in activities: garmin_id = str(act.get("activityId", "")).strip() if not garmin_id: continue # Skip if already imported (garmin_activity_id unique index) existing = db.execute( select(Activity).where(Activity.garmin_activity_id == garmin_id) ).scalar_one_or_none() if existing: continue # Download original FIT (wrapped in a ZIP by Garmin) try: zip_bytes = garmin.download_activity( int(garmin_id), dl_fmt=garmin.ActivityDownloadFormat.ORIGINAL, ) except Exception as exc: logger.warning("Failed to download activity %s: %s", garmin_id, exc) continue # Extract the FIT file from the ZIP try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")] if not fit_names: logger.debug("No FIT in ZIP for activity %s", garmin_id) continue fit_data = zf.read(fit_names[0]) except Exception as exc: logger.warning("Failed to unzip activity %s: %s", garmin_id, exc) continue # Save to disk and queue dest_dir = Path(file_store_path) / str(user_id) / "garmin_connect" dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / f"{garmin_id}.fit" dest.write_bytes(fit_data) process_activity_file.delay(str(dest), user_id, "fit", garmin_id) queued += 1 return queued # ── Wellness sync ───────────────────────────────────────────────────────────── def sync_wellness(garmin, user_id: int, since: Optional[datetime], db) -> int: """ Fetch daily stats / sleep / HRV from the Garmin Connect JSON API for each day since `since` and upsert into health_metrics. Returns the number of days upserted. """ from sqlalchemy import text start_date = since.date() if since else (date.today() - timedelta(days=7)) days = (date.today() - start_date).days + 1 processed = 0 for i in range(max(days, 1)): day = start_date + timedelta(days=i) day_str = day.isoformat() stats = _safe(garmin.get_stats, day_str) sleep_data = _safe(garmin.get_sleep_data, day_str) hrv_data = _safe(garmin.get_hrv_data, day_str) row = _parse_day(stats, sleep_data, hrv_data) if not row: continue cols = list(row.keys()) col_sql = ", ".join(cols) val_sql = ", ".join(f":{c}" for c in cols) upd_sql = ", ".join( # total_calories uses GREATEST so multiple sources don't downgrade f"{c} = GREATEST(EXCLUDED.{c}, health_metrics.{c})" if c == "total_calories" else f"{c} = COALESCE(EXCLUDED.{c}, health_metrics.{c})" for c in cols ) params = {"user_id": user_id, "day": day.isoformat()} params.update(row) db.execute(text(f""" INSERT INTO health_metrics (user_id, date, {col_sql}) VALUES (:user_id, :day, {val_sql}) ON CONFLICT (user_id, date) DO UPDATE SET {upd_sql} """), params) db.commit() processed += 1 return processed def _safe(fn, *args): try: return fn(*args) except Exception as exc: logger.debug("%s(%s) skipped: %s", fn.__name__, args, exc) return None def _parse_day(stats, sleep_data, hrv_data) -> dict: row = {} if stats: _set(row, "resting_hr", stats.get("restingHeartRate")) _set(row, "steps", stats.get("totalSteps")) _set(row, "floors_climbed", stats.get("floorsAscended")) _set(row, "avg_stress", stats.get("averageStressLevel")) active = stats.get("activeKilocalories") bmr = stats.get("bmrKilocalories") _set(row, "active_calories", active) if active and bmr: _set(row, "total_calories", float(active) + float(bmr)) if sleep_data: dto = sleep_data.get("dailySleepDTO") or sleep_data _set(row, "sleep_duration_s", dto.get("sleepTimeSeconds")) _set(row, "sleep_deep_s", dto.get("deepSleepSeconds")) _set(row, "sleep_light_s", dto.get("lightSleepSeconds")) _set(row, "sleep_rem_s", dto.get("remSleepSeconds")) _set(row, "sleep_awake_s", dto.get("awakeSleepSeconds")) # Timestamps are milliseconds since epoch in local time for key, col in (("sleepStartTimestampLocal", "sleep_start"), ("sleepEndTimestampLocal", "sleep_end")): ms = dto.get(key) if ms: _set(row, col, datetime.fromtimestamp(ms / 1000, tz=timezone.utc).isoformat()) # SpO2 spo2 = dto.get("averageSpO2Value") if spo2 and 50 < float(spo2) <= 100: row["spo2_avg"] = float(spo2) # Sleep score — structure varies across firmware scores = sleep_data.get("sleepScores") or sleep_data.get("sleepScore") if isinstance(scores, dict): overall = scores.get("overall") or scores.get("qualityScore") if isinstance(overall, dict): _set(row, "sleep_score", overall.get("value")) else: _set(row, "sleep_score", overall) elif isinstance(scores, (int, float)): row["sleep_score"] = scores if hrv_data: summary = hrv_data.get("hrvSummary") or hrv_data _set(row, "hrv_nightly_avg", summary.get("lastNight") or summary.get("lastNightAvg")) _set(row, "hrv_5min_high", summary.get("lastNight5MinHigh")) status = summary.get("status") if status: row["hrv_status"] = str(status).lower() return row def _set(d: dict, key: str, val): if val is not None: d[key] = val