Use sync SQLAlchemy in Celery worker - fixes asyncpg connection issues
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||||
from sqlalchemy.orm import DeclarativeBase
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import DeclarativeBase, sessionmaker
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
|
# Async engine for FastAPI
|
||||||
engine = create_async_engine(
|
engine = create_async_engine(
|
||||||
settings.database_url,
|
settings.database_url,
|
||||||
echo=settings.environment == "development",
|
echo=settings.environment == "development",
|
||||||
@@ -15,6 +17,19 @@ AsyncSessionLocal = async_sessionmaker(
|
|||||||
expire_on_commit=False,
|
expire_on_commit=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Sync engine for Celery workers (Celery + asyncio don't mix well)
|
||||||
|
# Convert async URL to sync: postgresql+asyncpg:// → postgresql+psycopg2://
|
||||||
|
sync_url = settings.database_url.replace("postgresql+asyncpg://", "postgresql+psycopg2://")
|
||||||
|
sync_engine = create_engine(
|
||||||
|
sync_url,
|
||||||
|
echo=False,
|
||||||
|
pool_size=5,
|
||||||
|
max_overflow=10,
|
||||||
|
pool_pre_ping=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
SyncSessionLocal = sessionmaker(sync_engine, expire_on_commit=False)
|
||||||
|
|
||||||
|
|
||||||
class Base(DeclarativeBase):
|
class Base(DeclarativeBase):
|
||||||
pass
|
pass
|
||||||
|
|||||||
+158
-175
@@ -1,7 +1,10 @@
|
|||||||
"""
|
"""
|
||||||
Background tasks: activity ingestion, route matching, PR calculation.
|
Background tasks: activity ingestion, route matching, PR calculation.
|
||||||
|
|
||||||
|
Uses synchronous SQLAlchemy because Celery's prefork model doesn't play
|
||||||
|
well with asyncio - each worker process needs its own connection pool,
|
||||||
|
and async pools don't survive process forks.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
@@ -22,23 +25,14 @@ celery_app.conf.update(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_async(coro):
|
|
||||||
loop = asyncio.new_event_loop()
|
|
||||||
try:
|
|
||||||
return loop.run_until_complete(coro)
|
|
||||||
finally:
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, name="process_activity_file")
|
@celery_app.task(bind=True, name="process_activity_file")
|
||||||
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
||||||
"""Parse a FIT/GPX file and insert activity + data points into DB."""
|
"""Parse a FIT/GPX file and insert activity + data points into DB."""
|
||||||
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
|
from app.services.fit_parser import parse_fit_file, parse_gpx_file, calculate_hr_zones
|
||||||
from app.services.route_matcher import compute_best_splits, routes_are_similar
|
from app.core.database import SyncSessionLocal
|
||||||
from app.core.database import AsyncSessionLocal
|
from app.models.user import Activity, ActivityDataPoint, ActivityLap
|
||||||
from app.models.user import Activity, ActivityDataPoint, ActivityLap, PersonalRecord, HealthMetric
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime
|
||||||
|
|
||||||
self.update_state(state="PROGRESS", meta={"step": "parsing"})
|
self.update_state(state="PROGRESS", meta={"step": "parsing"})
|
||||||
|
|
||||||
@@ -50,101 +44,106 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise self.retry(exc=e, countdown=10, max_retries=3)
|
raise self.retry(exc=e, countdown=10, max_retries=3)
|
||||||
|
|
||||||
async def _insert():
|
with SyncSessionLocal() as db:
|
||||||
async with AsyncSessionLocal() as db:
|
# Check for duplicate
|
||||||
# Check for duplicate
|
if parsed.get("garmin_activity_id"):
|
||||||
if parsed.get("garmin_activity_id"):
|
existing = db.execute(
|
||||||
existing = await db.execute(
|
select(Activity).where(
|
||||||
select(Activity).where(
|
Activity.garmin_activity_id == parsed["garmin_activity_id"]
|
||||||
Activity.garmin_activity_id == parsed["garmin_activity_id"]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
if existing.scalar_one_or_none():
|
).scalar_one_or_none()
|
||||||
return None
|
if existing:
|
||||||
|
return {"activity_id": existing.id, "status": "duplicate"}
|
||||||
|
|
||||||
# HR zones
|
hr_zones = calculate_hr_zones(
|
||||||
hr_zones = calculate_hr_zones(
|
parsed.get("data_points", []),
|
||||||
parsed.get("data_points", []),
|
parsed.get("max_heart_rate") or 190
|
||||||
parsed.get("max_heart_rate") or 190
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# Create activity
|
start_time = datetime.fromisoformat(parsed["start_time"]) if parsed.get("start_time") else None
|
||||||
start_time = datetime.fromisoformat(parsed["start_time"]) if parsed.get("start_time") else None
|
|
||||||
|
|
||||||
activity = Activity(
|
activity = Activity(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
name=parsed["name"],
|
name=parsed["name"],
|
||||||
sport_type=parsed["sport_type"],
|
sport_type=parsed["sport_type"],
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
distance_m=parsed.get("distance_m"),
|
distance_m=parsed.get("distance_m"),
|
||||||
duration_s=parsed.get("duration_s"),
|
duration_s=parsed.get("duration_s"),
|
||||||
elevation_gain_m=parsed.get("elevation_gain_m"),
|
elevation_gain_m=parsed.get("elevation_gain_m"),
|
||||||
elevation_loss_m=parsed.get("elevation_loss_m"),
|
elevation_loss_m=parsed.get("elevation_loss_m"),
|
||||||
avg_heart_rate=parsed.get("avg_heart_rate"),
|
avg_heart_rate=parsed.get("avg_heart_rate"),
|
||||||
max_heart_rate=parsed.get("max_heart_rate"),
|
max_heart_rate=parsed.get("max_heart_rate"),
|
||||||
avg_cadence=parsed.get("avg_cadence"),
|
avg_cadence=parsed.get("avg_cadence"),
|
||||||
avg_power=parsed.get("avg_power"),
|
avg_power=parsed.get("avg_power"),
|
||||||
normalized_power=parsed.get("normalized_power"),
|
normalized_power=parsed.get("normalized_power"),
|
||||||
avg_speed_ms=parsed.get("avg_speed_ms"),
|
avg_speed_ms=parsed.get("avg_speed_ms"),
|
||||||
max_speed_ms=parsed.get("max_speed_ms"),
|
max_speed_ms=parsed.get("max_speed_ms"),
|
||||||
avg_temperature_c=parsed.get("avg_temperature_c"),
|
avg_temperature_c=parsed.get("avg_temperature_c"),
|
||||||
calories=parsed.get("calories"),
|
calories=parsed.get("calories"),
|
||||||
training_stress_score=parsed.get("training_stress_score"),
|
training_stress_score=parsed.get("training_stress_score"),
|
||||||
polyline=parsed.get("polyline"),
|
polyline=parsed.get("polyline"),
|
||||||
bounding_box=parsed.get("bounding_box"),
|
bounding_box=parsed.get("bounding_box"),
|
||||||
source_file=file_path,
|
source_file=file_path,
|
||||||
source_type=parsed.get("source_type"),
|
source_type=parsed.get("source_type"),
|
||||||
hr_zones=hr_zones,
|
hr_zones=hr_zones,
|
||||||
)
|
)
|
||||||
db.add(activity)
|
db.add(activity)
|
||||||
await db.flush()
|
db.flush()
|
||||||
|
|
||||||
# Insert data points in batches
|
# Insert data points in batches - dedupe (activity_id, timestamp) pairs
|
||||||
points = parsed.get("data_points", [])
|
# since composite PK rejects duplicates and Garmin sometimes has same-second readings
|
||||||
batch_size = 500
|
seen = set()
|
||||||
for i in range(0, len(points), batch_size):
|
points = parsed.get("data_points", [])
|
||||||
batch = points[i:i+batch_size]
|
batch = []
|
||||||
db.add_all([
|
for p in points:
|
||||||
ActivityDataPoint(
|
if not p.get("timestamp"):
|
||||||
activity_id=activity.id,
|
continue
|
||||||
timestamp=datetime.fromisoformat(p["timestamp"]) if p.get("timestamp") else None,
|
ts = datetime.fromisoformat(p["timestamp"]) if isinstance(p["timestamp"], str) else p["timestamp"]
|
||||||
latitude=p.get("latitude"),
|
key = (activity.id, ts)
|
||||||
longitude=p.get("longitude"),
|
if key in seen:
|
||||||
altitude_m=p.get("altitude_m"),
|
continue
|
||||||
heart_rate=p.get("heart_rate"),
|
seen.add(key)
|
||||||
cadence=p.get("cadence"),
|
batch.append(ActivityDataPoint(
|
||||||
speed_ms=p.get("speed_ms"),
|
activity_id=activity.id,
|
||||||
power=p.get("power"),
|
timestamp=ts,
|
||||||
temperature_c=p.get("temperature_c"),
|
latitude=p.get("latitude"),
|
||||||
distance_m=p.get("distance_m"),
|
longitude=p.get("longitude"),
|
||||||
)
|
altitude_m=p.get("altitude_m"),
|
||||||
for p in batch
|
heart_rate=p.get("heart_rate"),
|
||||||
])
|
cadence=p.get("cadence"),
|
||||||
|
speed_ms=p.get("speed_ms"),
|
||||||
|
power=p.get("power"),
|
||||||
|
temperature_c=p.get("temperature_c"),
|
||||||
|
distance_m=p.get("distance_m"),
|
||||||
|
))
|
||||||
|
if len(batch) >= 500:
|
||||||
|
db.add_all(batch)
|
||||||
|
db.flush()
|
||||||
|
batch = []
|
||||||
|
if batch:
|
||||||
|
db.add_all(batch)
|
||||||
|
db.flush()
|
||||||
|
|
||||||
# Insert laps
|
# Laps
|
||||||
for lap in parsed.get("laps", []):
|
for lap in parsed.get("laps", []):
|
||||||
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
|
ls = datetime.fromisoformat(lap["start_time"]) if lap.get("start_time") else None
|
||||||
db.add(ActivityLap(
|
db.add(ActivityLap(
|
||||||
activity_id=activity.id,
|
activity_id=activity.id,
|
||||||
lap_number=lap["lap_number"],
|
lap_number=lap["lap_number"],
|
||||||
start_time=ls,
|
start_time=ls,
|
||||||
duration_s=lap.get("duration_s"),
|
duration_s=lap.get("duration_s"),
|
||||||
distance_m=lap.get("distance_m"),
|
distance_m=lap.get("distance_m"),
|
||||||
avg_heart_rate=lap.get("avg_heart_rate"),
|
avg_heart_rate=lap.get("avg_heart_rate"),
|
||||||
avg_cadence=lap.get("avg_cadence"),
|
avg_cadence=lap.get("avg_cadence"),
|
||||||
avg_speed_ms=lap.get("avg_speed_ms"),
|
avg_speed_ms=lap.get("avg_speed_ms"),
|
||||||
avg_power=lap.get("avg_power"),
|
avg_power=lap.get("avg_power"),
|
||||||
))
|
))
|
||||||
|
|
||||||
await db.commit()
|
db.commit()
|
||||||
return activity.id
|
activity_id = activity.id
|
||||||
|
|
||||||
activity_id = run_async(_insert())
|
|
||||||
|
|
||||||
if activity_id:
|
|
||||||
# Queue PR calculation
|
|
||||||
compute_personal_records.delay(activity_id, user_id, parsed)
|
|
||||||
|
|
||||||
|
# Queue PR calculation
|
||||||
|
compute_personal_records.delay(activity_id, user_id, parsed)
|
||||||
return {"activity_id": activity_id, "status": "ok"}
|
return {"activity_id": activity_id, "status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
@@ -152,7 +151,7 @@ def process_activity_file(self, file_path: str, user_id: int, source_type: str):
|
|||||||
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
|
def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
|
||||||
"""Calculate personal records for standard distances from this activity."""
|
"""Calculate personal records for standard distances from this activity."""
|
||||||
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
|
from app.services.route_matcher import compute_best_splits, STANDARD_DISTANCES
|
||||||
from app.core.database import AsyncSessionLocal
|
from app.core.database import SyncSessionLocal
|
||||||
from app.models.user import PersonalRecord
|
from app.models.user import PersonalRecord
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -165,93 +164,77 @@ def compute_personal_records(activity_id: int, user_id: int, parsed: dict):
|
|||||||
|
|
||||||
best_splits = compute_best_splits(data_points, total_dist)
|
best_splits = compute_best_splits(data_points, total_dist)
|
||||||
|
|
||||||
async def _save():
|
with SyncSessionLocal() as db:
|
||||||
async with AsyncSessionLocal() as db:
|
for label, duration_s in best_splits.items():
|
||||||
for label, duration_s in best_splits.items():
|
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
|
||||||
dist_m = next((d for d, l in STANDARD_DISTANCES if l == label), None)
|
if dist_m is None:
|
||||||
if dist_m is None:
|
continue
|
||||||
continue
|
|
||||||
|
|
||||||
# Check existing record
|
current = db.execute(
|
||||||
existing = await db.execute(
|
select(PersonalRecord).where(
|
||||||
select(PersonalRecord).where(
|
PersonalRecord.user_id == user_id,
|
||||||
PersonalRecord.user_id == user_id,
|
PersonalRecord.sport_type == sport,
|
||||||
PersonalRecord.sport_type == sport,
|
PersonalRecord.distance_m == dist_m,
|
||||||
PersonalRecord.distance_m == dist_m,
|
PersonalRecord.is_current_record == True,
|
||||||
PersonalRecord.is_current_record == True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
current = existing.scalar_one_or_none()
|
).scalar_one_or_none()
|
||||||
|
|
||||||
if current is None or duration_s < current.duration_s:
|
if current is None or duration_s < current.duration_s:
|
||||||
if current:
|
if current:
|
||||||
current.is_current_record = False
|
current.is_current_record = False
|
||||||
db.add(PersonalRecord(
|
db.add(PersonalRecord(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
activity_id=activity_id,
|
activity_id=activity_id,
|
||||||
sport_type=sport,
|
sport_type=sport,
|
||||||
distance_m=dist_m,
|
distance_m=dist_m,
|
||||||
distance_label=label,
|
distance_label=label,
|
||||||
duration_s=duration_s,
|
duration_s=duration_s,
|
||||||
achieved_at=start_time,
|
achieved_at=start_time,
|
||||||
is_current_record=True,
|
is_current_record=True,
|
||||||
))
|
))
|
||||||
await db.commit()
|
db.commit()
|
||||||
|
|
||||||
run_async(_save())
|
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="process_garmin_health_zip")
|
@celery_app.task(name="process_garmin_health_zip")
|
||||||
def process_garmin_health_zip(zip_path: str, user_id: int):
|
def process_garmin_health_zip(zip_path: str, user_id: int):
|
||||||
"""
|
"""Extract wellness/sleep/HRV data from a Garmin Connect export ZIP."""
|
||||||
Process a Garmin Connect data export zip.
|
|
||||||
Extracts wellness/sleep/HRV CSV files and inserts health metrics.
|
|
||||||
"""
|
|
||||||
import zipfile
|
import zipfile
|
||||||
import json
|
import json
|
||||||
import csv
|
from app.core.database import SyncSessionLocal
|
||||||
from pathlib import Path
|
|
||||||
from app.core.database import AsyncSessionLocal
|
|
||||||
from app.models.user import HealthMetric
|
from app.models.user import HealthMetric
|
||||||
from sqlalchemy.dialects.postgresql import insert
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
async def _process():
|
with SyncSessionLocal() as db:
|
||||||
async with AsyncSessionLocal() as db:
|
with zipfile.ZipFile(zip_path) as zf:
|
||||||
with zipfile.ZipFile(zip_path) as zf:
|
for name in zf.namelist():
|
||||||
names = zf.namelist()
|
if "DailyMetrics" not in name or not name.endswith(".json"):
|
||||||
|
continue
|
||||||
|
with zf.open(name) as f:
|
||||||
|
try:
|
||||||
|
data = json.load(f)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
# Parse daily summary JSON files from Garmin export
|
date_str = data.get("calendarDate") or data.get("date")
|
||||||
for name in names:
|
if not date_str:
|
||||||
if "DailyMetrics" in name and name.endswith(".json"):
|
continue
|
||||||
with zf.open(name) as f:
|
|
||||||
try:
|
|
||||||
data = json.load(f)
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
date_str = data.get("calendarDate") or data.get("date")
|
try:
|
||||||
if not date_str:
|
date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
||||||
continue
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
metric = HealthMetric(
|
||||||
date = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
user_id=user_id,
|
||||||
except ValueError:
|
date=date,
|
||||||
continue
|
resting_hr=data.get("restingHeartRate"),
|
||||||
|
steps=data.get("totalSteps"),
|
||||||
|
floors_climbed=data.get("floorsAscended"),
|
||||||
|
active_calories=data.get("activeKilocalories"),
|
||||||
|
total_calories=data.get("totalKilocalories"),
|
||||||
|
avg_stress=data.get("averageStressLevel"),
|
||||||
|
spo2_avg=data.get("avgSpo2"),
|
||||||
|
)
|
||||||
|
db.add(metric)
|
||||||
|
|
||||||
metric = HealthMetric(
|
db.commit()
|
||||||
user_id=user_id,
|
|
||||||
date=date,
|
|
||||||
resting_hr=data.get("restingHeartRate"),
|
|
||||||
steps=data.get("totalSteps"),
|
|
||||||
floors_climbed=data.get("floorsAscended"),
|
|
||||||
active_calories=data.get("activeKilocalories"),
|
|
||||||
total_calories=data.get("totalKilocalories"),
|
|
||||||
avg_stress=data.get("averageStressLevel"),
|
|
||||||
spo2_avg=data.get("avgSpo2"),
|
|
||||||
)
|
|
||||||
db.add(metric)
|
|
||||||
|
|
||||||
await db.commit()
|
|
||||||
|
|
||||||
run_async(_process())
|
|
||||||
@@ -22,3 +22,4 @@ Pillow==10.3.0
|
|||||||
aiofiles==23.2.1
|
aiofiles==23.2.1
|
||||||
python-dateutil==2.9.0
|
python-dateutil==2.9.0
|
||||||
pytz==2024.1
|
pytz==2024.1
|
||||||
|
psycopg2-binary==2.9.9
|
||||||
Reference in New Issue
Block a user