438 lines
18 KiB
Python
438 lines
18 KiB
Python
"""
|
|
Background tasks: activity ingestion, route matching, PR calculation.
|
|
|
|
Uses synchronous SQLAlchemy because Celery's prefork model doesn't play
|
|
well with asyncio - each worker process needs its own connection pool,
|
|
and async pools don't survive process forks.
|
|
"""
|
|
from celery import Celery
|
|
from app.core.config import settings
|
|
|
|
celery_app = Celery(
|
|
"milevault",
|
|
broker=settings.redis_url,
|
|
backend=settings.redis_url,
|
|
)
|
|
|
|
celery_app.conf.update(
|
|
task_serializer="json",
|
|
result_serializer="json",
|
|
accept_content=["json"],
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
task_track_started=True,
|
|
worker_prefetch_multiplier=1,
|
|
)
|
|
|
|
# Garmin FIT file suffixes that are health/wellness data, not activities
|
|
WELLNESS_SUFFIXES = (
|
|
"_METRICS.fit",
|
|
"_WELLNESS.fit",
|
|
"_SLEEP.fit",
|
|
"_STRESS.fit",
|
|
"_SPO2.fit",
|
|
"_HRV.fit",
|
|
"_MONITORING.fit",
|
|
"_MONITORING_B.fit",
|
|
)
|
|
|
|
|
|
def is_wellness_file(file_path: str) -> bool:
|
|
name = file_path.upper()
|
|
return any(name.endswith(s.upper()) for s in WELLNESS_SUFFIXES)
|
|
|
|
|
|
@celery_app.task(bind=True, name="process_activity_file")
|
|
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
|
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
|
|
|
|
# Route wellness/metrics files to health parser instead
|
|
if is_wellness_file(file_path):
|
|
parse_wellness_fit.delay(file_path, user_id)
|
|
return {"status": "routed_to_wellness", "file": file_path}
|
|
|
|
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import Activity, ActivityDataPoint, ActivityLap
|
|
from sqlalchemy import select
|
|
from datetime import datetime
|
|
|
|
self.update_state(state="PROGRESS", meta={"step": "parsing"})
|
|
|
|
try:
|
|
if source_type == "fit" or file_path.endswith(".fit"):
|
|
parsed = parse_fit_file(file_path)
|
|
else:
|
|
parsed = parse_gpx_file(file_path)
|
|
except Exception as e:
|
|
raise self.retry(exc=e, countdown=10, max_retries=3)
|
|
|
|
# Skip files with no usable activity data
|
|
if not parsed.get("start_time"):
|
|
return {"status": "skipped", "reason": "no start_time", "file": file_path}
|
|
|
|
with SyncSessionLocal() as db:
|
|
# Check for duplicate by garmin activity ID
|
|
if parsed.get("garmin_activity_id"):
|
|
existing = db.execute(
|
|
select(Activity).where(
|
|
Activity.garmin_activity_id == parsed["garmin_activity_id"]
|
|
)
|
|
).scalar_one_or_none()
|
|
if existing:
|
|
return {"activity_id": existing.id, "status": "duplicate"}
|
|
|
|
hr_zones = calculate_hr_zones(
|
|
parsed.get("data_points", []),
|
|
parsed.get("max_heart_rate") or 190
|
|
)
|
|
|
|
start_time = datetime.fromisoformat(parsed["start_time"])
|
|
|
|
activity = Activity(
|
|
user_id=user_id,
|
|
name=parsed["name"],
|
|
sport_type=parsed["sport_type"],
|
|
start_time=start_time,
|
|
distance_m=parsed.get("distance_m"),
|
|
duration_s=parsed.get("duration_s"),
|
|
elevation_gain_m=parsed.get("elevation_gain_m"),
|
|
elevation_loss_m=parsed.get("elevation_loss_m"),
|
|
avg_heart_rate=parsed.get("avg_heart_rate"),
|
|
max_heart_rate=parsed.get("max_heart_rate"),
|
|
avg_cadence=parsed.get("avg_cadence"),
|
|
avg_power=parsed.get("avg_power"),
|
|
normalized_power=parsed.get("normalized_power"),
|
|
avg_speed_ms=parsed.get("avg_speed_ms"),
|
|
max_speed_ms=parsed.get("max_speed_ms"),
|
|
avg_temperature_c=parsed.get("avg_temperature_c"),
|
|
calories=parsed.get("calories"),
|
|
training_stress_score=parsed.get("training_stress_score"),
|
|
polyline=parsed.get("polyline"),
|
|
bounding_box=parsed.get("bounding_box"),
|
|
source_file=file_path,
|
|
source_type=parsed.get("source_type"),
|
|
hr_zones=hr_zones,
|
|
)
|
|
db.add(activity)
|
|
db.flush()
|
|
|
|
# Insert data points, deduping on (activity_id, timestamp)
|
|
seen = set()
|
|
points = parsed.get("data_points", [])
|
|
batch = []
|
|
for p in points:
|
|
if not p.get("timestamp"):
|
|
continue
|
|
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
|
|
key = (activity.id, ts)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
batch.append(ActivityDataPoint(
|
|
activity_id=activity.id,
|
|
timestamp=ts,
|
|
latitude=p.get("latitude"),
|
|
longitude=p.get("longitude"),
|
|
altitude_m=p.get("altitude_m"),
|
|
heart_rate=p.get("heart_rate"),
|
|
cadence=p.get("cadence"),
|
|
speed_ms=p.get("speed_ms"),
|
|
power=p.get("power"),
|
|
temperature_c=p.get("temperature_c"),
|
|
distance_m=p.get("distance_m"),
|
|
))
|
|
if len(batch) >= 500:
|
|
db.add_all(batch)
|
|
db.flush()
|
|
batch = []
|
|
if batch:
|
|
db.add_all(batch)
|
|
db.flush()
|
|
|
|
# Laps
|
|
for lap in parsed.get("laps", []):
|
|
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
|
|
db.add(ActivityLap(
|
|
activity_id=activity.id,
|
|
lap_number=lap["lap_number"],
|
|
start_time=ls,
|
|
duration_s=lap.get("duration_s"),
|
|
distance_m=lap.get("distance_m"),
|
|
avg_heart_rate=lap.get("avg_heart_rate"),
|
|
avg_cadence=lap.get("avg_cadence"),
|
|
avg_speed_ms=lap.get("avg_speed_ms"),
|
|
avg_power=lap.get("avg_power"),
|
|
))
|
|
|
|
db.commit()
|
|
activity_id = activity.id
|
|
|
|
compute_personal_records.delay(activity_id, user_id, parsed)
|
|
return {"activity_id": activity_id, "status": "ok"}
|
|
|
|
|
|
@celery_app.task(name="parse_wellness_fit")
|
|
def parse_wellness_fit(file_path: str, user_id: int):
|
|
"""
|
|
Parse a Garmin wellness/metrics FIT file and upsert into health_metrics.
|
|
These files contain resting HR, HRV, sleep, stress, SpO2 etc.
|
|
"""
|
|
import fitparse
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import HealthMetric
|
|
from sqlalchemy import text
|
|
from datetime import datetime, timezone, date
|
|
|
|
try:
|
|
fit = fitparse.FitFile(file_path)
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
# Collect all monitoring/daily summary records keyed by date
|
|
daily = {} # date -> dict of fields
|
|
|
|
def get_or_create_day(d: date) -> dict:
|
|
if d not in daily:
|
|
daily[d] = {}
|
|
return daily[d]
|
|
|
|
for record in fit.get_messages():
|
|
name = record.name
|
|
fields = {f.name: f.value for f in record if f.value is not None}
|
|
|
|
if name == "monitoring_info":
|
|
ts = fields.get("timestamp") or fields.get("local_timestamp")
|
|
if ts:
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if d:
|
|
day = get_or_create_day(d)
|
|
day.setdefault("resting_hr", fields.get("resting_heart_rate"))
|
|
|
|
elif name == "monitoring":
|
|
ts = fields.get("timestamp") or fields.get("local_timestamp")
|
|
if not ts:
|
|
continue
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if not d:
|
|
continue
|
|
day = get_or_create_day(d)
|
|
# Accumulate steps (they're stored as increments)
|
|
if "steps" in fields:
|
|
day["steps"] = day.get("steps", 0) + int(fields["steps"])
|
|
if "heart_rate" in fields:
|
|
hrs = day.setdefault("heart_rates", [])
|
|
hrs.append(int(fields["heart_rate"]))
|
|
if "stress_level_value" in fields:
|
|
stresses = day.setdefault("stress_values", [])
|
|
stresses.append(int(fields["stress_level_value"]))
|
|
|
|
elif name == "hrv_status_summary":
|
|
ts = fields.get("timestamp")
|
|
if ts:
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if d:
|
|
day = get_or_create_day(d)
|
|
day.setdefault("hrv_nightly_avg", fields.get("weekly_average"))
|
|
day.setdefault("hrv_5min_high", fields.get("last_night_5_min_high"))
|
|
day.setdefault("hrv_status", str(fields.get("hrv_status", "")))
|
|
|
|
elif name == "sleep_level":
|
|
ts = fields.get("timestamp")
|
|
if ts:
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if d:
|
|
day = get_or_create_day(d)
|
|
levels = day.setdefault("sleep_levels", [])
|
|
levels.append(fields.get("sleep_level"))
|
|
|
|
elif name == "stress":
|
|
ts = fields.get("timestamp")
|
|
if ts:
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if d:
|
|
day = get_or_create_day(d)
|
|
if "stress_level_value" in fields:
|
|
stresses = day.setdefault("stress_values", [])
|
|
stresses.append(int(fields["stress_level_value"]))
|
|
|
|
elif name == "spo2_data":
|
|
ts = fields.get("timestamp")
|
|
if ts:
|
|
d = ts.date() if hasattr(ts, "date") else None
|
|
if d:
|
|
day = get_or_create_day(d)
|
|
readings = day.setdefault("spo2_readings", [])
|
|
if "spo2_percent" in fields:
|
|
readings.append(fields["spo2_percent"])
|
|
|
|
if not daily:
|
|
return {"status": "no_data", "file": file_path}
|
|
|
|
# Upsert into health_metrics using ON CONFLICT to handle concurrent workers
|
|
with SyncSessionLocal() as db:
|
|
for day_date, data in daily.items():
|
|
hrs = data.pop("heart_rates", [])
|
|
stresses = data.pop("stress_values", [])
|
|
spo2s = data.pop("spo2_readings", [])
|
|
sleep_levels = data.pop("sleep_levels", [])
|
|
|
|
resting_hr = data.get("resting_hr")
|
|
avg_hr = (sum(hrs) / len(hrs)) if hrs else None
|
|
avg_stress = (sum(stresses) / len(stresses)) if stresses else None
|
|
spo2_avg = (sum(spo2s) / len(spo2s)) if spo2s else None
|
|
|
|
# Garmin sleep levels: 0=unmeasurable, 1=awake, 2=light, 3=deep, 4=rem
|
|
sleep_deep_s = sum(30 for l in sleep_levels if l == 3) if sleep_levels else None
|
|
sleep_light_s = sum(30 for l in sleep_levels if l == 2) if sleep_levels else None
|
|
sleep_rem_s = sum(30 for l in sleep_levels if l == 4) if sleep_levels else None
|
|
sleep_awake_s = sum(30 for l in sleep_levels if l == 1) if sleep_levels else None
|
|
sleep_duration_s = (
|
|
(sleep_deep_s or 0) + (sleep_light_s or 0) + (sleep_rem_s or 0)
|
|
) or None
|
|
|
|
date_dt = datetime(day_date.year, day_date.month, day_date.day, tzinfo=timezone.utc)
|
|
|
|
# ON CONFLICT upsert - race-condition safe, COALESCE preserves existing data
|
|
db.execute(text("""
|
|
INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, avg_stress,
|
|
spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status, steps,
|
|
sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s)
|
|
VALUES (:user_id, :date, :resting_hr, :avg_hr, :avg_stress,
|
|
:spo2_avg, :hrv_avg, :hrv_high, :hrv_status, :steps,
|
|
:sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake)
|
|
ON CONFLICT (user_id, date) DO UPDATE SET
|
|
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
|
avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day),
|
|
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
|
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg),
|
|
hrv_nightly_avg = COALESCE(EXCLUDED.hrv_nightly_avg, health_metrics.hrv_nightly_avg),
|
|
hrv_5min_high = COALESCE(EXCLUDED.hrv_5min_high, health_metrics.hrv_5min_high),
|
|
hrv_status = COALESCE(EXCLUDED.hrv_status, health_metrics.hrv_status),
|
|
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
|
sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s),
|
|
sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s),
|
|
sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s),
|
|
sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s),
|
|
sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s)
|
|
"""), {
|
|
"user_id": user_id, "date": date_dt,
|
|
"resting_hr": resting_hr, "avg_hr": avg_hr,
|
|
"avg_stress": avg_stress, "spo2_avg": spo2_avg,
|
|
"hrv_avg": data.get("hrv_nightly_avg"),
|
|
"hrv_high": data.get("hrv_5min_high"),
|
|
"hrv_status": data.get("hrv_status"),
|
|
"steps": data.get("steps"),
|
|
"sleep_dur": sleep_duration_s, "sleep_deep": sleep_deep_s,
|
|
"sleep_light": sleep_light_s, "sleep_rem": sleep_rem_s,
|
|
"sleep_awake": sleep_awake_s,
|
|
})
|
|
|
|
db.commit()
|
|
|
|
return {"status": "ok", "days_processed": len(daily), "file": file_path}
|
|
|
|
|
|
@celery_app.task(name="compute_personal_records")
|
|
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
|
|
"""Calculate personal records for standard distances from this activity."""
|
|
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import PersonalRecord
|
|
from sqlalchemy import select
|
|
from datetime import datetime, timezone
|
|
|
|
data_points = parsed.get("data_points", [])
|
|
total_dist = parsed.get("distance_m", 0) or 0
|
|
sport = parsed.get("sport_type", "running")
|
|
start_time_str = parsed.get("start_time")
|
|
start_time = datetime.fromisoformat(start_time_str) if start_time_str else datetime.now(timezone.utc)
|
|
|
|
best_splits = compute_best_splits(data_points, total_dist)
|
|
|
|
with SyncSessionLocal() as db:
|
|
for label, duration_s in best_splits.items():
|
|
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
|
|
if dist_m is None:
|
|
continue
|
|
|
|
current = db.execute(
|
|
select(PersonalRecord).where(
|
|
PersonalRecord.user_id == user_id,
|
|
PersonalRecord.sport_type == sport,
|
|
PersonalRecord.distance_m == dist_m,
|
|
PersonalRecord.is_current_record == True,
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if current is None or duration_s < current.duration_s:
|
|
if current:
|
|
current.is_current_record = False
|
|
db.add(PersonalRecord(
|
|
user_id=user_id,
|
|
activity_id=activity_id,
|
|
sport_type=sport,
|
|
distance_m=dist_m,
|
|
distance_label=label,
|
|
duration_s=duration_s,
|
|
achieved_at=start_time,
|
|
is_current_record=True,
|
|
))
|
|
db.commit()
|
|
|
|
|
|
@celery_app.task(name="process_garmin_health_zip")
|
|
def process_garmin_health_zip(zip_path: str, user_id: int):
|
|
"""Extract wellness data from a Garmin Connect export ZIP."""
|
|
import zipfile
|
|
import json
|
|
from app.core.database import SyncSessionLocal
|
|
from app.models.user import HealthMetric
|
|
from datetime import datetime, timezone
|
|
|
|
with SyncSessionLocal() as db:
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
for name in zf.namelist():
|
|
if "DailyMetrics" not in name or not name.endswith(".json"):
|
|
continue
|
|
with zf.open(name) as f:
|
|
try:
|
|
data = json.load(f)
|
|
except Exception:
|
|
continue
|
|
|
|
date_str = data.get("calendarDate") or data.get("date")
|
|
if not date_str:
|
|
continue
|
|
|
|
try:
|
|
date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
continue
|
|
|
|
from sqlalchemy import text as _text
|
|
db.execute(_text("""
|
|
INSERT INTO health_metrics (user_id, date, resting_hr, steps,
|
|
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
|
|
VALUES (:user_id, :date, :resting_hr, :steps,
|
|
:floors, :active_cal, :total_cal, :stress, :spo2)
|
|
ON CONFLICT (user_id, date) DO UPDATE SET
|
|
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
|
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
|
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
|
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
|
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
|
|
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
|
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
|
|
"""), {
|
|
"user_id": user_id, "date": date_dt,
|
|
"resting_hr": data.get("restingHeartRate"),
|
|
"steps": data.get("totalSteps"),
|
|
"floors": data.get("floorsAscended"),
|
|
"active_cal": data.get("activeKilocalories"),
|
|
"total_cal": data.get("totalKilocalories"),
|
|
"stress": data.get("averageStressLevel"),
|
|
"spo2": data.get("avgSpo2"),
|
|
})
|
|
|
|
db.commit() |