""" Route matching: identifies when multiple activities were on the same route. Uses a bounding-box pre-filter + dynamic time warping (DTW) for GPS track similarity. """ import math from typing import Optional import polyline as polyline_lib import numpy as np def decode_polyline_to_coords(encoded: str) -> list[tuple[float, float]]: return polyline_lib.decode(encoded) def bounding_boxes_overlap(bb1: dict, bb2: dict, tolerance_deg: float = 0.005) -> bool: """Quick check: do two bounding boxes overlap (with a tolerance margin)?""" return ( bb1["min_lat"] - tolerance_deg <= bb2["max_lat"] + tolerance_deg and bb1["max_lat"] + tolerance_deg >= bb2["min_lat"] - tolerance_deg and bb1["min_lon"] - tolerance_deg <= bb2["max_lon"] + tolerance_deg and bb1["max_lon"] + tolerance_deg >= bb2["min_lon"] - tolerance_deg ) def sample_coords(coords: list[tuple], n: int = 100) -> list[tuple]: """Downsample a track to n evenly-spaced points for DTW efficiency.""" if len(coords) <= n: return coords indices = [int(i * (len(coords) - 1) / (n - 1)) for i in range(n)] return [coords[i] for i in indices] def dtw_distance(track1: list[tuple], track2: list[tuple]) -> float: """ Compute DTW distance between two GPS tracks. Each point is (lat, lon). Returns average distance in metres per matched pair. """ n, m = len(track1), len(track2) dtw = np.full((n + 1, m + 1), np.inf) dtw[0][0] = 0.0 for i in range(1, n + 1): for j in range(1, m + 1): cost = haversine_m(track1[i-1], track2[j-1]) dtw[i][j] = cost + min(dtw[i-1][j], dtw[i][j-1], dtw[i-1][j-1]) return dtw[n][m] / max(n, m) def haversine_m(p1: tuple, p2: tuple) -> float: R = 6371000 lat1, lon1 = math.radians(p1[0]), math.radians(p1[1]) lat2, lon2 = math.radians(p2[0]), math.radians(p2[1]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2 return 2 * R * math.asin(math.sqrt(a)) def routes_are_similar( poly1: str, poly2: str, bb1: Optional[dict], bb2: Optional[dict], dtw_threshold_m: float = 80.0, dist1: Optional[float] = None, dist2: Optional[float] = None, ) -> bool: """ Returns True if two activities are on sufficiently similar routes. First does a cheap bounding box check, then DTW on downsampled tracks. When dist1/dist2 are provided: - Rejects if distance differs by more than 2.5% - Uses 3% of route distance as the DTW threshold (capped at 300m) """ if dist1 and dist2 and dist1 > 0 and dist2 > 0: if abs(dist1 - dist2) / max(dist1, dist2) > 0.025: return False dtw_threshold_m = min(max(dist1, dist2) * 0.03, 300.0) if bb1 and bb2: if not bounding_boxes_overlap(bb1, bb2): return False try: coords1 = sample_coords(decode_polyline_to_coords(poly1), 60) coords2 = sample_coords(decode_polyline_to_coords(poly2), 60) except Exception: return False if not coords1 or not coords2: return False dist = dtw_distance(coords1, coords2) return dist < dtw_threshold_m def match_segment_in_activity( seg_coords: list[tuple], act_coords: list[tuple], act_times: list, tol_m: float = 30.0, ) -> Optional[float]: """ Determine whether an activity track traverses a segment's GPS geometry, and if so how long it took. Works even when the activity's overall route differs — only the overlapping stretch matters. seg_coords: [(lat, lon), ...] segment geometry (start → end). act_coords: [(lat, lon), ...] activity track, in time order. act_times: parallel list of datetimes for act_coords. Strategy: anchor on the activity point nearest the segment start, then the nearest point (at/after it) to the segment end, then verify a few intermediate segment points are each passed within tolerance between those anchors. Returns the time between the start and end anchors, or None if the activity doesn't follow the segment. """ n = len(act_coords) if n < 2 or len(seg_coords) < 2: return None start_pt, end_pt = seg_coords[0], seg_coords[-1] si, sd = None, tol_m for i in range(n): d = haversine_m(act_coords[i], start_pt) if d < sd: sd, si = d, i if si is None: return None ei, ed = None, tol_m for i in range(si + 1, n): d = haversine_m(act_coords[i], end_pt) if d < ed: ed, ei = d, i if ei is None or ei <= si: return None # Verify the activity actually follows the segment shape between the anchors. for frac in (0.25, 0.5, 0.75): sp = seg_coords[int(frac * (len(seg_coords) - 1))] if not any(haversine_m(act_coords[i], sp) <= tol_m for i in range(si, ei + 1)): return None dur = (act_times[ei] - act_times[si]).total_seconds() return dur if dur > 0 else None def find_best_split_time( data_points: list[dict], target_distance_m: float, ) -> Optional[float]: """ Find the best (fastest) time over any target_distance_m window within an activity. E.g. fastest 1km split in a 10km run. Returns duration in seconds. """ points_with_dist = [ p for p in data_points if p.get("distance_m") is not None and p.get("timestamp") is not None ] if not points_with_dist: return None best = None j = 0 for i, start_p in enumerate(points_with_dist): start_dist = start_p["distance_m"] start_ts = start_p["timestamp"] # Advance j until distance covered >= target while j < len(points_with_dist): end_p = points_with_dist[j] covered = end_p["distance_m"] - start_dist if covered >= target_distance_m: from datetime import datetime t1 = datetime.fromisoformat(start_ts) if isinstance(start_ts, str) else start_ts t2 = datetime.fromisoformat(end_p["timestamp"]) if isinstance(end_p["timestamp"], str) else end_p["timestamp"] duration = (t2 - t1).total_seconds() if best is None or duration < best: best = duration break j += 1 if j >= len(points_with_dist): break return best STANDARD_DISTANCES = [ (400, "400m"), (800, "800m"), (1000, "1k"), (1609.34, "1 mile"), (3000, "3k"), (5000, "5k"), (10000, "10k"), (21097.5, "Half marathon"), (42195, "Marathon"), (50000, "50k"), (100000, "100k"), ] def compute_best_splits(data_points: list[dict], total_distance_m: float) -> dict[str, float]: """Compute best split times for all standard distances that fit within the activity.""" results = {} for dist_m, label in STANDARD_DISTANCES: if total_distance_m >= dist_m * 0.95: # allow 5% tolerance best = find_best_split_time(data_points, dist_m) if best: results[label] = best return results