""" FIT and GPX file parser using: - Official Garmin FIT Python SDK (garmin-fit-sdk) for .fit files - gpxpy for .gpx files The official SDK correctly handles scale/offset, component expansion, semicircle-to-degree conversion, and HR message merging. """ import math from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional import gpxpy import polyline as polyline_lib FIT_EPOCH_S = 631065600 def haversine_distance(lat1, lon1, lat2, lon2) -> float: """Distance in metres between two GPS points.""" R = 6371000 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2 return 2 * R * math.asin(math.sqrt(a)) def _safe_float(val) -> Optional[float]: try: return float(val) if val is not None else None except (TypeError, ValueError): return None def _bounding_box(coords: list) -> Optional[dict]: if not coords: return None lats = [c[0] for c in coords] lons = [c[1] for c in coords] return {"min_lat": min(lats), "max_lat": max(lats), "min_lon": min(lons), "max_lon": max(lons)} def parse_fit_file(filepath: str) -> dict: """Parse a Garmin .fit activity file using the official Garmin SDK.""" from garmin_fit_sdk import Decoder, Stream session = {} records = [] laps = [] def listener(mesg_num: int, msg: dict): nonlocal session if mesg_num == 18: # session session = msg elif mesg_num == 20: # record records.append(msg) elif mesg_num == 19: # lap laps.append(msg) stream = Stream.from_file(filepath) decoder = Decoder(stream) decoder.read( apply_scale_and_offset=True, convert_datetimes_to_dates=True, convert_types_to_strings=True, enable_crc_check=False, expand_sub_fields=True, expand_components=True, merge_heart_rates=True, mesg_listener=listener, ) # Map sport type sport = str(session.get("sport", "generic")).lower() sport_map = { "running": "running", "cycling": "cycling", "swimming": "swimming", "hiking": "hiking", "walking": "walking", "generic": "other", "open_water_swimming": "swimming", "trail_running": "running", "e_biking": "cycling", } sport_type = sport_map.get(sport, sport) start_time = session.get("start_time") if isinstance(start_time, datetime) and start_time.tzinfo is None: start_time = start_time.replace(tzinfo=timezone.utc) # Build GPS track coords = [ (r["position_lat"], r["position_long"]) for r in records if r.get("position_lat") is not None and r.get("position_long") is not None ] encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) # Normalize data points normalized_points = [] for r in records: ts = r.get("timestamp") if isinstance(ts, datetime) and ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) normalized_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": r.get("position_lat"), "longitude": r.get("position_long"), "altitude_m": r.get("altitude") or r.get("enhanced_altitude"), "heart_rate": r.get("heart_rate"), "cadence": r.get("cadence") or r.get("fractional_cadence"), "speed_ms": r.get("speed") or r.get("enhanced_speed"), "power": r.get("power"), "temperature_c": r.get("temperature"), "distance_m": r.get("distance"), }) # Normalize laps normalized_laps = [] for i, lap in enumerate(laps): ls = lap.get("start_time") if isinstance(ls, datetime) and ls.tzinfo is None: ls = ls.replace(tzinfo=timezone.utc) normalized_laps.append({ "lap_number": i + 1, "start_time": ls.isoformat() if ls else None, "duration_s": _safe_float(lap.get("total_elapsed_time")), "distance_m": _safe_float(lap.get("total_distance")), "avg_heart_rate": _safe_float(lap.get("avg_heart_rate")), "avg_cadence": _safe_float(lap.get("avg_cadence")), "avg_speed_ms": _safe_float(lap.get("avg_speed") or lap.get("enhanced_avg_speed")), "avg_power": _safe_float(lap.get("avg_power")), }) # Build activity name name = session.get("sport", "Activity").title() if start_time: name += " " + start_time.strftime("%Y-%m-%d") return { "name": name, "sport_type": sport_type, "start_time": start_time.isoformat() if start_time else None, "distance_m": _safe_float(session.get("total_distance")), "duration_s": _safe_float(session.get("total_elapsed_time")), "elevation_gain_m": _safe_float(session.get("total_ascent")), "elevation_loss_m": _safe_float(session.get("total_descent")), "avg_heart_rate": _safe_float(session.get("avg_heart_rate")), "max_heart_rate": _safe_float(session.get("max_heart_rate")), "avg_cadence": _safe_float(session.get("avg_cadence")), "avg_power": _safe_float(session.get("avg_power")), "normalized_power": _safe_float(session.get("normalized_power")), "avg_speed_ms": _safe_float(session.get("avg_speed") or session.get("enhanced_avg_speed")), "max_speed_ms": _safe_float(session.get("max_speed") or session.get("enhanced_max_speed")), "avg_temperature_c": _safe_float(session.get("avg_temperature")), "calories": _safe_float(session.get("total_calories")), "training_stress_score": _safe_float(session.get("training_stress_score")), "vo2max_estimate": _safe_float(session.get("total_training_effect")), "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "fit", "data_points": normalized_points, "laps": normalized_laps, } def parse_gpx_file(filepath: str) -> dict: """Parse a GPX file.""" with open(filepath) as f: gpx = gpxpy.parse(f) data_points = [] track = gpx.tracks[0] if gpx.tracks else None if not track: raise ValueError("No tracks found in GPX file") for segment in track.segments: for pt in segment.points: ts = pt.time if ts and ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) extensions = {} if pt.extensions: for ext in pt.extensions: for child in ext: tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag try: extensions[tag] = float(child.text) except (ValueError, TypeError): pass data_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": pt.latitude, "longitude": pt.longitude, "altitude_m": pt.elevation, "heart_rate": extensions.get("hr"), "cadence": extensions.get("cad"), "speed_ms": extensions.get("speed"), "power": extensions.get("power"), "temperature_c": extensions.get("temp") or extensions.get("atemp"), "distance_m": None, }) coords = [(p["latitude"], p["longitude"]) for p in data_points if p["latitude"] and p["longitude"]] encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) # Add cumulative distance total_dist = 0.0 prev = None for p in data_points: if p["latitude"] and p["longitude"]: if prev: total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"]) prev = (p["latitude"], p["longitude"]) p["distance_m"] = total_dist # Elevation gain/loss uphill, downhill = 0.0, 0.0 alts = [p["altitude_m"] for p in data_points if p["altitude_m"]] for i in range(1, len(alts)): diff = alts[i] - alts[i-1] if diff > 0: uphill += diff else: downhill += abs(diff) hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]] start_time_str = data_points[0]["timestamp"] if data_points else None start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None sport = "running" if track.type: sport = track.type.lower() return { "name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}", "sport_type": sport, "start_time": start_time_str, "distance_m": total_dist, "duration_s": duration, "elevation_gain_m": uphill, "elevation_loss_m": downhill, "avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None, "max_heart_rate": max(hrs) if hrs else None, "avg_cadence": None, "avg_power": None, "normalized_power": None, "avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None, "max_speed_ms": None, "avg_temperature_c": None, "calories": None, "training_stress_score": None, "vo2max_estimate": None, "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "gpx", "data_points": data_points, "laps": [], } def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict: """ Calculate % time in each HR zone using the user's configured max HR. Zones follow the standard 5-zone model as % of max HR: Z1 Recovery: < 60% Z2 Base: 60 - 70% Z3 Tempo: 70 - 80% Z4 Threshold: 80 - 90% Z5 Max: > 90% user_max_hr should be the user's actual physiological max HR, NOT the highest HR recorded in this activity. Using activity max shifts all zones upward and makes easy runs look harder than they are. """ if not user_max_hr or user_max_hr < 100: return {} zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01] zone_keys = ["z1", "z2", "z3", "z4", "z5"] zones = {k: 0 for k in zone_keys} total = 0 for p in data_points: hr = p.get("heart_rate") if not hr or hr < 20: continue pct = hr / user_max_hr total += 1 for i, key in enumerate(zone_keys): if zone_bounds[i] <= pct < zone_bounds[i+1]: zones[key] += 1 break else: zones["z5"] += 1 # anything above 90% goes to z5 if total: return {k: round(v / total * 100, 1) for k, v in zones.items()} return {}