Fix wellness_parser - had fit_parser content instead of wellness parser
Build and push images / validate (push) Successful in 3s
Build and push images / build-backend (push) Successful in 5s
Build and push images / build-worker (push) Successful in 5s
Build and push images / build-frontend (push) Successful in 4s

This commit is contained in:
2026-06-06 19:38:53 +01:00
parent ed4ab0eff8
commit 16cf4a9313
+225 -252
View File
@@ -1,55 +1,205 @@
""" """
FIT and GPX file parser using the official Garmin FIT Python SDK. Garmin wellness FIT file parser using the official Garmin FIT Python SDK.
Field names from the SDK are camelCase as per the SDK documentation. SDK field names are camelCase as per the SDK documentation.
""" """
import math from datetime import datetime, timezone, date
from datetime import datetime, timezone
from typing import Optional from typing import Optional
import gpxpy
import polyline as polyline_lib
from garmin_fit_sdk import Decoder, Stream from garmin_fit_sdk import Decoder, Stream
def haversine_distance(lat1, lon1, lat2, lon2) -> float: FIT_EPOCH_S = 631065600
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
return 2 * R * math.asin(math.sqrt(a))
def _safe_float(val) -> Optional[float]: def _fit_ts(raw) -> Optional[datetime]:
if raw is None:
return None
try: try:
return float(val) if val is not None else None s = int(raw)
except (TypeError, ValueError): if s <= 0 or s == 0xFFFFFFFF:
return None
return datetime.fromtimestamp(s + FIT_EPOCH_S, tz=timezone.utc)
except (TypeError, ValueError, OverflowError, OSError):
return None return None
def _bounding_box(coords: list) -> Optional[dict]: def _to_date(val) -> Optional[date]:
if not coords: if val is None:
return None return None
lats = [c[0] for c in coords] if isinstance(val, datetime):
lons = [c[1] for c in coords] if val.tzinfo is None:
return {"min_lat": min(lats), "max_lat": max(lats), val = val.replace(tzinfo=timezone.utc)
"min_lon": min(lons), "max_lon": max(lons)} return val.date()
if isinstance(val, (int, float)):
dt = _fit_ts(val)
def _ensure_utc(dt) -> Optional[datetime]: return dt.date() if dt else None
if dt is None:
return None
if isinstance(dt, datetime):
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
return None return None
def parse_fit_file(filepath: str) -> dict: def parse_wellness_fit(file_path: str) -> dict:
"""Parse a Garmin .fit activity file using the official Garmin SDK.""" """
stream = Stream.from_file(filepath) Parse a Garmin wellness/monitoring FIT file.
Returns {"days": {date: metrics_dict}, "error": str|None}
"""
daily = {}
def ensure_day(d: date) -> dict:
if d not in daily:
daily[d] = {
"heart_rates": [],
"stress_values": [],
"spo2_readings": [],
"sleep_levels": [],
"steps": None,
"floors_climbed": None,
"active_calories": None,
"total_calories": None,
"resting_hr": None,
"hrv_nightly_avg": None,
"hrv_5min_high": None,
"hrv_status": None,
}
return daily[d]
def listener(mesg_num: int, msg: dict):
# monitoring_info (147)
if mesg_num == 147:
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
rhr = msg.get("restingHeartRate")
if d and rhr and 20 < rhr < 120:
ensure_day(d)["resting_hr"] = int(rhr)
# monitoring (148)
elif mesg_num == 148:
d = _to_date(msg.get("timestamp") or msg.get("localTimestamp"))
if not d:
return
entry = ensure_day(d)
hr = msg.get("heartRate")
if hr and 20 < hr < 250:
entry["heart_rates"].append(int(hr))
steps = msg.get("steps") or msg.get("cycles")
if steps and steps > 0:
entry["steps"] = max(entry["steps"] or 0, int(steps))
stress = msg.get("stressLevelValue")
if stress is not None and stress >= 0:
entry["stress_values"].append(int(stress))
# hrv_status_summary (275)
elif mesg_num == 275:
d = _to_date(msg.get("timestamp"))
if not d:
return
entry = ensure_day(d)
for key in ("weeklyAverage", "lastNightAvg", "hrvNightlyAvg"):
v = msg.get(key)
if v and v > 0:
entry["hrv_nightly_avg"] = float(v)
break
high = msg.get("lastNight5MinHigh")
if high:
entry["hrv_5min_high"] = float(high)
status = msg.get("hrvStatus")
if status:
entry["hrv_status"] = str(status)
# stress_level (132)
elif mesg_num == 132:
d = _to_date(msg.get("stressLevelTime") or msg.get("timestamp"))
if not d:
return
stress = msg.get("stressLevelValue")
if stress is not None and stress >= 0:
ensure_day(d)["stress_values"].append(int(stress))
# spo2_data (258)
elif mesg_num == 258:
d = _to_date(msg.get("timestamp"))
if not d:
return
spo2 = msg.get("spo2Percent") or msg.get("readingSpo2")
if spo2 and 50 < spo2 <= 100:
ensure_day(d)["spo2_readings"].append(float(spo2))
# sleep_level (269)
elif mesg_num == 269:
d = _to_date(msg.get("timestamp"))
if not d:
return
level = msg.get("sleepLevel")
if level is not None:
if isinstance(level, str):
level_map = {"unmeasurable": 0, "awake": 1, "light": 2, "deep": 3, "rem": 4}
level = level_map.get(level.lower())
if level is not None:
ensure_day(d)["sleep_levels"].append(int(level))
# Proprietary 227: per-minute stress + HR
elif mesg_num == 227:
ts_raw = msg.get(1) or msg.get("1")
hr_raw = msg.get(2) or msg.get("2")
stress_raw = msg.get(0) or msg.get("0")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
if hr_raw and isinstance(hr_raw, (int, float)) and 20 < hr_raw < 250:
entry["heart_rates"].append(int(hr_raw))
if stress_raw is not None and isinstance(stress_raw, (int, float)) and stress_raw >= 0:
entry["stress_values"].append(int(stress_raw))
# Proprietary 103: daily totals
elif mesg_num == 103:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
steps = msg.get(3) or msg.get("3")
if steps and isinstance(steps, (int, float)) and steps > 0:
entry["steps"] = int(steps)
floors = msg.get(4) or msg.get("4")
if floors and isinstance(floors, (int, float)) and floors > 0:
f = float(floors)
entry["floors_climbed"] = round(f / 100 if f > 1000 else f, 1)
active_cal = msg.get(5) or msg.get("5")
if active_cal and isinstance(active_cal, (int, float)) and active_cal > 0:
entry["active_calories"] = float(active_cal)
total_cal = msg.get(7) or msg.get("7")
if total_cal and isinstance(total_cal, (int, float)) and total_cal > 0:
entry["total_calories"] = float(total_cal)
# Proprietary 211: resting HR + HRV
elif mesg_num == 211:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
rhr = msg.get(0) or msg.get("0")
if rhr and isinstance(rhr, (int, float)) and 20 < rhr < 120:
entry["resting_hr"] = int(rhr)
hrv = msg.get(1) or msg.get("1")
if hrv and isinstance(hrv, (int, float)) and 5 < hrv < 300:
entry["hrv_nightly_avg"] = float(hrv)
# Proprietary 55: activity accumulations
elif mesg_num == 55:
ts_raw = msg.get(253) or msg.get("253") or msg.get("timestamp")
d = _to_date(ts_raw)
if not d:
return
entry = ensure_day(d)
steps = msg.get(2) or msg.get("2")
if steps and isinstance(steps, (int, float)) and steps > 0:
entry["steps"] = max(entry["steps"] or 0, int(steps))
hr = msg.get(19) or msg.get("19")
if hr and isinstance(hr, (int, float)) and 20 < hr < 250:
entry["heart_rates"].append(int(hr))
try:
stream = Stream.from_file(file_path)
decoder = Decoder(stream) decoder = Decoder(stream)
messages, errors = decoder.read( messages, errors = decoder.read(
apply_scale_and_offset=True, apply_scale_and_offset=True,
convert_datetimes_to_dates=True, convert_datetimes_to_dates=True,
@@ -57,228 +207,51 @@ def parse_fit_file(filepath: str) -> dict:
enable_crc_check=False, enable_crc_check=False,
expand_sub_fields=True, expand_sub_fields=True,
expand_components=True, expand_components=True,
merge_heart_rates=True, merge_heart_rates=False,
mesg_listener=listener,
) )
except Exception as e:
return {"error": str(e), "days": {}}
# SDK returns camelCase keys result = {}
sessions = messages.get("session", [{}]) for day_date, data in daily.items():
session = sessions[0] if sessions else {} hrs = data.pop("heart_rates", [])
records = messages.get("record", []) stresses = data.pop("stress_values", [])
laps = messages.get("lap", []) spo2s = data.pop("spo2_readings", [])
sleep_levels = data.pop("sleep_levels", [])
sport = str(session.get("sport", "generic")).lower() avg_hr = round(sum(hrs) / len(hrs), 1) if hrs else None
sport_map = { max_hr = max(hrs) if hrs else None
"running": "running", "cycling": "cycling", avg_stress = round(sum(s for s in stresses if s >= 0) / len(stresses), 1) if stresses else None
"hiking": "hiking", "walking": "walking", spo2_avg = round(sum(spo2s) / len(spo2s), 1) if spo2s else None
"generic": "other", "trail_running": "running",
"e_biking": "cycling", "open_water_swimming": "other",
}
sport_type = sport_map.get(sport, sport)
start_time = _ensure_utc(session.get("startTime")) if sleep_levels:
sleep_deep_s = sum(30 for l in sleep_levels if l == 3) or None
coords = [] sleep_light_s = sum(30 for l in sleep_levels if l == 2) or None
for r in records: sleep_rem_s = sum(30 for l in sleep_levels if l == 4) or None
lat = r.get("positionLat") sleep_awake_s = sum(30 for l in sleep_levels if l == 1) or None
lon = r.get("positionLong") sleep_duration_s = (sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0) or None
if lat is not None and lon is not None:
if -90 <= lat <= 90 and -180 <= lon <= 180:
coords.append((lat, lon))
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
normalized_points = []
for r in records:
ts = _ensure_utc(r.get("timestamp"))
lat = r.get("positionLat")
lon = r.get("positionLong")
if lat is not None and not (-90 <= lat <= 90):
lat = None
if lon is not None and not (-180 <= lon <= 180):
lon = None
normalized_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": _safe_float(lat),
"longitude": _safe_float(lon),
"altitude_m": _safe_float(r.get("altitude") or r.get("enhancedAltitude")),
"heart_rate": _safe_float(r.get("heartRate")),
"cadence": _safe_float(r.get("cadence")),
"speed_ms": _safe_float(r.get("speed") or r.get("enhancedSpeed")),
"power": _safe_float(r.get("power")),
"temperature_c": _safe_float(r.get("temperature")),
"distance_m": _safe_float(r.get("distance")),
})
normalized_laps = []
for i, lap in enumerate(laps):
ls = _ensure_utc(lap.get("startTime"))
normalized_laps.append({
"lap_number": i + 1,
"start_time": ls.isoformat() if ls else None,
"duration_s": _safe_float(lap.get("totalElapsedTime")),
"distance_m": _safe_float(lap.get("totalDistance")),
"avg_heart_rate": _safe_float(lap.get("avgHeartRate")),
"avg_cadence": _safe_float(lap.get("avgCadence")),
"avg_speed_ms": _safe_float(lap.get("avgSpeed") or lap.get("enhancedAvgSpeed")),
"avg_power": _safe_float(lap.get("avgPower")),
})
name = sport_type.title()
if start_time:
name += " " + start_time.strftime("%Y-%m-%d")
return {
"name": name,
"sport_type": sport_type,
"start_time": start_time.isoformat() if start_time else None,
"distance_m": _safe_float(session.get("totalDistance")),
"duration_s": _safe_float(session.get("totalElapsedTime")),
"elevation_gain_m": _safe_float(session.get("totalAscent")),
"elevation_loss_m": _safe_float(session.get("totalDescent")),
"avg_heart_rate": _safe_float(session.get("avgHeartRate")),
"max_heart_rate": _safe_float(session.get("maxHeartRate")),
"avg_cadence": _safe_float(session.get("avgCadence")),
"avg_power": _safe_float(session.get("avgPower")),
"normalized_power": _safe_float(session.get("normalizedPower")),
"avg_speed_ms": _safe_float(session.get("avgSpeed") or session.get("enhancedAvgSpeed")),
"max_speed_ms": _safe_float(session.get("maxSpeed") or session.get("enhancedMaxSpeed")),
"avg_temperature_c": _safe_float(session.get("avgTemperature")),
"calories": _safe_float(session.get("totalCalories")),
"training_stress_score": _safe_float(session.get("trainingStressScore")),
"vo2max_estimate": _safe_float(session.get("totalTrainingEffect")),
"polyline": encoded_polyline,
"bounding_box": bounding_box,
"source_type": "fit",
"data_points": normalized_points,
"laps": normalized_laps,
}
def parse_gpx_file(filepath: str) -> dict:
"""Parse a GPX file."""
with open(filepath) as f:
gpx = gpxpy.parse(f)
data_points = []
track = gpx.tracks[0] if gpx.tracks else None
if not track:
raise ValueError("No tracks found in GPX file")
for segment in track.segments:
for pt in segment.points:
ts = pt.time
if ts and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
extensions = {}
if pt.extensions:
for ext in pt.extensions:
for child in ext:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
try:
extensions[tag] = float(child.text)
except (ValueError, TypeError):
pass
data_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": pt.latitude,
"longitude": pt.longitude,
"altitude_m": pt.elevation,
"heart_rate": extensions.get("hr"),
"cadence": extensions.get("cad"),
"speed_ms": extensions.get("speed"),
"power": extensions.get("power"),
"temperature_c": extensions.get("temp") or extensions.get("atemp"),
"distance_m": None,
})
coords = [(p["latitude"], p["longitude"]) for p in data_points
if p["latitude"] and p["longitude"]]
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
total_dist = 0.0
prev = None
for p in data_points:
if p["latitude"] and p["longitude"]:
if prev:
total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"])
prev = (p["latitude"], p["longitude"])
p["distance_m"] = total_dist
uphill, downhill = 0.0, 0.0
alts = [p["altitude_m"] for p in data_points if p["altitude_m"]]
for i in range(1, len(alts)):
diff = alts[i] - alts[i-1]
if diff > 0:
uphill += diff
else: else:
downhill += abs(diff) sleep_deep_s = sleep_light_s = sleep_rem_s = sleep_awake_s = sleep_duration_s = None
hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]] result[day_date] = {
start_time_str = data_points[0]["timestamp"] if data_points else None "resting_hr": data.get("resting_hr"),
start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None "avg_hr_day": avg_hr,
end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None "max_hr_day": max_hr,
duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None "avg_stress": avg_stress,
"spo2_avg": spo2_avg,
sport = "running" "hrv_nightly_avg": data.get("hrv_nightly_avg"),
if track.type: "hrv_5min_high": data.get("hrv_5min_high"),
sport = track.type.lower() "hrv_status": data.get("hrv_status"),
"steps": data.get("steps"),
return { "floors_climbed": data.get("floors_climbed"),
"name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}", "active_calories": data.get("active_calories"),
"sport_type": sport, "total_calories": data.get("total_calories"),
"start_time": start_time_str, "sleep_duration_s": sleep_duration_s,
"distance_m": total_dist, "sleep_deep_s": sleep_deep_s,
"duration_s": duration, "sleep_light_s": sleep_light_s,
"elevation_gain_m": uphill, "sleep_rem_s": sleep_rem_s,
"elevation_loss_m": downhill, "sleep_awake_s": sleep_awake_s,
"avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None,
"max_heart_rate": max(hrs) if hrs else None,
"avg_cadence": None,
"avg_power": None,
"normalized_power": None,
"avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None,
"max_speed_ms": None,
"avg_temperature_c": None,
"calories": None,
"training_stress_score": None,
"vo2max_estimate": None,
"polyline": encoded_polyline,
"bounding_box": bounding_box,
"source_type": "gpx",
"data_points": data_points,
"laps": [],
} }
return {"days": result, "error": None}
def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict:
"""Calculate % time in each HR zone using user's configured max HR."""
if not user_max_hr or user_max_hr < 100:
return {}
zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01]
zone_keys = ["z1", "z2", "z3", "z4", "z5"]
zones = {k: 0 for k in zone_keys}
total = 0
for p in data_points:
hr = p.get("heart_rate")
if not hr or hr < 20:
continue
pct = hr / user_max_hr
total += 1
for i, key in enumerate(zone_keys):
if zone_bounds[i] <= pct < zone_bounds[i+1]:
zones[key] += 1
break
else:
zones["z5"] += 1
if total:
return {k: round(v / total * 100, 1) for k, v in zones.items()}
return {}