Files
MileVault/backend/app/services/garmin_connect_sync.py
T
owain 22b41109f5
Build and push images / validate (push) Successful in 3s
Build and push images / build-backend (push) Successful in 7s
Build and push images / build-worker (push) Successful in 7s
Build and push images / build-frontend (push) Successful in 10s
Fix Garmin stats sync, add route merge/map/links, fix PR constraint
Garmin health: fix display_name=None when using stored OAuth tokens.
authenticate_garmin() now calls login(tokenstore=...) instead of
garth.loads() directly, so display_name is populated and get_user_summary
works. Also add avg_hr_day / max_hr_day from stats response.

Routes: add merge endpoint (POST /{id}/merge/{source}), delete endpoint.
Routes page: polyline SVG mini-map on each route card, merge UI with
confirmation, activity rows are now Links to the activity detail page.

Personal records: replace all-columns unique constraint with a partial
index (unique on current records only) to stop UniqueViolation crashes
when parallel workers deactivate the same PR.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 01:25:01 +01:00

295 lines
11 KiB
Python

"""
Garmin Connect sync helpers.
authenticate_garmin() returns an authenticated client, refreshing the stored
OAuth token when possible and falling back to email/password re-login.
sync_activities() downloads new FIT files and queues them for processing.
sync_wellness() pulls daily stats/sleep/HRV summaries from the JSON API
and upserts them into health_metrics.
"""
import io
import zipfile
import logging
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# ── Password encryption ─────────────────────────────────────────────────────
def _fernet():
import base64, hashlib
from cryptography.fernet import Fernet
from app.core.config import settings
key = base64.urlsafe_b64encode(hashlib.sha256(settings.secret_key.encode()).digest())
return Fernet(key)
def encrypt_password(password: str) -> str:
return _fernet().encrypt(password.encode()).decode()
def decrypt_password(enc: str) -> str:
return _fernet().decrypt(enc.encode()).decode()
# ── Auth ─────────────────────────────────────────────────────────────────────
def authenticate_garmin(email: str, password_enc: str, token_store: Optional[str]) -> Tuple:
"""
Returns (garmin_client, new_token_store_or_None).
new_token_store is set only when tokens were refreshed/re-created so the
caller can persist them.
"""
import garminconnect
# Try stored OAuth token first.
# Must call login(tokenstore=...) rather than garth.loads() directly so that
# garmin.display_name is populated — it's required by get_user_summary() and
# several other endpoints. Without it every stats call silently returns None.
if token_store:
try:
garmin = garminconnect.Garmin(
email=email, password=decrypt_password(password_enc)
)
garmin.login(tokenstore=token_store)
return garmin, None
except Exception as exc:
logger.info("Garmin token invalid (%s), re-authenticating", exc)
# Full login with email + password
garmin = garminconnect.Garmin(email=email, password=decrypt_password(password_enc))
garmin.login()
return garmin, garmin.garth.dumps()
# ── Activity sync ─────────────────────────────────────────────────────────────
def sync_activities(garmin, user_id: int, since: Optional[datetime],
db, file_store_path: str, lookback_days: int = 30) -> int:
"""
List activities from Garmin Connect, skip any already in the DB, download
FIT ZIPs for new ones, and queue them for processing.
On first sync (since=None) the start date is determined by lookback_days:
-1 → full history back to 2010; N → today minus N days.
On incremental syncs fetches from one day before last_sync_at.
Returns the number of new activities queued.
"""
import time
from app.workers.tasks import process_activity_file
from app.models.user import Activity
from sqlalchemy import select, func
if since:
start_date = (since - timedelta(days=1)).date()
elif lookback_days == -1:
start_date = date(2010, 1, 1)
else:
start_date = date.today() - timedelta(days=max(lookback_days, 1))
end_date = date.today()
try:
activities = garmin.get_activities_by_date(
start_date.isoformat(), end_date.isoformat()
)
except Exception as exc:
logger.error("Failed to list Garmin activities: %s", exc)
return 0
queued = 0
for act in activities:
garmin_id = str(act.get("activityId", "")).strip()
if not garmin_id:
continue
# Fast path: already imported via Garmin Connect sync
existing = db.execute(
select(Activity).where(Activity.garmin_activity_id == garmin_id)
).scalar_one_or_none()
if existing:
continue
# Slow-path dedup: activity imported via bulk export (no garmin_activity_id).
# Check by start_time; stamp the ID so future syncs skip it in the fast path.
act_start_str = act.get("startTimeLocal") or act.get("startTimeGMT") or ""
if act_start_str:
try:
from datetime import datetime as _dt
act_start = _dt.fromisoformat(act_start_str.replace("Z", "+00:00"))
time_match = db.execute(
select(Activity).where(
Activity.user_id == user_id,
func.date(Activity.start_time) == act_start.date(),
)
).scalar_one_or_none()
if time_match:
if not time_match.garmin_activity_id:
time_match.garmin_activity_id = garmin_id
db.commit()
continue
except Exception:
pass # couldn't parse time — fall through to download
# Download original FIT (Garmin wraps it in a ZIP)
try:
zip_bytes = garmin.download_activity(
int(garmin_id),
dl_fmt=garmin.ActivityDownloadFormat.ORIGINAL,
)
except Exception as exc:
logger.warning("Failed to download activity %s: %s", garmin_id, exc)
continue
# Extract the FIT from the ZIP
try:
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")]
if not fit_names:
logger.debug("No FIT in ZIP for activity %s", garmin_id)
continue
fit_data = zf.read(fit_names[0])
except Exception as exc:
logger.warning("Failed to unzip activity %s: %s", garmin_id, exc)
continue
# Save to disk and queue
dest_dir = Path(file_store_path) / str(user_id) / "garmin_connect"
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / f"{garmin_id}.fit"
dest.write_bytes(fit_data)
process_activity_file.delay(str(dest), user_id, "fit", garmin_id)
queued += 1
# Brief pause to avoid hammering the Garmin API
time.sleep(0.5)
return queued
# ── Wellness sync ─────────────────────────────────────────────────────────────
def sync_wellness(garmin, user_id: int, since: Optional[datetime], db) -> int:
"""
Fetch daily stats / sleep / HRV from the Garmin Connect JSON API for each
day since `since` and upsert into health_metrics.
Returns the number of days upserted.
"""
from sqlalchemy import text
# First sync: 90 days of wellness history; incremental: from last sync
start_date = since.date() if since else (date.today() - timedelta(days=90))
days = (date.today() - start_date).days + 1
processed = 0
for i in range(max(days, 1)):
day = start_date + timedelta(days=i)
day_str = day.isoformat()
stats = _safe(garmin.get_stats, day_str)
sleep_data = _safe(garmin.get_sleep_data, day_str)
hrv_data = _safe(garmin.get_hrv_data, day_str)
row = _parse_day(stats, sleep_data, hrv_data)
if not row:
continue
cols = list(row.keys())
col_sql = ", ".join(cols)
val_sql = ", ".join(f":{c}" for c in cols)
upd_sql = ", ".join(
# total_calories uses GREATEST so multiple sources don't downgrade
f"{c} = GREATEST(EXCLUDED.{c}, health_metrics.{c})"
if c == "total_calories" else
f"{c} = COALESCE(EXCLUDED.{c}, health_metrics.{c})"
for c in cols
)
params = {"user_id": user_id, "day": day.isoformat()}
params.update(row)
db.execute(text(f"""
INSERT INTO health_metrics (user_id, date, {col_sql})
VALUES (:user_id, :day, {val_sql})
ON CONFLICT (user_id, date) DO UPDATE SET {upd_sql}
"""), params)
db.commit()
processed += 1
return processed
def _safe(fn, *args):
try:
return fn(*args)
except Exception as exc:
logger.debug("%s(%s) skipped: %s", fn.__name__, args, exc)
return None
def _parse_day(stats, sleep_data, hrv_data) -> dict:
row = {}
if stats:
_set(row, "resting_hr", stats.get("restingHeartRate"))
_set(row, "avg_hr_day", stats.get("averageHeartRate"))
_set(row, "max_hr_day", stats.get("maxHeartRate"))
_set(row, "steps", stats.get("totalSteps"))
_set(row, "floors_climbed", stats.get("floorsAscended"))
_set(row, "avg_stress", stats.get("averageStressLevel"))
active = stats.get("activeKilocalories")
bmr = stats.get("bmrKilocalories")
_set(row, "active_calories", active)
if active and bmr:
_set(row, "total_calories", float(active) + float(bmr))
if sleep_data:
dto = sleep_data.get("dailySleepDTO") or sleep_data
_set(row, "sleep_duration_s", dto.get("sleepTimeSeconds"))
_set(row, "sleep_deep_s", dto.get("deepSleepSeconds"))
_set(row, "sleep_light_s", dto.get("lightSleepSeconds"))
_set(row, "sleep_rem_s", dto.get("remSleepSeconds"))
_set(row, "sleep_awake_s", dto.get("awakeSleepSeconds"))
# Timestamps are milliseconds since epoch in local time
for key, col in (("sleepStartTimestampLocal", "sleep_start"),
("sleepEndTimestampLocal", "sleep_end")):
ms = dto.get(key)
if ms:
_set(row, col, datetime.fromtimestamp(ms / 1000, tz=timezone.utc).isoformat())
# SpO2
spo2 = dto.get("averageSpO2Value")
if spo2 and 50 < float(spo2) <= 100:
row["spo2_avg"] = float(spo2)
# Sleep score — structure varies across firmware
scores = sleep_data.get("sleepScores") or sleep_data.get("sleepScore")
if isinstance(scores, dict):
overall = scores.get("overall") or scores.get("qualityScore")
if isinstance(overall, dict):
_set(row, "sleep_score", overall.get("value"))
else:
_set(row, "sleep_score", overall)
elif isinstance(scores, (int, float)):
row["sleep_score"] = scores
if hrv_data:
summary = hrv_data.get("hrvSummary") or hrv_data
_set(row, "hrv_nightly_avg", summary.get("lastNight") or summary.get("lastNightAvg"))
_set(row, "hrv_5min_high", summary.get("lastNight5MinHigh"))
status = summary.get("status")
if status:
row["hrv_status"] = str(status).lower()
return row
def _set(d: dict, key: str, val):
if val is not None:
d[key] = val