diff --git a/backend/app/core/database.py b/backend/app/core/database.py index 723f8e9..726fb25 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -1,7 +1,9 @@ from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker -from sqlalchemy.orm import DeclarativeBase +from sqlalchemy import create_engine +from sqlalchemy.orm import DeclarativeBase, sessionmaker from app.core.config import settings +# Async engine for FastAPI engine = create_async_engine( settings.database_url, echo=settings.environment == "development", @@ -15,6 +17,19 @@ AsyncSessionLocal = async_sessionmaker( expire_on_commit=False, ) +# Sync engine for Celery workers (Celery + asyncio don't mix well) +# Convert async URL to sync: postgresql+asyncpg:// → postgresql+psycopg2:// +sync_url = settings.database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://") +sync_engine = create_engine( + sync_url, + echo=False, + pool_size=5, + max_overflow=10, + pool_pre_ping=True, +) + +SyncSessionLocal = sessionmaker(sync_engine, expire_on_commit=False) + class Base(DeclarativeBase): pass @@ -29,4 +44,4 @@ async def get_db(): await session.rollback() raise finally: - await session.close() + await session.close() \ No newline at end of file diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index f8f463a..106e5d2 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -1,7 +1,10 @@ """ Background tasks: activity ingestion, route matching, PR calculation. + +Uses synchronous SQLAlchemy because Celery's prefork model doesn't play +well with asyncio - each worker process needs its own connection pool, +and async pools don't survive process forks. """ -import asyncio from celery import Celery from app.core.config import settings @@ -22,23 +25,14 @@ celery_app.conf.update( ) -def run_async(coro): - loop = asyncio.new_event_loop() - try: - return loop.run_until_complete(coro) - finally: - loop.close() - - @celery_app.task(bind=True, name="process_activity_file") def process_activity_file(self, file_path: str, user_id: int, source_type: str): """Parse a FIT/GPX file and insert activity + data points into DB.""" from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones - from app.services.route_matcher import compute_best_splits, routes_are_similar - from app.core.database import AsyncSessionLocal - from app.models.user import Activity, ActivityDataPoint, ActivityLap, PersonalRecord, HealthMetric + from app.core.database import SyncSessionLocal + from app.models.user import Activity, ActivityDataPoint, ActivityLap from sqlalchemy import select - from datetime import datetime, timezone + from datetime import datetime self.update_state(state="PROGRESS", meta={"step": "parsing"}) @@ -50,101 +44,106 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): except Exception as e: raise self.retry(exc=e, countdown=10, max_retries=3) - async def _insert(): - async with AsyncSessionLocal() as db: - # Check for duplicate - if parsed.get("garmin_activity_id"): - existing = await db.execute( - select(Activity).where( - Activity.garmin_activity_id == parsed["garmin_activity_id"] - ) + with SyncSessionLocal() as db: + # Check for duplicate + if parsed.get("garmin_activity_id"): + existing = db.execute( + select(Activity).where( + Activity.garmin_activity_id == parsed["garmin_activity_id"] ) - if existing.scalar_one_or_none(): - return None + ).scalar_one_or_none() + if existing: + return {"activity_id": existing.id, "status": "duplicate"} - # HR zones - hr_zones = calculate_hr_zones( - parsed.get("data_points", []), - parsed.get("max_heart_rate") or 190 - ) + hr_zones = calculate_hr_zones( + parsed.get("data_points", []), + parsed.get("max_heart_rate") or 190 + ) - # Create activity - start_time = datetime.fromisoformat(parsed["start_time"]) if parsed.get("start_time") else None + start_time = datetime.fromisoformat(parsed["start_time"]) if parsed.get("start_time") else None - activity = Activity( - user_id=user_id, - name=parsed["name"], - sport_type=parsed["sport_type"], - start_time=start_time, - distance_m=parsed.get("distance_m"), - duration_s=parsed.get("duration_s"), - elevation_gain_m=parsed.get("elevation_gain_m"), - elevation_loss_m=parsed.get("elevation_loss_m"), - avg_heart_rate=parsed.get("avg_heart_rate"), - max_heart_rate=parsed.get("max_heart_rate"), - avg_cadence=parsed.get("avg_cadence"), - avg_power=parsed.get("avg_power"), - normalized_power=parsed.get("normalized_power"), - avg_speed_ms=parsed.get("avg_speed_ms"), - max_speed_ms=parsed.get("max_speed_ms"), - avg_temperature_c=parsed.get("avg_temperature_c"), - calories=parsed.get("calories"), - training_stress_score=parsed.get("training_stress_score"), - polyline=parsed.get("polyline"), - bounding_box=parsed.get("bounding_box"), - source_file=file_path, - source_type=parsed.get("source_type"), - hr_zones=hr_zones, - ) - db.add(activity) - await db.flush() + activity = Activity( + user_id=user_id, + name=parsed["name"], + sport_type=parsed["sport_type"], + start_time=start_time, + distance_m=parsed.get("distance_m"), + duration_s=parsed.get("duration_s"), + elevation_gain_m=parsed.get("elevation_gain_m"), + elevation_loss_m=parsed.get("elevation_loss_m"), + avg_heart_rate=parsed.get("avg_heart_rate"), + max_heart_rate=parsed.get("max_heart_rate"), + avg_cadence=parsed.get("avg_cadence"), + avg_power=parsed.get("avg_power"), + normalized_power=parsed.get("normalized_power"), + avg_speed_ms=parsed.get("avg_speed_ms"), + max_speed_ms=parsed.get("max_speed_ms"), + avg_temperature_c=parsed.get("avg_temperature_c"), + calories=parsed.get("calories"), + training_stress_score=parsed.get("training_stress_score"), + polyline=parsed.get("polyline"), + bounding_box=parsed.get("bounding_box"), + source_file=file_path, + source_type=parsed.get("source_type"), + hr_zones=hr_zones, + ) + db.add(activity) + db.flush() - # Insert data points in batches - points = parsed.get("data_points", []) - batch_size = 500 - for i in range(0, len(points), batch_size): - batch = points[i:i+batch_size] - db.add_all([ - ActivityDataPoint( - activity_id=activity.id, - timestamp=datetime.fromisoformat(p["timestamp"]) if p.get("timestamp") else None, - latitude=p.get("latitude"), - longitude=p.get("longitude"), - altitude_m=p.get("altitude_m"), - heart_rate=p.get("heart_rate"), - cadence=p.get("cadence"), - speed_ms=p.get("speed_ms"), - power=p.get("power"), - temperature_c=p.get("temperature_c"), - distance_m=p.get("distance_m"), - ) - for p in batch - ]) + # Insert data points in batches - dedupe (activity_id, timestamp) pairs + # since composite PK rejects duplicates and Garmin sometimes has same-second readings + seen = set() + points = parsed.get("data_points", []) + batch = [] + for p in points: + if not p.get("timestamp"): + continue + ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"] + key = (activity.id, ts) + if key in seen: + continue + seen.add(key) + batch.append(ActivityDataPoint( + activity_id=activity.id, + timestamp=ts, + latitude=p.get("latitude"), + longitude=p.get("longitude"), + altitude_m=p.get("altitude_m"), + heart_rate=p.get("heart_rate"), + cadence=p.get("cadence"), + speed_ms=p.get("speed_ms"), + power=p.get("power"), + temperature_c=p.get("temperature_c"), + distance_m=p.get("distance_m"), + )) + if len(batch) >= 500: + db.add_all(batch) + db.flush() + batch = [] + if batch: + db.add_all(batch) + db.flush() - # Insert laps - for lap in parsed.get("laps", []): - ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None - db.add(ActivityLap( - activity_id=activity.id, - lap_number=lap["lap_number"], - start_time=ls, - duration_s=lap.get("duration_s"), - distance_m=lap.get("distance_m"), - avg_heart_rate=lap.get("avg_heart_rate"), - avg_cadence=lap.get("avg_cadence"), - avg_speed_ms=lap.get("avg_speed_ms"), - avg_power=lap.get("avg_power"), - )) + # Laps + for lap in parsed.get("laps", []): + ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None + db.add(ActivityLap( + activity_id=activity.id, + lap_number=lap["lap_number"], + start_time=ls, + duration_s=lap.get("duration_s"), + distance_m=lap.get("distance_m"), + avg_heart_rate=lap.get("avg_heart_rate"), + avg_cadence=lap.get("avg_cadence"), + avg_speed_ms=lap.get("avg_speed_ms"), + avg_power=lap.get("avg_power"), + )) - await db.commit() - return activity.id - - activity_id = run_async(_insert()) - - if activity_id: - # Queue PR calculation - compute_personal_records.delay(activity_id, user_id, parsed) + db.commit() + activity_id = activity.id + # Queue PR calculation + compute_personal_records.delay(activity_id, user_id, parsed) return {"activity_id": activity_id, "status": "ok"} @@ -152,7 +151,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str): def compute_personal_records(activity_id: int, user_id: int, parsed: dict): """Calculate personal records for standard distances from this activity.""" from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES - from app.core.database import AsyncSessionLocal + from app.core.database import SyncSessionLocal from app.models.user import PersonalRecord from sqlalchemy import select from datetime import datetime, timezone @@ -165,93 +164,77 @@ def compute_personal_records(activity_id: int, user_id: int, parsed: dict): best_splits = compute_best_splits(data_points, total_dist) - async def _save(): - async with AsyncSessionLocal() as db: - for label, duration_s in best_splits.items(): - dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None) - if dist_m is None: - continue + with SyncSessionLocal() as db: + for label, duration_s in best_splits.items(): + dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None) + if dist_m is None: + continue - # Check existing record - existing = await db.execute( - select(PersonalRecord).where( - PersonalRecord.user_id == user_id, - PersonalRecord.sport_type == sport, - PersonalRecord.distance_m == dist_m, - PersonalRecord.is_current_record == True, - ) + current = db.execute( + select(PersonalRecord).where( + PersonalRecord.user_id == user_id, + PersonalRecord.sport_type == sport, + PersonalRecord.distance_m == dist_m, + PersonalRecord.is_current_record == True, ) - current = existing.scalar_one_or_none() + ).scalar_one_or_none() - if current is None or duration_s < current.duration_s: - if current: - current.is_current_record = False - db.add(PersonalRecord( - user_id=user_id, - activity_id=activity_id, - sport_type=sport, - distance_m=dist_m, - distance_label=label, - duration_s=duration_s, - achieved_at=start_time, - is_current_record=True, - )) - await db.commit() - - run_async(_save()) + if current is None or duration_s < current.duration_s: + if current: + current.is_current_record = False + db.add(PersonalRecord( + user_id=user_id, + activity_id=activity_id, + sport_type=sport, + distance_m=dist_m, + distance_label=label, + duration_s=duration_s, + achieved_at=start_time, + is_current_record=True, + )) + db.commit() @celery_app.task(name="process_garmin_health_zip") def process_garmin_health_zip(zip_path: str, user_id: int): - """ - Process a Garmin Connect data export zip. - Extracts wellness/sleep/HRV CSV files and inserts health metrics. - """ + """Extract wellness/sleep/HRV data from a Garmin Connect export ZIP.""" import zipfile import json - import csv - from pathlib import Path - from app.core.database import AsyncSessionLocal + from app.core.database import SyncSessionLocal from app.models.user import HealthMetric - from sqlalchemy.dialects.postgresql import insert from datetime import datetime, timezone - async def _process(): - async with AsyncSessionLocal() as db: - with zipfile.ZipFile(zip_path) as zf: - names = zf.namelist() + with SyncSessionLocal() as db: + with zipfile.ZipFile(zip_path) as zf: + for name in zf.namelist(): + if "DailyMetrics" not in name or not name.endswith(".json"): + continue + with zf.open(name) as f: + try: + data = json.load(f) + except Exception: + continue - # Parse daily summary JSON files from Garmin export - for name in names: - if "DailyMetrics" in name and name.endswith(".json"): - with zf.open(name) as f: - try: - data = json.load(f) - except Exception: - continue + date_str = data.get("calendarDate") or data.get("date") + if not date_str: + continue - date_str = data.get("calendarDate") or data.get("date") - if not date_str: - continue + try: + date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) + except ValueError: + continue - try: - date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) - except ValueError: - continue + metric = HealthMetric( + user_id=user_id, + date=date, + resting_hr=data.get("restingHeartRate"), + steps=data.get("totalSteps"), + floors_climbed=data.get("floorsAscended"), + active_calories=data.get("activeKilocalories"), + total_calories=data.get("totalKilocalories"), + avg_stress=data.get("averageStressLevel"), + spo2_avg=data.get("avgSpo2"), + ) + db.add(metric) - metric = HealthMetric( - user_id=user_id, - date=date, - resting_hr=data.get("restingHeartRate"), - steps=data.get("totalSteps"), - floors_climbed=data.get("floorsAscended"), - active_calories=data.get("activeKilocalories"), - total_calories=data.get("totalKilocalories"), - avg_stress=data.get("averageStressLevel"), - spo2_avg=data.get("avgSpo2"), - ) - db.add(metric) - - await db.commit() - - run_async(_process()) + db.commit() \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index 0b7b6a3..2788a83 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -21,4 +21,5 @@ polyline==2.0.2 Pillow==10.3.0 aiofiles==23.2.1 python-dateutil==2.9.0 -pytz==2024.1 \ No newline at end of file +pytz==2024.1 +psycopg2-binary==2.9.9 \ No newline at end of file