Files
owain 4a4cbdcc92
Build and push images / validate (push) Successful in 2s
Build and push images / build-backend (push) Successful in 5s
Build and push images / build-worker (push) Successful in 5s
Build and push images / build-frontend (push) Successful in 8s
Fix pace sentinel, route map thumbnails, tiled segments, health/dashboard layout
- Pace: FIT 0xFFFF sentinel (65.535 m/s) was stored as avg_speed_ms on every
  activity and lap; add _sanitize_speed() to parser falling back to dist/dur,
  plus a startup SQL migration that fixed 120 activities and 688 laps in-place
- Records: remove swimming from Distance PRs; Route Records rows are clickable
  (navigate to activity), View button removed, small SVG route map per row;
  Segment Records uses same tiled route-card layout as Segments page
- Segments: replace route dropdown with responsive tile grid showing SVG map
  thumbnails; selecting a tile reveals the segment management panel below
- RouteMiniMap: new pure-SVG component (no Leaflet) for route thumbnails,
  decodes polyline and normalises coords into a fixed viewBox
- Health: rename "Avg Heart Rate (day)" → "Heart Rate"; weight chart now
  filters to non-null rows and enables connectNulls + dots for sparse data
- Dashboard: 4-col layout at lg breakpoint so Body Battery sits between weekly
  chart and Health Today; Body Battery card gains a 24-hr sparkline from the
  values[] already present in the health summary response

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 16:36:54 +01:00

351 lines
13 KiB
Python

"""
FIT and GPX file parser.
Parses FIT files directly using the Garmin SDK but applies manual
scale conversion for fields where the SDK doesn't auto-convert.
"""
import math
import struct
from datetime import datetime, timezone
from typing import Optional
import gpxpy
import polyline as polyline_lib
from garmin_fit_sdk import Decoder, Stream
FIT_EPOCH_S = 631065600
SEMICIRCLES_TO_DEG = 180.0 / (2 ** 31)
def _semicircles_to_deg(val):
if val is None:
return None
try:
result = float(val) * SEMICIRCLES_TO_DEG
if -90 <= result <= 90 or -180 <= result <= 180:
return result
except (TypeError, ValueError):
pass
return None
def _safe_float(val) -> Optional[float]:
try:
return float(val) if val is not None else None
except (TypeError, ValueError):
return None
def _sanitize_speed(val, dist_m=None, dur_s=None) -> Optional[float]:
"""Reject the FIT invalid sentinel (0xFFFF/1000 = 65.535 m/s) and fall back to dist/dur."""
fv = _safe_float(val)
if fv is None or fv >= 65.0:
if dist_m and dur_s and float(dur_s) > 0:
return float(dist_m) / float(dur_s)
return None
return fv
def _bounding_box(coords):
if not coords:
return None
lats = [c[0] for c in coords]
lons = [c[1] for c in coords]
return {"min_lat": min(lats), "max_lat": max(lats),
"min_lon": min(lons), "max_lon": max(lons)}
def _to_dt(val) -> Optional[datetime]:
if val is None:
return None
if isinstance(val, datetime):
return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val
if isinstance(val, (int, float)):
try:
return datetime.fromtimestamp(int(val) + FIT_EPOCH_S, tz=timezone.utc)
except (OSError, OverflowError, ValueError):
return None
return None
def _is_valid_lat(v):
return v is not None and -90 <= v <= 90
def _is_valid_lon(v):
return v is not None and -180 <= v <= 180
def parse_fit_file(filepath: str) -> dict:
session_data = {}
records = []
laps = []
def listener(mesg_num: int, msg: dict):
if mesg_num == 18: # session
session_data.update(msg)
elif mesg_num == 20: # record
records.append(msg)
elif mesg_num == 19: # lap
laps.append(msg)
stream = Stream.from_file(filepath)
decoder = Decoder(stream)
decoder.read(
apply_scale_and_offset=True,
convert_datetimes_to_dates=True,
convert_types_to_strings=True,
enable_crc_check=False,
expand_sub_fields=True,
expand_components=True,
merge_heart_rates=True,
mesg_listener=listener,
)
# The SDK may return field names in camelCase or snake_case depending on version.
# Try both. Also handle raw timestamp integers for start_time.
def get(d, *keys):
for k in keys:
v = d.get(k)
if v is not None:
return v
return None
sport_raw = str(get(session_data, "sport", "Sport") or "generic").lower()
sport_map = {
"running": "running", "cycling": "cycling",
"hiking": "hiking", "walking": "walking",
"generic": "other", "trail_running": "running",
"e_biking": "cycling", "open_water_swimming": "other",
}
sport_type = sport_map.get(sport_raw, sport_raw)
# start_time — SDK may return datetime or raw int
start_time_raw = get(session_data, "startTime", "start_time")
start_time = _to_dt(start_time_raw)
# Position fields — the SDK may or may not convert semicircles.
# Check if values look like semicircles (>= 90 for lat) and convert if so.
def get_lat(d):
v = get(d, "positionLat", "position_lat")
if v is None:
return None
fv = _safe_float(v)
if fv is None:
return None
# If absolute value > 90, it's semicircles
if abs(fv) > 90:
fv = fv * SEMICIRCLES_TO_DEG
return fv if _is_valid_lat(fv) else None
def get_lon(d):
v = get(d, "positionLong", "position_long")
if v is None:
return None
fv = _safe_float(v)
if fv is None:
return None
if abs(fv) > 180:
fv = fv * SEMICIRCLES_TO_DEG
return fv if _is_valid_lon(fv) else None
# Build GPS track
coords = []
for r in records:
lat = get_lat(r)
lon = get_lon(r)
if lat is not None and lon is not None:
coords.append((lat, lon))
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
# Normalize data points
normalized_points = []
for r in records:
ts = _to_dt(get(r, "timestamp"))
lat = get_lat(r)
lon = get_lon(r)
altitude = get(r, "altitude", "enhancedAltitude", "enhanced_altitude")
hr = get(r, "heartRate", "heart_rate")
cadence = get(r, "cadence")
speed = get(r, "speed", "enhancedSpeed", "enhanced_speed")
power = get(r, "power")
temp = get(r, "temperature")
distance = get(r, "distance")
normalized_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": _safe_float(lat),
"longitude": _safe_float(lon),
"altitude_m": _safe_float(altitude),
"heart_rate": _safe_float(hr),
"cadence": _safe_float(cadence),
"speed_ms": _safe_float(speed),
"power": _safe_float(power),
"temperature_c": _safe_float(temp),
"distance_m": _safe_float(distance),
})
# Normalize laps
normalized_laps = []
for i, lap in enumerate(laps):
ls = _to_dt(get(lap, "startTime", "start_time"))
lap_dist = _safe_float(get(lap, "totalDistance", "total_distance"))
lap_dur = _safe_float(get(lap, "totalElapsedTime", "total_elapsed_time"))
normalized_laps.append({
"lap_number": i + 1,
"start_time": ls.isoformat() if ls else None,
"duration_s": lap_dur,
"distance_m": lap_dist,
"avg_heart_rate": _safe_float(get(lap, "avgHeartRate", "avg_heart_rate")),
"avg_cadence": _safe_float(get(lap, "avgCadence", "avg_cadence")),
"avg_speed_ms": _sanitize_speed(
get(lap, "avgSpeed", "avg_speed", "enhancedAvgSpeed", "enhanced_avg_speed"),
dist_m=lap_dist, dur_s=lap_dur,
),
"avg_power": _safe_float(get(lap, "avgPower", "avg_power")),
})
name = sport_type.title()
if start_time:
name += " " + start_time.strftime("%Y-%m-%d")
return {
"name": name,
"sport_type": sport_type,
"start_time": start_time.isoformat() if start_time else None,
"distance_m": _safe_float(get(session_data, "totalDistance", "total_distance")),
"duration_s": _safe_float(get(session_data, "totalElapsedTime", "total_elapsed_time")),
"elevation_gain_m": _safe_float(get(session_data, "totalAscent", "total_ascent")),
"elevation_loss_m": _safe_float(get(session_data, "totalDescent", "total_descent")),
"avg_heart_rate": _safe_float(get(session_data, "avgHeartRate", "avg_heart_rate")),
"max_heart_rate": _safe_float(get(session_data, "maxHeartRate", "max_heart_rate")),
"avg_cadence": _safe_float(get(session_data, "avgCadence", "avg_cadence")),
"avg_power": _safe_float(get(session_data, "avgPower", "avg_power")),
"normalized_power": _safe_float(get(session_data, "normalizedPower", "normalized_power")),
"avg_speed_ms": _sanitize_speed(
get(session_data, "avgSpeed", "avg_speed", "enhancedAvgSpeed", "enhanced_avg_speed"),
dist_m=_safe_float(get(session_data, "totalDistance", "total_distance")),
dur_s=_safe_float(get(session_data, "totalElapsedTime", "total_elapsed_time")),
),
"max_speed_ms": _safe_float(get(session_data, "maxSpeed", "max_speed",
"enhancedMaxSpeed", "enhanced_max_speed")),
"avg_temperature_c": _safe_float(get(session_data, "avgTemperature", "avg_temperature")),
"calories": _safe_float(get(session_data, "totalCalories", "total_calories")),
"training_stress_score": _safe_float(get(session_data, "trainingStressScore",
"training_stress_score")),
"vo2max_estimate": _safe_float(get(session_data, "totalTrainingEffect",
"total_training_effect")),
"polyline": encoded_polyline,
"bounding_box": bounding_box,
"source_type": "fit",
"data_points": normalized_points,
"laps": normalized_laps,
}
def parse_gpx_file(filepath: str) -> dict:
with open(filepath) as f:
gpx = gpxpy.parse(f)
data_points = []
track = gpx.tracks[0] if gpx.tracks else None
if not track:
raise ValueError("No tracks found in GPX file")
for segment in track.segments:
for pt in segment.points:
ts = pt.time
if ts and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
extensions = {}
if pt.extensions:
for ext in pt.extensions:
for child in ext:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
try:
extensions[tag] = float(child.text)
except (ValueError, TypeError):
pass
data_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": pt.latitude, "longitude": pt.longitude,
"altitude_m": pt.elevation,
"heart_rate": extensions.get("hr"),
"cadence": extensions.get("cad"),
"speed_ms": extensions.get("speed"),
"power": extensions.get("power"),
"temperature_c": extensions.get("temp") or extensions.get("atemp"),
"distance_m": None,
})
coords = [(p["latitude"], p["longitude"]) for p in data_points if p["latitude"] and p["longitude"]]
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
total_dist = 0.0
prev = None
for p in data_points:
if p["latitude"] and p["longitude"]:
if prev:
R = 6371000
phi1, phi2 = math.radians(prev[0]), math.radians(p["latitude"])
dphi = math.radians(p["latitude"] - prev[0])
dlam = math.radians(p["longitude"] - prev[1])
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
total_dist += 2 * R * math.asin(math.sqrt(a))
prev = (p["latitude"], p["longitude"])
p["distance_m"] = total_dist
uphill, downhill = 0.0, 0.0
alts = [p["altitude_m"] for p in data_points if p["altitude_m"]]
for i in range(1, len(alts)):
diff = alts[i] - alts[i-1]
if diff > 0: uphill += diff
else: downhill += abs(diff)
hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]]
start_time_str = data_points[0]["timestamp"] if data_points else None
start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None
end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None
duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None
sport = track.type.lower() if track.type else "running"
return {
"name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}",
"sport_type": sport, "start_time": start_time_str,
"distance_m": total_dist, "duration_s": duration,
"elevation_gain_m": uphill, "elevation_loss_m": downhill,
"avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None,
"max_heart_rate": max(hrs) if hrs else None,
"avg_cadence": None, "avg_power": None, "normalized_power": None,
"avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None,
"max_speed_ms": None, "avg_temperature_c": None, "calories": None,
"training_stress_score": None, "vo2max_estimate": None,
"polyline": encoded_polyline, "bounding_box": bounding_box,
"source_type": "gpx", "data_points": data_points, "laps": [],
}
def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict:
if not user_max_hr or user_max_hr < 100:
return {}
zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01]
zone_keys = ["z1", "z2", "z3", "z4", "z5"]
zones = {k: 0 for k in zone_keys}
total = 0
for p in data_points:
hr = p.get("heart_rate")
if not hr or hr < 20:
continue
pct = hr / user_max_hr
total += 1
for i, key in enumerate(zone_keys):
if zone_bounds[i] <= pct < zone_bounds[i+1]:
zones[key] += 1
break
else:
zones["z5"] += 1
if total:
return {k: round(v / total * 100, 1) for k, v in zones.items()}
return {}