Fix VO2 max sync: robust fallback when maxmet range returns non-list or valueless entries
The previous code used `if not mm_history:` to decide whether to fall back to get_training_status(). If the maxmet API returned a non-empty list with no valid vo2max values (or a non-list type), the fallback was skipped and nothing stored. Changes: - Normalise mm_raw: only use it if it's a list (handles dict/None responses) - Check valid_from_range: fall back to training_status whenever no usable value was found in the range query, regardless of whether it returned entries - Upgrade all related log lines to INFO so the result is visible without debug mode - Guard the entry loop against non-dict items Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -332,25 +332,40 @@ def sync_wellness(garmin, user_id: int, since: Optional[datetime], db,
|
||||
if fa_data:
|
||||
fa_age = fa_data.get("fitnessAge") or fa_data.get("achievableFitnessAge")
|
||||
|
||||
mm_entries = []
|
||||
try:
|
||||
mm_history = garmin.connectapi(
|
||||
mm_raw = garmin.connectapi(
|
||||
f"/metrics-service/metrics/maxmet/daily/{start_date.isoformat()}/{today_str}"
|
||||
)
|
||||
logger.info("maxmet range query returned type=%s len=%s",
|
||||
type(mm_raw).__name__,
|
||||
len(mm_raw) if isinstance(mm_raw, (list, dict)) else "n/a")
|
||||
if isinstance(mm_raw, list):
|
||||
mm_entries = mm_raw
|
||||
except Exception as exc:
|
||||
logger.debug("maxmet history fetch failed: %s", exc)
|
||||
mm_history = []
|
||||
logger.info("maxmet history fetch failed: %s", exc)
|
||||
|
||||
# Fall back to most-recent from training status if history is empty
|
||||
if not mm_history:
|
||||
# Check whether the range query yielded any usable vo2max values
|
||||
valid_from_range = any(
|
||||
(entry.get("vo2MaxPreciseValue") or entry.get("vo2MaxValue") or 0)
|
||||
for entry in mm_entries
|
||||
if isinstance(entry, dict)
|
||||
)
|
||||
|
||||
# Always fall back to training_status when the range query had no valid data
|
||||
if not valid_from_range:
|
||||
ts_data = _safe(garmin.get_training_status, today_str)
|
||||
generic = (ts_data or {}).get("mostRecentVO2Max", {}).get("generic") or {}
|
||||
generic = ((ts_data or {}).get("mostRecentVO2Max") or {}).get("generic") or {}
|
||||
v = generic.get("vo2MaxPreciseValue") or generic.get("vo2MaxValue")
|
||||
logger.info("training_status vo2max=%s at %s", v, generic.get("calendarDate"))
|
||||
if v and float(v) > 0:
|
||||
mm_history = [{"calendarDate": generic.get("calendarDate") or today_str,
|
||||
mm_entries = [{"calendarDate": generic.get("calendarDate") or today_str,
|
||||
"vo2MaxPreciseValue": float(v)}]
|
||||
|
||||
stored = 0
|
||||
for entry in (mm_history or []):
|
||||
for entry in mm_entries:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
v = entry.get("vo2MaxPreciseValue") or entry.get("vo2MaxValue")
|
||||
if not v or float(v) <= 0:
|
||||
continue
|
||||
@@ -372,8 +387,7 @@ def sync_wellness(garmin, user_id: int, since: Optional[datetime], db,
|
||||
logger.warning("Failed to upsert VO2 max for %s: %s", entry_date, exc)
|
||||
db.rollback()
|
||||
|
||||
if stored:
|
||||
logger.info("Stored %d VO2 max data points", stored)
|
||||
logger.info("VO2 max: stored=%d from range_valid=%s", stored, valid_from_range)
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
@@ -0,0 +1,581 @@
|
||||
"""
|
||||
Garmin Connect sync helpers.
|
||||
|
||||
authenticate_garmin() returns an authenticated client, refreshing the stored
|
||||
OAuth token when possible and falling back to email/password re-login.
|
||||
|
||||
sync_activities() downloads new FIT files and queues them for processing.
|
||||
sync_wellness() pulls daily stats/sleep/HRV summaries from the JSON API
|
||||
and upserts them into health_metrics.
|
||||
"""
|
||||
import io
|
||||
import zipfile
|
||||
import logging
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Password encryption ─────────────────────────────────────────────────────
|
||||
|
||||
def _fernet():
|
||||
import base64, hashlib
|
||||
from cryptography.fernet import Fernet
|
||||
from app.core.config import settings
|
||||
key = base64.urlsafe_b64encode(hashlib.sha256(settings.secret_key.encode()).digest())
|
||||
return Fernet(key)
|
||||
|
||||
|
||||
def encrypt_password(password: str) -> str:
|
||||
return _fernet().encrypt(password.encode()).decode()
|
||||
|
||||
|
||||
def decrypt_password(enc: str) -> str:
|
||||
return _fernet().decrypt(enc.encode()).decode()
|
||||
|
||||
|
||||
# ── Auth ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def authenticate_garmin(email: str, password_enc: str, token_store: Optional[str]) -> Tuple:
|
||||
"""
|
||||
Returns (garmin_client, new_token_store_or_None).
|
||||
new_token_store is set only when tokens were refreshed/re-created so the
|
||||
caller can persist them.
|
||||
"""
|
||||
import garminconnect
|
||||
|
||||
# Try stored OAuth token first.
|
||||
# Use garth.loads() directly (always treats the argument as an inline string).
|
||||
# garmin.login(tokenstore=...) dispatches on len>512, treating short tokens as
|
||||
# filesystem paths and raising FileNotFoundError on every token-based auth attempt.
|
||||
# After loads(), set display_name from the embedded profile — required by
|
||||
# get_stats(), get_sleep_data(), and other endpoints that build URLs from it.
|
||||
if token_store:
|
||||
try:
|
||||
garmin = garminconnect.Garmin(
|
||||
email=email, password=decrypt_password(password_enc)
|
||||
)
|
||||
garmin.garth.loads(token_store)
|
||||
garmin.display_name = (garmin.garth.profile or {}).get("displayName", "")
|
||||
return garmin, None
|
||||
except Exception as exc:
|
||||
logger.info("Garmin token invalid (%s), re-authenticating", exc)
|
||||
|
||||
# Full login with email + password
|
||||
garmin = garminconnect.Garmin(email=email, password=decrypt_password(password_enc))
|
||||
garmin.login()
|
||||
return garmin, garmin.garth.dumps()
|
||||
|
||||
|
||||
# ── Activity sync ─────────────────────────────────────────────────────────────
|
||||
|
||||
def sync_activities(garmin, user_id: int, since: Optional[datetime],
|
||||
db, file_store_path: str, lookback_days: int = 30,
|
||||
status_callback=None) -> int:
|
||||
"""
|
||||
List activities from Garmin Connect, skip any already in the DB, download
|
||||
FIT ZIPs for new ones, and queue them for processing.
|
||||
|
||||
lookback_days controls the start date on every sync:
|
||||
-1 → full history back to 2010 on first sync, then incremental (since-1d)
|
||||
N → incremental (since-1d) when since is set; else last N days on first sync
|
||||
Returns the number of new activities queued.
|
||||
"""
|
||||
import time
|
||||
from app.workers.tasks import process_activity_file
|
||||
from app.models.user import Activity
|
||||
from sqlalchemy import select, func
|
||||
|
||||
if lookback_days == -1:
|
||||
# All-time: full pull on first sync, incremental thereafter
|
||||
start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1)
|
||||
elif since:
|
||||
# Use whichever is earlier: one day before last sync OR the configured lookback
|
||||
# window. This ensures increasing lookback_days actually fetches older data.
|
||||
incremental = (since - timedelta(days=1)).date()
|
||||
lookback = date.today() - timedelta(days=max(lookback_days, 1))
|
||||
start_date = min(incremental, lookback)
|
||||
else:
|
||||
start_date = date.today() - timedelta(days=max(lookback_days, 1))
|
||||
end_date = date.today()
|
||||
|
||||
try:
|
||||
activities = garmin.get_activities_by_date(
|
||||
start_date.isoformat(), end_date.isoformat()
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error("Failed to list Garmin activities: %s", exc)
|
||||
return 0
|
||||
|
||||
total = len(activities)
|
||||
if status_callback and total:
|
||||
status_callback(f"Syncing activities: 0/{total} queued")
|
||||
|
||||
queued = 0
|
||||
for act in activities:
|
||||
garmin_id = str(act.get("activityId", "")).strip()
|
||||
if not garmin_id:
|
||||
continue
|
||||
|
||||
# Fast path: already imported via Garmin Connect sync
|
||||
existing = db.execute(
|
||||
select(Activity).where(Activity.garmin_activity_id == garmin_id)
|
||||
).scalar_one_or_none()
|
||||
if existing:
|
||||
continue
|
||||
|
||||
# Slow-path dedup: activity imported via bulk export (no garmin_activity_id).
|
||||
# Check by start_time; stamp the ID so future syncs skip it in the fast path.
|
||||
act_start_str = act.get("startTimeLocal") or act.get("startTimeGMT") or ""
|
||||
if act_start_str:
|
||||
try:
|
||||
from datetime import datetime as _dt
|
||||
act_start = _dt.fromisoformat(act_start_str.replace("Z", "+00:00"))
|
||||
time_match = db.execute(
|
||||
select(Activity).where(
|
||||
Activity.user_id == user_id,
|
||||
func.date(Activity.start_time) == act_start.date(),
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if time_match:
|
||||
if not time_match.garmin_activity_id:
|
||||
time_match.garmin_activity_id = garmin_id
|
||||
db.commit()
|
||||
continue
|
||||
except Exception:
|
||||
pass # couldn't parse time — fall through to download
|
||||
|
||||
# Download original FIT (Garmin wraps it in a ZIP)
|
||||
try:
|
||||
zip_bytes = garmin.download_activity(
|
||||
int(garmin_id),
|
||||
dl_fmt=garmin.ActivityDownloadFormat.ORIGINAL,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to download activity %s: %s", garmin_id, exc)
|
||||
continue
|
||||
|
||||
# Extract the FIT from the ZIP
|
||||
try:
|
||||
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
||||
fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")]
|
||||
if not fit_names:
|
||||
logger.debug("No FIT in ZIP for activity %s", garmin_id)
|
||||
continue
|
||||
fit_data = zf.read(fit_names[0])
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to unzip activity %s: %s", garmin_id, exc)
|
||||
continue
|
||||
|
||||
# Save to disk and queue
|
||||
dest_dir = Path(file_store_path) / str(user_id) / "garmin_connect"
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest = dest_dir / f"{garmin_id}.fit"
|
||||
dest.write_bytes(fit_data)
|
||||
|
||||
process_activity_file.delay(str(dest), user_id, "fit", garmin_id)
|
||||
queued += 1
|
||||
|
||||
if status_callback and (queued % 5 == 0 or queued == total):
|
||||
status_callback(f"Syncing activities: {queued}/{total} queued")
|
||||
|
||||
# Brief pause to avoid hammering the Garmin API
|
||||
time.sleep(0.5)
|
||||
|
||||
return queued
|
||||
|
||||
|
||||
# ── Wellness sync ─────────────────────────────────────────────────────────────
|
||||
|
||||
def sync_wellness(garmin, user_id: int, since: Optional[datetime], db,
|
||||
lookback_days: int = 90, status_callback=None) -> int:
|
||||
"""
|
||||
Fetch daily stats / sleep / HRV from the Garmin Connect JSON API for each
|
||||
day in the window and upsert into health_metrics.
|
||||
|
||||
lookback_days controls the window on every sync:
|
||||
-1 → full history back to 2010 on first sync, then incremental (since-1d)
|
||||
N → incremental (since-1d) when since is set; else last N days on first sync
|
||||
Returns the number of days upserted.
|
||||
"""
|
||||
from sqlalchemy import text
|
||||
|
||||
if lookback_days == -1:
|
||||
start_date = (since - timedelta(days=1)).date() if since else date(2010, 1, 1)
|
||||
elif since:
|
||||
# Use whichever is earlier: one day before last sync OR the configured lookback
|
||||
# window. This ensures increasing lookback_days actually fetches older data.
|
||||
incremental = (since - timedelta(days=1)).date()
|
||||
lookback = date.today() - timedelta(days=max(lookback_days, 1))
|
||||
start_date = min(incremental, lookback)
|
||||
else:
|
||||
start_date = date.today() - timedelta(days=max(lookback_days, 1))
|
||||
days = (date.today() - start_date).days + 1
|
||||
processed = 0
|
||||
|
||||
import time as _time
|
||||
import json as _json
|
||||
total_days = max(days, 1)
|
||||
if status_callback:
|
||||
status_callback(f"Syncing wellness: 0/{total_days} days")
|
||||
for i in range(total_days):
|
||||
day = start_date + timedelta(days=i)
|
||||
if status_callback and (i % 5 == 0 or i == total_days - 1):
|
||||
status_callback(f"Syncing wellness: {i + 1}/{total_days} days")
|
||||
day_str = day.isoformat()
|
||||
|
||||
stats = _safe(garmin.get_stats, day_str)
|
||||
sleep_data = _safe(garmin.get_sleep_data, day_str)
|
||||
hrv_data = _safe(garmin.get_hrv_data, day_str)
|
||||
# Intraday HR (requires display_name; skip gracefully if absent)
|
||||
hr_raw = _safe(garmin.get_heart_rates, day_str) if garmin.display_name else None
|
||||
bc_data = _safe(garmin.get_body_composition, day_str, day_str)
|
||||
bb_raw = _safe(garmin.get_body_battery, day_str, day_str)
|
||||
_time.sleep(0.25) # avoid hammering Garmin's wellness API
|
||||
|
||||
row = _parse_day(stats, sleep_data, hrv_data)
|
||||
|
||||
# Weight + body composition from weight service (more reliable than stats)
|
||||
if bc_data:
|
||||
entries = (bc_data.get("dateWeightList")
|
||||
or bc_data.get("allWeightMetrics")
|
||||
or bc_data.get("weightList") or [])
|
||||
if entries:
|
||||
e = entries[0]
|
||||
bw = e.get("weight")
|
||||
if bw and float(bw) > 0:
|
||||
bwf = float(bw)
|
||||
_set(row, "weight_kg", round(bwf / 1000 if bwf > 300 else bwf, 2))
|
||||
if e.get("bmi"):
|
||||
_set(row, "bmi", float(e["bmi"]))
|
||||
if e.get("bodyFat"):
|
||||
_set(row, "body_fat_pct", float(e["bodyFat"]))
|
||||
mm = e.get("muscleMass")
|
||||
if mm and float(mm) > 0:
|
||||
mmf = float(mm)
|
||||
_set(row, "muscle_mass_kg", round(mmf / 1000 if mmf > 300 else mmf, 2))
|
||||
|
||||
# Weight from daily stats as fallback (present when Garmin scale is used)
|
||||
if stats and "weight_kg" not in row:
|
||||
bw = stats.get("bodyWeight")
|
||||
if bw and float(bw) > 0:
|
||||
bwf = float(bw)
|
||||
_set(row, "weight_kg", round(bwf / 1000 if bwf > 300 else bwf, 2))
|
||||
|
||||
# Body battery — store summary + fine-grained timeline
|
||||
bb = None
|
||||
if bb_raw:
|
||||
bb = _parse_body_battery(bb_raw, day_str)
|
||||
if bb:
|
||||
row["body_battery"] = _json.dumps(bb)
|
||||
|
||||
# Intraday heart rate — store non-null [epoch_ms, bpm] pairs + compute daily averages
|
||||
intraday = None
|
||||
if hr_raw:
|
||||
raw_vals = hr_raw.get("heartRateValues") or []
|
||||
intraday = [[int(ts), int(v)] for ts, v in raw_vals if v is not None]
|
||||
if intraday:
|
||||
row["intraday_hr"] = intraday
|
||||
hr_vals = [v for _, v in intraday if v > 0]
|
||||
if hr_vals:
|
||||
row["avg_hr_day"] = round(sum(hr_vals) / len(hr_vals), 1)
|
||||
row["max_hr_day"] = float(max(hr_vals))
|
||||
|
||||
# High-resolution body battery derived from BB checkpoints + intraday HR
|
||||
if bb and intraday:
|
||||
hires = _compute_body_battery_hires(bb.get("values") or [], intraday)
|
||||
if hires:
|
||||
row["body_battery_hires"] = _json.dumps(hires)
|
||||
|
||||
if not row:
|
||||
continue
|
||||
|
||||
# psycopg2 treats Python lists/dicts as PG arrays/hstore; serialize JSON
|
||||
# columns as strings so psycopg2 passes them correctly to json/jsonb columns.
|
||||
if "intraday_hr" in row and not isinstance(row["intraday_hr"], str):
|
||||
row["intraday_hr"] = _json.dumps(row["intraday_hr"])
|
||||
if "body_battery" in row and not isinstance(row["body_battery"], str):
|
||||
row["body_battery"] = _json.dumps(row["body_battery"])
|
||||
|
||||
cols = list(row.keys())
|
||||
col_sql = ", ".join(cols)
|
||||
val_sql = ", ".join(f":{c}" for c in cols)
|
||||
upd_sql = ", ".join(
|
||||
# total_calories uses GREATEST so multiple sources don't downgrade
|
||||
f"{c} = GREATEST(EXCLUDED.{c}, health_metrics.{c})"
|
||||
if c == "total_calories" else
|
||||
f"{c} = COALESCE(EXCLUDED.{c}, health_metrics.{c})"
|
||||
for c in cols
|
||||
)
|
||||
|
||||
params = {"user_id": user_id, "day": day.isoformat()}
|
||||
params.update(row)
|
||||
|
||||
try:
|
||||
db.execute(text(f"""
|
||||
INSERT INTO health_metrics (user_id, date, {col_sql})
|
||||
VALUES (:user_id, :day, {val_sql})
|
||||
ON CONFLICT (user_id, date) DO UPDATE SET {upd_sql}
|
||||
"""), params)
|
||||
db.commit()
|
||||
processed += 1
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to upsert health_metrics for %s: %s", day_str, exc)
|
||||
db.rollback()
|
||||
|
||||
# Fetch historical VO2 max across the full sync window via maxmet/daily range query
|
||||
today_str = date.today().isoformat()
|
||||
fa_data = _safe(garmin.get_fitnessage_data, today_str)
|
||||
fa_age = None
|
||||
if fa_data:
|
||||
fa_age = fa_data.get("fitnessAge") or fa_data.get("achievableFitnessAge")
|
||||
|
||||
mm_entries = []
|
||||
try:
|
||||
mm_raw = garmin.connectapi(
|
||||
f"/metrics-service/metrics/maxmet/daily/{start_date.isoformat()}/{today_str}"
|
||||
)
|
||||
logger.info("maxmet range query returned type=%s len=%s",
|
||||
type(mm_raw).__name__,
|
||||
len(mm_raw) if isinstance(mm_raw, (list, dict)) else "n/a")
|
||||
if isinstance(mm_raw, list):
|
||||
mm_entries = mm_raw
|
||||
except Exception as exc:
|
||||
logger.info("maxmet history fetch failed: %s", exc)
|
||||
|
||||
# Check whether the range query yielded any usable vo2max values
|
||||
valid_from_range = any(
|
||||
(entry.get("vo2MaxPreciseValue") or entry.get("vo2MaxValue") or 0)
|
||||
for entry in mm_entries
|
||||
if isinstance(entry, dict)
|
||||
)
|
||||
|
||||
# Always fall back to training_status when the range query had no valid data
|
||||
if not valid_from_range:
|
||||
ts_data = _safe(garmin.get_training_status, today_str)
|
||||
generic = ((ts_data or {}).get("mostRecentVO2Max") or {}).get("generic") or {}
|
||||
v = generic.get("vo2MaxPreciseValue") or generic.get("vo2MaxValue")
|
||||
logger.info("training_status vo2max=%s at %s", v, generic.get("calendarDate"))
|
||||
if v and float(v) > 0:
|
||||
mm_entries = [{"calendarDate": generic.get("calendarDate") or today_str,
|
||||
"vo2MaxPreciseValue": float(v)}]
|
||||
|
||||
stored = 0
|
||||
for entry in mm_entries:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
v = entry.get("vo2MaxPreciseValue") or entry.get("vo2MaxValue")
|
||||
if not v or float(v) <= 0:
|
||||
continue
|
||||
entry_date = entry.get("calendarDate") or today_str
|
||||
try:
|
||||
fa_row = {"vo2max": float(v)}
|
||||
if fa_age and entry_date == today_str:
|
||||
fa_row["fitness_age"] = int(fa_age)
|
||||
fa_cols = list(fa_row.keys())
|
||||
db.execute(text(f"""
|
||||
INSERT INTO health_metrics (user_id, date, {", ".join(fa_cols)})
|
||||
VALUES (:user_id, :day, {", ".join(f":{c}" for c in fa_cols)})
|
||||
ON CONFLICT (user_id, date) DO UPDATE SET
|
||||
{", ".join(f"{c} = EXCLUDED.{c}" for c in fa_cols)}
|
||||
"""), {"user_id": user_id, "day": entry_date, **fa_row})
|
||||
db.commit()
|
||||
stored += 1
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to upsert VO2 max for %s: %s", entry_date, exc)
|
||||
db.rollback()
|
||||
|
||||
logger.info("VO2 max: stored=%d from range_valid=%s", stored, valid_from_range)
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
def _parse_body_battery(bb_response, day_str: str):
|
||||
"""Parse get_body_battery() response for a single day into a compact dict."""
|
||||
if not bb_response:
|
||||
return None
|
||||
entry = next((e for e in bb_response if e.get("date") == day_str), None)
|
||||
if not entry and bb_response:
|
||||
entry = bb_response[0]
|
||||
if not entry:
|
||||
return None
|
||||
|
||||
charged = entry.get("charged")
|
||||
drained = entry.get("drained")
|
||||
start_lvl = entry.get("startValue")
|
||||
end_lvl = entry.get("endValue")
|
||||
|
||||
# Fine-grained timeline: [[ts_ms, level, type_code, stress], ...]
|
||||
# type_code: 0=REST, 1=ACTIVE, 2=SLEEP, 3=STRESS, 4=UNMEASURABLE
|
||||
values = entry.get("bodyBatteryValuesArray") or []
|
||||
|
||||
if not values:
|
||||
# Fall back to bodyBatteryStatList (segment-level data)
|
||||
type_map = {"REST": 0, "ACTIVE": 1, "SLEEP": 2, "STRESS": 3, "UNMEASURABLE": 4}
|
||||
for seg in (entry.get("bodyBatteryStatList") or []):
|
||||
ts_str = seg.get("startTimestampGMT") or seg.get("startTimestampLocal")
|
||||
if ts_str:
|
||||
try:
|
||||
from datetime import datetime as _dt, timezone as _tz
|
||||
ts = _dt.fromisoformat(ts_str.rstrip("Z")).replace(tzinfo=_tz.utc)
|
||||
type_code = type_map.get(seg.get("activityType", "UNMEASURABLE"), 4)
|
||||
values.append([int(ts.timestamp() * 1000),
|
||||
int(seg.get("bodyBatteryLevel") or 0),
|
||||
type_code,
|
||||
int(seg.get("stressLevel") or -1)])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if charged is None and end_lvl is None and not values:
|
||||
return None
|
||||
|
||||
return {
|
||||
"charged": charged,
|
||||
"drained": drained,
|
||||
"start_level": start_lvl,
|
||||
"end_level": end_lvl,
|
||||
"values": values, # stripped from list-API, returned in intraday endpoint
|
||||
}
|
||||
|
||||
|
||||
def _compute_body_battery_hires(bb_values, intraday_hr):
|
||||
"""
|
||||
Produce a higher-resolution body battery series by interpolating between
|
||||
sparse BB checkpoints using intraday HR as a proxy for effort.
|
||||
|
||||
During drain segments (BB falling) the drain is distributed proportionally
|
||||
to how much each HR reading exceeds the day's median — peaks spend battery
|
||||
faster than valleys. During recovery segments (BB rising) recovery is
|
||||
spread uniformly over time.
|
||||
|
||||
Returns [[ts_ms, level], ...] at the granularity of intraday HR, or None
|
||||
if inputs are insufficient.
|
||||
"""
|
||||
if not bb_values or not intraday_hr or len(bb_values) < 2:
|
||||
return None
|
||||
|
||||
# Drop entries with None timestamp or level — raw API data can have gaps
|
||||
bb = sorted([v for v in bb_values if v[0] is not None and v[1] is not None],
|
||||
key=lambda x: x[0])
|
||||
if len(bb) < 2:
|
||||
return None
|
||||
hr = sorted(intraday_hr, key=lambda x: x[0])
|
||||
|
||||
hr_vals = [bpm for _, bpm in hr if bpm is not None and bpm > 0]
|
||||
if not hr_vals:
|
||||
return None
|
||||
|
||||
hr_median = sorted(hr_vals)[len(hr_vals) // 2]
|
||||
|
||||
result = []
|
||||
for i in range(len(bb) - 1):
|
||||
t1, L1 = bb[i][0], bb[i][1]
|
||||
t2, L2 = bb[i + 1][0], bb[i + 1][1]
|
||||
delta = L2 - L1
|
||||
|
||||
seg_hr = [(ts, bpm) for ts, bpm in hr if t1 <= ts <= t2 and bpm is not None]
|
||||
result.append([t1, round(float(L1), 1)])
|
||||
|
||||
if not seg_hr or abs(delta) < 1:
|
||||
continue
|
||||
|
||||
if delta < 0:
|
||||
# Drain: weight each reading by HR above median
|
||||
efforts = [max(0.0, bpm - hr_median) for _, bpm in seg_hr]
|
||||
total = sum(efforts) or 1.0
|
||||
cumul = 0.0
|
||||
for j, (ts, bpm) in enumerate(seg_hr):
|
||||
cumul += efforts[j] * delta / total
|
||||
level = max(0.0, min(100.0, L1 + cumul))
|
||||
result.append([ts, round(level, 1)])
|
||||
else:
|
||||
# Recovery: linear over time
|
||||
span = max(1, t2 - t1)
|
||||
for ts, _ in seg_hr:
|
||||
frac = (ts - t1) / span
|
||||
level = max(0.0, min(100.0, L1 + delta * frac))
|
||||
result.append([ts, round(level, 1)])
|
||||
|
||||
result.append([bb[-1][0], round(float(bb[-1][1]), 1)])
|
||||
|
||||
# Deduplicate and sort
|
||||
seen, out = set(), []
|
||||
for item in sorted(result, key=lambda x: x[0]):
|
||||
if item[0] not in seen:
|
||||
seen.add(item[0])
|
||||
out.append(item)
|
||||
|
||||
return out if len(out) > 4 else None
|
||||
|
||||
|
||||
def _safe(fn, *args):
|
||||
try:
|
||||
return fn(*args)
|
||||
except Exception as exc:
|
||||
logger.debug("%s(%s) skipped: %s", fn.__name__, args, exc)
|
||||
return None
|
||||
|
||||
|
||||
def _parse_day(stats, sleep_data, hrv_data) -> dict:
|
||||
row = {}
|
||||
|
||||
if stats:
|
||||
_set(row, "resting_hr", stats.get("restingHeartRate"))
|
||||
_set(row, "avg_hr_day", stats.get("averageHeartRate"))
|
||||
_set(row, "max_hr_day", stats.get("maxHeartRate"))
|
||||
_set(row, "steps", stats.get("totalSteps"))
|
||||
_set(row, "floors_climbed", stats.get("floorsAscended"))
|
||||
_set(row, "avg_stress", stats.get("averageStressLevel"))
|
||||
active = stats.get("activeKilocalories")
|
||||
bmr = stats.get("bmrKilocalories")
|
||||
_set(row, "active_calories", active)
|
||||
if active and bmr:
|
||||
_set(row, "total_calories", float(active) + float(bmr))
|
||||
|
||||
if sleep_data:
|
||||
dto = sleep_data.get("dailySleepDTO") or sleep_data
|
||||
_set(row, "sleep_duration_s", dto.get("sleepTimeSeconds"))
|
||||
_set(row, "sleep_deep_s", dto.get("deepSleepSeconds"))
|
||||
_set(row, "sleep_light_s", dto.get("lightSleepSeconds"))
|
||||
_set(row, "sleep_rem_s", dto.get("remSleepSeconds"))
|
||||
_set(row, "sleep_awake_s", dto.get("awakeSleepSeconds"))
|
||||
|
||||
# Timestamps are milliseconds since epoch in local time
|
||||
for key, col in (("sleepStartTimestampLocal", "sleep_start"),
|
||||
("sleepEndTimestampLocal", "sleep_end")):
|
||||
ms = dto.get(key)
|
||||
if ms:
|
||||
_set(row, col, datetime.fromtimestamp(ms / 1000, tz=timezone.utc).isoformat())
|
||||
|
||||
# SpO2
|
||||
spo2 = dto.get("averageSpO2Value")
|
||||
if spo2 and 50 < float(spo2) <= 100:
|
||||
row["spo2_avg"] = float(spo2)
|
||||
|
||||
# Sleep score — structure varies across firmware
|
||||
scores = sleep_data.get("sleepScores") or sleep_data.get("sleepScore")
|
||||
if isinstance(scores, dict):
|
||||
overall = scores.get("overall") or scores.get("qualityScore")
|
||||
if isinstance(overall, dict):
|
||||
_set(row, "sleep_score", overall.get("value"))
|
||||
else:
|
||||
_set(row, "sleep_score", overall)
|
||||
elif isinstance(scores, (int, float)):
|
||||
row["sleep_score"] = scores
|
||||
|
||||
if hrv_data:
|
||||
summary = hrv_data.get("hrvSummary") or hrv_data
|
||||
_set(row, "hrv_nightly_avg", summary.get("lastNight") or summary.get("lastNightAvg"))
|
||||
_set(row, "hrv_5min_high", summary.get("lastNight5MinHigh"))
|
||||
status = summary.get("status")
|
||||
if status:
|
||||
row["hrv_status"] = str(status).lower()
|
||||
|
||||
return row
|
||||
|
||||
|
||||
def _set(d: dict, key: str, val):
|
||||
if val is not None:
|
||||
d[key] = val
|
||||
Reference in New Issue
Block a user