Files
owain 6a1726e0c3
Build and push images / validate (push) Successful in 3s
Build and push images / build-backend (push) Successful in 6s
Build and push images / build-worker (push) Successful in 4s
Build and push images / build-frontend (push) Successful in 9s
Fix sleep score parsing, dashboard body battery, segment direction
- Garmin sync: read sleepScores from dailySleepDTO (Garmin nests it there),
  so sleep score is actually stored instead of always null
- Dashboard: pass YYYY-MM-DD to the intraday endpoint (was a full ISO
  timestamp), so the body-battery tile populates
- Segment matching: follow the segment in its created direction with a
  path-length sanity check, so out-and-back routes no longer match an early
  start pass to a late finish (the >1h bogus segment times)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 20:58:53 +01:00

234 lines
7.8 KiB
Python

"""
Route matching: identifies when multiple activities were on the same route.
Uses a bounding-box pre-filter + dynamic time warping (DTW) for GPS track similarity.
"""
import math
from typing import Optional
import polyline as polyline_lib
import numpy as np
def decode_polyline_to_coords(encoded: str) -> list[tuple[float, float]]:
return polyline_lib.decode(encoded)
def bounding_boxes_overlap(bb1: dict, bb2: dict, tolerance_deg: float = 0.005) -> bool:
"""Quick check: do two bounding boxes overlap (with a tolerance margin)?"""
return (
bb1["min_lat"] - tolerance_deg <= bb2["max_lat"] + tolerance_deg and
bb1["max_lat"] + tolerance_deg >= bb2["min_lat"] - tolerance_deg and
bb1["min_lon"] - tolerance_deg <= bb2["max_lon"] + tolerance_deg and
bb1["max_lon"] + tolerance_deg >= bb2["min_lon"] - tolerance_deg
)
def sample_coords(coords: list[tuple], n: int = 100) -> list[tuple]:
"""Downsample a track to n evenly-spaced points for DTW efficiency."""
if len(coords) <= n:
return coords
indices = [int(i * (len(coords) - 1) / (n - 1)) for i in range(n)]
return [coords[i] for i in indices]
def dtw_distance(track1: list[tuple], track2: list[tuple]) -> float:
"""
Compute DTW distance between two GPS tracks.
Each point is (lat, lon). Returns average distance in metres per matched pair.
"""
n, m = len(track1), len(track2)
dtw = np.full((n + 1, m + 1), np.inf)
dtw[0][0] = 0.0
for i in range(1, n + 1):
for j in range(1, m + 1):
cost = haversine_m(track1[i-1], track2[j-1])
dtw[i][j] = cost + min(dtw[i-1][j], dtw[i][j-1], dtw[i-1][j-1])
return dtw[n][m] / max(n, m)
def haversine_m(p1: tuple, p2: tuple) -> float:
R = 6371000
lat1, lon1 = math.radians(p1[0]), math.radians(p1[1])
lat2, lon2 = math.radians(p2[0]), math.radians(p2[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
return 2 * R * math.asin(math.sqrt(a))
def routes_are_similar(
poly1: str,
poly2: str,
bb1: Optional[dict],
bb2: Optional[dict],
dtw_threshold_m: float = 80.0,
dist1: Optional[float] = None,
dist2: Optional[float] = None,
) -> bool:
"""
Returns True if two activities are on sufficiently similar routes.
First does a cheap bounding box check, then DTW on downsampled tracks.
When dist1/dist2 are provided:
- Rejects if distance differs by more than 2.5%
- Uses 3% of route distance as the DTW threshold (capped at 300m)
"""
if dist1 and dist2 and dist1 > 0 and dist2 > 0:
if abs(dist1 - dist2) / max(dist1, dist2) > 0.025:
return False
dtw_threshold_m = min(max(dist1, dist2) * 0.03, 300.0)
if bb1 and bb2:
if not bounding_boxes_overlap(bb1, bb2):
return False
try:
coords1 = sample_coords(decode_polyline_to_coords(poly1), 60)
coords2 = sample_coords(decode_polyline_to_coords(poly2), 60)
except Exception:
return False
if not coords1 or not coords2:
return False
dist = dtw_distance(coords1, coords2)
return dist < dtw_threshold_m
def match_segment_in_activity(
seg_coords: list[tuple],
act_coords: list[tuple],
act_times: list,
tol_m: float = 30.0,
) -> Optional[float]:
"""
Determine whether an activity track traverses a segment's GPS geometry in the
segment's own direction, and if so how long the fastest such traversal took.
Works even when the activity's overall route differs — only the overlapping
stretch matters.
seg_coords: [(lat, lon), ...] segment geometry (start → end).
act_coords: [(lat, lon), ...] activity track, in time order.
act_times: parallel list of datetimes for act_coords.
Strategy: for every pass of the activity near the segment START, walk forward
accumulating path length; accept the traversal only if the activity reaches the
segment END after covering roughly the segment's own length (so an out-and-back
route can't match an early start to a late finish), and the intermediate segment
points are passed in order. Returns the shortest valid traversal time, or None.
"""
n = len(act_coords)
m = len(seg_coords)
if n < 2 or m < 2:
return None
start_pt, end_pt = seg_coords[0], seg_coords[-1]
seg_len = sum(haversine_m(seg_coords[k], seg_coords[k + 1]) for k in range(m - 1))
if seg_len <= 0:
return None
near_start = lambda i: haversine_m(act_coords[i], start_pt) <= tol_m
# One candidate entry per pass through the start region (first point of each run).
entries = [i for i in range(n) if near_start(i) and (i == 0 or not near_start(i - 1))]
best = None
for si in entries:
path = 0.0
ei = None
for i in range(si + 1, n):
path += haversine_m(act_coords[i - 1], act_coords[i])
if path > seg_len * 1.5: # wandered too far without finishing → wrong pass/direction
break
if path >= seg_len * 0.6 and haversine_m(act_coords[i], end_pt) <= tol_m:
ei = i
break
if ei is None:
continue
# Confirm the activity follows the segment shape in order between the anchors.
ok = True
for frac in (0.25, 0.5, 0.75):
sp = seg_coords[int(frac * (m - 1))]
if not any(haversine_m(act_coords[k], sp) <= tol_m for k in range(si, ei + 1)):
ok = False
break
if not ok:
continue
dur = (act_times[ei] - act_times[si]).total_seconds()
if dur > 0 and (best is None or dur < best):
best = dur
return best
def find_best_split_time(
data_points: list[dict],
target_distance_m: float,
) -> Optional[float]:
"""
Find the best (fastest) time over any target_distance_m window within an activity.
E.g. fastest 1km split in a 10km run.
Returns duration in seconds.
"""
points_with_dist = [
p for p in data_points
if p.get("distance_m") is not None and p.get("timestamp") is not None
]
if not points_with_dist:
return None
best = None
j = 0
for i, start_p in enumerate(points_with_dist):
start_dist = start_p["distance_m"]
start_ts = start_p["timestamp"]
# Advance j until distance covered >= target
while j < len(points_with_dist):
end_p = points_with_dist[j]
covered = end_p["distance_m"] - start_dist
if covered >= target_distance_m:
from datetime import datetime
t1 = datetime.fromisoformat(start_ts) if isinstance(start_ts, str) else start_ts
t2 = datetime.fromisoformat(end_p["timestamp"]) if isinstance(end_p["timestamp"], str) else end_p["timestamp"]
duration = (t2 - t1).total_seconds()
if best is None or duration < best:
best = duration
break
j += 1
if j >= len(points_with_dist):
break
return best
STANDARD_DISTANCES = [
(400, "400m"),
(800, "800m"),
(1000, "1k"),
(1609.34, "1 mile"),
(3000, "3k"),
(5000, "5k"),
(10000, "10k"),
(21097.5, "Half marathon"),
(42195, "Marathon"),
(50000, "50k"),
(100000, "100k"),
]
def compute_best_splits(data_points: list[dict], total_distance_m: float) -> dict[str, float]:
"""Compute best split times for all standard distances that fit within the activity."""
results = {}
for dist_m, label in STANDARD_DISTANCES:
if total_distance_m >= dist_m * 0.95: # allow 5% tolerance
best = find_best_split_time(data_points, dist_m)
if best:
results[label] = best
return results