Skip to content

feat(jobs): port 5 celery beat tasks from discovery-provider#834

Open
raymondjacobson wants to merge 1 commit into
mainfrom
api/parity-jobs
Open

feat(jobs): port 5 celery beat tasks from discovery-provider#834
raymondjacobson wants to merge 1 commit into
mainfrom
api/parity-jobs

Conversation

@raymondjacobson
Copy link
Copy Markdown
Member

Summary

Ports 5 periodic celery tasks from `apps/packages/discovery-provider` into the existing `api/jobs/` framework. Each job is its own file, follows the `Run` + `ScheduleEvery` + mutex-guarded `isRunning` pattern (matching `record_balance_history.go` / `reclaim_rent.go`), and is scheduled inside `CoreIndexer.Start` next to the existing aggregates calculator.

These fill gaps left after the discovery → go-openaudio cutover. Tables they read/write already exist in api/'s schema (verified against `sql/01_schema.sql`).

Jobs

File Interval Purpose
`index_hourly_play_counts.go` 30s Bucket new plays into `hourly_play_counts`. Powers `GET /v1/metrics/plays`.
`prune_plays.go` 30s DELETE plays older than ~400 days, 50k/run cap.
`index_user_listening_history.go` 5s Merge new plays into per-user `listening_history` JSONB (cap 1000 tracks/user).
`index_trending.go` 10s Score-only port. Refresh `aggregate_interval_plays` + `trending_params` MVs, then run pnagD + AnlGe trending strategy SQL templates verbatim into `track_trending_scores` + `playlist_trending_scores`.
`update_delist_statuses.go` 5min Poll `notifier.audius.co/statuses/{users,tracks}` with delegate-signed Authorization header, apply delists to `users.is_available` / `tracks.is_available` + `is_delete` / `is_deactivated`, advance `delist_status_cursor`.

Plus `delegate_auth.go`: `signedHTTPGet` helper porting apps' `basic_auth_nonce` (ETH personal-sign over keccak'd timestamp, base64'd `Basic :<sig_hex>` header).

Scoping decisions

  1. Trending: score-only. apps' `index_trending.py` also writes trending-mover notifications and calls `index_tastemaker` which dispatches challenge events. Both depend on the challenges/notification systems that aren't ported yet. The scoring half is what makes `/v1/tracks/trending`, `/v1/playlists/trending`, `/v1/tracks/trending/underground` non-empty — that's the user-facing parity.

  2. Trusted notifier hardcoded to `https://notifier.audius.co/\`. apps reads from config; we don't need multi-notifier support today. Easy promotion to a config knob later if needed.

  3. No new config required. `config.Cfg.DelegatePrivateKey` already exists.

Wiring

// indexer/indexer.go — CoreIndexer.Start
ci.startParityJobs(ctx)

`startParityJobs` instantiates each job with `ci.Config` and `ci.pool` and calls `ScheduleEvery`. Each job self-manages its goroutine and exits on `ctx.Done()` — they don't join the errgroup since they're best-effort background work.

Test plan

  • `go build ./...` clean
  • `go vet ./jobs/ ./indexer/` clean
  • `go test ./jobs/` passes against `test_jobs` template DB (api-db-1)
  • `TestHourlyPlayCountsJob_FullPipeline` — buckets, checkpoints, idempotent re-runs.
  • `TestHourlyPlayCountsJob_NoPlays` — no-op on empty plays table.
  • `TestPrunePlaysJob_DeletesOnlyOld` — recent plays survive, old ones go.
  • `TestPrunePlaysJob_RespectsBatchSize` — per-run cap honored.
  • `TestUserListeningHistoryJob_InsertsFirstHistory` — first batch sorts + counts correctly.
  • `TestUserListeningHistoryJob_MergesAcrossRuns` — `play_count` sums across batches.
  • `TestMergeListeningHistory_CapsToLimit` — JSONB capped to limit.
  • `TestTrendingJob_PopulatesScores` — scores land + DELETE-then-INSERT means no dupes on rerun.
  • `TestBasicAuthNonce_Shape` — Authorization header decodes to expected `:<sig_hex>` format.

Out of scope (called out in the parity discussion)

  • `update_aggregates` — separate job in `aggregates_calculator.go` already exists.
  • `index_aggregate_tips`, `index_metrics`, `index_profile_challenge_backfill` — explicitly skipped.
  • `index_challenges` — bigger effort, separate PR.
  • Trending notifications + tastemaker challenges — lands with the challenges work.

🤖 Generated with Claude Code

Adds five periodic jobs that mirror tasks from
apps/packages/discovery-provider's celery beat schedule, plugging gaps
left by the discovery → go-openaudio cutover. Each job follows the
existing api/jobs/ pattern (Run + ScheduleEvery + mutex-guarded
isRunning) and is wired into CoreIndexer.Start.

Jobs:
- index_hourly_play_counts: rolls up plays into hourly_play_counts
  every 30s; powers GET /v1/metrics/plays.
- prune_plays: deletes plays older than ~400 days, 50k/run cap.
- index_user_listening_history: maintains per-user listening_history
  JSONB blob from plays, capped at 1000 tracks/user.
- index_trending (score computation only): refreshes
  aggregate_interval_plays and trending_params MVs, then runs the
  trending-score SQL templates from apps' pnagD + AnlGe strategies
  verbatim into track_trending_scores + playlist_trending_scores.
  Notifications and tastemaker challenge dispatch are intentionally
  deferred — they depend on the challenges system which is a separate
  effort.
- update_delist_statuses: polls notifier.audius.co (hardcoded — apps
  reads from config but we don't need multi-notifier support today)
  for delist updates, applies them to users.is_available /
  tracks.is_available + is_delete, advances per-entity cursors in
  delist_status_cursor. Uses a new signedHTTPGet helper (delegate_auth.go)
  that ports apps' basic_auth_nonce: ETH personal-sign over a keccak'd
  timestamp, base64'd into a "Basic <ts>:<sig_hex>" header.

Tests:
- Pure unit: TestBasicAuthNonce_Shape, TestMergeListeningHistory_CapsToLimit.
- DB-backed (test_jobs template): TestHourlyPlayCountsJob_FullPipeline /
  NoPlays, TestPrunePlaysJob_DeletesOnlyOld / RespectsBatchSize,
  TestUserListeningHistoryJob_InsertsFirstHistory / MergesAcrossRuns,
  TestTrendingJob_PopulatesScores (idempotency check included).

All `go vet` + `go build` clean; `go test ./jobs/` passes against
api-db-1 template DBs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant