""" FIT and GPX file parser. Parses FIT files directly using the Garmin SDK but applies manual scale conversion for fields where the SDK doesn't auto-convert. """ import math import struct from datetime import datetime, timezone from typing import Optional import gpxpy import polyline as polyline_lib from garmin_fit_sdk import Decoder, Stream FIT_EPOCH_S = 631065600 SEMICIRCLES_TO_DEG = 180.0 / (2 ** 31) def _semicircles_to_deg(val): if val is None: return None try: result = float(val) * SEMICIRCLES_TO_DEG if -90 <= result <= 90 or -180 <= result <= 180: return result except (TypeError, ValueError): pass return None def _safe_float(val) -> Optional[float]: try: return float(val) if val is not None else None except (TypeError, ValueError): return None def _sanitize_speed(val, dist_m=None, dur_s=None) -> Optional[float]: """Reject the FIT invalid sentinel (0xFFFF/1000 = 65.535 m/s) and fall back to dist/dur.""" fv = _safe_float(val) if fv is None or fv >= 65.0: if dist_m and dur_s and float(dur_s) > 0: return float(dist_m) / float(dur_s) return None return fv # Conservative average-speed ceilings (m/s) above which an activity was almost # certainly recorded in a vehicle rather than under human power. Sports not # listed fall back to the generous default. _VEHICLE_SPEED_CEILINGS = { "running": 8.0, # ~28.8 km/h — well above elite sprint pace sustained "walking": 8.0, "hiking": 8.0, "cycling": 22.0, # ~79 km/h — beyond sustained amateur cycling } _VEHICLE_SPEED_DEFAULT = 25.0 # ~90 km/h def _vehicle_reason(sport_type, avg_speed_ms, dist_m=None, dur_s=None) -> Optional[str]: """Return a human-readable reason if the average speed is implausibly fast for the sport (i.e. the 'activity' looks like car/vehicle travel), else None.""" speed = _safe_float(avg_speed_ms) if speed is None and dist_m and dur_s and float(dur_s) > 0: speed = float(dist_m) / float(dur_s) if speed is None or speed <= 0: return None ceiling = _VEHICLE_SPEED_CEILINGS.get(sport_type, _VEHICLE_SPEED_DEFAULT) if speed > ceiling: return (f"Looks like vehicle travel — average speed {speed * 3.6:.0f} km/h " f"exceeds the plausible limit for {sport_type}") return None def _bounding_box(coords): if not coords: return None lats = [c[0] for c in coords] lons = [c[1] for c in coords] return {"min_lat": min(lats), "max_lat": max(lats), "min_lon": min(lons), "max_lon": max(lons)} def _to_dt(val) -> Optional[datetime]: if val is None: return None if isinstance(val, datetime): return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val if isinstance(val, (int, float)): try: return datetime.fromtimestamp(int(val) + FIT_EPOCH_S, tz=timezone.utc) except (OSError, OverflowError, ValueError): return None return None def _is_valid_lat(v): return v is not None and -90 <= v <= 90 def _is_valid_lon(v): return v is not None and -180 <= v <= 180 def parse_fit_file(filepath: str) -> dict: session_data = {} records = [] laps = [] def listener(mesg_num: int, msg: dict): if mesg_num == 18: # session session_data.update(msg) elif mesg_num == 20: # record records.append(msg) elif mesg_num == 19: # lap laps.append(msg) stream = Stream.from_file(filepath) decoder = Decoder(stream) decoder.read( apply_scale_and_offset=True, convert_datetimes_to_dates=True, convert_types_to_strings=True, enable_crc_check=False, expand_sub_fields=True, expand_components=True, merge_heart_rates=True, mesg_listener=listener, ) # The SDK may return field names in camelCase or snake_case depending on version. # Try both. Also handle raw timestamp integers for start_time. def get(d, *keys): for k in keys: v = d.get(k) if v is not None: return v return None sport_raw = str(get(session_data, "sport", "Sport") or "generic").lower() sport_map = { "running": "running", "cycling": "cycling", "hiking": "hiking", "walking": "walking", "generic": "other", "trail_running": "running", "e_biking": "cycling", "open_water_swimming": "other", } sport_type = sport_map.get(sport_raw, sport_raw) # start_time — SDK may return datetime or raw int start_time_raw = get(session_data, "startTime", "start_time") start_time = _to_dt(start_time_raw) # Position fields — the SDK may or may not convert semicircles. # Check if values look like semicircles (>= 90 for lat) and convert if so. def get_lat(d): v = get(d, "positionLat", "position_lat") if v is None: return None fv = _safe_float(v) if fv is None: return None # If absolute value > 90, it's semicircles if abs(fv) > 90: fv = fv * SEMICIRCLES_TO_DEG return fv if _is_valid_lat(fv) else None def get_lon(d): v = get(d, "positionLong", "position_long") if v is None: return None fv = _safe_float(v) if fv is None: return None if abs(fv) > 180: fv = fv * SEMICIRCLES_TO_DEG return fv if _is_valid_lon(fv) else None # Build GPS track coords = [] for r in records: lat = get_lat(r) lon = get_lon(r) if lat is not None and lon is not None: coords.append((lat, lon)) encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) # Normalize data points normalized_points = [] for r in records: ts = _to_dt(get(r, "timestamp")) lat = get_lat(r) lon = get_lon(r) altitude = get(r, "altitude", "enhancedAltitude", "enhanced_altitude") hr = get(r, "heartRate", "heart_rate") cadence = get(r, "cadence") speed = get(r, "speed", "enhancedSpeed", "enhanced_speed") power = get(r, "power") temp = get(r, "temperature") distance = get(r, "distance") normalized_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": _safe_float(lat), "longitude": _safe_float(lon), "altitude_m": _safe_float(altitude), "heart_rate": _safe_float(hr), "cadence": _safe_float(cadence), "speed_ms": _safe_float(speed), "power": _safe_float(power), "temperature_c": _safe_float(temp), "distance_m": _safe_float(distance), }) # Normalize laps normalized_laps = [] for i, lap in enumerate(laps): ls = _to_dt(get(lap, "startTime", "start_time")) lap_dist = _safe_float(get(lap, "totalDistance", "total_distance")) lap_dur = _safe_float(get(lap, "totalElapsedTime", "total_elapsed_time")) normalized_laps.append({ "lap_number": i + 1, "start_time": ls.isoformat() if ls else None, "duration_s": lap_dur, "distance_m": lap_dist, "avg_heart_rate": _safe_float(get(lap, "avgHeartRate", "avg_heart_rate")), "avg_cadence": _safe_float(get(lap, "avgCadence", "avg_cadence")), "avg_speed_ms": _sanitize_speed( get(lap, "avgSpeed", "avg_speed", "enhancedAvgSpeed", "enhanced_avg_speed"), dist_m=lap_dist, dur_s=lap_dur, ), "avg_power": _safe_float(get(lap, "avgPower", "avg_power")), }) name = sport_type.title() if start_time: name += " " + start_time.strftime("%Y-%m-%d") total_dist = _safe_float(get(session_data, "totalDistance", "total_distance")) elapsed_s = _safe_float(get(session_data, "totalElapsedTime", "total_elapsed_time")) # Timer time = time the device was actively recording (excludes auto/manual pauses). moving_s = _safe_float(get(session_data, "totalTimerTime", "total_timer_time")) avg_speed = _sanitize_speed( get(session_data, "avgSpeed", "avg_speed", "enhancedAvgSpeed", "enhanced_avg_speed"), dist_m=total_dist, dur_s=elapsed_s, ) return { "name": name, "sport_type": sport_type, "start_time": start_time.isoformat() if start_time else None, "distance_m": total_dist, "duration_s": elapsed_s, "moving_time_s": moving_s, "elevation_gain_m": _safe_float(get(session_data, "totalAscent", "total_ascent")), "elevation_loss_m": _safe_float(get(session_data, "totalDescent", "total_descent")), "avg_heart_rate": _safe_float(get(session_data, "avgHeartRate", "avg_heart_rate")), "max_heart_rate": _safe_float(get(session_data, "maxHeartRate", "max_heart_rate")), "avg_cadence": _safe_float(get(session_data, "avgCadence", "avg_cadence")), "avg_power": _safe_float(get(session_data, "avgPower", "avg_power")), "normalized_power": _safe_float(get(session_data, "normalizedPower", "normalized_power")), "avg_speed_ms": avg_speed, "max_speed_ms": _safe_float(get(session_data, "maxSpeed", "max_speed", "enhancedMaxSpeed", "enhanced_max_speed")), "avg_temperature_c": _safe_float(get(session_data, "avgTemperature", "avg_temperature")), "calories": _safe_float(get(session_data, "totalCalories", "total_calories")), "training_stress_score": _safe_float(get(session_data, "trainingStressScore", "training_stress_score")), "vo2max_estimate": _safe_float(get(session_data, "totalTrainingEffect", "total_training_effect")), "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "fit", "rejected_reason": _vehicle_reason(sport_type, avg_speed, total_dist, moving_s or elapsed_s), "data_points": normalized_points, "laps": normalized_laps, } def parse_gpx_file(filepath: str) -> dict: with open(filepath) as f: gpx = gpxpy.parse(f) data_points = [] track = gpx.tracks[0] if gpx.tracks else None if not track: raise ValueError("No tracks found in GPX file") for segment in track.segments: for pt in segment.points: ts = pt.time if ts and ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) extensions = {} if pt.extensions: for ext in pt.extensions: for child in ext: tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag try: extensions[tag] = float(child.text) except (ValueError, TypeError): pass data_points.append({ "timestamp": ts.isoformat() if ts else None, "latitude": pt.latitude, "longitude": pt.longitude, "altitude_m": pt.elevation, "heart_rate": extensions.get("hr"), "cadence": extensions.get("cad"), "speed_ms": extensions.get("speed"), "power": extensions.get("power"), "temperature_c": extensions.get("temp") or extensions.get("atemp"), "distance_m": None, }) coords = [(p["latitude"], p["longitude"]) for p in data_points if p["latitude"] and p["longitude"]] encoded_polyline = polyline_lib.encode(coords) if coords else None bounding_box = _bounding_box(coords) total_dist = 0.0 prev = None for p in data_points: if p["latitude"] and p["longitude"]: if prev: R = 6371000 phi1, phi2 = math.radians(prev[0]), math.radians(p["latitude"]) dphi = math.radians(p["latitude"] - prev[0]) dlam = math.radians(p["longitude"] - prev[1]) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2 total_dist += 2 * R * math.asin(math.sqrt(a)) prev = (p["latitude"], p["longitude"]) p["distance_m"] = total_dist uphill, downhill = 0.0, 0.0 alts = [p["altitude_m"] for p in data_points if p["altitude_m"]] for i in range(1, len(alts)): diff = alts[i] - alts[i-1] if diff > 0: uphill += diff else: downhill += abs(diff) hrs = [p["heart_rate"] for p in data_points if p["heart_rate"]] start_time_str = data_points[0]["timestamp"] if data_points else None start_dt = datetime.fromisoformat(start_time_str) if start_time_str else None end_dt = datetime.fromisoformat(data_points[-1]["timestamp"]) if data_points else None duration = (end_dt - start_dt).total_seconds() if (start_dt and end_dt) else None sport = track.type.lower() if track.type else "running" gpx_avg_speed = (total_dist / duration) if (total_dist and duration) else None return { "name": track.name or gpx.name or f"Activity {start_dt.date() if start_dt else ''}", "sport_type": sport, "start_time": start_time_str, "distance_m": total_dist, "duration_s": duration, "moving_time_s": None, "elevation_gain_m": uphill, "elevation_loss_m": downhill, "avg_heart_rate": (sum(hrs) / len(hrs)) if hrs else None, "max_heart_rate": max(hrs) if hrs else None, "avg_cadence": None, "avg_power": None, "normalized_power": None, "avg_speed_ms": gpx_avg_speed, "max_speed_ms": None, "avg_temperature_c": None, "calories": None, "training_stress_score": None, "vo2max_estimate": None, "polyline": encoded_polyline, "bounding_box": bounding_box, "source_type": "gpx", "rejected_reason": _vehicle_reason(sport, gpx_avg_speed, total_dist, duration), "data_points": data_points, "laps": [], } def calculate_hr_zones(data_points: list, user_max_hr: float) -> dict: if not user_max_hr or user_max_hr < 100: return {} zone_bounds = [0.0, 0.60, 0.70, 0.80, 0.90, 1.01] zone_keys = ["z1", "z2", "z3", "z4", "z5"] zones = {k: 0 for k in zone_keys} total = 0 for p in data_points: hr = p.get("heart_rate") if not hr or hr < 20: continue pct = hr / user_max_hr total += 1 for i, key in enumerate(zone_keys): if zone_bounds[i] <= pct < zone_bounds[i+1]: zones[key] += 1 break else: zones["z5"] += 1 if total: return {k: round(v / total * 100, 1) for k, v in zones.items()} return {}