""" FIT and GPX file parser using the official Garmin FIT Python SDK. """ import math from datetime import datetime, timezone from typing import Optional import gpxpy import polyline as polyline_lib from garmin_fit_sdk import Decoder, Stream def haversine_distance(lat1, lon1, lat2, lon2) -> float: R = 6371000 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2 return 2 * R * math.asin(math.sqrt(a)) def _safe_float(val) -> Optional[float]: try: return float(val) if val is not None else None except (TypeError, ValueError): return None def _bounding_box(coords: list) -> Optional[dict]: if not coords: return None lats = [c[0] for c in coords] lons = [c[1] for c in coords] return {"min_lat": min(lats), "max_lat": max(lats), "min_lon": min(lons), "max_lon": max(lons)} def _ensure_utc(dt) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt return None def parse_fit_file(filepath: str) -> dict: """Parse a Garmin .fit activity file using the official Garmin SDK.""" stream = Stream.from_file(filepath) decoder = Decoder(stream) messages, errors = decoder.read( apply_scale_and_offset=True, convert_datetimes_to_dates=True, convert_types_to_strings=True, enable_crc_check=False, expand_sub_fields=True, expand_components=True, merge_heart_rates=True, ) sessions = messages.get("session", [{}]) session = sessions[0] if sessions else {} records = messages.get("record", []) laps = messages.get("lap", []) sport = str(session.get("sport", "generic")).lower() sport_map = { "running": "running", "cycling": "cycling", "hiking": "hiking", "walking": "walking", "generic": "other", "trail_running": "running", "e_biking": "cycling", "open_water_swimming": "other", } sport_type = sport_map.get(sport, sport) start_time = _ensure_utc(session.get("start_time")) coords = [] for r in records: lat = r.get("position_lat") lon = r.get("position_long") if lat is not None and lon is not None: if -90 <= lat <= 90 and -180 <= lon <= 180: coords.append((lat, lon)) encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) normalized_points = [] for r in records: ts = _ensure_utc(r.get("timestamp")) lat = r.get("position_lat") lon = r.get("position_long") if lat is not None and not (-90 <= lat <= 90): lat = None if lon is not None and not (-180 <= lon <= 180): lon = None normalized_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": _safe_float(lat), "longitude": _safe_float(lon), "altitude_m": _safe_float(r.get("altitude") or r.get("enhanced_altitude")), "heart_rate": _safe_float(r.get("heart_rate")), "cadence": _safe_float(r.get("cadence")), "speed_ms": _safe_float(r.get("speed") or r.get("enhanced_speed")), "power": _safe_float(r.get("power")), "temperature_c": _safe_float(r.get("temperature")), "distance_m": _safe_float(r.get("distance")), }) normalized_laps = [] for i, lap in enumerate(laps): ls = _ensure_utc(lap.get("start_time")) normalized_laps.append({ "lap_number": i + 1, "start_time": ls.isoformat() if ls else None, "duration_s": _safe_float(lap.get("total_elapsed_time")), "distance_m": _safe_float(lap.get("total_distance")), "avg_heart_rate": _safe_float(lap.get("avg_heart_rate")), "avg_cadence": _safe_float(lap.get("avg_cadence")), "avg_speed_ms": _safe_float(lap.get("avg_speed") or lap.get("enhanced_avg_speed")), "avg_power": _safe_float(lap.get("avg_power")), }) name = sport_type.title() if start_time: name += " " + start_time.strftime("%Y-%m-%d") return { "name": name, "sport_type": sport_type, "start_time": start_time.isoformat() if start_time else None, "distance_m": _safe_float(session.get("total_distance")), "duration_s": _safe_float(session.get("total_elapsed_time")), "elevation_gain_m": _safe_float(session.get("total_ascent")), "elevation_loss_m": _safe_float(session.get("total_descent")), "avg_heart_rate": _safe_float(session.get("avg_heart_rate")), "max_heart_rate": _safe_float(session.get("max_heart_rate")), "avg_cadence": _safe_float(session.get("avg_cadence")), "avg_power": _safe_float(session.get("avg_power")), "normalized_power": _safe_float(session.get("normalized_power")), "avg_speed_ms": _safe_float(session.get("avg_speed") or session.get("enhanced_avg_speed")), "max_speed_ms": _safe_float(session.get("max_speed") or session.get("enhanced_max_speed")), "avg_temperature_c": _safe_float(session.get("avg_temperature")), "calories": _safe_float(session.get("total_calories")), "training_stress_score": _safe_float(session.get("training_stress_score")), "vo2max_estimate": _safe_float(session.get("total_training_effect")), "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "fit", "data_points": normalized_points, "laps": normalized_laps, } def parse_gpx_file(filepath: str) -> dict: """Parse a GPX file.""" with open(filepath) as f: gpx = gpxpy.parse(f) data_points = [] track = gpx.tracks[0] if gpx.tracks else None if not track: raise ValueError("No tracks found in GPX file") for segment in track.segments: for pt in segment.points: ts = pt.time if ts and ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) extensions = {} if pt.extensions: for ext in pt.extensions: for child in ext: tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag try: extensions[tag] = float(child.text) except (ValueError, TypeError): pass data_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": pt.latitude, "longitude": pt.longitude, "altitude_m": pt.elevation, "heart_rate": extensions.get("hr"), "cadence": extensions.get("cad"), "speed_ms": extensions.get("speed"), "power": extensions.get("power"), "temperature_c": extensions.get("temp") or extensions.get("atemp"), "distance_m": None, }) coords = [(p["latitude"], p["longitude"]) for p in data_points if p["latitude"] and p["longitude"]] encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) total_dist = 0.0 prev = None for p in data_points: if p["latitude"] and p["longitude"]: if prev: total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"]) prev = (p["latitude"], p["longitude"]) p["distance_m"] = total_dist uphill, downhill = 0.0, 0.0 alts = [p["altitude_m"] for p in data_points if p["altitude_m"]] for i in range(1, len(alts)): diff = alts[i] - alts[i-1] if diff > 0: uphill += diff else: downhill += abs(diff) hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]] start_time_str = data_points[0]["timestamp"] if data_points else None start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None sport = "running" if track.type: sport = track.type.lower() return { "name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}", "sport_type": sport, "start_time": start_time_str, "distance_m": total_dist, "duration_s": duration, "elevation_gain_m": uphill, "elevation_loss_m": downhill, "avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None, "max_heart_rate": max(hrs) if hrs else None, "avg_cadence": None, "avg_power": None, "normalized_power": None, "avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None, "max_speed_ms": None, "avg_temperature_c": None, "calories": None, "training_stress_score": None, "vo2max_estimate": None, "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "gpx", "data_points": data_points, "laps": [], } def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict: """Calculate % time in each HR zone using user's configured max HR.""" if not user_max_hr or user_max_hr < 100: return {} zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01] zone_keys = ["z1", "z2", "z3", "z4", "z5"] zones = {k: 0 for k in zone_keys} total = 0 for p in data_points: hr = p.get("heart_rate") if not hr or hr < 20: continue pct = hr / user_max_hr total += 1 for i, key in enumerate(zone_keys): if zone_bounds[i] <= pct < zone_bounds[i+1]: zones[key] += 1 break else: zones["z5"] += 1 if total: return {k: round(v / total * 100, 1) for k, v in zones.items()} return {}