308 lines
11 KiB
Python
308 lines
11 KiB
Python
"""
|
|
FIT and GPX file parser using:
|
|
- Official Garmin FIT Python SDK (garmin-fit-sdk) for .fit files
|
|
- gpxpy for .gpx files
|
|
|
|
The official SDK correctly handles scale/offset, component expansion,
|
|
semicircle-to-degree conversion, and HR message merging.
|
|
"""
|
|
import math
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional
|
|
import gpxpy
|
|
import polyline as polyline_lib
|
|
|
|
|
|
FIT_EPOCH_S = 631065600
|
|
|
|
|
|
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
|
|
"""Distance in metres between two GPS points."""
|
|
R = 6371000
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
|
return 2 * R * math.asin(math.sqrt(a))
|
|
|
|
|
|
def _safe_float(val) -> Optional[float]:
|
|
try:
|
|
return float(val) if val is not None else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _bounding_box(coords: list) -> Optional[dict]:
|
|
if not coords:
|
|
return None
|
|
lats = [c[0] for c in coords]
|
|
lons = [c[1] for c in coords]
|
|
return {"min_lat": min(lats), "max_lat": max(lats),
|
|
"min_lon": min(lons), "max_lon": max(lons)}
|
|
|
|
|
|
def parse_fit_file(filepath: str) -> dict:
|
|
"""Parse a Garmin .fit activity file using the official Garmin SDK."""
|
|
from garmin_fit_sdk import Decoder, Stream
|
|
|
|
session = {}
|
|
records = []
|
|
laps = []
|
|
|
|
def listener(mesg_num: int, msg: dict):
|
|
nonlocal session
|
|
if mesg_num == 18: # session
|
|
session = msg
|
|
elif mesg_num == 20: # record
|
|
records.append(msg)
|
|
elif mesg_num == 19: # lap
|
|
laps.append(msg)
|
|
|
|
stream = Stream.from_file(filepath)
|
|
decoder = Decoder(stream)
|
|
decoder.read(
|
|
apply_scale_and_offset=True,
|
|
convert_datetimes_to_dates=True,
|
|
convert_types_to_strings=True,
|
|
enable_crc_check=False,
|
|
expand_sub_fields=True,
|
|
expand_components=True,
|
|
merge_heart_rates=True,
|
|
mesg_listener=listener,
|
|
)
|
|
|
|
# Map sport type
|
|
sport = str(session.get("sport", "generic")).lower()
|
|
sport_map = {
|
|
"running": "running", "cycling": "cycling", "swimming": "swimming",
|
|
"hiking": "hiking", "walking": "walking", "generic": "other",
|
|
"open_water_swimming": "swimming", "trail_running": "running",
|
|
"e_biking": "cycling",
|
|
}
|
|
sport_type = sport_map.get(sport, sport)
|
|
|
|
start_time = session.get("start_time")
|
|
if isinstance(start_time, datetime) and start_time.tzinfo is None:
|
|
start_time = start_time.replace(tzinfo=timezone.utc)
|
|
|
|
# Build GPS track
|
|
coords = [
|
|
(r["position_lat"], r["position_long"])
|
|
for r in records
|
|
if r.get("position_lat") is not None and r.get("position_long") is not None
|
|
]
|
|
encoded_polyline = polyline_lib.encode(coords) if coords else None
|
|
bounding_box = _bounding_box(coords)
|
|
|
|
# Normalize data points
|
|
normalized_points = []
|
|
for r in records:
|
|
ts = r.get("timestamp")
|
|
if isinstance(ts, datetime) and ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
|
|
normalized_points.append({
|
|
"timestamp": ts.isoformat() if ts else None,
|
|
"latitude": r.get("position_lat"),
|
|
"longitude": r.get("position_long"),
|
|
"altitude_m": r.get("altitude") or r.get("enhanced_altitude"),
|
|
"heart_rate": r.get("heart_rate"),
|
|
"cadence": r.get("cadence") or r.get("fractional_cadence"),
|
|
"speed_ms": r.get("speed") or r.get("enhanced_speed"),
|
|
"power": r.get("power"),
|
|
"temperature_c": r.get("temperature"),
|
|
"distance_m": r.get("distance"),
|
|
})
|
|
|
|
# Normalize laps
|
|
normalized_laps = []
|
|
for i, lap in enumerate(laps):
|
|
ls = lap.get("start_time")
|
|
if isinstance(ls, datetime) and ls.tzinfo is None:
|
|
ls = ls.replace(tzinfo=timezone.utc)
|
|
normalized_laps.append({
|
|
"lap_number": i + 1,
|
|
"start_time": ls.isoformat() if ls else None,
|
|
"duration_s": _safe_float(lap.get("total_elapsed_time")),
|
|
"distance_m": _safe_float(lap.get("total_distance")),
|
|
"avg_heart_rate": _safe_float(lap.get("avg_heart_rate")),
|
|
"avg_cadence": _safe_float(lap.get("avg_cadence")),
|
|
"avg_speed_ms": _safe_float(lap.get("avg_speed") or lap.get("enhanced_avg_speed")),
|
|
"avg_power": _safe_float(lap.get("avg_power")),
|
|
})
|
|
|
|
# Build activity name
|
|
name = session.get("sport", "Activity").title()
|
|
if start_time:
|
|
name += " " + start_time.strftime("%Y-%m-%d")
|
|
|
|
return {
|
|
"name": name,
|
|
"sport_type": sport_type,
|
|
"start_time": start_time.isoformat() if start_time else None,
|
|
"distance_m": _safe_float(session.get("total_distance")),
|
|
"duration_s": _safe_float(session.get("total_elapsed_time")),
|
|
"elevation_gain_m": _safe_float(session.get("total_ascent")),
|
|
"elevation_loss_m": _safe_float(session.get("total_descent")),
|
|
"avg_heart_rate": _safe_float(session.get("avg_heart_rate")),
|
|
"max_heart_rate": _safe_float(session.get("max_heart_rate")),
|
|
"avg_cadence": _safe_float(session.get("avg_cadence")),
|
|
"avg_power": _safe_float(session.get("avg_power")),
|
|
"normalized_power": _safe_float(session.get("normalized_power")),
|
|
"avg_speed_ms": _safe_float(session.get("avg_speed") or session.get("enhanced_avg_speed")),
|
|
"max_speed_ms": _safe_float(session.get("max_speed") or session.get("enhanced_max_speed")),
|
|
"avg_temperature_c": _safe_float(session.get("avg_temperature")),
|
|
"calories": _safe_float(session.get("total_calories")),
|
|
"training_stress_score": _safe_float(session.get("training_stress_score")),
|
|
"vo2max_estimate": _safe_float(session.get("total_training_effect")),
|
|
"polyline": encoded_polyline,
|
|
"bounding_box": bounding_box,
|
|
"source_type": "fit",
|
|
"data_points": normalized_points,
|
|
"laps": normalized_laps,
|
|
}
|
|
|
|
|
|
def parse_gpx_file(filepath: str) -> dict:
|
|
"""Parse a GPX file."""
|
|
with open(filepath) as f:
|
|
gpx = gpxpy.parse(f)
|
|
|
|
data_points = []
|
|
track = gpx.tracks[0] if gpx.tracks else None
|
|
if not track:
|
|
raise ValueError("No tracks found in GPX file")
|
|
|
|
for segment in track.segments:
|
|
for pt in segment.points:
|
|
ts = pt.time
|
|
if ts and ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
|
|
extensions = {}
|
|
if pt.extensions:
|
|
for ext in pt.extensions:
|
|
for child in ext:
|
|
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
|
try:
|
|
extensions[tag] = float(child.text)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
data_points.append({
|
|
"timestamp": ts.isoformat() if ts else None,
|
|
"latitude": pt.latitude,
|
|
"longitude": pt.longitude,
|
|
"altitude_m": pt.elevation,
|
|
"heart_rate": extensions.get("hr"),
|
|
"cadence": extensions.get("cad"),
|
|
"speed_ms": extensions.get("speed"),
|
|
"power": extensions.get("power"),
|
|
"temperature_c": extensions.get("temp") or extensions.get("atemp"),
|
|
"distance_m": None,
|
|
})
|
|
|
|
coords = [(p["latitude"], p["longitude"]) for p in data_points
|
|
if p["latitude"] and p["longitude"]]
|
|
encoded_polyline = polyline_lib.encode(coords) if coords else None
|
|
bounding_box = _bounding_box(coords)
|
|
|
|
# Add cumulative distance
|
|
total_dist = 0.0
|
|
prev = None
|
|
for p in data_points:
|
|
if p["latitude"] and p["longitude"]:
|
|
if prev:
|
|
total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"])
|
|
prev = (p["latitude"], p["longitude"])
|
|
p["distance_m"] = total_dist
|
|
|
|
# Elevation gain/loss
|
|
uphill, downhill = 0.0, 0.0
|
|
alts = [p["altitude_m"] for p in data_points if p["altitude_m"]]
|
|
for i in range(1, len(alts)):
|
|
diff = alts[i] - alts[i-1]
|
|
if diff > 0:
|
|
uphill += diff
|
|
else:
|
|
downhill += abs(diff)
|
|
|
|
hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]]
|
|
start_time_str = data_points[0]["timestamp"] if data_points else None
|
|
start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None
|
|
end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None
|
|
duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None
|
|
|
|
sport = "running"
|
|
if track.type:
|
|
sport = track.type.lower()
|
|
|
|
return {
|
|
"name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}",
|
|
"sport_type": sport,
|
|
"start_time": start_time_str,
|
|
"distance_m": total_dist,
|
|
"duration_s": duration,
|
|
"elevation_gain_m": uphill,
|
|
"elevation_loss_m": downhill,
|
|
"avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None,
|
|
"max_heart_rate": max(hrs) if hrs else None,
|
|
"avg_cadence": None,
|
|
"avg_power": None,
|
|
"normalized_power": None,
|
|
"avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None,
|
|
"max_speed_ms": None,
|
|
"avg_temperature_c": None,
|
|
"calories": None,
|
|
"training_stress_score": None,
|
|
"vo2max_estimate": None,
|
|
"polyline": encoded_polyline,
|
|
"bounding_box": bounding_box,
|
|
"source_type": "gpx",
|
|
"data_points": data_points,
|
|
"laps": [],
|
|
}
|
|
|
|
|
|
def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict:
|
|
"""
|
|
Calculate % time in each HR zone using the user's configured max HR.
|
|
|
|
Zones follow the standard 5-zone model as % of max HR:
|
|
Z1 Recovery: < 60%
|
|
Z2 Base: 60 - 70%
|
|
Z3 Tempo: 70 - 80%
|
|
Z4 Threshold: 80 - 90%
|
|
Z5 Max: > 90%
|
|
|
|
user_max_hr should be the user's actual physiological max HR, NOT the
|
|
highest HR recorded in this activity. Using activity max shifts all zones
|
|
upward and makes easy runs look harder than they are.
|
|
"""
|
|
if not user_max_hr or user_max_hr < 100:
|
|
return {}
|
|
|
|
zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01]
|
|
zone_keys = ["z1", "z2", "z3", "z4", "z5"]
|
|
zones = {k: 0 for k in zone_keys}
|
|
total = 0
|
|
|
|
for p in data_points:
|
|
hr = p.get("heart_rate")
|
|
if not hr or hr < 20:
|
|
continue
|
|
pct = hr / user_max_hr
|
|
total += 1
|
|
for i, key in enumerate(zone_keys):
|
|
if zone_bounds[i] <= pct < zone_bounds[i+1]:
|
|
zones[key] += 1
|
|
break
|
|
else:
|
|
zones["z5"] += 1 # anything above 90% goes to z5
|
|
|
|
if total:
|
|
return {k: round(v / total * 100, 1) for k, v in zones.items()}
|
|
return {}
|