Files
MileVault/backend/app/workers/tasks.py
T
owain ec87f68729
Build and push images / validate (push) Successful in 9s
Build and push images / build-backend (push) Successful in 1m57s
Build and push images / build-worker (push) Successful in 50s
Build and push images / build-frontend (push) Successful in 24s
Add trend-range gating, vehicle filter, sync cancel, moving time, and UI fixes
- Grey out trend ranges beyond available health history
- Reject implausibly fast (vehicle) activities on upload with feedback
- Add cancel button + cooperative cancellation for Garmin sync
- Show daily steps prominently on the dashboard
- Clear errors for malformed/empty upload ZIPs
- Snap-target dot when drawing a segment on the map
- Time-axis fallback for stationary/HIIT HR timelines; hide map when no GPS
- Parse and display moving time (timer) vs elapsed; backfill task
- Restyle SegmentsPanel like RouteLeaderboard; Laps/Routes/Segments on one row

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 19:41:56 +01:00

868 lines
35 KiB
Python

"""
Background tasks: activity ingestion, route matching, PR calculation.
Uses synchronous SQLAlchemy because Celery's prefork model doesn't play
well with asyncio - each worker process needs its own connection pool,
and async pools don't survive process forks.
"""
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"milevault",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_track_started=True,
worker_prefetch_multiplier=1,
beat_schedule={
"sync-garmin-connect": {
"task": "sync_all_garmin_connect",
# Interval is configurable via GARMIN_SYNC_INTERVAL_MINUTES (default 30 min)
"schedule": float(settings.garmin_sync_interval_minutes * 60),
},
},
)
WELLNESS_SUFFIXES = (
"_METRICS.fit",
"_WELLNESS.fit",
"_SLEEP.fit",
"_SLEEP_DATA.fit",
"_STRESS.fit",
"_SPO2.fit",
"_HRV.fit",
"_HRV_STATUS.fit",
"_MONITORING.fit",
"_MONITORING_B.fit",
"_RESPIRATION.fit",
"_PULSE_OX.fit",
)
def is_wellness_file(file_path: str) -> bool:
name = file_path.upper()
return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES)
@celery_app.task(bind=True, name="process_activity_file")
def process_activity_file(self, file_path: str, user_id: int, source_type: str,
garmin_activity_id: str = None):
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
if is_wellness_file(file_path):
parse_wellness_fit.delay(file_path, user_id)
return {"status": "routed_to_wellness", "file": file_path}
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
from app.core.database import SyncSessionLocal
from app.models.user import Activity, ActivityDataPoint, ActivityLap
from sqlalchemy import select, func
from datetime import datetime
self.update_state(state="PROGRESS", meta={"step": "parsing"})
try:
if source_type == "fit" or file_path.endswith(".fit"):
parsed = parse_fit_file(file_path)
else:
parsed = parse_gpx_file(file_path)
except Exception as e:
raise self.retry(exc=e, countdown=10, max_retries=3)
if not parsed.get("start_time"):
return {"status": "skipped", "reason": "no start_time", "file": file_path}
# Reject activities whose average speed is implausible for the sport (e.g. a
# car journey accidentally recorded). Surfaced to the upload UI as the reason.
if parsed.get("rejected_reason"):
return {"status": "skipped", "reason": parsed["rejected_reason"], "file": file_path}
with SyncSessionLocal() as db:
start_time = datetime.fromisoformat(parsed["start_time"])
# Deduplicate: same user + sport_type + start_time within ±60s
from datetime import timedelta
existing = db.execute(
select(Activity).where(
Activity.user_id == user_id,
Activity.sport_type == parsed["sport_type"],
Activity.start_time >= start_time - timedelta(seconds=60),
Activity.start_time <= start_time + timedelta(seconds=60),
)
).scalars().first()
if existing:
# Stamp garmin_activity_id if this came from a Garmin Connect sync
# so future syncs skip the fast-path dedup and don't re-download.
if garmin_activity_id and not existing.garmin_activity_id:
existing.garmin_activity_id = garmin_activity_id
db.commit()
return {"activity_id": existing.id, "status": "duplicate"}
# Get user max HR for zone calculation
from app.models.user import User as UserModel
user_obj = db.execute(
select(UserModel).where(UserModel.id == user_id)
).scalar_one_or_none()
user_max_hr = None
if user_obj:
user_max_hr = user_obj.max_heart_rate
if not user_max_hr and user_obj.birth_year:
from datetime import date as _date
age = _date.today().year - user_obj.birth_year
user_max_hr = 220 - age
if not user_max_hr:
user_max_hr = parsed.get("max_heart_rate") or 190
hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr)
activity = Activity(
user_id=user_id,
name=parsed["name"],
sport_type=parsed["sport_type"],
garmin_activity_id=garmin_activity_id,
start_time=start_time,
distance_m=parsed.get("distance_m"),
duration_s=parsed.get("duration_s"),
moving_time_s=parsed.get("moving_time_s"),
elevation_gain_m=parsed.get("elevation_gain_m"),
elevation_loss_m=parsed.get("elevation_loss_m"),
avg_heart_rate=parsed.get("avg_heart_rate"),
max_heart_rate=parsed.get("max_heart_rate"),
avg_cadence=parsed.get("avg_cadence"),
avg_power=parsed.get("avg_power"),
normalized_power=parsed.get("normalized_power"),
avg_speed_ms=parsed.get("avg_speed_ms"),
max_speed_ms=parsed.get("max_speed_ms"),
avg_temperature_c=parsed.get("avg_temperature_c"),
calories=parsed.get("calories"),
training_stress_score=parsed.get("training_stress_score"),
polyline=parsed.get("polyline"),
bounding_box=parsed.get("bounding_box"),
source_file=file_path,
source_type=parsed.get("source_type"),
hr_zones=hr_zones,
)
db.add(activity)
db.flush()
seen = set()
batch = []
for p in parsed.get("data_points", []):
if not p.get("timestamp"):
continue
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
key = (activity.id, ts)
if key in seen:
continue
seen.add(key)
batch.append(ActivityDataPoint(
activity_id=activity.id,
timestamp=ts,
latitude=p.get("latitude"),
longitude=p.get("longitude"),
altitude_m=p.get("altitude_m"),
heart_rate=p.get("heart_rate"),
cadence=p.get("cadence"),
speed_ms=p.get("speed_ms"),
power=p.get("power"),
temperature_c=p.get("temperature_c"),
distance_m=p.get("distance_m"),
))
if len(batch) >= 500:
db.add_all(batch)
db.flush()
batch = []
if batch:
db.add_all(batch)
db.flush()
for lap in parsed.get("laps", []):
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
db.add(ActivityLap(
activity_id=activity.id,
lap_number=lap["lap_number"],
start_time=ls,
duration_s=lap.get("duration_s"),
distance_m=lap.get("distance_m"),
avg_heart_rate=lap.get("avg_heart_rate"),
avg_cadence=lap.get("avg_cadence"),
avg_speed_ms=lap.get("avg_speed_ms"),
avg_power=lap.get("avg_power"),
))
db.commit()
activity_id = activity.id
compute_personal_records.delay(activity_id, user_id, parsed)
if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"):
detect_route.delay(activity_id, user_id)
match_activity_segments.delay(activity_id, user_id)
return {"activity_id": activity_id, "status": "ok"}
@celery_app.task(name="parse_wellness_fit")
def parse_wellness_fit(file_path: str, user_id: int):
"""Parse a Garmin wellness FIT file and upsert into health_metrics."""
from app.services.wellness_parser import parse_wellness_fit as _parse
from app.core.database import SyncSessionLocal
from datetime import datetime, timezone
from sqlalchemy import text
try:
result = _parse(file_path)
except Exception as e:
return {"status": "error", "error": str(e), "file": file_path}
if result.get("error"):
return {"status": "error", "error": result["error"], "file": file_path}
days = result.get("days", {})
if not days:
return {"status": "no_data", "file": file_path}
with SyncSessionLocal() as db:
for day_date, data in days.items():
date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc)
db.execute(text("""
INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, max_hr_day,
avg_stress, spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status,
steps, floors_climbed, active_calories, total_calories,
sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s,
sleep_score, sleep_start, sleep_end, sleep_stages)
VALUES (:user_id, :date, :resting_hr, :avg_hr, :max_hr,
:avg_stress, :spo2_avg, :hrv_avg, :hrv_high, :hrv_status,
:steps, :floors, :active_cal, :total_cal,
:sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake,
:sleep_score, :sleep_start, :sleep_end, :sleep_stages::json)
ON CONFLICT (user_id, date) DO UPDATE SET
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day),
max_hr_day = COALESCE(EXCLUDED.max_hr_day, health_metrics.max_hr_day),
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg),
hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg),
hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high),
hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status),
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
total_calories = GREATEST(EXCLUDED.total_calories, health_metrics.total_calories),
sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s),
sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s),
sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s),
sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s),
sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s),
sleep_score = COALESCE(EXCLUDED.sleep_score, health_metrics.sleep_score),
sleep_start = COALESCE(EXCLUDED.sleep_start, health_metrics.sleep_start),
sleep_end = COALESCE(EXCLUDED.sleep_end, health_metrics.sleep_end),
sleep_stages = COALESCE(EXCLUDED.sleep_stages, health_metrics.sleep_stages)
"""), {
"user_id": user_id, "date": date_dt,
"resting_hr": data.get("resting_hr"),
"avg_hr": data.get("avg_hr_day"),
"max_hr": data.get("max_hr_day"),
"avg_stress": data.get("avg_stress"),
"spo2_avg": data.get("spo2_avg"),
"hrv_avg": data.get("hrv_nightly_avg"),
"hrv_high": data.get("hrv_5min_high"),
"hrv_status": data.get("hrv_status"),
"steps": data.get("steps"),
"floors": data.get("floors_climbed"),
"active_cal": data.get("active_calories"),
"total_cal": data.get("total_calories"),
"sleep_dur": data.get("sleep_duration_s"),
"sleep_deep": data.get("sleep_deep_s"),
"sleep_light": data.get("sleep_light_s"),
"sleep_rem": data.get("sleep_rem_s"),
"sleep_awake": data.get("sleep_awake_s"),
"sleep_score": data.get("sleep_score"),
"sleep_start": data.get("sleep_start"),
"sleep_end": data.get("sleep_end"),
"sleep_stages": __import__('json').dumps(data.get("sleep_stages")) if data.get("sleep_stages") else None,
})
db.commit()
return {"status": "ok", "days_processed": len(days), "file": file_path}
@celery_app.task(name="detect_route")
def detect_route(activity_id: int, user_id: int):
"""Auto-detect and link activities to named routes."""
from app.services.route_matcher import routes_are_similar
from app.core.database import SyncSessionLocal
from app.models.user import Activity, NamedRoute
from sqlalchemy import select
with SyncSessionLocal() as db:
new_act = db.execute(
select(Activity).where(Activity.id == activity_id)
).scalar_one_or_none()
if not new_act or not new_act.polyline:
return {"status": "no_polyline"}
if new_act.named_route_id:
return {"status": "already_assigned"}
routes = db.execute(
select(NamedRoute).where(
NamedRoute.user_id == user_id,
NamedRoute.sport_type == new_act.sport_type,
)
).scalars().all()
for route in routes:
if route.reference_polyline and routes_are_similar(
new_act.polyline, route.reference_polyline,
new_act.bounding_box, route.bounding_box,
dist1=new_act.distance_m, dist2=route.distance_m,
):
new_act.named_route_id = route.id
db.commit()
return {"status": "matched_existing", "route_id": route.id}
candidates = db.execute(
select(Activity).where(
Activity.user_id == user_id,
Activity.sport_type == new_act.sport_type,
Activity.named_route_id == None,
Activity.id != activity_id,
Activity.polyline != None,
Activity.distance_m >= (new_act.distance_m or 0) * 0.95,
Activity.distance_m <= (new_act.distance_m or 0) * 1.05,
)
).scalars().all()
for candidate in candidates:
if routes_are_similar(
new_act.polyline, candidate.polyline,
new_act.bounding_box, candidate.bounding_box,
dist1=new_act.distance_m, dist2=candidate.distance_m,
):
older = candidate if candidate.start_time < new_act.start_time else new_act
newer = new_act if candidate.start_time < new_act.start_time else candidate
route_name = f"{older.sport_type.title()} route {older.start_time.strftime('%d %b %Y')}"
new_route = NamedRoute(
user_id=user_id,
name=route_name,
sport_type=older.sport_type,
reference_polyline=older.polyline,
bounding_box=older.bounding_box,
distance_m=older.distance_m,
auto_detected=True,
)
db.add(new_route)
db.flush()
older.named_route_id = new_route.id
newer.named_route_id = new_route.id
db.commit()
return {"status": "auto_created", "route_id": new_route.id}
return {"status": "no_match"}
@celery_app.task(name="compute_personal_records")
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
"""Calculate personal records for standard distances from this activity."""
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
from app.core.database import SyncSessionLocal
from app.models.user import PersonalRecord
from sqlalchemy import select
from datetime import datetime, timezone
data_points = parsed.get("data_points", [])
total_dist = parsed.get("distance_m", 0) or 0
sport = parsed.get("sport_type", "running")
start_time_str = parsed.get("start_time")
start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc)
# GPS can over/under-measure relative to the activity's official distance
# (e.g. a 5 km run whose GPS track sums to 5.8 km), which would otherwise
# produce a bogus "best 5 km" split. Scale the distance stream so its max
# matches the recorded total before computing splits.
if total_dist > 0 and data_points:
gps_max = max((p.get("distance_m") or 0) for p in data_points)
if gps_max > 0 and abs(gps_max - total_dist) / total_dist > 0.02:
factor = total_dist / gps_max
data_points = [
{**p, "distance_m": p["distance_m"] * factor} if p.get("distance_m") is not None else p
for p in data_points
]
best_splits = compute_best_splits(data_points, total_dist)
with SyncSessionLocal() as db:
for label, duration_s in best_splits.items():
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
if dist_m is None:
continue
current = db.execute(
select(PersonalRecord).where(
PersonalRecord.user_id == user_id,
PersonalRecord.sport_type == sport,
PersonalRecord.distance_m == dist_m,
PersonalRecord.is_current_record == True,
)
).scalar_one_or_none()
if current is None or duration_s < current.duration_s:
if current:
current.is_current_record = False
db.add(PersonalRecord(
user_id=user_id,
activity_id=activity_id,
sport_type=sport,
distance_m=dist_m,
distance_label=label,
duration_s=duration_s,
achieved_at=start_time,
is_current_record=True,
))
db.commit()
def _recompute_segment_ranks(db, segment_id: int):
"""Assign rank 1/2/3 to the three fastest efforts on a segment, null to the rest."""
from app.models.user import SegmentEffort
from sqlalchemy import select
efforts = db.execute(
select(SegmentEffort)
.where(SegmentEffort.segment_id == segment_id)
.order_by(SegmentEffort.duration_s)
).scalars().all()
for i, e in enumerate(efforts):
e.rank = (i + 1) if i < 3 else None
def _activity_track(db, activity_id):
"""Return (coords, times) for an activity's GPS track in time order."""
from app.models.user import ActivityDataPoint
from sqlalchemy import select
dps = db.execute(
select(ActivityDataPoint)
.where(ActivityDataPoint.activity_id == activity_id)
.order_by(ActivityDataPoint.timestamp)
).scalars().all()
coords, times = [], []
for p in dps:
if p.latitude is not None and p.longitude is not None and p.timestamp is not None:
coords.append((p.latitude, p.longitude))
times.append(p.timestamp)
return coords, times
def _upsert_effort(db, segment_id, activity, duration_s):
from app.models.user import SegmentEffort
from sqlalchemy import select
existing = db.execute(
select(SegmentEffort).where(
SegmentEffort.segment_id == segment_id,
SegmentEffort.activity_id == activity.id,
)
).scalar_one_or_none()
if existing:
existing.duration_s = duration_s
existing.achieved_at = activity.start_time
else:
db.add(SegmentEffort(
segment_id=segment_id,
activity_id=activity.id,
duration_s=duration_s,
achieved_at=activity.start_time,
))
@celery_app.task(name="match_segment")
def match_segment(segment_id: int):
"""Match one segment against every eligible activity and (re)build its leaderboard."""
from app.services.route_matcher import (
match_segment_in_activity, bounding_boxes_overlap, decode_polyline_to_coords,
)
from app.core.database import SyncSessionLocal
from app.models.user import Segment, Activity
from sqlalchemy import select
with SyncSessionLocal() as db:
seg = db.execute(select(Segment).where(Segment.id == segment_id)).scalar_one_or_none()
if not seg or not seg.polyline:
return {"status": "no_segment"}
seg_coords = decode_polyline_to_coords(seg.polyline)
acts = db.execute(
select(Activity).where(
Activity.user_id == seg.user_id,
Activity.sport_type == seg.sport_type,
Activity.polyline != None,
)
).scalars().all()
matched = 0
for act in acts:
if seg.bounding_box and act.bounding_box and not bounding_boxes_overlap(seg.bounding_box, act.bounding_box):
continue
coords, times = _activity_track(db, act.id)
if len(coords) < 2:
continue
dur = match_segment_in_activity(seg_coords, coords, times)
if dur is None:
continue
_upsert_effort(db, seg.id, act, dur)
matched += 1
db.commit()
_recompute_segment_ranks(db, seg.id)
db.commit()
return {"status": "ok", "matched": matched}
@celery_app.task(name="match_activity_segments")
def match_activity_segments(activity_id: int, user_id: int):
"""Match a newly-ingested activity against all of the user's existing segments."""
from app.services.route_matcher import (
match_segment_in_activity, bounding_boxes_overlap, decode_polyline_to_coords,
)
from app.core.database import SyncSessionLocal
from app.models.user import Segment, Activity
from sqlalchemy import select
with SyncSessionLocal() as db:
act = db.execute(select(Activity).where(Activity.id == activity_id)).scalar_one_or_none()
if not act or not act.polyline:
return {"status": "no_polyline"}
coords, times = _activity_track(db, act.id)
if len(coords) < 2:
return {"status": "no_track"}
segs = db.execute(
select(Segment).where(
Segment.user_id == user_id,
Segment.sport_type == act.sport_type,
)
).scalars().all()
touched = []
for seg in segs:
if not seg.polyline:
continue
if seg.bounding_box and act.bounding_box and not bounding_boxes_overlap(seg.bounding_box, act.bounding_box):
continue
dur = match_segment_in_activity(decode_polyline_to_coords(seg.polyline), coords, times)
if dur is None:
continue
_upsert_effort(db, seg.id, act, dur)
touched.append(seg.id)
db.commit()
for sid in touched:
_recompute_segment_ranks(db, sid)
db.commit()
return {"status": "ok", "matched_segments": len(touched)}
@celery_app.task(name="process_garmin_health_zip")
def process_garmin_health_zip(zip_path: str, user_id: int):
"""Extract wellness data from a Garmin Connect export ZIP."""
import zipfile
import json
from app.core.database import SyncSessionLocal
from datetime import datetime, timezone
from sqlalchemy import text
INSERT_SQL = text("""
INSERT INTO health_metrics (user_id, date, resting_hr, max_hr_day, steps,
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
VALUES (:user_id, :date, :resting_hr, :max_hr_day, :steps,
:floors, :active_cal, :total_cal, :stress, :spo2)
ON CONFLICT (user_id, date) DO UPDATE SET
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
max_hr_day = COALESCE(EXCLUDED.max_hr_day, health_metrics.max_hr_day),
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
""")
def _extract_stress(item):
stress_data = item.get("allDayStress")
if not stress_data or not isinstance(stress_data, dict):
return item.get("averageStressLevel")
for agg in stress_data.get("aggregatorList", []):
if agg.get("type") == "TOTAL":
return agg.get("averageStressLevel")
return None
def _floors_from_item(item):
# UDS format reports meters; 1 floor = 3.048 m
meters = item.get("floorsAscendedInMeters")
if meters is not None:
return round(meters / 3.048)
return item.get("floorsAscended")
def _process_record(db, item):
date_str = item.get("calendarDate") or item.get("date")
if not date_str:
return
try:
date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
except ValueError:
return
db.execute(INSERT_SQL, {
"user_id": user_id, "date": date_dt,
"resting_hr": item.get("restingHeartRate"),
"max_hr_day": item.get("maxHeartRate"),
"steps": item.get("totalSteps"),
"floors": _floors_from_item(item),
"active_cal": item.get("activeKilocalories"),
"total_cal": item.get("totalKilocalories"),
"stress": _extract_stress(item),
"spo2": item.get("avgSpo2"),
})
with SyncSessionLocal() as db:
with zipfile.ZipFile(zip_path) as zf:
for name in zf.namelist():
if not name.endswith(".json"):
continue
# Garmin Connect export stores daily summaries in UDSFile_*.json
# (DI-Connect-Aggregator). Older/alternative exports may use DailyMetrics.
is_uds = "UDSFile" in name
is_legacy = "DailyMetrics" in name
if not (is_uds or is_legacy):
continue
with zf.open(name) as f:
try:
data = json.load(f)
except Exception:
continue
# UDS files are lists of daily records; legacy format is a single object
records = data if isinstance(data, list) else [data]
for item in records:
if isinstance(item, dict):
_process_record(db, item)
db.commit()
class SyncCancelled(Exception):
"""Raised inside a Garmin sync when the user has requested cancellation."""
@celery_app.task(name="sync_garmin_connect_user")
def sync_garmin_connect_user(user_id: int):
"""Sync Garmin Connect data (activities + wellness) for one user."""
from app.services.garmin_connect_sync import authenticate_garmin, sync_activities, sync_wellness
from app.core.database import SyncSessionLocal
from app.models.user import GarminConnectConfig
from app.core.config import settings
from sqlalchemy import select
from datetime import datetime, timezone
# Cooperative-cancellation flag (set by POST /garmin-sync/cancel).
cancel_key = f"garmin_sync_cancel:{user_id}"
try:
import redis as redis_lib
_redis = redis_lib.Redis.from_url(settings.redis_url)
except Exception:
_redis = None
def _cancelled():
try:
return bool(_redis and _redis.exists(cancel_key))
except Exception:
return False
with SyncSessionLocal() as db:
cfg = db.execute(
select(GarminConnectConfig).where(GarminConnectConfig.user_id == user_id)
).scalar_one_or_none()
if not cfg or not cfg.sync_enabled:
return {"status": "skipped"}
# Snapshot config values before any intermediate commits (commits expire ORM attrs)
email = cfg.email
password_enc = cfg.password_enc
token_store = cfg.token_store
last_sync_at = cfg.last_sync_at
sync_acts = cfg.sync_activities
sync_well = cfg.sync_wellness
lookback = cfg.sync_lookback_days if cfg.sync_lookback_days is not None else 30
cfg.last_sync_status = "Connecting to Garmin..."
db.commit()
try:
garmin, new_token = authenticate_garmin(email, password_enc, token_store)
except Exception as exc:
cfg.last_sync_at = datetime.now(timezone.utc)
cfg.last_sync_status = f"Auth error: {exc}"
db.commit()
return {"status": "auth_error", "error": str(exc)}
if new_token:
cfg.token_store = new_token
db.commit()
activities_queued = 0
wellness_days = 0
errors = []
def _set_status(text):
# Checked between items: abort the sync if cancellation was requested.
if _cancelled():
raise SyncCancelled()
cfg.last_sync_status = text
db.commit()
try:
if sync_acts:
_set_status("Syncing activities...")
try:
activities_queued = sync_activities(
garmin, user_id, last_sync_at, db, settings.file_store_path,
lookback_days=lookback,
status_callback=_set_status,
)
except SyncCancelled:
raise
except Exception as exc:
errors.append(f"activities: {exc}")
if sync_well:
_set_status("Syncing wellness...")
try:
wellness_days = sync_wellness(
garmin, user_id, last_sync_at, db,
lookback_days=lookback,
status_callback=_set_status,
)
except SyncCancelled:
raise
except Exception as exc:
errors.append(f"wellness: {exc}")
db.rollback() # recover session so the final status commit can succeed
except SyncCancelled:
db.rollback()
cfg.last_sync_at = datetime.now(timezone.utc)
cfg.last_sync_status = "Cancelled"
db.commit()
try:
if _redis:
_redis.delete(cancel_key)
except Exception:
pass
return {"status": "cancelled",
"activities_queued": activities_queued, "wellness_days": wellness_days}
cfg.last_sync_at = datetime.now(timezone.utc)
cfg.last_sync_status = (
f"OK — {activities_queued} activities queued, {wellness_days} wellness days synced"
if not errors else
f"Partial — {'; '.join(errors)}"
)
db.commit()
return {"status": "ok", "activities_queued": activities_queued, "wellness_days": wellness_days}
@celery_app.task(name="sync_all_garmin_connect")
def sync_all_garmin_connect():
"""Hourly beat task: dispatch per-user sync for all enabled configs."""
from app.core.database import SyncSessionLocal
from app.models.user import GarminConnectConfig
from sqlalchemy import select
with SyncSessionLocal() as db:
configs = db.execute(
select(GarminConnectConfig).where(GarminConnectConfig.sync_enabled == True)
).scalars().all()
user_ids = [c.user_id for c in configs]
for uid in user_ids:
sync_garmin_connect_user.delay(uid)
return {"dispatched": len(user_ids)}
@celery_app.task(name="recalculate_hr_zones_for_user")
def recalculate_hr_zones_for_user(user_id: int, new_max_hr: float):
"""Recalculate hr_zones for all of a user's activities using a new max HR."""
from app.services.fit_parser import calculate_hr_zones
from app.core.database import SyncSessionLocal
from app.models.user import Activity, ActivityDataPoint
from sqlalchemy import select
with SyncSessionLocal() as db:
activities = db.execute(
select(Activity).where(Activity.user_id == user_id)
).scalars().all()
for activity in activities:
data_points = db.execute(
select(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity.id)
).scalars().all()
points_dicts = [{"heart_rate": dp.heart_rate} for dp in data_points]
new_zones = calculate_hr_zones(points_dicts, new_max_hr)
if new_zones:
activity.hr_zones = new_zones
db.commit()
@celery_app.task(name="backfill_moving_time")
def backfill_moving_time(user_id: int = None):
"""Populate moving_time_s for existing FIT-sourced activities by re-reading the
timer time from their stored source files. Idempotent — skips activities that
already have a value or whose source file is missing/unreadable."""
import os
from app.services.fit_parser import parse_fit_file
from app.core.database import SyncSessionLocal
from app.models.user import Activity
from sqlalchemy import select
updated, skipped = 0, 0
with SyncSessionLocal() as db:
q = select(Activity).where(
Activity.moving_time_s.is_(None),
Activity.source_type == "fit",
Activity.source_file.isnot(None),
)
if user_id is not None:
q = q.where(Activity.user_id == user_id)
activities = db.execute(q).scalars().all()
for activity in activities:
path = activity.source_file
if not path or not os.path.exists(path):
skipped += 1
continue
try:
parsed = parse_fit_file(path)
except Exception:
skipped += 1
continue
mt = parsed.get("moving_time_s")
if mt:
activity.moving_time_s = mt
updated += 1
else:
skipped += 1
db.commit()
return {"status": "ok", "updated": updated, "skipped": skipped}