257 lines
9.7 KiB
Python
257 lines
9.7 KiB
Python
"""
|
|
Garmin wellness FIT file parser using the official Garmin FIT Python SDK.
|
|
SDK field names are camelCase as per the SDK documentation.
|
|
"""
|
|
from datetime import datetime, timezone, date
|
|
from typing import Optional
|
|
from garmin_fit_sdk import Decoder, Stream
|
|
|
|
|
|
FIT_EPOCH_S = 631065600
|
|
|
|
|
|
def _fit_ts(raw) -> Optional[datetime]:
|
|
if raw is None:
|
|
return None
|
|
try:
|
|
s = int(raw)
|
|
if s <= 0 or s == 0xFFFFFFFF:
|
|
return None
|
|
return datetime.fromtimestamp(s + FIT_EPOCH_S, tz=timezone.utc)
|
|
except (TypeError, ValueError, OverflowError, OSError):
|
|
return None
|
|
|
|
|
|
def _to_date(val) -> Optional[date]:
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, datetime):
|
|
if val.tzinfo is None:
|
|
val = val.replace(tzinfo=timezone.utc)
|
|
return val.date()
|
|
if isinstance(val, (int, float)):
|
|
dt = _fit_ts(val)
|
|
return dt.date() if dt else None
|
|
return None
|
|
|
|
|
|
def parse_wellness_fit(file_path: str) -> dict:
|
|
"""
|
|
Parse a Garmin wellness/monitoring FIT file.
|
|
Returns {"days": {date: metrics_dict}, "error": str|None}
|
|
"""
|
|
daily = {}
|
|
|
|
def ensure_day(d: date) -> dict:
|
|
if d not in daily:
|
|
daily[d] = {
|
|
"heart_rates": [],
|
|
"stress_values": [],
|
|
"spo2_readings": [],
|
|
"sleep_levels": [],
|
|
"steps": None,
|
|
"floors_climbed": None,
|
|
"active_calories": None,
|
|
"total_calories": None,
|
|
"resting_hr": None,
|
|
"hrv_nightly_avg": None,
|
|
"hrv_5min_high": None,
|
|
"hrv_status": None,
|
|
}
|
|
return daily[d]
|
|
|
|
def listener(mesg_num: int, msg: dict):
|
|
|
|
# monitoring_info (147)
|
|
if mesg_num == 147:
|
|
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
|
|
rhr = msg.get("restingHeartRate")
|
|
if d and rhr and 20 < rhr < 120:
|
|
ensure_day(d)["resting_hr"] = int(rhr)
|
|
|
|
# monitoring (148)
|
|
elif mesg_num == 148:
|
|
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
hr = msg.get("heartRate")
|
|
if hr and 20 < hr < 250:
|
|
entry["heart_rates"].append(int(hr))
|
|
steps = msg.get("steps") or msg.get("cycles")
|
|
if steps and steps > 0:
|
|
entry["steps"] = max(entry["steps"] or 0, int(steps))
|
|
stress = msg.get("stressLevelValue")
|
|
if stress is not None and stress >= 0:
|
|
entry["stress_values"].append(int(stress))
|
|
|
|
# hrv_status_summary (275)
|
|
elif mesg_num == 275:
|
|
d = _to_date(msg.get("timestamp"))
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
for key in ("weeklyAverage", "lastNightAvg", "hrvNightlyAvg"):
|
|
v = msg.get(key)
|
|
if v and v > 0:
|
|
entry["hrv_nightly_avg"] = float(v)
|
|
break
|
|
high = msg.get("lastNight5MinHigh")
|
|
if high:
|
|
entry["hrv_5min_high"] = float(high)
|
|
status = msg.get("hrvStatus")
|
|
if status:
|
|
entry["hrv_status"] = str(status)
|
|
|
|
# stress_level (132)
|
|
elif mesg_num == 132:
|
|
d = _to_date(msg.get("stressLevelTime") or msg.get("timestamp"))
|
|
if not d:
|
|
return
|
|
stress = msg.get("stressLevelValue")
|
|
if stress is not None and stress >= 0:
|
|
ensure_day(d)["stress_values"].append(int(stress))
|
|
|
|
# spo2_data (258)
|
|
elif mesg_num == 258:
|
|
d = _to_date(msg.get("timestamp"))
|
|
if not d:
|
|
return
|
|
spo2 = msg.get("spo2Percent") or msg.get("readingSpo2")
|
|
if spo2 and 50 < spo2 <= 100:
|
|
ensure_day(d)["spo2_readings"].append(float(spo2))
|
|
|
|
# sleep_level (269)
|
|
elif mesg_num == 269:
|
|
d = _to_date(msg.get("timestamp"))
|
|
if not d:
|
|
return
|
|
level = msg.get("sleepLevel")
|
|
if level is not None:
|
|
if isinstance(level, str):
|
|
level_map = {"unmeasurable": 0, "awake": 1, "light": 2, "deep": 3, "rem": 4}
|
|
level = level_map.get(level.lower())
|
|
if level is not None:
|
|
ensure_day(d)["sleep_levels"].append(int(level))
|
|
|
|
# Proprietary 227: per-minute stress + HR
|
|
elif mesg_num == 227:
|
|
ts_raw = msg.get(1) or msg.get("1")
|
|
hr_raw = msg.get(2) or msg.get("2")
|
|
stress_raw = msg.get(0) or msg.get("0")
|
|
d = _to_date(ts_raw)
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
if hr_raw and isinstance(hr_raw, (int, float)) and 20 < hr_raw < 250:
|
|
entry["heart_rates"].append(int(hr_raw))
|
|
if stress_raw is not None and isinstance(stress_raw, (int, float)) and stress_raw >= 0:
|
|
entry["stress_values"].append(int(stress_raw))
|
|
|
|
# Proprietary 103: daily totals
|
|
elif mesg_num == 103:
|
|
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
|
|
d = _to_date(ts_raw)
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
steps = msg.get(3) or msg.get("3")
|
|
if steps and isinstance(steps, (int, float)) and steps > 0:
|
|
entry["steps"] = int(steps)
|
|
floors = msg.get(4) or msg.get("4")
|
|
if floors and isinstance(floors, (int, float)) and floors > 0:
|
|
f = float(floors)
|
|
entry["floors_climbed"] = round(f / 100 if f > 1000 else f, 1)
|
|
active_cal = msg.get(5) or msg.get("5")
|
|
if active_cal and isinstance(active_cal, (int, float)) and active_cal > 0:
|
|
entry["active_calories"] = float(active_cal)
|
|
total_cal = msg.get(7) or msg.get("7")
|
|
if total_cal and isinstance(total_cal, (int, float)) and total_cal > 0:
|
|
entry["total_calories"] = float(total_cal)
|
|
|
|
# Proprietary 211: resting HR + HRV
|
|
elif mesg_num == 211:
|
|
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
|
|
d = _to_date(ts_raw)
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
rhr = msg.get(0) or msg.get("0")
|
|
if rhr and isinstance(rhr, (int, float)) and 20 < rhr < 120:
|
|
entry["resting_hr"] = int(rhr)
|
|
hrv = msg.get(1) or msg.get("1")
|
|
if hrv and isinstance(hrv, (int, float)) and 5 < hrv < 300:
|
|
entry["hrv_nightly_avg"] = float(hrv)
|
|
|
|
# Proprietary 55: activity accumulations
|
|
elif mesg_num == 55:
|
|
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
|
|
d = _to_date(ts_raw)
|
|
if not d:
|
|
return
|
|
entry = ensure_day(d)
|
|
steps = msg.get(2) or msg.get("2")
|
|
if steps and isinstance(steps, (int, float)) and steps > 0:
|
|
entry["steps"] = max(entry["steps"] or 0, int(steps))
|
|
hr = msg.get(19) or msg.get("19")
|
|
if hr and isinstance(hr, (int, float)) and 20 < hr < 250:
|
|
entry["heart_rates"].append(int(hr))
|
|
|
|
try:
|
|
stream = Stream.from_file(file_path)
|
|
decoder = Decoder(stream)
|
|
messages, errors = decoder.read(
|
|
apply_scale_and_offset=True,
|
|
convert_datetimes_to_dates=True,
|
|
convert_types_to_strings=True,
|
|
enable_crc_check=False,
|
|
expand_sub_fields=True,
|
|
expand_components=True,
|
|
merge_heart_rates=False,
|
|
mesg_listener=listener,
|
|
)
|
|
except Exception as e:
|
|
return {"error": str(e), "days": {}}
|
|
|
|
result = {}
|
|
for day_date, data in daily.items():
|
|
hrs = data.pop("heart_rates", [])
|
|
stresses = data.pop("stress_values", [])
|
|
spo2s = data.pop("spo2_readings", [])
|
|
sleep_levels = data.pop("sleep_levels", [])
|
|
|
|
avg_hr = round(sum(hrs) / len(hrs), 1) if hrs else None
|
|
max_hr = max(hrs) if hrs else None
|
|
avg_stress = round(sum(s for s in stresses if s >= 0) / len(stresses), 1) if stresses else None
|
|
spo2_avg = round(sum(spo2s) / len(spo2s), 1) if spo2s else None
|
|
|
|
if sleep_levels:
|
|
sleep_deep_s = sum(30 for l in sleep_levels if l == 3) or None
|
|
sleep_light_s = sum(30 for l in sleep_levels if l == 2) or None
|
|
sleep_rem_s = sum(30 for l in sleep_levels if l == 4) or None
|
|
sleep_awake_s = sum(30 for l in sleep_levels if l == 1) or None
|
|
sleep_duration_s = (sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0) or None
|
|
else:
|
|
sleep_deep_s = sleep_light_s = sleep_rem_s = sleep_awake_s = sleep_duration_s = None
|
|
|
|
result[day_date] = {
|
|
"resting_hr": data.get("resting_hr"),
|
|
"avg_hr_day": avg_hr,
|
|
"max_hr_day": max_hr,
|
|
"avg_stress": avg_stress,
|
|
"spo2_avg": spo2_avg,
|
|
"hrv_nightly_avg": data.get("hrv_nightly_avg"),
|
|
"hrv_5min_high": data.get("hrv_5min_high"),
|
|
"hrv_status": data.get("hrv_status"),
|
|
"steps": data.get("steps"),
|
|
"floors_climbed": data.get("floors_climbed"),
|
|
"active_calories": data.get("active_calories"),
|
|
"total_calories": data.get("total_calories"),
|
|
"sleep_duration_s": sleep_duration_s,
|
|
"sleep_deep_s": sleep_deep_s,
|
|
"sleep_light_s": sleep_light_s,
|
|
"sleep_rem_s": sleep_rem_s,
|
|
"sleep_awake_s": sleep_awake_s,
|
|
}
|
|
|
|
return {"days": result, "error": None} |