Files
MileVault/backend/app/services/wellness_parser.py
T
owain 16cf4a9313
Build and push images / validate (push) Successful in 3s
Build and push images / build-backend (push) Successful in 5s
Build and push images / build-worker (push) Successful in 5s
Build and push images / build-frontend (push) Successful in 4s
Fix wellness_parser - had fit_parser content instead of wellness parser
2026-06-06 19:38:53 +01:00

257 lines
9.7 KiB
Python

"""
Garmin wellness FIT file parser using the official Garmin FIT Python SDK.
SDK field names are camelCase as per the SDK documentation.
"""
from datetime import datetime, timezone, date
from typing import Optional
from garmin_fit_sdk import Decoder, Stream
FIT_EPOCH_S = 631065600
def _fit_ts(raw) -> Optional[datetime]:
if raw is None:
return None
try:
s = int(raw)
if s <= 0 or s == 0xFFFFFFFF:
return None
return datetime.fromtimestamp(s + FIT_EPOCH_S, tz=timezone.utc)
except (TypeError, ValueError, OverflowError, OSError):
return None
def _to_date(val) -> Optional[date]:
if val is None:
return None
if isinstance(val, datetime):
if val.tzinfo is None:
val = val.replace(tzinfo=timezone.utc)
return val.date()
if isinstance(val, (int, float)):
dt = _fit_ts(val)
return dt.date() if dt else None
return None
def parse_wellness_fit(file_path: str) -> dict:
"""
Parse a Garmin wellness/monitoring FIT file.
Returns {"days": {date: metrics_dict}, "error": str|None}
"""
daily = {}
def ensure_day(d: date) -> dict:
if d not in daily:
daily[d] = {
"heart_rates": [],
"stress_values": [],
"spo2_readings": [],
"sleep_levels": [],
"steps": None,
"floors_climbed": None,
"active_calories": None,
"total_calories": None,
"resting_hr": None,
"hrv_nightly_avg": None,
"hrv_5min_high": None,
"hrv_status": None,
}
return daily[d]
def listener(mesg_num: int, msg: dict):
# monitoring_info (147)
if mesg_num == 147:
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
rhr = msg.get("restingHeartRate")
if d and rhr and 20 < rhr < 120:
ensure_day(d)["resting_hr"] = int(rhr)
# monitoring (148)
elif mesg_num == 148:
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
if not d:
return
entry = ensure_day(d)
hr = msg.get("heartRate")
if hr and 20 < hr < 250:
entry["heart_rates"].append(int(hr))
steps = msg.get("steps") or msg.get("cycles")
if steps and steps > 0:
entry["steps"] = max(entry["steps"] or 0, int(steps))
stress = msg.get("stressLevelValue")
if stress is not None and stress >= 0:
entry["stress_values"].append(int(stress))
# hrv_status_summary (275)
elif mesg_num == 275:
d = _to_date(msg.get("timestamp"))
if not d:
return
entry = ensure_day(d)
for key in ("weeklyAverage", "lastNightAvg", "hrvNightlyAvg"):
v = msg.get(key)
if v and v > 0:
entry["hrv_nightly_avg"] = float(v)
break
high = msg.get("lastNight5MinHigh")
if high:
entry["hrv_5min_high"] = float(high)
status = msg.get("hrvStatus")
if status:
entry["hrv_status"] = str(status)
# stress_level (132)
elif mesg_num == 132:
d = _to_date(msg.get("stressLevelTime") or msg.get("timestamp"))
if not d:
return
stress = msg.get("stressLevelValue")
if stress is not None and stress >= 0:
ensure_day(d)["stress_values"].append(int(stress))
# spo2_data (258)
elif mesg_num == 258:
d = _to_date(msg.get("timestamp"))
if not d:
return
spo2 = msg.get("spo2Percent") or msg.get("readingSpo2")
if spo2 and 50 < spo2 <= 100:
ensure_day(d)["spo2_readings"].append(float(spo2))
# sleep_level (269)
elif mesg_num == 269:
d = _to_date(msg.get("timestamp"))
if not d:
return
level = msg.get("sleepLevel")
if level is not None:
if isinstance(level, str):
level_map = {"unmeasurable": 0, "awake": 1, "light": 2, "deep": 3, "rem": 4}
level = level_map.get(level.lower())
if level is not None:
ensure_day(d)["sleep_levels"].append(int(level))
# Proprietary 227: per-minute stress + HR
elif mesg_num == 227:
ts_raw = msg.get(1) or msg.get("1")
hr_raw = msg.get(2) or msg.get("2")
stress_raw = msg.get(0) or msg.get("0")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
if hr_raw and isinstance(hr_raw, (int, float)) and 20 < hr_raw < 250:
entry["heart_rates"].append(int(hr_raw))
if stress_raw is not None and isinstance(stress_raw, (int, float)) and stress_raw >= 0:
entry["stress_values"].append(int(stress_raw))
# Proprietary 103: daily totals
elif mesg_num == 103:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
steps = msg.get(3) or msg.get("3")
if steps and isinstance(steps, (int, float)) and steps > 0:
entry["steps"] = int(steps)
floors = msg.get(4) or msg.get("4")
if floors and isinstance(floors, (int, float)) and floors > 0:
f = float(floors)
entry["floors_climbed"] = round(f / 100 if f > 1000 else f, 1)
active_cal = msg.get(5) or msg.get("5")
if active_cal and isinstance(active_cal, (int, float)) and active_cal > 0:
entry["active_calories"] = float(active_cal)
total_cal = msg.get(7) or msg.get("7")
if total_cal and isinstance(total_cal, (int, float)) and total_cal > 0:
entry["total_calories"] = float(total_cal)
# Proprietary 211: resting HR + HRV
elif mesg_num == 211:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
rhr = msg.get(0) or msg.get("0")
if rhr and isinstance(rhr, (int, float)) and 20 < rhr < 120:
entry["resting_hr"] = int(rhr)
hrv = msg.get(1) or msg.get("1")
if hrv and isinstance(hrv, (int, float)) and 5 < hrv < 300:
entry["hrv_nightly_avg"] = float(hrv)
# Proprietary 55: activity accumulations
elif mesg_num == 55:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
steps = msg.get(2) or msg.get("2")
if steps and isinstance(steps, (int, float)) and steps > 0:
entry["steps"] = max(entry["steps"] or 0, int(steps))
hr = msg.get(19) or msg.get("19")
if hr and isinstance(hr, (int, float)) and 20 < hr < 250:
entry["heart_rates"].append(int(hr))
try:
stream = Stream.from_file(file_path)
decoder = Decoder(stream)
messages, errors = decoder.read(
apply_scale_and_offset=True,
convert_datetimes_to_dates=True,
convert_types_to_strings=True,
enable_crc_check=False,
expand_sub_fields=True,
expand_components=True,
merge_heart_rates=False,
mesg_listener=listener,
)
except Exception as e:
return {"error": str(e), "days": {}}
result = {}
for day_date, data in daily.items():
hrs = data.pop("heart_rates", [])
stresses = data.pop("stress_values", [])
spo2s = data.pop("spo2_readings", [])
sleep_levels = data.pop("sleep_levels", [])
avg_hr = round(sum(hrs) / len(hrs), 1) if hrs else None
max_hr = max(hrs) if hrs else None
avg_stress = round(sum(s for s in stresses if s >= 0) / len(stresses), 1) if stresses else None
spo2_avg = round(sum(spo2s) / len(spo2s), 1) if spo2s else None
if sleep_levels:
sleep_deep_s = sum(30 for l in sleep_levels if l == 3) or None
sleep_light_s = sum(30 for l in sleep_levels if l == 2) or None
sleep_rem_s = sum(30 for l in sleep_levels if l == 4) or None
sleep_awake_s = sum(30 for l in sleep_levels if l == 1) or None
sleep_duration_s = (sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0) or None
else:
sleep_deep_s = sleep_light_s = sleep_rem_s = sleep_awake_s = sleep_duration_s = None
result[day_date] = {
"resting_hr": data.get("resting_hr"),
"avg_hr_day": avg_hr,
"max_hr_day": max_hr,
"avg_stress": avg_stress,
"spo2_avg": spo2_avg,
"hrv_nightly_avg": data.get("hrv_nightly_avg"),
"hrv_5min_high": data.get("hrv_5min_high"),
"hrv_status": data.get("hrv_status"),
"steps": data.get("steps"),
"floors_climbed": data.get("floors_climbed"),
"active_calories": data.get("active_calories"),
"total_calories": data.get("total_calories"),
"sleep_duration_s": sleep_duration_s,
"sleep_deep_s": sleep_deep_s,
"sleep_light_s": sleep_light_s,
"sleep_rem_s": sleep_rem_s,
"sleep_awake_s": sleep_awake_s,
}
return {"days": result, "error": None}