""" Garmin Connect sync helpers. authenticate_garmin() returns an authenticated client, refreshing the stored OAuth token when possible and falling back to email/password re-login. sync_activities() downloads new FIT files and queues them for processing. sync_wellness() pulls daily stats/sleep/HRV summaries from the JSON API and upserts them into health_metrics. """ import io import zipfile import logging from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Optional, Tuple logger = logging.getLogger(__name__) # ── Password encryption ───────────────────────────────────────────────────── def _fernet(): import base64, hashlib from cryptography.fernet import Fernet from app.core.config import settings key = base64.urlsafe_b64encode(hashlib.sha256(settings.secret_key.encode()).digest()) return Fernet(key) def encrypt_password(password: str) -> str: return _fernet().encrypt(password.encode()).decode() def decrypt_password(enc: str) -> str: return _fernet().decrypt(enc.encode()).decode() # ── Auth ───────────────────────────────────────────────────────────────────── def authenticate_garmin(email: str, password_enc: str, token_store: Optional[str]) -> Tuple: """ Returns (garmin_client, new_token_store_or_None). new_token_store is set only when tokens were refreshed/re-created so the caller can persist them. """ import garminconnect # Try stored OAuth token first. # Use garth.loads() directly (always treats the argument as an inline string). # garmin.login(tokenstore=...) dispatches on len>512, treating short tokens as # filesystem paths and raising FileNotFoundError on every token-based auth attempt. # After loads(), set display_name from the embedded profile — required by # get_stats(), get_sleep_data(), and other endpoints that build URLs from it. if token_store: try: garmin = garminconnect.Garmin( email=email, password=decrypt_password(password_enc) ) garmin.garth.loads(token_store) garmin.display_name = (garmin.garth.profile or {}).get("displayName", "") return garmin, None except Exception as exc: logger.info("Garmin token invalid (%s), re-authenticating", exc) # Full login with email + password garmin = garminconnect.Garmin(email=email, password=decrypt_password(password_enc)) garmin.login() return garmin, garmin.garth.dumps() # ── Activity sync ───────────────────────────────────────────────────────────── def sync_activities(garmin, user_id: int, since: Optional[datetime], db, file_store_path: str, lookback_days: int = 30, status_callback=None) -> int: """ List activities from Garmin Connect, skip any already in the DB, download FIT ZIPs for new ones, and queue them for processing. lookback_days controls the start date on every sync: -1 → full history back to 2010 on first sync, then incremental (since-1d) N → incremental (since-1d) when since is set; else last N days on first sync Returns the number of new activities queued. """ import time from app.workers.tasks import process_activity_file from app.models.user import Activity from sqlalchemy import select, func if lookback_days == -1: # All-time: full pull on first sync, incremental thereafter start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1) elif since: # Use whichever is earlier: one day before last sync OR the configured lookback # window. This ensures increasing lookback_days actually fetches older data. incremental = (since - timedelta(days=1)).date() lookback = date.today() - timedelta(days=max(lookback_days, 1)) start_date = min(incremental, lookback) else: start_date = date.today() - timedelta(days=max(lookback_days, 1)) end_date = date.today() try: activities = garmin.get_activities_by_date( start_date.isoformat(), end_date.isoformat() ) except Exception as exc: logger.error("Failed to list Garmin activities: %s", exc) return 0 total = len(activities) if status_callback and total: status_callback(f"Syncing activities: 0/{total} queued") queued = 0 for act in activities: garmin_id = str(act.get("activityId", "")).strip() if not garmin_id: continue # Fast path: already imported via Garmin Connect sync existing = db.execute( select(Activity).where(Activity.garmin_activity_id == garmin_id) ).scalar_one_or_none() if existing: continue # Slow-path dedup: activity imported via bulk export (no garmin_activity_id). # Check by start_time; stamp the ID so future syncs skip it in the fast path. act_start_str = act.get("startTimeLocal") or act.get("startTimeGMT") or "" if act_start_str: try: from datetime import datetime as _dt act_start = _dt.fromisoformat(act_start_str.replace("Z", "+00:00")) time_match = db.execute( select(Activity).where( Activity.user_id == user_id, func.date(Activity.start_time) == act_start.date(), ) ).scalar_one_or_none() if time_match: if not time_match.garmin_activity_id: time_match.garmin_activity_id = garmin_id db.commit() continue except Exception: pass # couldn't parse time — fall through to download # Download original FIT (Garmin wraps it in a ZIP) try: zip_bytes = garmin.download_activity( int(garmin_id), dl_fmt=garmin.ActivityDownloadFormat.ORIGINAL, ) except Exception as exc: logger.warning("Failed to download activity %s: %s", garmin_id, exc) continue # Extract the FIT from the ZIP try: with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")] if not fit_names: logger.debug("No FIT in ZIP for activity %s", garmin_id) continue fit_data = zf.read(fit_names[0]) except Exception as exc: logger.warning("Failed to unzip activity %s: %s", garmin_id, exc) continue # Save to disk and queue dest_dir = Path(file_store_path) / str(user_id) / "garmin_connect" dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / f"{garmin_id}.fit" dest.write_bytes(fit_data) process_activity_file.delay(str(dest), user_id, "fit", garmin_id) queued += 1 if status_callback and (queued % 5 == 0 or queued == total): status_callback(f"Syncing activities: {queued}/{total} queued") # Brief pause to avoid hammering the Garmin API time.sleep(0.5) return queued # ── Wellness sync ───────────────────────────────────────────────────────────── def sync_wellness(garmin, user_id: int, since: Optional[datetime], db, lookback_days: int = 90, status_callback=None) -> int: """ Fetch daily stats / sleep / HRV from the Garmin Connect JSON API for each day in the window and upsert into health_metrics. lookback_days controls the window on every sync: -1 → full history back to 2010 on first sync, then incremental (since-1d) N → incremental (since-1d) when since is set; else last N days on first sync Returns the number of days upserted. """ from sqlalchemy import text if lookback_days == -1: start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1) elif since: # Use whichever is earlier: one day before last sync OR the configured lookback # window. This ensures increasing lookback_days actually fetches older data. incremental = (since - timedelta(days=1)).date() lookback = date.today() - timedelta(days=max(lookback_days, 1)) start_date = min(incremental, lookback) else: start_date = date.today() - timedelta(days=max(lookback_days, 1)) days = (date.today() - start_date).days + 1 processed = 0 import time as _time import json as _json total_days = max(days, 1) if status_callback: status_callback(f"Syncing wellness: 0/{total_days} days") for i in range(total_days): day = start_date + timedelta(days=i) if status_callback and (i % 5 == 0 or i == total_days - 1): status_callback(f"Syncing wellness: {i + 1}/{total_days} days") day_str = day.isoformat() stats = _safe(garmin.get_stats, day_str) sleep_data = _safe(garmin.get_sleep_data, day_str) hrv_data = _safe(garmin.get_hrv_data, day_str) # Intraday HR (requires display_name; skip gracefully if absent) hr_raw = _safe(garmin.get_heart_rates, day_str) if garmin.display_name else None bc_data = _safe(garmin.get_body_composition, day_str, day_str) bb_raw = _safe(garmin.get_body_battery, day_str, day_str) _time.sleep(0.25) # avoid hammering Garmin's wellness API row = _parse_day(stats, sleep_data, hrv_data) # Weight + body composition from weight service (more reliable than stats) if bc_data: entries = (bc_data.get("dateWeightList") or bc_data.get("allWeightMetrics") or bc_data.get("weightList") or []) if entries: e = entries[0] bw = e.get("weight") if bw and float(bw) > 0: bwf = float(bw) _set(row, "weight_kg", round(bwf / 1000 if bwf > 300 else bwf, 2)) if e.get("bmi"): _set(row, "bmi", float(e["bmi"])) if e.get("bodyFat"): _set(row, "body_fat_pct", float(e["bodyFat"])) mm = e.get("muscleMass") if mm and float(mm) > 0: mmf = float(mm) _set(row, "muscle_mass_kg", round(mmf / 1000 if mmf > 300 else mmf, 2)) # Weight from daily stats as fallback (present when Garmin scale is used) if stats and "weight_kg" not in row: bw = stats.get("bodyWeight") if bw and float(bw) > 0: bwf = float(bw) _set(row, "weight_kg", round(bwf / 1000 if bwf > 300 else bwf, 2)) # Body battery — store summary + fine-grained timeline bb = None if bb_raw: bb = _parse_body_battery(bb_raw, day_str) if bb: row["body_battery"] = _json.dumps(bb) # Intraday heart rate — store non-null [epoch_ms, bpm] pairs + compute daily averages intraday = None if hr_raw: raw_vals = hr_raw.get("heartRateValues") or [] intraday = [[int(ts), int(v)] for ts, v in raw_vals if v is not None] if intraday: row["intraday_hr"] = intraday hr_vals = [v for _, v in intraday if v > 0] if hr_vals: row["avg_hr_day"] = round(sum(hr_vals) / len(hr_vals), 1) row["max_hr_day"] = float(max(hr_vals)) # High-resolution body battery derived from BB checkpoints + intraday HR if bb and intraday: hires = _compute_body_battery_hires(bb.get("values") or [], intraday) if hires: row["body_battery_hires"] = _json.dumps(hires) if not row: continue # psycopg2 treats Python lists/dicts as PG arrays/hstore; serialize JSON # columns as strings so psycopg2 passes them correctly to json/jsonb columns. if "intraday_hr" in row and not isinstance(row["intraday_hr"], str): row["intraday_hr"] = _json.dumps(row["intraday_hr"]) if "body_battery" in row and not isinstance(row["body_battery"], str): row["body_battery"] = _json.dumps(row["body_battery"]) cols = list(row.keys()) col_sql = ", ".join(cols) val_sql = ", ".join(f":{c}" for c in cols) upd_sql = ", ".join( # total_calories uses GREATEST so multiple sources don't downgrade f"{c} = GREATEST(EXCLUDED.{c}, health_metrics.{c})" if c == "total_calories" else f"{c} = COALESCE(EXCLUDED.{c}, health_metrics.{c})" for c in cols ) params = {"user_id": user_id, "day": day.isoformat()} params.update(row) try: db.execute(text(f""" INSERT INTO health_metrics (user_id, date, {col_sql}) VALUES (:user_id, :day, {val_sql}) ON CONFLICT (user_id, date) DO UPDATE SET {upd_sql} """), params) db.commit() processed += 1 except Exception as exc: logger.warning("Failed to upsert health_metrics for %s: %s", day_str, exc) db.rollback() # Fetch current VO2 max once (slow-changing — only update today's row) today_str = date.today().isoformat() ts_data = _safe(garmin.get_training_status, today_str) logger.info("training_status raw response: %s", ts_data) sb_data = _safe(garmin.get_stats_and_body, today_str) logger.info("stats_and_body raw response: %s", sb_data) fa_data = _safe(garmin.get_fitnessage_data, today_str) vo2 = None # Try training status first if ts_data and not vo2: for key in ("vo2MaxPreciseValue", "vo2Max", "latestVO2Max"): v = ts_data.get(key) if v and isinstance(v, (int, float)) and float(v) > 0: vo2 = float(v) break if not vo2 and isinstance(ts_data.get("mostRecentVO2Max"), dict): v = ts_data["mostRecentVO2Max"].get("generic") or ts_data["mostRecentVO2Max"].get("value") if v and float(v) > 0: vo2 = float(v) # Try stats_and_body if sb_data and not vo2: for key in ("vo2MaxPreciseValue", "vo2Max"): v = sb_data.get(key) if v and isinstance(v, (int, float)) and float(v) > 0: vo2 = float(v) break fa_age = None if fa_data: fa_age = fa_data.get("fitnessAge") or fa_data.get("achievableFitnessAge") logger.info("parsed vo2=%s fitness_age=%s", vo2, fa_age) if vo2: try: fa_row = {"vo2max": vo2} if fa_age: fa_row["fitness_age"] = int(fa_age) fa_cols = list(fa_row.keys()) db.execute(text(f""" INSERT INTO health_metrics (user_id, date, {", ".join(fa_cols)}) VALUES (:user_id, :day, {", ".join(f":{c}" for c in fa_cols)}) ON CONFLICT (user_id, date) DO UPDATE SET {", ".join(f"{c} = EXCLUDED.{c}" for c in fa_cols)} """), {"user_id": user_id, "day": today_str, **fa_row}) db.commit() except Exception as exc: logger.warning("Failed to upsert VO2 max: %s", exc) db.rollback() return processed def _parse_body_battery(bb_response, day_str: str): """Parse get_body_battery() response for a single day into a compact dict.""" if not bb_response: return None entry = next((e for e in bb_response if e.get("date") == day_str), None) if not entry and bb_response: entry = bb_response[0] if not entry: return None charged = entry.get("charged") drained = entry.get("drained") start_lvl = entry.get("startValue") end_lvl = entry.get("endValue") # Fine-grained timeline: [[ts_ms, level, type_code, stress], ...] # type_code: 0=REST, 1=ACTIVE, 2=SLEEP, 3=STRESS, 4=UNMEASURABLE values = entry.get("bodyBatteryValuesArray") or [] if not values: # Fall back to bodyBatteryStatList (segment-level data) type_map = {"REST": 0, "ACTIVE": 1, "SLEEP": 2, "STRESS": 3, "UNMEASURABLE": 4} for seg in (entry.get("bodyBatteryStatList") or []): ts_str = seg.get("startTimestampGMT") or seg.get("startTimestampLocal") if ts_str: try: from datetime import datetime as _dt, timezone as _tz ts = _dt.fromisoformat(ts_str.rstrip("Z")).replace(tzinfo=_tz.utc) type_code = type_map.get(seg.get("activityType", "UNMEASURABLE"), 4) values.append([int(ts.timestamp() * 1000), int(seg.get("bodyBatteryLevel") or 0), type_code, int(seg.get("stressLevel") or -1)]) except Exception: pass if charged is None and end_lvl is None and not values: return None return { "charged": charged, "drained": drained, "start_level": start_lvl, "end_level": end_lvl, "values": values, # stripped from list-API, returned in intraday endpoint } def _compute_body_battery_hires(bb_values, intraday_hr): """ Produce a higher-resolution body battery series by interpolating between sparse BB checkpoints using intraday HR as a proxy for effort. During drain segments (BB falling) the drain is distributed proportionally to how much each HR reading exceeds the day's median — peaks spend battery faster than valleys. During recovery segments (BB rising) recovery is spread uniformly over time. Returns [[ts_ms, level], ...] at the granularity of intraday HR, or None if inputs are insufficient. """ if not bb_values or not intraday_hr or len(bb_values) < 2: return None # Drop entries with None timestamp or level — raw API data can have gaps bb = sorted([v for v in bb_values if v[0] is not None and v[1] is not None], key=lambda x: x[0]) if len(bb) < 2: return None hr = sorted(intraday_hr, key=lambda x: x[0]) hr_vals = [bpm for _, bpm in hr if bpm is not None and bpm > 0] if not hr_vals: return None hr_median = sorted(hr_vals)[len(hr_vals) // 2] result = [] for i in range(len(bb) - 1): t1, L1 = bb[i][0], bb[i][1] t2, L2 = bb[i + 1][0], bb[i + 1][1] delta = L2 - L1 seg_hr = [(ts, bpm) for ts, bpm in hr if t1 <= ts <= t2 and bpm is not None] result.append([t1, round(float(L1), 1)]) if not seg_hr or abs(delta) < 1: continue if delta < 0: # Drain: weight each reading by HR above median efforts = [max(0.0, bpm - hr_median) for _, bpm in seg_hr] total = sum(efforts) or 1.0 cumul = 0.0 for j, (ts, bpm) in enumerate(seg_hr): cumul += efforts[j] * delta / total level = max(0.0, min(100.0, L1 + cumul)) result.append([ts, round(level, 1)]) else: # Recovery: linear over time span = max(1, t2 - t1) for ts, _ in seg_hr: frac = (ts - t1) / span level = max(0.0, min(100.0, L1 + delta * frac)) result.append([ts, round(level, 1)]) result.append([bb[-1][0], round(float(bb[-1][1]), 1)]) # Deduplicate and sort seen, out = set(), [] for item in sorted(result, key=lambda x: x[0]): if item[0] not in seen: seen.add(item[0]) out.append(item) return out if len(out) > 4 else None def _safe(fn, *args): try: return fn(*args) except Exception as exc: logger.debug("%s(%s) skipped: %s", fn.__name__, args, exc) return None def _parse_day(stats, sleep_data, hrv_data) -> dict: row = {} if stats: _set(row, "resting_hr", stats.get("restingHeartRate")) _set(row, "avg_hr_day", stats.get("averageHeartRate")) _set(row, "max_hr_day", stats.get("maxHeartRate")) _set(row, "steps", stats.get("totalSteps")) _set(row, "floors_climbed", stats.get("floorsAscended")) _set(row, "avg_stress", stats.get("averageStressLevel")) active = stats.get("activeKilocalories") bmr = stats.get("bmrKilocalories") _set(row, "active_calories", active) if active and bmr: _set(row, "total_calories", float(active) + float(bmr)) if sleep_data: dto = sleep_data.get("dailySleepDTO") or sleep_data _set(row, "sleep_duration_s", dto.get("sleepTimeSeconds")) _set(row, "sleep_deep_s", dto.get("deepSleepSeconds")) _set(row, "sleep_light_s", dto.get("lightSleepSeconds")) _set(row, "sleep_rem_s", dto.get("remSleepSeconds")) _set(row, "sleep_awake_s", dto.get("awakeSleepSeconds")) # Timestamps are milliseconds since epoch in local time for key, col in (("sleepStartTimestampLocal", "sleep_start"), ("sleepEndTimestampLocal", "sleep_end")): ms = dto.get(key) if ms: _set(row, col, datetime.fromtimestamp(ms / 1000, tz=timezone.utc).isoformat()) # SpO2 spo2 = dto.get("averageSpO2Value") if spo2 and 50 < float(spo2) <= 100: row["spo2_avg"] = float(spo2) # Sleep score — structure varies across firmware scores = sleep_data.get("sleepScores") or sleep_data.get("sleepScore") if isinstance(scores, dict): overall = scores.get("overall") or scores.get("qualityScore") if isinstance(overall, dict): _set(row, "sleep_score", overall.get("value")) else: _set(row, "sleep_score", overall) elif isinstance(scores, (int, float)): row["sleep_score"] = scores if hrv_data: summary = hrv_data.get("hrvSummary") or hrv_data _set(row, "hrv_nightly_avg", summary.get("lastNight") or summary.get("lastNightAvg")) _set(row, "hrv_5min_high", summary.get("lastNight5MinHigh")) status = summary.get("status") if status: row["hrv_status"] = str(status).lower() return row def _set(d: dict, key: str, val): if val is not None: d[key] = val