c3637fa3fa
The Garmin FIT SDK returns snake_case field names but the parser was
looking for camelCase. Sleep epoch durations were wrong (fixed 30s each
instead of computing from timestamp gaps). HRV is in message 370 not 275
(275 now carries sleep levels in modern firmware). Multiple fixes:
- msg 55: use 'steps', 'heart_rate', 'active_calories' (not numeric keys)
- msg 211: use 'resting_heart_rate' (not msg.get(0))
- msg 227: use 'stress_level_time'/'stress_level_value' for named fields
- msg 132: use snake_case 'stress_level_time'/'stress_level_value'
- msg 275: detect sleep_level field → handle as sleep epoch (modern),
fall back to HRV handling for older firmware
- msg 370: new handler for modern hrv_status_summary (last_night_average,
last_night_5_min_high, status)
- msg 346: new handler for sleep_assessment → overall_sleep_score
- msg 21: new handler for sleep session start/stop events to close the
last sleep epoch and record sleep_start/sleep_end timestamps
- Sleep duration: computed from epoch timestamp gaps instead of 30s/epoch
- Celery task SQL: add sleep_score, sleep_start, sleep_end to INSERT/UPDATE;
use GREATEST for total_calories so most-complete value wins across files
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
476 lines
20 KiB
Python
476 lines
20 KiB
Python
"""
|
|
Background tasks: activity ingestion, route matching, PR calculation.
|
|
|
|
Uses synchronous SQLAlchemy because Celery's prefork model doesn't play
|
|
well with asyncio - each worker process needs its own connection pool,
|
|
and async pools don't survive process forks.
|
|
"""
|
|
from celery import Celery
|
|
from app.core.config import settings
|
|
|
|
celery_app = Celery(
|
|
"milevault",
|
|
broker=settings.redis_url,
|
|
backend=settings.redis_url,
|
|
)
|
|
|
|
celery_app.conf.update(
|
|
task_serializer="json",
|
|
result_serializer="json",
|
|
accept_content=["json"],
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
task_track_started=True,
|
|
worker_prefetch_multiplier=1,
|
|
)
|
|
|
|
WELLNESS_SUFFIXES = (
|
|
"_METRICS.fit",
|
|
"_WELLNESS.fit",
|
|
"_SLEEP.fit",
|
|
"_SLEEP_DATA.fit",
|
|
"_STRESS.fit",
|
|
"_SPO2.fit",
|
|
"_HRV.fit",
|
|
"_HRV_STATUS.fit",
|
|
"_MONITORING.fit",
|
|
"_MONITORING_B.fit",
|
|
"_RESPIRATION.fit",
|
|
"_PULSE_OX.fit",
|
|
)
|
|
|
|
|
|
def is_wellness_file(file_path: str) -> bool:
|
|
name = file_path.upper()
|
|
return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES)
|
|
|
|
|
|
@celery_app.task(bind=True, name="process_activity_file")
|
|
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
|
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
|
|
|
|
if is_wellness_file(file_path):
|
|
parse_wellness_fit.delay(file_path, user_id)
|
|
return {"status": "routed_to_wellness", "file": file_path}
|
|
|
|
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import Activity, ActivityDataPoint, ActivityLap
|
|
from sqlalchemy import select, func
|
|
from datetime import datetime
|
|
|
|
self.update_state(state="PROGRESS", meta={"step": "parsing"})
|
|
|
|
try:
|
|
if source_type == "fit" or file_path.endswith(".fit"):
|
|
parsed = parse_fit_file(file_path)
|
|
else:
|
|
parsed = parse_gpx_file(file_path)
|
|
except Exception as e:
|
|
raise self.retry(exc=e, countdown=10, max_retries=3)
|
|
|
|
if not parsed.get("start_time"):
|
|
return {"status": "skipped", "reason": "no start_time", "file": file_path}
|
|
|
|
with SyncSessionLocal() as db:
|
|
start_time = datetime.fromisoformat(parsed["start_time"])
|
|
|
|
# Deduplicate by (user_id, sport_type, start_time date + within 60s)
|
|
existing = db.execute(
|
|
select(Activity).where(
|
|
Activity.user_id == user_id,
|
|
Activity.sport_type == parsed["sport_type"],
|
|
func.date(Activity.start_time) == start_time.date(),
|
|
Activity.start_time >= start_time.replace(second=0, microsecond=0),
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if existing:
|
|
return {"activity_id": existing.id, "status": "duplicate"}
|
|
|
|
# Get user max HR for zone calculation
|
|
from app.models.user import User as UserModel
|
|
user_obj = db.execute(
|
|
select(UserModel).where(UserModel.id == user_id)
|
|
).scalar_one_or_none()
|
|
|
|
user_max_hr = None
|
|
if user_obj:
|
|
user_max_hr = user_obj.max_heart_rate
|
|
if not user_max_hr and user_obj.birth_year:
|
|
from datetime import date as _date
|
|
age = _date.today().year - user_obj.birth_year
|
|
user_max_hr = 220 - age
|
|
if not user_max_hr:
|
|
user_max_hr = parsed.get("max_heart_rate") or 190
|
|
|
|
hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr)
|
|
|
|
activity = Activity(
|
|
user_id=user_id,
|
|
name=parsed["name"],
|
|
sport_type=parsed["sport_type"],
|
|
start_time=start_time,
|
|
distance_m=parsed.get("distance_m"),
|
|
duration_s=parsed.get("duration_s"),
|
|
elevation_gain_m=parsed.get("elevation_gain_m"),
|
|
elevation_loss_m=parsed.get("elevation_loss_m"),
|
|
avg_heart_rate=parsed.get("avg_heart_rate"),
|
|
max_heart_rate=parsed.get("max_heart_rate"),
|
|
avg_cadence=parsed.get("avg_cadence"),
|
|
avg_power=parsed.get("avg_power"),
|
|
normalized_power=parsed.get("normalized_power"),
|
|
avg_speed_ms=parsed.get("avg_speed_ms"),
|
|
max_speed_ms=parsed.get("max_speed_ms"),
|
|
avg_temperature_c=parsed.get("avg_temperature_c"),
|
|
calories=parsed.get("calories"),
|
|
training_stress_score=parsed.get("training_stress_score"),
|
|
polyline=parsed.get("polyline"),
|
|
bounding_box=parsed.get("bounding_box"),
|
|
source_file=file_path,
|
|
source_type=parsed.get("source_type"),
|
|
hr_zones=hr_zones,
|
|
)
|
|
db.add(activity)
|
|
db.flush()
|
|
|
|
seen = set()
|
|
batch = []
|
|
for p in parsed.get("data_points", []):
|
|
if not p.get("timestamp"):
|
|
continue
|
|
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
|
|
key = (activity.id, ts)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
batch.append(ActivityDataPoint(
|
|
activity_id=activity.id,
|
|
timestamp=ts,
|
|
latitude=p.get("latitude"),
|
|
longitude=p.get("longitude"),
|
|
altitude_m=p.get("altitude_m"),
|
|
heart_rate=p.get("heart_rate"),
|
|
cadence=p.get("cadence"),
|
|
speed_ms=p.get("speed_ms"),
|
|
power=p.get("power"),
|
|
temperature_c=p.get("temperature_c"),
|
|
distance_m=p.get("distance_m"),
|
|
))
|
|
if len(batch) >= 500:
|
|
db.add_all(batch)
|
|
db.flush()
|
|
batch = []
|
|
if batch:
|
|
db.add_all(batch)
|
|
db.flush()
|
|
|
|
for lap in parsed.get("laps", []):
|
|
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
|
|
db.add(ActivityLap(
|
|
activity_id=activity.id,
|
|
lap_number=lap["lap_number"],
|
|
start_time=ls,
|
|
duration_s=lap.get("duration_s"),
|
|
distance_m=lap.get("distance_m"),
|
|
avg_heart_rate=lap.get("avg_heart_rate"),
|
|
avg_cadence=lap.get("avg_cadence"),
|
|
avg_speed_ms=lap.get("avg_speed_ms"),
|
|
avg_power=lap.get("avg_power"),
|
|
))
|
|
|
|
db.commit()
|
|
activity_id = activity.id
|
|
|
|
compute_personal_records.delay(activity_id, user_id, parsed)
|
|
if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"):
|
|
detect_route.delay(activity_id, user_id)
|
|
return {"activity_id": activity_id, "status": "ok"}
|
|
|
|
|
|
@celery_app.task(name="parse_wellness_fit")
|
|
def parse_wellness_fit(file_path: str, user_id: int):
|
|
"""Parse a Garmin wellness FIT file and upsert into health_metrics."""
|
|
from app.services.wellness_parser import parse_wellness_fit as _parse
|
|
from app.core.database import SyncSessionLocal
|
|
from datetime import datetime, timezone
|
|
from sqlalchemy import text
|
|
|
|
try:
|
|
result = _parse(file_path)
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e), "file": file_path}
|
|
|
|
if result.get("error"):
|
|
return {"status": "error", "error": result["error"], "file": file_path}
|
|
|
|
days = result.get("days", {})
|
|
if not days:
|
|
return {"status": "no_data", "file": file_path}
|
|
|
|
with SyncSessionLocal() as db:
|
|
for day_date, data in days.items():
|
|
date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc)
|
|
db.execute(text("""
|
|
INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, max_hr_day,
|
|
avg_stress, spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status,
|
|
steps, floors_climbed, active_calories, total_calories,
|
|
sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s,
|
|
sleep_score, sleep_start, sleep_end)
|
|
VALUES (:user_id, :date, :resting_hr, :avg_hr, :max_hr,
|
|
:avg_stress, :spo2_avg, :hrv_avg, :hrv_high, :hrv_status,
|
|
:steps, :floors, :active_cal, :total_cal,
|
|
:sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake,
|
|
:sleep_score, :sleep_start, :sleep_end)
|
|
ON CONFLICT (user_id, date) DO UPDATE SET
|
|
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
|
avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day),
|
|
max_hr_day = COALESCE(EXCLUDED.max_hr_day, health_metrics.max_hr_day),
|
|
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
|
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg),
|
|
hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg),
|
|
hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high),
|
|
hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status),
|
|
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
|
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
|
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
|
total_calories = GREATEST(EXCLUDED.total_calories, health_metrics.total_calories),
|
|
sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s),
|
|
sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s),
|
|
sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s),
|
|
sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s),
|
|
sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s),
|
|
sleep_score = COALESCE(EXCLUDED.sleep_score, health_metrics.sleep_score),
|
|
sleep_start = COALESCE(EXCLUDED.sleep_start, health_metrics.sleep_start),
|
|
sleep_end = COALESCE(EXCLUDED.sleep_end, health_metrics.sleep_end)
|
|
"""), {
|
|
"user_id": user_id, "date": date_dt,
|
|
"resting_hr": data.get("resting_hr"),
|
|
"avg_hr": data.get("avg_hr_day"),
|
|
"max_hr": data.get("max_hr_day"),
|
|
"avg_stress": data.get("avg_stress"),
|
|
"spo2_avg": data.get("spo2_avg"),
|
|
"hrv_avg": data.get("hrv_nightly_avg"),
|
|
"hrv_high": data.get("hrv_5min_high"),
|
|
"hrv_status": data.get("hrv_status"),
|
|
"steps": data.get("steps"),
|
|
"floors": data.get("floors_climbed"),
|
|
"active_cal": data.get("active_calories"),
|
|
"total_cal": data.get("total_calories"),
|
|
"sleep_dur": data.get("sleep_duration_s"),
|
|
"sleep_deep": data.get("sleep_deep_s"),
|
|
"sleep_light": data.get("sleep_light_s"),
|
|
"sleep_rem": data.get("sleep_rem_s"),
|
|
"sleep_awake": data.get("sleep_awake_s"),
|
|
"sleep_score": data.get("sleep_score"),
|
|
"sleep_start": data.get("sleep_start"),
|
|
"sleep_end": data.get("sleep_end"),
|
|
})
|
|
db.commit()
|
|
|
|
return {"status": "ok", "days_processed": len(days), "file": file_path}
|
|
|
|
|
|
@celery_app.task(name="detect_route")
|
|
def detect_route(activity_id: int, user_id: int):
|
|
"""Auto-detect and link activities to named routes."""
|
|
from app.services.route_matcher import routes_are_similar
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import Activity, NamedRoute
|
|
from sqlalchemy import select
|
|
|
|
with SyncSessionLocal() as db:
|
|
new_act = db.execute(
|
|
select(Activity).where(Activity.id == activity_id)
|
|
).scalar_one_or_none()
|
|
|
|
if not new_act or not new_act.polyline:
|
|
return {"status": "no_polyline"}
|
|
|
|
if new_act.named_route_id:
|
|
return {"status": "already_assigned"}
|
|
|
|
routes = db.execute(
|
|
select(NamedRoute).where(
|
|
NamedRoute.user_id == user_id,
|
|
NamedRoute.sport_type == new_act.sport_type,
|
|
)
|
|
).scalars().all()
|
|
|
|
for route in routes:
|
|
if route.reference_polyline and routes_are_similar(
|
|
new_act.polyline, route.reference_polyline,
|
|
new_act.bounding_box, route.bounding_box,
|
|
):
|
|
new_act.named_route_id = route.id
|
|
db.commit()
|
|
return {"status": "matched_existing", "route_id": route.id}
|
|
|
|
candidates = db.execute(
|
|
select(Activity).where(
|
|
Activity.user_id == user_id,
|
|
Activity.sport_type == new_act.sport_type,
|
|
Activity.named_route_id == None,
|
|
Activity.id != activity_id,
|
|
Activity.polyline != None,
|
|
Activity.distance_m >= (new_act.distance_m or 0) * 0.8,
|
|
Activity.distance_m <= (new_act.distance_m or 0) * 1.2,
|
|
)
|
|
).scalars().all()
|
|
|
|
for candidate in candidates:
|
|
if routes_are_similar(
|
|
new_act.polyline, candidate.polyline,
|
|
new_act.bounding_box, candidate.bounding_box,
|
|
):
|
|
older = candidate if candidate.start_time < new_act.start_time else new_act
|
|
newer = new_act if candidate.start_time < new_act.start_time else candidate
|
|
|
|
route_name = f"{older.sport_type.title()} route {older.start_time.strftime('%d %b %Y')}"
|
|
new_route = NamedRoute(
|
|
user_id=user_id,
|
|
name=route_name,
|
|
sport_type=older.sport_type,
|
|
reference_polyline=older.polyline,
|
|
bounding_box=older.bounding_box,
|
|
distance_m=older.distance_m,
|
|
auto_detected=True,
|
|
)
|
|
db.add(new_route)
|
|
db.flush()
|
|
older.named_route_id = new_route.id
|
|
newer.named_route_id = new_route.id
|
|
db.commit()
|
|
return {"status": "auto_created", "route_id": new_route.id}
|
|
|
|
return {"status": "no_match"}
|
|
|
|
|
|
@celery_app.task(name="compute_personal_records")
|
|
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
|
|
"""Calculate personal records for standard distances from this activity."""
|
|
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import PersonalRecord
|
|
from sqlalchemy import select
|
|
from datetime import datetime, timezone
|
|
|
|
data_points = parsed.get("data_points", [])
|
|
total_dist = parsed.get("distance_m", 0) or 0
|
|
sport = parsed.get("sport_type", "running")
|
|
start_time_str = parsed.get("start_time")
|
|
start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc)
|
|
|
|
best_splits = compute_best_splits(data_points, total_dist)
|
|
|
|
with SyncSessionLocal() as db:
|
|
for label, duration_s in best_splits.items():
|
|
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
|
|
if dist_m is None:
|
|
continue
|
|
|
|
current = db.execute(
|
|
select(PersonalRecord).where(
|
|
PersonalRecord.user_id == user_id,
|
|
PersonalRecord.sport_type == sport,
|
|
PersonalRecord.distance_m == dist_m,
|
|
PersonalRecord.is_current_record == True,
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if current is None or duration_s < current.duration_s:
|
|
if current:
|
|
current.is_current_record = False
|
|
db.add(PersonalRecord(
|
|
user_id=user_id,
|
|
activity_id=activity_id,
|
|
sport_type=sport,
|
|
distance_m=dist_m,
|
|
distance_label=label,
|
|
duration_s=duration_s,
|
|
achieved_at=start_time,
|
|
is_current_record=True,
|
|
))
|
|
db.commit()
|
|
|
|
|
|
@celery_app.task(name="process_garmin_health_zip")
|
|
def process_garmin_health_zip(zip_path: str, user_id: int):
|
|
"""Extract wellness data from a Garmin Connect export ZIP."""
|
|
import zipfile
|
|
import json
|
|
from app.core.database import SyncSessionLocal
|
|
from datetime import datetime, timezone
|
|
from sqlalchemy import text
|
|
|
|
with SyncSessionLocal() as db:
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
for name in zf.namelist():
|
|
if "DailyMetrics" not in name or not name.endswith(".json"):
|
|
continue
|
|
with zf.open(name) as f:
|
|
try:
|
|
data = json.load(f)
|
|
except Exception:
|
|
continue
|
|
|
|
date_str = data.get("calendarDate") or data.get("date")
|
|
if not date_str:
|
|
continue
|
|
|
|
try:
|
|
date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
continue
|
|
|
|
db.execute(text("""
|
|
INSERT INTO health_metrics (user_id, date, resting_hr, steps,
|
|
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
|
|
VALUES (:user_id, :date, :resting_hr, :steps,
|
|
:floors, :active_cal, :total_cal, :stress, :spo2)
|
|
ON CONFLICT (user_id, date) DO UPDATE SET
|
|
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
|
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
|
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
|
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
|
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
|
|
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
|
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
|
|
"""), {
|
|
"user_id": user_id, "date": date_dt,
|
|
"resting_hr": data.get("restingHeartRate"),
|
|
"steps": data.get("totalSteps"),
|
|
"floors": data.get("floorsAscended"),
|
|
"active_cal": data.get("activeKilocalories"),
|
|
"total_cal": data.get("totalKilocalories"),
|
|
"stress": data.get("averageStressLevel"),
|
|
"spo2": data.get("avgSpo2"),
|
|
})
|
|
|
|
db.commit()
|
|
|
|
|
|
@celery_app.task(name="recalculate_hr_zones_for_user")
|
|
def recalculate_hr_zones_for_user(user_id: int, new_max_hr: float):
|
|
"""Recalculate hr_zones for all of a user's activities using a new max HR."""
|
|
from app.services.fit_parser import calculate_hr_zones
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import Activity, ActivityDataPoint
|
|
from sqlalchemy import select
|
|
|
|
with SyncSessionLocal() as db:
|
|
activities = db.execute(
|
|
select(Activity).where(Activity.user_id == user_id)
|
|
).scalars().all()
|
|
|
|
for activity in activities:
|
|
data_points = db.execute(
|
|
select(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity.id)
|
|
).scalars().all()
|
|
points_dicts = [{"heart_rate": dp.heart_rate} for dp in data_points]
|
|
new_zones = calculate_hr_zones(points_dicts, new_max_hr)
|
|
if new_zones:
|
|
activity.hr_zones = new_zones
|
|
|
|
db.commit()
|