0e4bc7b444
PocketID OIDC already auto-provisioned users keyed by pocketid_sub, and the data layer was already fully user-scoped. This adds the missing pieces for running real multi-user: - auth.py callback: link by email to an existing un-linked account (so the admin keeps their data when first signing in by passkey), collision-safe username generation, and request the `groups` scope. - Group gating: optional pocketid_allowed_group (admin-config or POCKETID_ALLOWED_GROUP env); users lacking the group are rejected at the callback and redirected to /login?auth_error=not_authorized. - New admin users API (app/api/users.py): list users, promote/demote admin (guards against demoting/locking out the last admin or yourself), and delete a user with ordered bulk deletes of all their data + on-disk files. - ProfilePage: allowed-group field; LoginPage: rejected-login message; Layout: admin-only Users nav; new UsersPage. Resync milevault_export to current source (it had drifted many features behind — missing garmin_sync, npm-ci Dockerfile and @polyline-codec that broke its own CI) and add POCKETID_ALLOWED_GROUP to .env.example. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
349 lines
11 KiB
Python
349 lines
11 KiB
Python
"""
|
|
Route matching: identifies when multiple activities were on the same route.
|
|
Uses a bounding-box pre-filter + dynamic time warping (DTW) for GPS track similarity.
|
|
"""
|
|
import math
|
|
from typing import Optional
|
|
import polyline as polyline_lib
|
|
import numpy as np
|
|
|
|
|
|
def decode_polyline_to_coords(encoded: str) -> list[tuple[float, float]]:
|
|
return polyline_lib.decode(encoded)
|
|
|
|
|
|
def bounding_boxes_overlap(bb1: dict, bb2: dict, tolerance_deg: float = 0.005) -> bool:
|
|
"""Quick check: do two bounding boxes overlap (with a tolerance margin)?"""
|
|
return (
|
|
bb1["min_lat"] - tolerance_deg <= bb2["max_lat"] + tolerance_deg and
|
|
bb1["max_lat"] + tolerance_deg >= bb2["min_lat"] - tolerance_deg and
|
|
bb1["min_lon"] - tolerance_deg <= bb2["max_lon"] + tolerance_deg and
|
|
bb1["max_lon"] + tolerance_deg >= bb2["min_lon"] - tolerance_deg
|
|
)
|
|
|
|
|
|
def sample_coords(coords: list[tuple], n: int = 100) -> list[tuple]:
|
|
"""Downsample a track to n evenly-spaced points for DTW efficiency."""
|
|
if len(coords) <= n:
|
|
return coords
|
|
indices = [int(i * (len(coords) - 1) / (n - 1)) for i in range(n)]
|
|
return [coords[i] for i in indices]
|
|
|
|
|
|
def dtw_distance(track1: list[tuple], track2: list[tuple]) -> float:
|
|
"""
|
|
Compute DTW distance between two GPS tracks.
|
|
Each point is (lat, lon). Returns average distance in metres per matched pair.
|
|
"""
|
|
n, m = len(track1), len(track2)
|
|
dtw = np.full((n + 1, m + 1), np.inf)
|
|
dtw[0][0] = 0.0
|
|
|
|
for i in range(1, n + 1):
|
|
for j in range(1, m + 1):
|
|
cost = haversine_m(track1[i-1], track2[j-1])
|
|
dtw[i][j] = cost + min(dtw[i-1][j], dtw[i][j-1], dtw[i-1][j-1])
|
|
|
|
return dtw[n][m] / max(n, m)
|
|
|
|
|
|
def haversine_m(p1: tuple, p2: tuple) -> float:
|
|
R = 6371000
|
|
lat1, lon1 = math.radians(p1[0]), math.radians(p1[1])
|
|
lat2, lon2 = math.radians(p2[0]), math.radians(p2[1])
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
|
|
return 2 * R * math.asin(math.sqrt(a))
|
|
|
|
|
|
def routes_are_similar(
|
|
poly1: str,
|
|
poly2: str,
|
|
bb1: Optional[dict],
|
|
bb2: Optional[dict],
|
|
dtw_threshold_m: float = 80.0,
|
|
dist1: Optional[float] = None,
|
|
dist2: Optional[float] = None,
|
|
) -> bool:
|
|
"""
|
|
Returns True if two activities are on sufficiently similar routes.
|
|
First does a cheap bounding box check, then DTW on downsampled tracks.
|
|
When dist1/dist2 are provided:
|
|
- Rejects if distance differs by more than 2.5%
|
|
- Uses 3% of route distance as the DTW threshold (capped at 300m)
|
|
"""
|
|
if dist1 and dist2 and dist1 > 0 and dist2 > 0:
|
|
if abs(dist1 - dist2) / max(dist1, dist2) > 0.025:
|
|
return False
|
|
dtw_threshold_m = min(max(dist1, dist2) * 0.03, 300.0)
|
|
|
|
if bb1 and bb2:
|
|
if not bounding_boxes_overlap(bb1, bb2):
|
|
return False
|
|
|
|
try:
|
|
coords1 = sample_coords(decode_polyline_to_coords(poly1), 60)
|
|
coords2 = sample_coords(decode_polyline_to_coords(poly2), 60)
|
|
except Exception:
|
|
return False
|
|
|
|
if not coords1 or not coords2:
|
|
return False
|
|
|
|
dist = dtw_distance(coords1, coords2)
|
|
return dist < dtw_threshold_m
|
|
|
|
|
|
def find_segment_times(
|
|
data_points: list[dict],
|
|
start_dist_m: float,
|
|
end_dist_m: float,
|
|
) -> Optional[float]:
|
|
"""
|
|
Given activity data points (with cumulative distance_m),
|
|
find the time to traverse from start_dist_m to end_dist_m.
|
|
Returns duration in seconds, or None if not found.
|
|
"""
|
|
start_time = None
|
|
end_time = None
|
|
|
|
for p in data_points:
|
|
dist = p.get("distance_m")
|
|
ts = p.get("timestamp")
|
|
if dist is None or ts is None:
|
|
continue
|
|
|
|
if start_time is None and dist >= start_dist_m:
|
|
start_time = ts
|
|
|
|
if start_time is not None and dist >= end_dist_m:
|
|
end_time = ts
|
|
break
|
|
|
|
if start_time and end_time:
|
|
from datetime import datetime
|
|
t1 = datetime.fromisoformat(start_time) if isinstance(start_time, str) else start_time
|
|
t2 = datetime.fromisoformat(end_time) if isinstance(end_time, str) else end_time
|
|
return (t2 - t1).total_seconds()
|
|
|
|
return None
|
|
|
|
|
|
def find_best_split_time(
|
|
data_points: list[dict],
|
|
target_distance_m: float,
|
|
) -> Optional[float]:
|
|
"""
|
|
Find the best (fastest) time over any target_distance_m window within an activity.
|
|
E.g. fastest 1km split in a 10km run.
|
|
Returns duration in seconds.
|
|
"""
|
|
points_with_dist = [
|
|
p for p in data_points
|
|
if p.get("distance_m") is not None and p.get("timestamp") is not None
|
|
]
|
|
|
|
if not points_with_dist:
|
|
return None
|
|
|
|
best = None
|
|
j = 0
|
|
|
|
for i, start_p in enumerate(points_with_dist):
|
|
start_dist = start_p["distance_m"]
|
|
start_ts = start_p["timestamp"]
|
|
|
|
# Advance j until distance covered >= target
|
|
while j < len(points_with_dist):
|
|
end_p = points_with_dist[j]
|
|
covered = end_p["distance_m"] - start_dist
|
|
if covered >= target_distance_m:
|
|
from datetime import datetime
|
|
t1 = datetime.fromisoformat(start_ts) if isinstance(start_ts, str) else start_ts
|
|
t2 = datetime.fromisoformat(end_p["timestamp"]) if isinstance(end_p["timestamp"], str) else end_p["timestamp"]
|
|
duration = (t2 - t1).total_seconds()
|
|
if best is None or duration < best:
|
|
best = duration
|
|
break
|
|
j += 1
|
|
|
|
if j >= len(points_with_dist):
|
|
break
|
|
|
|
return best
|
|
|
|
|
|
def _bearing(p1: tuple, p2: tuple) -> float:
|
|
"""Compass bearing in degrees (0-360) from p1 to p2."""
|
|
lat1, lon1 = math.radians(p1[0]), math.radians(p1[1])
|
|
lat2, lon2 = math.radians(p2[0]), math.radians(p2[1])
|
|
dlon = lon2 - lon1
|
|
x = math.sin(dlon) * math.cos(lat2)
|
|
y = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon)
|
|
return math.degrees(math.atan2(x, y)) % 360
|
|
|
|
|
|
def generate_1km_segments(encoded_polyline: str, total_dist_m: float) -> list[tuple[str, float, float]]:
|
|
"""Generate 1-km splits along a route. Returns list of (name, start_m, end_m)."""
|
|
if not encoded_polyline:
|
|
return []
|
|
km_count = int(total_dist_m / 1000)
|
|
segments = []
|
|
for i in range(km_count):
|
|
segments.append((f"km {i + 1}", float(i * 1000), float((i + 1) * 1000)))
|
|
remainder = total_dist_m - km_count * 1000
|
|
if remainder >= 200:
|
|
segments.append((f"km {km_count + 1}", float(km_count * 1000), total_dist_m))
|
|
return segments
|
|
|
|
|
|
def generate_turn_segments(
|
|
encoded_polyline: str,
|
|
turn_angle_deg: float = 45.0,
|
|
) -> list[tuple[str, float, float]]:
|
|
"""Detect sharp turns in a route polyline. Returns list of (name, start_m, end_m)."""
|
|
coords = decode_polyline_to_coords(encoded_polyline)
|
|
if len(coords) < 3:
|
|
return []
|
|
|
|
cum_dists = [0.0]
|
|
for i in range(1, len(coords)):
|
|
cum_dists.append(cum_dists[-1] + haversine_m(coords[i - 1], coords[i]))
|
|
total = cum_dists[-1]
|
|
|
|
HALF_WINDOW = 100.0 # metres either side of candidate turn point
|
|
|
|
turn_centers: list[float] = []
|
|
for i in range(1, len(coords) - 1):
|
|
# Find index ~HALF_WINDOW before and after
|
|
start_i = i
|
|
while start_i > 0 and cum_dists[i] - cum_dists[start_i] < HALF_WINDOW:
|
|
start_i -= 1
|
|
end_i = i
|
|
while end_i < len(coords) - 1 and cum_dists[end_i] - cum_dists[i] < HALF_WINDOW:
|
|
end_i += 1
|
|
if start_i == i or end_i == i:
|
|
continue
|
|
|
|
b1 = _bearing(coords[start_i], coords[i])
|
|
b2 = _bearing(coords[i], coords[end_i])
|
|
diff = abs(b2 - b1) % 360
|
|
if diff > 180:
|
|
diff = 360 - diff
|
|
if diff >= turn_angle_deg:
|
|
turn_centers.append(cum_dists[i])
|
|
|
|
if not turn_centers:
|
|
return []
|
|
|
|
# Cluster turns within 150 m of each other → one segment per cluster
|
|
clusters: list[list[float]] = [[turn_centers[0]]]
|
|
for d in turn_centers[1:]:
|
|
if d - clusters[-1][-1] < 150:
|
|
clusters[-1].append(d)
|
|
else:
|
|
clusters.append([d])
|
|
|
|
segments = []
|
|
for cluster in clusters:
|
|
center = sum(cluster) / len(cluster)
|
|
start = max(0.0, center - HALF_WINDOW)
|
|
end = min(total, center + HALF_WINDOW)
|
|
segments.append((f"Turn at {center / 1000:.1f} km", start, end))
|
|
return segments
|
|
|
|
|
|
def generate_hill_segments(
|
|
data_points: list[dict],
|
|
gradient_pct: float = 5.0,
|
|
) -> list[tuple[str, float, float]]:
|
|
"""
|
|
Detect uphill sections using activity data points (with altitude_m + distance_m).
|
|
Returns list of (name, start_m, end_m).
|
|
"""
|
|
pts = [
|
|
(p["distance_m"], p["altitude_m"])
|
|
for p in data_points
|
|
if p.get("distance_m") is not None and p.get("altitude_m") is not None
|
|
]
|
|
if len(pts) < 10:
|
|
return []
|
|
pts.sort(key=lambda x: x[0])
|
|
dists = [p[0] for p in pts]
|
|
alts = [p[1] for p in pts]
|
|
|
|
# Smooth altitude with a sliding window to reduce GPS noise
|
|
SMOOTH = 10
|
|
smooth_alts = []
|
|
for i in range(len(alts)):
|
|
lo, hi = max(0, i - SMOOTH), min(len(alts), i + SMOOTH + 1)
|
|
smooth_alts.append(sum(alts[lo:hi]) / (hi - lo))
|
|
|
|
grad_threshold = gradient_pct / 100.0
|
|
MIN_HILL_M = 200.0
|
|
|
|
in_hill = False
|
|
hill_start_idx = 0
|
|
segments = []
|
|
|
|
for i in range(1, len(dists)):
|
|
d_dist = dists[i] - dists[i - 1]
|
|
if d_dist <= 0:
|
|
continue
|
|
grad = (smooth_alts[i] - smooth_alts[i - 1]) / d_dist
|
|
|
|
if grad >= grad_threshold and not in_hill:
|
|
in_hill = True
|
|
hill_start_idx = i - 1
|
|
elif grad < grad_threshold and in_hill:
|
|
length = dists[i - 1] - dists[hill_start_idx]
|
|
if length >= MIN_HILL_M:
|
|
gain = round(smooth_alts[i - 1] - smooth_alts[hill_start_idx])
|
|
start_km = dists[hill_start_idx] / 1000
|
|
segments.append((
|
|
f"Hill at {start_km:.1f} km (+{gain} m)",
|
|
dists[hill_start_idx],
|
|
dists[i - 1],
|
|
))
|
|
in_hill = False
|
|
|
|
if in_hill:
|
|
length = dists[-1] - dists[hill_start_idx]
|
|
if length >= MIN_HILL_M:
|
|
gain = round(smooth_alts[-1] - smooth_alts[hill_start_idx])
|
|
start_km = dists[hill_start_idx] / 1000
|
|
segments.append((
|
|
f"Hill at {start_km:.1f} km (+{gain} m)",
|
|
dists[hill_start_idx],
|
|
dists[-1],
|
|
))
|
|
|
|
return segments
|
|
|
|
|
|
STANDARD_DISTANCES = [
|
|
(400, "400m"),
|
|
(800, "800m"),
|
|
(1000, "1k"),
|
|
(1609.34, "1 mile"),
|
|
(3000, "3k"),
|
|
(5000, "5k"),
|
|
(10000, "10k"),
|
|
(21097.5, "Half marathon"),
|
|
(42195, "Marathon"),
|
|
(50000, "50k"),
|
|
(100000, "100k"),
|
|
]
|
|
|
|
|
|
def compute_best_splits(data_points: list[dict], total_distance_m: float) -> dict[str, float]:
|
|
"""Compute best split times for all standard distances that fit within the activity."""
|
|
results = {}
|
|
for dist_m, label in STANDARD_DISTANCES:
|
|
if total_distance_m >= dist_m * 0.95: # allow 5% tolerance
|
|
best = find_best_split_time(data_points, dist_m)
|
|
if best:
|
|
results[label] = best
|
|
return results
|