Files
MileVault/backend/app/workers/tasks.py
T
owain 8104ca5ed0
Build and push images / build-backend (push) Successful in 6s
Build and push images / build-worker (push) Successful in 6s
Build and push images / build-frontend (push) Successful in 5s
Route wellness FIT files to health parser, parse HR/HRV/sleep/stress/SpO2
2026-06-06 15:50:25 +01:00

463 lines
18 KiB
Python

"""
Background tasks: activity ingestion, route matching, PR calculation.
Uses synchronous SQLAlchemy because Celery's prefork model doesn't play
well with asyncio - each worker process needs its own connection pool,
and async pools don't survive process forks.
"""
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"milevault",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_track_started=True,
worker_prefetch_multiplier=1,
)
# Garmin FIT file suffixes that are health/wellness data, not activities
WELLNESS_SUFFIXES = (
"_METRICS.fit",
"_WELLNESS.fit",
"_SLEEP.fit",
"_STRESS.fit",
"_SPO2.fit",
"_HRV.fit",
"_MONITORING.fit",
"_MONITORING_B.fit",
)
def is_wellness_file(file_path: str) -> bool:
name = file_path.upper()
return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES)
@celery_app.task(bind=True, name="process_activity_file")
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
# Route wellness/metrics files to health parser instead
if is_wellness_file(file_path):
parse_wellness_fit.delay(file_path, user_id)
return {"status": "routed_to_wellness", "file": file_path}
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
from app.core.database import SyncSessionLocal
from app.models.user import Activity, ActivityDataPoint, ActivityLap
from sqlalchemy import select
from datetime import datetime
self.update_state(state="PROGRESS", meta={"step": "parsing"})
try:
if source_type == "fit" or file_path.endswith(".fit"):
parsed = parse_fit_file(file_path)
else:
parsed = parse_gpx_file(file_path)
except Exception as e:
raise self.retry(exc=e, countdown=10, max_retries=3)
# Skip files with no usable activity data
if not parsed.get("start_time"):
return {"status": "skipped", "reason": "no start_time", "file": file_path}
with SyncSessionLocal() as db:
# Check for duplicate by garmin activity ID
if parsed.get("garmin_activity_id"):
existing = db.execute(
select(Activity).where(
Activity.garmin_activity_id == parsed["garmin_activity_id"]
)
).scalar_one_or_none()
if existing:
return {"activity_id": existing.id, "status": "duplicate"}
hr_zones = calculate_hr_zones(
parsed.get("data_points", []),
parsed.get("max_heart_rate") or 190
)
start_time = datetime.fromisoformat(parsed["start_time"])
activity = Activity(
user_id=user_id,
name=parsed["name"],
sport_type=parsed["sport_type"],
start_time=start_time,
distance_m=parsed.get("distance_m"),
duration_s=parsed.get("duration_s"),
elevation_gain_m=parsed.get("elevation_gain_m"),
elevation_loss_m=parsed.get("elevation_loss_m"),
avg_heart_rate=parsed.get("avg_heart_rate"),
max_heart_rate=parsed.get("max_heart_rate"),
avg_cadence=parsed.get("avg_cadence"),
avg_power=parsed.get("avg_power"),
normalized_power=parsed.get("normalized_power"),
avg_speed_ms=parsed.get("avg_speed_ms"),
max_speed_ms=parsed.get("max_speed_ms"),
avg_temperature_c=parsed.get("avg_temperature_c"),
calories=parsed.get("calories"),
training_stress_score=parsed.get("training_stress_score"),
polyline=parsed.get("polyline"),
bounding_box=parsed.get("bounding_box"),
source_file=file_path,
source_type=parsed.get("source_type"),
hr_zones=hr_zones,
)
db.add(activity)
db.flush()
# Insert data points, deduping on (activity_id, timestamp)
seen = set()
points = parsed.get("data_points", [])
batch = []
for p in points:
if not p.get("timestamp"):
continue
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
key = (activity.id, ts)
if key in seen:
continue
seen.add(key)
batch.append(ActivityDataPoint(
activity_id=activity.id,
timestamp=ts,
latitude=p.get("latitude"),
longitude=p.get("longitude"),
altitude_m=p.get("altitude_m"),
heart_rate=p.get("heart_rate"),
cadence=p.get("cadence"),
speed_ms=p.get("speed_ms"),
power=p.get("power"),
temperature_c=p.get("temperature_c"),
distance_m=p.get("distance_m"),
))
if len(batch) >= 500:
db.add_all(batch)
db.flush()
batch = []
if batch:
db.add_all(batch)
db.flush()
# Laps
for lap in parsed.get("laps", []):
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
db.add(ActivityLap(
activity_id=activity.id,
lap_number=lap["lap_number"],
start_time=ls,
duration_s=lap.get("duration_s"),
distance_m=lap.get("distance_m"),
avg_heart_rate=lap.get("avg_heart_rate"),
avg_cadence=lap.get("avg_cadence"),
avg_speed_ms=lap.get("avg_speed_ms"),
avg_power=lap.get("avg_power"),
))
db.commit()
activity_id = activity.id
compute_personal_records.delay(activity_id, user_id, parsed)
return {"activity_id": activity_id, "status": "ok"}
@celery_app.task(name="parse_wellness_fit")
def parse_wellness_fit(file_path: str, user_id: int):
"""
Parse a Garmin wellness/metrics FIT file and upsert into health_metrics.
These files contain resting HR, HRV, sleep, stress, SpO2 etc.
"""
import fitparse
from app.core.database import SyncSessionLocal
from app.models.user import HealthMetric
from sqlalchemy import select, func
from datetime import datetime, timezone, date
try:
fit = fitparse.FitFile(file_path)
except Exception as e:
return {"status": "error", "error": str(e)}
# Collect all monitoring/daily summary records keyed by date
daily = {} # date -> dict of fields
def get_or_create_day(d: date) -> dict:
if d not in daily:
daily[d] = {}
return daily[d]
for record in fit.get_messages():
name = record.name
fields = {f.name: f.value for f in record if f.value is not None}
if name == "monitoring_info":
ts = fields.get("timestamp") or fields.get("local_timestamp")
if ts:
d = ts.date() if hasattr(ts, "date") else None
if d:
day = get_or_create_day(d)
day.setdefault("resting_hr", fields.get("resting_heart_rate"))
elif name == "monitoring":
ts = fields.get("timestamp") or fields.get("local_timestamp")
if not ts:
continue
d = ts.date() if hasattr(ts, "date") else None
if not d:
continue
day = get_or_create_day(d)
# Accumulate steps (they're stored as increments)
if "steps" in fields:
day["steps"] = day.get("steps", 0) + int(fields["steps"])
if "heart_rate" in fields:
hrs = day.setdefault("heart_rates", [])
hrs.append(int(fields["heart_rate"]))
if "stress_level_value" in fields:
stresses = day.setdefault("stress_values", [])
stresses.append(int(fields["stress_level_value"]))
elif name == "hrv_status_summary":
ts = fields.get("timestamp")
if ts:
d = ts.date() if hasattr(ts, "date") else None
if d:
day = get_or_create_day(d)
day.setdefault("hrv_nightly_avg", fields.get("weekly_average"))
day.setdefault("hrv_5min_high", fields.get("last_night_5_min_high"))
day.setdefault("hrv_status", str(fields.get("hrv_status", "")))
elif name == "sleep_level":
ts = fields.get("timestamp")
if ts:
d = ts.date() if hasattr(ts, "date") else None
if d:
day = get_or_create_day(d)
levels = day.setdefault("sleep_levels", [])
levels.append(fields.get("sleep_level"))
elif name == "stress":
ts = fields.get("timestamp")
if ts:
d = ts.date() if hasattr(ts, "date") else None
if d:
day = get_or_create_day(d)
if "stress_level_value" in fields:
stresses = day.setdefault("stress_values", [])
stresses.append(int(fields["stress_level_value"]))
elif name == "spo2_data":
ts = fields.get("timestamp")
if ts:
d = ts.date() if hasattr(ts, "date") else None
if d:
day = get_or_create_day(d)
readings = day.setdefault("spo2_readings", [])
if "spo2_percent" in fields:
readings.append(fields["spo2_percent"])
if not daily:
return {"status": "no_data", "file": file_path}
# Upsert into health_metrics
with SyncSessionLocal() as db:
for day_date, data in daily.items():
# Compute averages from raw readings
hrs = data.pop("heart_rates", [])
stresses = data.pop("stress_values", [])
spo2s = data.pop("spo2_readings", [])
sleep_levels = data.pop("sleep_levels", [])
resting_hr = data.get("resting_hr")
avg_hr = (sum(hrs) / len(hrs)) if hrs else None
avg_stress = (sum(stresses) / len(stresses)) if stresses else None
spo2_avg = (sum(spo2s) / len(spo2s)) if spo2s else None
# Rough sleep stage breakdown from level codes
# Garmin sleep levels: 0=unmeasurable, 1=awake, 2=light, 3=deep, 4=rem
sleep_deep_s = sum(30 for l in sleep_levels if l == 3) if sleep_levels else None
sleep_light_s = sum(30 for l in sleep_levels if l == 2) if sleep_levels else None
sleep_rem_s = sum(30 for l in sleep_levels if l == 4) if sleep_levels else None
sleep_awake_s = sum(30 for l in sleep_levels if l == 1) if sleep_levels else None
sleep_duration_s = (
(sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0)
) or None
date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc)
# Check for existing record
existing = db.execute(
select(HealthMetric).where(
HealthMetric.user_id == user_id,
func.date(HealthMetric.date) == day_date,
)
).scalar_one_or_none()
if existing:
# Update only fields we have data for
if resting_hr:
existing.resting_hr = resting_hr
if avg_hr:
existing.avg_hr_day = avg_hr
if avg_stress:
existing.avg_stress = avg_stress
if spo2_avg:
existing.spo2_avg = spo2_avg
if data.get("hrv_nightly_avg"):
existing.hrv_nightly_avg = data["hrv_nightly_avg"]
if data.get("hrv_5min_high"):
existing.hrv_5min_high = data["hrv_5min_high"]
if data.get("hrv_status"):
existing.hrv_status = data["hrv_status"]
if data.get("steps"):
existing.steps = data["steps"]
if sleep_duration_s:
existing.sleep_duration_s = sleep_duration_s
existing.sleep_deep_s = sleep_deep_s
existing.sleep_light_s = sleep_light_s
existing.sleep_rem_s = sleep_rem_s
existing.sleep_awake_s = sleep_awake_s
else:
db.add(HealthMetric(
user_id=user_id,
date=date_dt,
resting_hr=resting_hr,
avg_hr_day=avg_hr,
avg_stress=avg_stress,
spo2_avg=spo2_avg,
hrv_nightly_avg=data.get("hrv_nightly_avg"),
hrv_5min_high=data.get("hrv_5min_high"),
hrv_status=data.get("hrv_status"),
steps=data.get("steps"),
sleep_duration_s=sleep_duration_s,
sleep_deep_s=sleep_deep_s,
sleep_light_s=sleep_light_s,
sleep_rem_s=sleep_rem_s,
sleep_awake_s=sleep_awake_s,
))
db.commit()
return {"status": "ok", "days_processed": len(daily), "file": file_path}
@celery_app.task(name="compute_personal_records")
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
"""Calculate personal records for standard distances from this activity."""
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
from app.core.database import SyncSessionLocal
from app.models.user import PersonalRecord
from sqlalchemy import select
from datetime import datetime, timezone
data_points = parsed.get("data_points", [])
total_dist = parsed.get("distance_m", 0) or 0
sport = parsed.get("sport_type", "running")
start_time_str = parsed.get("start_time")
start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc)
best_splits = compute_best_splits(data_points, total_dist)
with SyncSessionLocal() as db:
for label, duration_s in best_splits.items():
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
if dist_m is None:
continue
current = db.execute(
select(PersonalRecord).where(
PersonalRecord.user_id == user_id,
PersonalRecord.sport_type == sport,
PersonalRecord.distance_m == dist_m,
PersonalRecord.is_current_record == True,
)
).scalar_one_or_none()
if current is None or duration_s < current.duration_s:
if current:
current.is_current_record = False
db.add(PersonalRecord(
user_id=user_id,
activity_id=activity_id,
sport_type=sport,
distance_m=dist_m,
distance_label=label,
duration_s=duration_s,
achieved_at=start_time,
is_current_record=True,
))
db.commit()
@celery_app.task(name="process_garmin_health_zip")
def process_garmin_health_zip(zip_path: str, user_id: int):
"""Extract wellness data from a Garmin Connect export ZIP."""
import zipfile
import json
from app.core.database import SyncSessionLocal
from app.models.user import HealthMetric
from sqlalchemy import select, func
from datetime import datetime, timezone
with SyncSessionLocal() as db:
with zipfile.ZipFile(zip_path) as zf:
for name in zf.namelist():
if "DailyMetrics" not in name or not name.endswith(".json"):
continue
with zf.open(name) as f:
try:
data = json.load(f)
except Exception:
continue
date_str = data.get("calendarDate") or data.get("date")
if not date_str:
continue
try:
date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
except ValueError:
continue
existing = db.execute(
select(HealthMetric).where(
HealthMetric.user_id == user_id,
func.date(HealthMetric.date) == date.date(),
)
).scalar_one_or_none()
if existing:
if data.get("restingHeartRate"):
existing.resting_hr = data["restingHeartRate"]
if data.get("totalSteps"):
existing.steps = data["totalSteps"]
if data.get("activeKilocalories"):
existing.active_calories = data["activeKilocalories"]
if data.get("averageStressLevel"):
existing.avg_stress = data["averageStressLevel"]
if data.get("avgSpo2"):
existing.spo2_avg = data["avgSpo2"]
else:
db.add(HealthMetric(
user_id=user_id,
date=date,
resting_hr=data.get("restingHeartRate"),
steps=data.get("totalSteps"),
floors_climbed=data.get("floorsAscended"),
active_calories=data.get("activeKilocalories"),
total_calories=data.get("totalKilocalories"),
avg_stress=data.get("averageStressLevel"),
spo2_avg=data.get("avgSpo2"),
))
db.commit()