Multi-user via PocketID: account linking, group gating, admin user management
PocketID OIDC already auto-provisioned users keyed by pocketid_sub, and the data layer was already fully user-scoped. This adds the missing pieces for running real multi-user: - auth.py callback: link by email to an existing un-linked account (so the admin keeps their data when first signing in by passkey), collision-safe username generation, and request the `groups` scope. - Group gating: optional pocketid_allowed_group (admin-config or POCKETID_ALLOWED_GROUP env); users lacking the group are rejected at the callback and redirected to /login?auth_error=not_authorized. - New admin users API (app/api/users.py): list users, promote/demote admin (guards against demoting/locking out the last admin or yourself), and delete a user with ordered bulk deletes of all their data + on-disk files. - ProfilePage: allowed-group field; LoginPage: rejected-login message; Layout: admin-only Users nav; new UsersPage. Resync milevault_export to current source (it had drifted many features behind — missing garmin_sync, npm-ci Dockerfile and @polyline-codec that broke its own CI) and add POCKETID_ALLOWED_GROUP to .env.example. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -22,18 +22,27 @@ celery_app.conf.update(
|
||||
enable_utc=True,
|
||||
task_track_started=True,
|
||||
worker_prefetch_multiplier=1,
|
||||
beat_schedule={
|
||||
"sync-garmin-connect": {
|
||||
"task": "sync_all_garmin_connect",
|
||||
"schedule": 1800.0, # every 30 minutes
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
# Garmin FIT file suffixes that are health/wellness data, not activities
|
||||
WELLNESS_SUFFIXES = (
|
||||
"_METRICS.fit",
|
||||
"_WELLNESS.fit",
|
||||
"_SLEEP.fit",
|
||||
"_SLEEP_DATA.fit",
|
||||
"_STRESS.fit",
|
||||
"_SPO2.fit",
|
||||
"_HRV.fit",
|
||||
"_HRV_STATUS.fit",
|
||||
"_MONITORING.fit",
|
||||
"_MONITORING_B.fit",
|
||||
"_RESPIRATION.fit",
|
||||
"_PULSE_OX.fit",
|
||||
)
|
||||
|
||||
|
||||
@@ -43,10 +52,10 @@ def is_wellness_file(file_path: str) -> bool:
|
||||
|
||||
|
||||
@celery_app.task(bind=True, name="process_activity_file")
|
||||
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
def process_activity_file(self, file_path: str, user_id: int, source_type: str,
|
||||
garmin_activity_id: str = None):
|
||||
"""Parse a FIT/GPX file. Routes wellness files to health parser."""
|
||||
|
||||
# Route wellness/metrics files to health parser instead
|
||||
if is_wellness_file(file_path):
|
||||
parse_wellness_fit.delay(file_path, user_id)
|
||||
return {"status": "routed_to_wellness", "file": file_path}
|
||||
@@ -54,7 +63,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import Activity, ActivityDataPoint, ActivityLap
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import select, func
|
||||
from datetime import datetime
|
||||
|
||||
self.update_state(state="PROGRESS", meta={"step": "parsing"})
|
||||
@@ -67,25 +76,37 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
except Exception as e:
|
||||
raise self.retry(exc=e, countdown=10, max_retries=3)
|
||||
|
||||
# Skip files with no usable activity data
|
||||
if not parsed.get("start_time"):
|
||||
return {"status": "skipped", "reason": "no start_time", "file": file_path}
|
||||
|
||||
with SyncSessionLocal() as db:
|
||||
# Check for duplicate by garmin activity ID
|
||||
if parsed.get("garmin_activity_id"):
|
||||
existing = db.execute(
|
||||
select(Activity).where(
|
||||
Activity.garmin_activity_id == parsed["garmin_activity_id"]
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if existing:
|
||||
return {"activity_id": existing.id, "status": "duplicate"}
|
||||
start_time = datetime.fromisoformat(parsed["start_time"])
|
||||
|
||||
# Get user's configured max HR for accurate zone calculation
|
||||
# Falls back to: user-set value → 220-age → activity max → 190
|
||||
# Deduplicate: same user + sport_type + start_time within ±60s
|
||||
from datetime import timedelta
|
||||
existing = db.execute(
|
||||
select(Activity).where(
|
||||
Activity.user_id == user_id,
|
||||
Activity.sport_type == parsed["sport_type"],
|
||||
Activity.start_time >= start_time - timedelta(seconds=60),
|
||||
Activity.start_time <= start_time + timedelta(seconds=60),
|
||||
)
|
||||
).scalars().first()
|
||||
|
||||
if existing:
|
||||
# Stamp garmin_activity_id if this came from a Garmin Connect sync
|
||||
# so future syncs skip the fast-path dedup and don't re-download.
|
||||
if garmin_activity_id and not existing.garmin_activity_id:
|
||||
existing.garmin_activity_id = garmin_activity_id
|
||||
db.commit()
|
||||
return {"activity_id": existing.id, "status": "duplicate"}
|
||||
|
||||
# Get user max HR for zone calculation
|
||||
from app.models.user import User as UserModel
|
||||
user_obj = db.execute(select(UserModel).where(UserModel.id == user_id)).scalar_one_or_none()
|
||||
user_obj = db.execute(
|
||||
select(UserModel).where(UserModel.id == user_id)
|
||||
).scalar_one_or_none()
|
||||
|
||||
user_max_hr = None
|
||||
if user_obj:
|
||||
user_max_hr = user_obj.max_heart_rate
|
||||
@@ -94,20 +115,15 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
age = _date.today().year - user_obj.birth_year
|
||||
user_max_hr = 220 - age
|
||||
if not user_max_hr:
|
||||
# Last resort: use activity max but warn this may shift zones
|
||||
user_max_hr = parsed.get("max_heart_rate") or 190
|
||||
|
||||
hr_zones = calculate_hr_zones(
|
||||
parsed.get("data_points", []),
|
||||
user_max_hr
|
||||
)
|
||||
|
||||
start_time = datetime.fromisoformat(parsed["start_time"])
|
||||
hr_zones = calculate_hr_zones(parsed.get("data_points", []), user_max_hr)
|
||||
|
||||
activity = Activity(
|
||||
user_id=user_id,
|
||||
name=parsed["name"],
|
||||
sport_type=parsed["sport_type"],
|
||||
garmin_activity_id=garmin_activity_id,
|
||||
start_time=start_time,
|
||||
distance_m=parsed.get("distance_m"),
|
||||
duration_s=parsed.get("duration_s"),
|
||||
@@ -132,11 +148,9 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
db.add(activity)
|
||||
db.flush()
|
||||
|
||||
# Insert data points, deduping on (activity_id, timestamp)
|
||||
seen = set()
|
||||
points = parsed.get("data_points", [])
|
||||
batch = []
|
||||
for p in points:
|
||||
for p in parsed.get("data_points", []):
|
||||
if not p.get("timestamp"):
|
||||
continue
|
||||
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
|
||||
@@ -165,7 +179,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
db.add_all(batch)
|
||||
db.flush()
|
||||
|
||||
# Laps
|
||||
for lap in parsed.get("laps", []):
|
||||
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
|
||||
db.add(ActivityLap(
|
||||
@@ -184,7 +197,6 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
activity_id = activity.id
|
||||
|
||||
compute_personal_records.delay(activity_id, user_id, parsed)
|
||||
# Auto route detection for running and cycling
|
||||
if parsed.get("sport_type") in ("running", "cycling", "hiking", "walking"):
|
||||
detect_route.delay(activity_id, user_id)
|
||||
return {"activity_id": activity_id, "status": "ok"}
|
||||
@@ -192,16 +204,17 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||
|
||||
@celery_app.task(name="parse_wellness_fit")
|
||||
def parse_wellness_fit(file_path: str, user_id: int):
|
||||
"""
|
||||
Parse a Garmin wellness/metrics FIT file and upsert into health_metrics.
|
||||
Uses wellness_parser which handles standard FIT + Garmin proprietary messages.
|
||||
"""
|
||||
"""Parse a Garmin wellness FIT file and upsert into health_metrics."""
|
||||
from app.services.wellness_parser import parse_wellness_fit as _parse
|
||||
from app.core.database import SyncSessionLocal
|
||||
from datetime import datetime, timezone
|
||||
from sqlalchemy import text
|
||||
|
||||
result = _parse(file_path)
|
||||
try:
|
||||
result = _parse(file_path)
|
||||
except Exception as e:
|
||||
return {"status": "error", "error": str(e), "file": file_path}
|
||||
|
||||
if result.get("error"):
|
||||
return {"status": "error", "error": result["error"], "file": file_path}
|
||||
|
||||
@@ -216,11 +229,13 @@ def parse_wellness_fit(file_path: str, user_id: int):
|
||||
INSERT INTO health_metrics (user_id, date, resting_hr, avg_hr_day, max_hr_day,
|
||||
avg_stress, spo2_avg, hrv_nightly_avg, hrv_5min_high, hrv_status,
|
||||
steps, floors_climbed, active_calories, total_calories,
|
||||
sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s)
|
||||
sleep_duration_s, sleep_deep_s, sleep_light_s, sleep_rem_s, sleep_awake_s,
|
||||
sleep_score, sleep_start, sleep_end, sleep_stages)
|
||||
VALUES (:user_id, :date, :resting_hr, :avg_hr, :max_hr,
|
||||
:avg_stress, :spo2_avg, :hrv_avg, :hrv_high, :hrv_status,
|
||||
:steps, :floors, :active_cal, :total_cal,
|
||||
:sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake)
|
||||
:sleep_dur, :sleep_deep, :sleep_light, :sleep_rem, :sleep_awake,
|
||||
:sleep_score, :sleep_start, :sleep_end, :sleep_stages::json)
|
||||
ON CONFLICT (user_id, date) DO UPDATE SET
|
||||
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
||||
avg_hr_day = COALESCE(EXCLUDED.avg_hr_day, health_metrics.avg_hr_day),
|
||||
@@ -233,12 +248,16 @@ def parse_wellness_fit(file_path: str, user_id: int):
|
||||
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
||||
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
||||
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
||||
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
|
||||
total_calories = GREATEST(EXCLUDED.total_calories, health_metrics.total_calories),
|
||||
sleep_duration_s = COALESCE(EXCLUDED.sleep_duration_s, health_metrics.sleep_duration_s),
|
||||
sleep_deep_s = COALESCE(EXCLUDED.sleep_deep_s, health_metrics.sleep_deep_s),
|
||||
sleep_light_s = COALESCE(EXCLUDED.sleep_light_s, health_metrics.sleep_light_s),
|
||||
sleep_rem_s = COALESCE(EXCLUDED.sleep_rem_s, health_metrics.sleep_rem_s),
|
||||
sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s)
|
||||
sleep_awake_s = COALESCE(EXCLUDED.sleep_awake_s, health_metrics.sleep_awake_s),
|
||||
sleep_score = COALESCE(EXCLUDED.sleep_score, health_metrics.sleep_score),
|
||||
sleep_start = COALESCE(EXCLUDED.sleep_start, health_metrics.sleep_start),
|
||||
sleep_end = COALESCE(EXCLUDED.sleep_end, health_metrics.sleep_end),
|
||||
sleep_stages = COALESCE(EXCLUDED.sleep_stages, health_metrics.sleep_stages)
|
||||
"""), {
|
||||
"user_id": user_id, "date": date_dt,
|
||||
"resting_hr": data.get("resting_hr"),
|
||||
@@ -258,35 +277,35 @@ def parse_wellness_fit(file_path: str, user_id: int):
|
||||
"sleep_light": data.get("sleep_light_s"),
|
||||
"sleep_rem": data.get("sleep_rem_s"),
|
||||
"sleep_awake": data.get("sleep_awake_s"),
|
||||
"sleep_score": data.get("sleep_score"),
|
||||
"sleep_start": data.get("sleep_start"),
|
||||
"sleep_end": data.get("sleep_end"),
|
||||
"sleep_stages": __import__('json').dumps(data.get("sleep_stages")) if data.get("sleep_stages") else None,
|
||||
})
|
||||
db.commit()
|
||||
|
||||
return {"status": "ok", "days_processed": len(days), "file": file_path}
|
||||
|
||||
|
||||
@celery_app.task(name="detect_route")
|
||||
def detect_route(activity_id: int, user_id: int):
|
||||
"""
|
||||
After importing an activity, check if it matches any existing named routes.
|
||||
If two+ unassigned activities match each other, auto-create a named route.
|
||||
"""
|
||||
"""Auto-detect and link activities to named routes."""
|
||||
from app.services.route_matcher import routes_are_similar
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import Activity, NamedRoute
|
||||
from sqlalchemy import select
|
||||
|
||||
with SyncSessionLocal() as db:
|
||||
# Get the new activity
|
||||
new_act = db.execute(
|
||||
select(Activity).where(Activity.id == activity_id)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if not new_act or not new_act.polyline:
|
||||
return {"status": "no_polyline"}
|
||||
|
||||
# Already assigned to a route?
|
||||
if new_act.named_route_id:
|
||||
return {"status": "already_assigned"}
|
||||
|
||||
# Check against existing named routes first
|
||||
routes = db.execute(
|
||||
select(NamedRoute).where(
|
||||
NamedRoute.user_id == user_id,
|
||||
@@ -298,12 +317,12 @@ def detect_route(activity_id: int, user_id: int):
|
||||
if route.reference_polyline and routes_are_similar(
|
||||
new_act.polyline, route.reference_polyline,
|
||||
new_act.bounding_box, route.bounding_box,
|
||||
dist1=new_act.distance_m, dist2=route.distance_m,
|
||||
):
|
||||
new_act.named_route_id = route.id
|
||||
db.commit()
|
||||
return {"status": "matched_existing", "route_id": route.id}
|
||||
|
||||
# No existing route matched - check unassigned activities for a match
|
||||
candidates = db.execute(
|
||||
select(Activity).where(
|
||||
Activity.user_id == user_id,
|
||||
@@ -311,9 +330,8 @@ def detect_route(activity_id: int, user_id: int):
|
||||
Activity.named_route_id == None,
|
||||
Activity.id != activity_id,
|
||||
Activity.polyline != None,
|
||||
# Within 20% distance
|
||||
Activity.distance_m >= (new_act.distance_m or 0) * 0.8,
|
||||
Activity.distance_m <= (new_act.distance_m or 0) * 1.2,
|
||||
Activity.distance_m >= (new_act.distance_m or 0) * 0.95,
|
||||
Activity.distance_m <= (new_act.distance_m or 0) * 1.05,
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
@@ -321,8 +339,8 @@ def detect_route(activity_id: int, user_id: int):
|
||||
if routes_are_similar(
|
||||
new_act.polyline, candidate.polyline,
|
||||
new_act.bounding_box, candidate.bounding_box,
|
||||
dist1=new_act.distance_m, dist2=candidate.distance_m,
|
||||
):
|
||||
# Auto-create a route from the older activity
|
||||
older = candidate if candidate.start_time < new_act.start_time else new_act
|
||||
newer = new_act if candidate.start_time < new_act.start_time else candidate
|
||||
|
||||
@@ -400,52 +418,208 @@ def process_garmin_health_zip(zip_path: str, user_id: int):
|
||||
import zipfile
|
||||
import json
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import HealthMetric
|
||||
from datetime import datetime, timezone
|
||||
from sqlalchemy import text
|
||||
|
||||
INSERT_SQL = text("""
|
||||
INSERT INTO health_metrics (user_id, date, resting_hr, max_hr_day, steps,
|
||||
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
|
||||
VALUES (:user_id, :date, :resting_hr, :max_hr_day, :steps,
|
||||
:floors, :active_cal, :total_cal, :stress, :spo2)
|
||||
ON CONFLICT (user_id, date) DO UPDATE SET
|
||||
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
||||
max_hr_day = COALESCE(EXCLUDED.max_hr_day, health_metrics.max_hr_day),
|
||||
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
||||
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
||||
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
||||
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
|
||||
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
||||
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
|
||||
""")
|
||||
|
||||
def _extract_stress(item):
|
||||
stress_data = item.get("allDayStress")
|
||||
if not stress_data or not isinstance(stress_data, dict):
|
||||
return item.get("averageStressLevel")
|
||||
for agg in stress_data.get("aggregatorList", []):
|
||||
if agg.get("type") == "TOTAL":
|
||||
return agg.get("averageStressLevel")
|
||||
return None
|
||||
|
||||
def _floors_from_item(item):
|
||||
# UDS format reports meters; 1 floor = 3.048 m
|
||||
meters = item.get("floorsAscendedInMeters")
|
||||
if meters is not None:
|
||||
return round(meters / 3.048)
|
||||
return item.get("floorsAscended")
|
||||
|
||||
def _process_record(db, item):
|
||||
date_str = item.get("calendarDate") or item.get("date")
|
||||
if not date_str:
|
||||
return
|
||||
try:
|
||||
date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
return
|
||||
db.execute(INSERT_SQL, {
|
||||
"user_id": user_id, "date": date_dt,
|
||||
"resting_hr": item.get("restingHeartRate"),
|
||||
"max_hr_day": item.get("maxHeartRate"),
|
||||
"steps": item.get("totalSteps"),
|
||||
"floors": _floors_from_item(item),
|
||||
"active_cal": item.get("activeKilocalories"),
|
||||
"total_cal": item.get("totalKilocalories"),
|
||||
"stress": _extract_stress(item),
|
||||
"spo2": item.get("avgSpo2"),
|
||||
})
|
||||
|
||||
with SyncSessionLocal() as db:
|
||||
with zipfile.ZipFile(zip_path) as zf:
|
||||
for name in zf.namelist():
|
||||
if "DailyMetrics" not in name or not name.endswith(".json"):
|
||||
if not name.endswith(".json"):
|
||||
continue
|
||||
# Garmin Connect export stores daily summaries in UDSFile_*.json
|
||||
# (DI-Connect-Aggregator). Older/alternative exports may use DailyMetrics.
|
||||
is_uds = "UDSFile" in name
|
||||
is_legacy = "DailyMetrics" in name
|
||||
if not (is_uds or is_legacy):
|
||||
continue
|
||||
with zf.open(name) as f:
|
||||
try:
|
||||
data = json.load(f)
|
||||
except Exception:
|
||||
continue
|
||||
# UDS files are lists of daily records; legacy format is a single object
|
||||
records = data if isinstance(data, list) else [data]
|
||||
for item in records:
|
||||
if isinstance(item, dict):
|
||||
_process_record(db, item)
|
||||
db.commit()
|
||||
|
||||
date_str = data.get("calendarDate") or data.get("date")
|
||||
if not date_str:
|
||||
continue
|
||||
|
||||
try:
|
||||
date_dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
@celery_app.task(name="sync_garmin_connect_user")
|
||||
def sync_garmin_connect_user(user_id: int):
|
||||
"""Sync Garmin Connect data (activities + wellness) for one user."""
|
||||
from app.services.garmin_connect_sync import authenticate_garmin, sync_activities, sync_wellness
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import GarminConnectConfig
|
||||
from app.core.config import settings
|
||||
from sqlalchemy import select
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import text as _text
|
||||
db.execute(_text("""
|
||||
INSERT INTO health_metrics (user_id, date, resting_hr, steps,
|
||||
floors_climbed, active_calories, total_calories, avg_stress, spo2_avg)
|
||||
VALUES (:user_id, :date, :resting_hr, :steps,
|
||||
:floors, :active_cal, :total_cal, :stress, :spo2)
|
||||
ON CONFLICT (user_id, date) DO UPDATE SET
|
||||
resting_hr = COALESCE(EXCLUDED.resting_hr, health_metrics.resting_hr),
|
||||
steps = COALESCE(EXCLUDED.steps, health_metrics.steps),
|
||||
floors_climbed = COALESCE(EXCLUDED.floors_climbed, health_metrics.floors_climbed),
|
||||
active_calories = COALESCE(EXCLUDED.active_calories, health_metrics.active_calories),
|
||||
total_calories = COALESCE(EXCLUDED.total_calories, health_metrics.total_calories),
|
||||
avg_stress = COALESCE(EXCLUDED.avg_stress, health_metrics.avg_stress),
|
||||
spo2_avg = COALESCE(EXCLUDED.spo2_avg, health_metrics.spo2_avg)
|
||||
"""), {
|
||||
"user_id": user_id, "date": date_dt,
|
||||
"resting_hr": data.get("restingHeartRate"),
|
||||
"steps": data.get("totalSteps"),
|
||||
"floors": data.get("floorsAscended"),
|
||||
"active_cal": data.get("activeKilocalories"),
|
||||
"total_cal": data.get("totalKilocalories"),
|
||||
"stress": data.get("averageStressLevel"),
|
||||
"spo2": data.get("avgSpo2"),
|
||||
})
|
||||
with SyncSessionLocal() as db:
|
||||
cfg = db.execute(
|
||||
select(GarminConnectConfig).where(GarminConnectConfig.user_id == user_id)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if not cfg or not cfg.sync_enabled:
|
||||
return {"status": "skipped"}
|
||||
|
||||
# Snapshot config values before any intermediate commits (commits expire ORM attrs)
|
||||
email = cfg.email
|
||||
password_enc = cfg.password_enc
|
||||
token_store = cfg.token_store
|
||||
last_sync_at = cfg.last_sync_at
|
||||
sync_acts = cfg.sync_activities
|
||||
sync_well = cfg.sync_wellness
|
||||
lookback = cfg.sync_lookback_days if cfg.sync_lookback_days is not None else 30
|
||||
|
||||
cfg.last_sync_status = "Connecting to Garmin..."
|
||||
db.commit()
|
||||
|
||||
try:
|
||||
garmin, new_token = authenticate_garmin(email, password_enc, token_store)
|
||||
except Exception as exc:
|
||||
cfg.last_sync_at = datetime.now(timezone.utc)
|
||||
cfg.last_sync_status = f"Auth error: {exc}"
|
||||
db.commit()
|
||||
return {"status": "auth_error", "error": str(exc)}
|
||||
|
||||
if new_token:
|
||||
cfg.token_store = new_token
|
||||
db.commit()
|
||||
|
||||
activities_queued = 0
|
||||
wellness_days = 0
|
||||
errors = []
|
||||
|
||||
def _set_status(text):
|
||||
cfg.last_sync_status = text
|
||||
db.commit()
|
||||
|
||||
if sync_acts:
|
||||
_set_status("Syncing activities...")
|
||||
try:
|
||||
activities_queued = sync_activities(
|
||||
garmin, user_id, last_sync_at, db, settings.file_store_path,
|
||||
lookback_days=lookback,
|
||||
status_callback=_set_status,
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"activities: {exc}")
|
||||
|
||||
if sync_well:
|
||||
_set_status("Syncing wellness...")
|
||||
try:
|
||||
wellness_days = sync_wellness(
|
||||
garmin, user_id, last_sync_at, db,
|
||||
lookback_days=lookback,
|
||||
status_callback=_set_status,
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"wellness: {exc}")
|
||||
db.rollback() # recover session so the final status commit can succeed
|
||||
|
||||
cfg.last_sync_at = datetime.now(timezone.utc)
|
||||
cfg.last_sync_status = (
|
||||
f"OK — {activities_queued} activities queued, {wellness_days} wellness days synced"
|
||||
if not errors else
|
||||
f"Partial — {'; '.join(errors)}"
|
||||
)
|
||||
db.commit()
|
||||
|
||||
return {"status": "ok", "activities_queued": activities_queued, "wellness_days": wellness_days}
|
||||
|
||||
|
||||
@celery_app.task(name="sync_all_garmin_connect")
|
||||
def sync_all_garmin_connect():
|
||||
"""Hourly beat task: dispatch per-user sync for all enabled configs."""
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import GarminConnectConfig
|
||||
from sqlalchemy import select
|
||||
|
||||
with SyncSessionLocal() as db:
|
||||
configs = db.execute(
|
||||
select(GarminConnectConfig).where(GarminConnectConfig.sync_enabled == True)
|
||||
).scalars().all()
|
||||
user_ids = [c.user_id for c in configs]
|
||||
|
||||
for uid in user_ids:
|
||||
sync_garmin_connect_user.delay(uid)
|
||||
|
||||
return {"dispatched": len(user_ids)}
|
||||
|
||||
|
||||
@celery_app.task(name="recalculate_hr_zones_for_user")
|
||||
def recalculate_hr_zones_for_user(user_id: int, new_max_hr: float):
|
||||
"""Recalculate hr_zones for all of a user's activities using a new max HR."""
|
||||
from app.services.fit_parser import calculate_hr_zones
|
||||
from app.core.database import SyncSessionLocal
|
||||
from app.models.user import Activity, ActivityDataPoint
|
||||
from sqlalchemy import select
|
||||
|
||||
with SyncSessionLocal() as db:
|
||||
activities = db.execute(
|
||||
select(Activity).where(Activity.user_id == user_id)
|
||||
).scalars().all()
|
||||
|
||||
for activity in activities:
|
||||
data_points = db.execute(
|
||||
select(ActivityDataPoint).where(ActivityDataPoint.activity_id == activity.id)
|
||||
).scalars().all()
|
||||
points_dicts = [{"heart_rate": dp.heart_rate} for dp in data_points]
|
||||
new_zones = calculate_hr_zones(points_dicts, new_max_hr)
|
||||
if new_zones:
|
||||
activity.hr_zones = new_zones
|
||||
|
||||
db.commit()
|
||||
|
||||
Reference in New Issue
Block a user