""" Route matching: identifies when multiple activities were on the same route. Uses a bounding-box pre-filter + dynamic time warping (DTW) for GPS track similarity. """ import math from typing import Optional import polyline as polyline_lib import numpy as np def decode_polyline_to_coords(encoded: str) -> list[tuple[float, float]]: return polyline_lib.decode(encoded) def bounding_boxes_overlap(bb1: dict, bb2: dict, tolerance_deg: float = 0.005) -> bool: """Quick check: do two bounding boxes overlap (with a tolerance margin)?""" return ( bb1["min_lat"] - tolerance_deg <= bb2["max_lat"] + tolerance_deg and bb1["max_lat"] + tolerance_deg >= bb2["min_lat"] - tolerance_deg and bb1["min_lon"] - tolerance_deg <= bb2["max_lon"] + tolerance_deg and bb1["max_lon"] + tolerance_deg >= bb2["min_lon"] - tolerance_deg ) def sample_coords(coords: list[tuple], n: int = 100) -> list[tuple]: """Downsample a track to n evenly-spaced points for DTW efficiency.""" if len(coords) <= n: return coords indices = [int(i * (len(coords) - 1) / (n - 1)) for i in range(n)] return [coords[i] for i in indices] def dtw_distance(track1: list[tuple], track2: list[tuple]) -> float: """ Compute DTW distance between two GPS tracks. Each point is (lat, lon). Returns average distance in metres per matched pair. """ n, m = len(track1), len(track2) dtw = np.full((n + 1, m + 1), np.inf) dtw[0][0] = 0.0 for i in range(1, n + 1): for j in range(1, m + 1): cost = haversine_m(track1[i-1], track2[j-1]) dtw[i][j] = cost + min(dtw[i-1][j], dtw[i][j-1], dtw[i-1][j-1]) return dtw[n][m] / max(n, m) def haversine_m(p1: tuple, p2: tuple) -> float: R = 6371000 lat1, lon1 = math.radians(p1[0]), math.radians(p1[1]) lat2, lon2 = math.radians(p2[0]), math.radians(p2[1]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2 return 2 * R * math.asin(math.sqrt(a)) def routes_are_similar( poly1: str, poly2: str, bb1: Optional[dict], bb2: Optional[dict], dtw_threshold_m: float = 80.0, dist1: Optional[float] = None, dist2: Optional[float] = None, ) -> bool: """ Returns True if two activities are on sufficiently similar routes. First does a cheap bounding box check, then DTW on downsampled tracks. When dist1/dist2 are provided: - Rejects if distance differs by more than 2.5% - Uses 3% of route distance as the DTW threshold (capped at 300m) """ if dist1 and dist2 and dist1 > 0 and dist2 > 0: if abs(dist1 - dist2) / max(dist1, dist2) > 0.025: return False dtw_threshold_m = min(max(dist1, dist2) * 0.03, 300.0) if bb1 and bb2: if not bounding_boxes_overlap(bb1, bb2): return False try: coords1 = sample_coords(decode_polyline_to_coords(poly1), 60) coords2 = sample_coords(decode_polyline_to_coords(poly2), 60) except Exception: return False if not coords1 or not coords2: return False dist = dtw_distance(coords1, coords2) return dist < dtw_threshold_m def find_segment_times( data_points: list[dict], start_dist_m: float, end_dist_m: float, ) -> Optional[float]: """ Given activity data points (with cumulative distance_m), find the time to traverse from start_dist_m to end_dist_m. Returns duration in seconds, or None if not found. """ start_time = None end_time = None for p in data_points: dist = p.get("distance_m") ts = p.get("timestamp") if dist is None or ts is None: continue if start_time is None and dist >= start_dist_m: start_time = ts if start_time is not None and dist >= end_dist_m: end_time = ts break if start_time and end_time: from datetime import datetime t1 = datetime.fromisoformat(start_time) if isinstance(start_time, str) else start_time t2 = datetime.fromisoformat(end_time) if isinstance(end_time, str) else end_time return (t2 - t1).total_seconds() return None def find_best_split_time( data_points: list[dict], target_distance_m: float, ) -> Optional[float]: """ Find the best (fastest) time over any target_distance_m window within an activity. E.g. fastest 1km split in a 10km run. Returns duration in seconds. """ points_with_dist = [ p for p in data_points if p.get("distance_m") is not None and p.get("timestamp") is not None ] if not points_with_dist: return None best = None j = 0 for i, start_p in enumerate(points_with_dist): start_dist = start_p["distance_m"] start_ts = start_p["timestamp"] # Advance j until distance covered >= target while j < len(points_with_dist): end_p = points_with_dist[j] covered = end_p["distance_m"] - start_dist if covered >= target_distance_m: from datetime import datetime t1 = datetime.fromisoformat(start_ts) if isinstance(start_ts, str) else start_ts t2 = datetime.fromisoformat(end_p["timestamp"]) if isinstance(end_p["timestamp"], str) else end_p["timestamp"] duration = (t2 - t1).total_seconds() if best is None or duration < best: best = duration break j += 1 if j >= len(points_with_dist): break return best def _bearing(p1: tuple, p2: tuple) -> float: """Compass bearing in degrees (0-360) from p1 to p2.""" lat1, lon1 = math.radians(p1[0]), math.radians(p1[1]) lat2, lon2 = math.radians(p2[0]), math.radians(p2[1]) dlon = lon2 - lon1 x = math.sin(dlon) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon) return math.degrees(math.atan2(x, y)) % 360 def generate_1km_segments(encoded_polyline: str, total_dist_m: float) -> list[tuple[str, float, float]]: """Generate 1-km splits along a route. Returns list of (name, start_m, end_m).""" if not encoded_polyline: return [] km_count = int(total_dist_m / 1000) segments = [] for i in range(km_count): segments.append((f"km {i + 1}", float(i * 1000), float((i + 1) * 1000))) remainder = total_dist_m - km_count * 1000 if remainder >= 200: segments.append((f"km {km_count + 1}", float(km_count * 1000), total_dist_m)) return segments def generate_turn_segments( encoded_polyline: str, turn_angle_deg: float = 45.0, ) -> list[tuple[str, float, float]]: """Detect sharp turns in a route polyline. Returns list of (name, start_m, end_m).""" coords = decode_polyline_to_coords(encoded_polyline) if len(coords) < 3: return [] cum_dists = [0.0] for i in range(1, len(coords)): cum_dists.append(cum_dists[-1] + haversine_m(coords[i - 1], coords[i])) total = cum_dists[-1] HALF_WINDOW = 100.0 # metres either side of candidate turn point turn_centers: list[float] = [] for i in range(1, len(coords) - 1): # Find index ~HALF_WINDOW before and after start_i = i while start_i > 0 and cum_dists[i] - cum_dists[start_i] < HALF_WINDOW: start_i -= 1 end_i = i while end_i < len(coords) - 1 and cum_dists[end_i] - cum_dists[i] < HALF_WINDOW: end_i += 1 if start_i == i or end_i == i: continue b1 = _bearing(coords[start_i], coords[i]) b2 = _bearing(coords[i], coords[end_i]) diff = abs(b2 - b1) % 360 if diff > 180: diff = 360 - diff if diff >= turn_angle_deg: turn_centers.append(cum_dists[i]) if not turn_centers: return [] # Cluster turns within 150 m of each other → one segment per cluster clusters: list[list[float]] = [[turn_centers[0]]] for d in turn_centers[1:]: if d - clusters[-1][-1] < 150: clusters[-1].append(d) else: clusters.append([d]) segments = [] for cluster in clusters: center = sum(cluster) / len(cluster) start = max(0.0, center - HALF_WINDOW) end = min(total, center + HALF_WINDOW) segments.append((f"Turn at {center / 1000:.1f} km", start, end)) return segments def generate_hill_segments( data_points: list[dict], gradient_pct: float = 5.0, ) -> list[tuple[str, float, float]]: """ Detect uphill sections using activity data points (with altitude_m + distance_m). Returns list of (name, start_m, end_m). """ pts = [ (p["distance_m"], p["altitude_m"]) for p in data_points if p.get("distance_m") is not None and p.get("altitude_m") is not None ] if len(pts) < 10: return [] pts.sort(key=lambda x: x[0]) dists = [p[0] for p in pts] alts = [p[1] for p in pts] # Smooth altitude with a sliding window to reduce GPS noise SMOOTH = 10 smooth_alts = [] for i in range(len(alts)): lo, hi = max(0, i - SMOOTH), min(len(alts), i + SMOOTH + 1) smooth_alts.append(sum(alts[lo:hi]) / (hi - lo)) grad_threshold = gradient_pct / 100.0 MIN_HILL_M = 200.0 in_hill = False hill_start_idx = 0 segments = [] for i in range(1, len(dists)): d_dist = dists[i] - dists[i - 1] if d_dist <= 0: continue grad = (smooth_alts[i] - smooth_alts[i - 1]) / d_dist if grad >= grad_threshold and not in_hill: in_hill = True hill_start_idx = i - 1 elif grad < grad_threshold and in_hill: length = dists[i - 1] - dists[hill_start_idx] if length >= MIN_HILL_M: gain = round(smooth_alts[i - 1] - smooth_alts[hill_start_idx]) start_km = dists[hill_start_idx] / 1000 segments.append(( f"Hill at {start_km:.1f} km (+{gain} m)", dists[hill_start_idx], dists[i - 1], )) in_hill = False if in_hill: length = dists[-1] - dists[hill_start_idx] if length >= MIN_HILL_M: gain = round(smooth_alts[-1] - smooth_alts[hill_start_idx]) start_km = dists[hill_start_idx] / 1000 segments.append(( f"Hill at {start_km:.1f} km (+{gain} m)", dists[hill_start_idx], dists[-1], )) return segments STANDARD_DISTANCES = [ (400, "400m"), (800, "800m"), (1000, "1k"), (1609.34, "1 mile"), (3000, "3k"), (5000, "5k"), (10000, "10k"), (21097.5, "Half marathon"), (42195, "Marathon"), (50000, "50k"), (100000, "100k"), ] def compute_best_splits(data_points: list[dict], total_distance_m: float) -> dict[str, float]: """Compute best split times for all standard distances that fit within the activity.""" results = {} for dist_m, label in STANDARD_DISTANCES: if total_distance_m >= dist_m * 0.95: # allow 5% tolerance best = find_best_split_time(data_points, dist_m) if best: results[label] = best return results