""" FIT and GPX file parser using the official Garmin FIT Python SDK. Field names from the SDK are camelCase as per the SDK documentation. """ import math from datetime import datetime, timezone from typing import Optional import gpxpy import polyline as polyline_lib from garmin_fit_sdk import Decoder, Stream def haversine_distance(lat1, lon1, lat2, lon2) -> float: R = 6371000 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2 return 2 * R * math.asin(math.sqrt(a)) def _safe_float(val) -> Optional[float]: try: return float(val) if val is not None else None except (TypeError, ValueError): return None def _bounding_box(coords: list) -> Optional[dict]: if not coords: return None lats = [c[0] for c in coords] lons = [c[1] for c in coords] return {"min_lat": min(lats), "max_lat": max(lats), "min_lon": min(lons), "max_lon": max(lons)} def _ensure_utc(dt) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt return None def parse_fit_file(filepath: str) -> dict: """Parse a Garmin .fit activity file using the official Garmin SDK.""" stream = Stream.from_file(filepath) decoder = Decoder(stream) messages, errors = decoder.read( apply_scale_and_offset=True, convert_datetimes_to_dates=True, convert_types_to_strings=True, enable_crc_check=False, expand_sub_fields=True, expand_components=True, merge_heart_rates=True, ) # SDK returns camelCase keys sessions = messages.get("session", [{}]) session = sessions[0] if sessions else {} records = messages.get("record", []) laps = messages.get("lap", []) sport = str(session.get("sport", "generic")).lower() sport_map = { "running": "running", "cycling": "cycling", "hiking": "hiking", "walking": "walking", "generic": "other", "trail_running": "running", "e_biking": "cycling", "open_water_swimming": "other", } sport_type = sport_map.get(sport, sport) start_time = _ensure_utc(session.get("startTime")) coords = [] for r in records: lat = r.get("positionLat") lon = r.get("positionLong") if lat is not None and lon is not None: if -90 <= lat <= 90 and -180 <= lon <= 180: coords.append((lat, lon)) encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) normalized_points = [] for r in records: ts = _ensure_utc(r.get("timestamp")) lat = r.get("positionLat") lon = r.get("positionLong") if lat is not None and not (-90 <= lat <= 90): lat = None if lon is not None and not (-180 <= lon <= 180): lon = None normalized_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": _safe_float(lat), "longitude": _safe_float(lon), "altitude_m": _safe_float(r.get("altitude") or r.get("enhancedAltitude")), "heart_rate": _safe_float(r.get("heartRate")), "cadence": _safe_float(r.get("cadence")), "speed_ms": _safe_float(r.get("speed") or r.get("enhancedSpeed")), "power": _safe_float(r.get("power")), "temperature_c": _safe_float(r.get("temperature")), "distance_m": _safe_float(r.get("distance")), }) normalized_laps = [] for i, lap in enumerate(laps): ls = _ensure_utc(lap.get("startTime")) normalized_laps.append({ "lap_number": i + 1, "start_time": ls.isoformat() if ls else None, "duration_s": _safe_float(lap.get("totalElapsedTime")), "distance_m": _safe_float(lap.get("totalDistance")), "avg_heart_rate": _safe_float(lap.get("avgHeartRate")), "avg_cadence": _safe_float(lap.get("avgCadence")), "avg_speed_ms": _safe_float(lap.get("avgSpeed") or lap.get("enhancedAvgSpeed")), "avg_power": _safe_float(lap.get("avgPower")), }) name = sport_type.title() if start_time: name += " " + start_time.strftime("%Y-%m-%d") return { "name": name, "sport_type": sport_type, "start_time": start_time.isoformat() if start_time else None, "distance_m": _safe_float(session.get("totalDistance")), "duration_s": _safe_float(session.get("totalElapsedTime")), "elevation_gain_m": _safe_float(session.get("totalAscent")), "elevation_loss_m": _safe_float(session.get("totalDescent")), "avg_heart_rate": _safe_float(session.get("avgHeartRate")), "max_heart_rate": _safe_float(session.get("maxHeartRate")), "avg_cadence": _safe_float(session.get("avgCadence")), "avg_power": _safe_float(session.get("avgPower")), "normalized_power": _safe_float(session.get("normalizedPower")), "avg_speed_ms": _safe_float(session.get("avgSpeed") or session.get("enhancedAvgSpeed")), "max_speed_ms": _safe_float(session.get("maxSpeed") or session.get("enhancedMaxSpeed")), "avg_temperature_c": _safe_float(session.get("avgTemperature")), "calories": _safe_float(session.get("totalCalories")), "training_stress_score": _safe_float(session.get("trainingStressScore")), "vo2max_estimate": _safe_float(session.get("totalTrainingEffect")), "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "fit", "data_points": normalized_points, "laps": normalized_laps, } def parse_gpx_file(filepath: str) -> dict: """Parse a GPX file.""" with open(filepath) as f: gpx = gpxpy.parse(f) data_points = [] track = gpx.tracks[0] if gpx.tracks else None if not track: raise ValueError("No tracks found in GPX file") for segment in track.segments: for pt in segment.points: ts = pt.time if ts and ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) extensions = {} if pt.extensions: for ext in pt.extensions: for child in ext: tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag try: extensions[tag] = float(child.text) except (ValueError, TypeError): pass data_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": pt.latitude, "longitude": pt.longitude, "altitude_m": pt.elevation, "heart_rate": extensions.get("hr"), "cadence": extensions.get("cad"), "speed_ms": extensions.get("speed"), "power": extensions.get("power"), "temperature_c": extensions.get("temp") or extensions.get("atemp"), "distance_m": None, }) coords = [(p["latitude"], p["longitude"]) for p in data_points if p["latitude"] and p["longitude"]] encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) total_dist = 0.0 prev = None for p in data_points: if p["latitude"] and p["longitude"]: if prev: total_dist += haversine_distance(prev[0], prev[1], p["latitude"], p["longitude"]) prev = (p["latitude"], p["longitude"]) p["distance_m"] = total_dist uphill, downhill = 0.0, 0.0 alts = [p["altitude_m"] for p in data_points if p["altitude_m"]] for i in range(1, len(alts)): diff = alts[i] - alts[i-1] if diff > 0: uphill += diff else: downhill += abs(diff) hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]] start_time_str = data_points[0]["timestamp"] if data_points else None start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None sport = "running" if track.type: sport = track.type.lower() return { "name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}", "sport_type": sport, "start_time": start_time_str, "distance_m": total_dist, "duration_s": duration, "elevation_gain_m": uphill, "elevation_loss_m": downhill, "avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None, "max_heart_rate": max(hrs) if hrs else None, "avg_cadence": None, "avg_power": None, "normalized_power": None, "avg_speed_ms": (total_dist / duration) if (total_dist and duration) else None, "max_speed_ms": None, "avg_temperature_c": None, "calories": None, "training_stress_score": None, "vo2max_estimate": None, "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "gpx", "data_points": data_points, "laps": [], } def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict: """Calculate % time in each HR zone using user's configured max HR.""" if not user_max_hr or user_max_hr < 100: return {} zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01] zone_keys = ["z1", "z2", "z3", "z4", "z5"] zones = {k: 0 for k in zone_keys} total = 0 for p in data_points: hr = p.get("heart_rate") if not hr or hr < 20: continue pct = hr / user_max_hr total += 1 for i, key in enumerate(zone_keys): if zone_bounds[i] <= pct < zone_bounds[i+1]: zones[key] += 1 break else: zones["z5"] += 1 if total: return {k: round(v / total * 100, 1) for k, v in zones.items()} return {}