Files
MileVault/backend/app/services/fit_parser.py
T
2026-06-06 13:23:33 +01:00

342 lines
12 KiB
Python

"""
Parses Garmin .fit files and GPX files into normalized activity data.
Handles full Strava and Garmin data export archives.
"""
import os
import zipfile
import json
import math
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import fitparse
import gpxpy
import polyline as polyline_lib
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Returns distance in metres between two GPS points."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
return 2 * R * math.asin(math.sqrt(a))
def semicircles_to_degrees(sc: int) -> float:
return sc * (180 / 2**31)
def parse_fit_file(filepath: str) -> dict:
"""Parse a Garmin .fit file and return normalized activity dict."""
fit = fitparse.FitFile(filepath)
data_points = []
laps = []
session = {}
for record in fit.get_messages():
name = record.name
if name == "session":
for f in record:
session[f.name] = f.value
elif name == "lap":
lap = {}
for f in record:
lap[f.name] = f.value
laps.append(lap)
elif name == "record":
point = {}
for f in record:
point[f.name] = f.value
if point:
# Convert semicircles to degrees
if "position_lat" in point and point["position_lat"] is not None:
point["position_lat"] = semicircles_to_degrees(point["position_lat"])
if "position_long" in point and point["position_long"] is not None:
point["position_long"] = semicircles_to_degrees(point["position_long"])
data_points.append(point)
# Build normalized output
sport = str(session.get("sport", "generic")).lower()
sport_map = {
"running": "running", "cycling": "cycling", "swimming": "swimming",
"hiking": "hiking", "walking": "walking", "generic": "other",
"open_water_swimming": "swimming", "trail_running": "running",
}
sport_type = sport_map.get(sport, sport)
start_time = session.get("start_time")
if start_time and start_time.tzinfo is None:
start_time = start_time.replace(tzinfo=timezone.utc)
# Build GPS track for polyline
coords = [
(p["position_lat"], p["position_long"])
for p in data_points
if p.get("position_lat") is not None and p.get("position_long") is not None
]
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
# Calculate cumulative distance if not in FIT
cumulative_dist = 0.0
prev_lat, prev_lon = None, None
normalized_points = []
for p in data_points:
ts = p.get("timestamp")
if ts and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
lat = p.get("position_lat")
lon = p.get("position_long")
dist = p.get("distance")
if dist is None and lat and lon and prev_lat and prev_lon:
cumulative_dist += haversine_distance(prev_lat, prev_lon, lat, lon)
dist = cumulative_dist
elif dist is not None:
cumulative_dist = float(dist)
if lat and lon:
prev_lat, prev_lon = lat, lon
normalized_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": lat,
"longitude": lon,
"altitude_m": p.get("altitude"),
"heart_rate": p.get("heart_rate"),
"cadence": p.get("cadence"),
"speed_ms": p.get("speed"),
"power": p.get("power"),
"temperature_c": p.get("temperature"),
"distance_m": dist,
})
# Parse laps
normalized_laps = []
for i, lap in enumerate(laps):
ls = lap.get("start_time")
if ls and ls.tzinfo is None:
ls = ls.replace(tzinfo=timezone.utc)
normalized_laps.append({
"lap_number": i + 1,
"start_time": ls.isoformat() if ls else None,
"duration_s": _safe_float(lap.get("total_elapsed_time")),
"distance_m": _safe_float(lap.get("total_distance")),
"avg_heart_rate": _safe_float(lap.get("avg_heart_rate")),
"avg_cadence": _safe_float(lap.get("avg_cadence")),
"avg_speed_ms": _safe_float(lap.get("avg_speed")),
"avg_power": _safe_float(lap.get("avg_power")),
})
return {
"name": session.get("sport", "Activity").title() + " " + (
start_time.strftime("%Y-%m-%d") if start_time else ""),
"sport_type": sport_type,
"start_time": start_time.isoformat() if start_time else None,
"distance_m": _safe_float(session.get("total_distance")),
"duration_s": _safe_float(session.get("total_elapsed_time")),
"elevation_gain_m": _safe_float(session.get("total_ascent")),
"elevation_loss_m": _safe_float(session.get("total_descent")),
"avg_heart_rate": _safe_float(session.get("avg_heart_rate")),
"max_heart_rate": _safe_float(session.get("max_heart_rate")),
"avg_cadence": _safe_float(session.get("avg_cadence")),
"avg_power": _safe_float(session.get("avg_power")),
"normalized_power": _safe_float(session.get("normalized_power")),
"avg_speed_ms": _safe_float(session.get("avg_speed")),
"max_speed_ms": _safe_float(session.get("max_speed")),
"avg_temperature_c": _safe_float(session.get("avg_temperature")),
"calories": _safe_float(session.get("total_calories")),
"training_stress_score": _safe_float(session.get("training_stress_score")),
"vo2max_estimate": _safe_float(session.get("estimated_sweat_loss")), # varies by device
"polyline": encoded_polyline,
"bounding_box": bounding_box,
"source_type": "fit",
"data_points": normalized_points,
"laps": normalized_laps,
}
def parse_gpx_file(filepath: str) -> dict:
"""Parse a GPX file into normalized activity dict."""
with open(filepath) as f:
gpx = gpxpy.parse(f)
data_points = []
track = gpx.tracks[0] if gpx.tracks else None
if not track:
raise ValueError("No tracks found in GPX file")
for segment in track.segments:
for pt in segment.points:
ts = pt.time
if ts and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
extensions = {}
if pt.extensions:
for ext in pt.extensions:
for child in ext:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
try:
extensions[tag] = float(child.text)
except (ValueError, TypeError):
pass
data_points.append({
"timestamp": ts.isoformat() if ts else None,
"latitude": pt.latitude,
"longitude": pt.longitude,
"altitude_m": pt.elevation,
"heart_rate": extensions.get("hr"),
"cadence": extensions.get("cad"),
"speed_ms": extensions.get("speed"),
"power": extensions.get("power"),
"temperature_c": extensions.get("temp") or extensions.get("atemp"),
"distance_m": None,
})
# Calculate distance and elevation
coords = [(p["latitude"], p["longitude"]) for p in data_points
if p["latitude"] and p["longitude"]]
encoded_polyline = polyline_lib.encode(coords) if coords else None
bounding_box = _bounding_box(coords)
# Add cumulative distance
total_dist = 0.0
prev = None
for p in data_points:
if p["latitude"] and p["longitude"]:
if prev:
total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"])
prev = (p["latitude"], p["longitude"])
p["distance_m"] = total_dist
uphill, downhill = 0.0, 0.0
alts = [p["altitude_m"] for p in data_points if p["altitude_m"]]
for i in range(1, len(alts)):
diff = alts[i] - alts[i-1]
if diff > 0:
uphill += diff
else:
downhill += abs(diff)
hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]]
start_time_str = data_points[0]["timestamp"] if data_points else None
start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None
end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None
duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None
sport = "running" # GPX doesn't always include sport; default to running
if track.type:
sport = track.type.lower()
return {
"name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}",
"sport_type": sport,
"start_time": start_time_str,
"distance_m": total_dist,
"duration_s": duration,
"elevation_gain_m": uphill,
"elevation_loss_m": downhill,
"avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None,
"max_heart_rate": max(hrs) if hrs else None,
"avg_cadence": None,
"avg_power": None,
"normalized_power": None,
"avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None,
"max_speed_ms": None,
"avg_temperature_c": None,
"calories": None,
"training_stress_score": None,
"vo2max_estimate": None,
"polyline": encoded_polyline,
"bounding_box": bounding_box,
"source_type": "gpx",
"data_points": data_points,
"laps": [],
}
def parse_strava_export(export_dir: str) -> list[dict]:
"""
Parse a full Strava data export directory.
Structure: activities.csv + activities/ folder with .gpx/.fit.gz files
"""
activities = []
activities_dir = Path(export_dir) / "activities"
if not activities_dir.exists():
return activities
for fname in sorted(activities_dir.iterdir()):
if fname.suffix in (".fit", ".gpx"):
try:
if fname.suffix == ".fit":
act = parse_fit_file(str(fname))
else:
act = parse_gpx_file(str(fname))
act["source_type"] = "strava_" + fname.suffix[1:]
activities.append(act)
except Exception as e:
print(f"Error parsing {fname}: {e}")
return activities
def calculate_hr_zones(data_points: list[dict], max_hr: float) -> dict:
"""Calculate percentage of time spent in each HR zone."""
if not max_hr:
return {}
zones = {"z1": 0, "z2": 0, "z3": 0, "z4": 0, "z5": 0}
zone_bounds = [0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
total = 0
for p in data_points:
hr = p.get("heart_rate")
if not hr:
continue
pct = hr / max_hr
total += 1
if pct < zone_bounds[1]:
zones["z1"] += 1
elif pct < zone_bounds[2]:
zones["z2"] += 1
elif pct < zone_bounds[3]:
zones["z3"] += 1
elif pct < zone_bounds[4]:
zones["z4"] += 1
else:
zones["z5"] += 1
if total:
return {k: round(v / total * 100, 1) for k, v in zones.items()}
return {}
def _safe_float(val) -> Optional[float]:
try:
return float(val) if val is not None else None
except (TypeError, ValueError):
return None
def _bounding_box(coords: list[tuple]) -> Optional[dict]:
if not coords:
return None
lats = [c[0] for c in coords]
lons = [c[1] for c in coords]
return {
"min_lat": min(lats), "max_lat": max(lats),
"min_lon": min(lons), "max_lon": max(lons),
}