diff --git a/ddl/migrations/0203_seed_phase_1_challenges.sql b/ddl/migrations/0203_seed_phase_1_challenges.sql new file mode 100644 index 00000000..743d4c2c --- /dev/null +++ b/ddl/migrations/0203_seed_phase_1_challenges.sql @@ -0,0 +1,41 @@ +-- Seed the challenges catalog rows for Phase 1 challenge processors. +-- +-- Values mirror apps/packages/discovery-provider/src/challenges/challenges.json +-- as of this writing. Production may already have these rows (from earlier +-- migrations or seeded another way); we use ON CONFLICT DO UPDATE so the +-- catalog stays aligned with the JSON source-of-truth — matching apps' +-- create_new_challenges.py behavior. +-- +-- Phase 1 set: +-- p profile completion (numeric, 7 steps) +-- u track upload (numeric, 3 tracks) +-- fp first playlist (boolean) +-- v connect-verified (boolean) +-- e listen streak (aggregate; currently inactive) +-- p1/p2/p3 play count milestones (numeric, 250/1k/10k) +-- tt/tut/tp trending track/under/playlist (trending) + +BEGIN; + +INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES + ('p', 'numeric', '1', true, 7, 0, 25000, 7), + ('u', 'numeric', '1', true, 3, 25346436, 25000, 7), + ('fp', 'boolean', '2', true, NULL, 28350000, 25000, 7), + ('v', 'boolean', '5', true, NULL, 0, 25000, NULL), + ('e', 'aggregate', '1', false, 2147483647, 116023891,25000, NULL), + ('p1', 'numeric', '25', true, 250, 0, 2147483647, 7), + ('p2', 'numeric', '100', true, 1000, 0, 2147483647, 7), + ('p3', 'numeric', '1000', true, 10000, 0, 2147483647, 7), + ('tt', 'trending', '1000', true, NULL, 25346436, 100000, NULL), + ('tut', 'trending', '1000', true, NULL, 25346436, 100000, NULL), + ('tp', 'trending', '100', true, NULL, 25346436, 10000, NULL) +ON CONFLICT (id) DO UPDATE SET + type = EXCLUDED.type, + amount = EXCLUDED.amount, + active = EXCLUDED.active, + step_count = EXCLUDED.step_count, + starting_block = EXCLUDED.starting_block, + weekly_pool = EXCLUDED.weekly_pool, + cooldown_days = EXCLUDED.cooldown_days; + +COMMIT; diff --git a/indexer/indexer.go b/indexer/indexer.go index 122113b7..71ac81f9 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -94,6 +94,11 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Minute) + + // Reconcile derived challenge state from source tables. Per-challenge + // scanners live in api/jobs/challenges/. + jobs.NewIndexChallengesJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 30*time.Second) } func (ci *CoreIndexer) run(ctx context.Context) error { diff --git a/jobs/challenges/first_playlist.go b/jobs/challenges/first_playlist.go new file mode 100644 index 00000000..1902a26f --- /dev/null +++ b/jobs/challenges/first_playlist.go @@ -0,0 +1,69 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// FirstPlaylistProcessor implements challenge "fp" — boolean: the user has +// created at least one playlist. +// Mirrors apps/packages/discovery-provider/src/challenges/first_playlist_challenge.py +// (Python just sets is_complete=true when an event fires; we derive it from +// playlists table state directly). +type FirstPlaylistProcessor struct{} + +func (p *FirstPlaylistProcessor) ChallengeID() string { return "fp" } + +func (p *FirstPlaylistProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + startingBlock := int32(0) + if c.StartingBlock != nil { + startingBlock = *c.StartingBlock + } + amount := c.AmountInt() + + // Find every user with at least one non-deleted playlist at or after + // the starting block. Boolean challenges complete in a single step + // (step_count is null/0 — we treat current_step_count=1, step=1). + rows, err := tx.Query(ctx, ` + SELECT DISTINCT playlist_owner_id + FROM playlists + WHERE is_current = true + AND is_delete = false + AND blocknumber >= $1 + `, startingBlock) + if err != nil { + return fmt.Errorf("scan playlists: %w", err) + } + var userIDs []int64 + for rows.Next() { + var userID int64 + if err := rows.Scan(&userID); err != nil { + rows.Close() + return err + } + userIDs = append(userIDs, userID) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, userID := range userIDs { + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), SpecifierFromUserID(userID), + userID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/first_playlist_test.go b/jobs/challenges/first_playlist_test.go new file mode 100644 index 00000000..d984103c --- /dev/null +++ b/jobs/challenges/first_playlist_test.go @@ -0,0 +1,28 @@ +package challenges + +import ( + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" +) + +func TestFirstPlaylist_CompletesOnAnyPlaylist(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_28350001", "number": 28350001}}, + "users": {{"user_id": 100, "wallet": "0x100"}, {"user_id": 101, "wallet": "0x101"}}, + "playlists": {{"playlist_id": 1, "playlist_owner_id": 100, "blocknumber": 28350001}}, + }) + + runProcessor(t, pool, &FirstPlaylistProcessor{}) + + r1, ok := queryUserChallenge(t, pool, "fp", fmt.Sprintf("%x", 100)) + if assert.True(t, ok) { + assert.True(t, r1.IsComplete) + assert.Equal(t, int32(2), r1.Amount, "amount=2 per challenges.json") + } + _, ok = queryUserChallenge(t, pool, "fp", fmt.Sprintf("%x", 101)) + assert.False(t, ok, "user 101 has no playlist; no row") +} diff --git a/jobs/challenges/listen_streak.go b/jobs/challenges/listen_streak.go new file mode 100644 index 00000000..838cf7f4 --- /dev/null +++ b/jobs/challenges/listen_streak.go @@ -0,0 +1,237 @@ +package challenges + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" +) + +// ListenStreakProcessor implements challenge "e" — daily listen streak. +// Mirrors apps' listen_streak_endless_challenge.py. +// +// State lives in `challenge_listen_streak` (last_listen_date, listen_streak). +// Rules (per apps): +// - First listen: streak = 1, last_listen_date = play timestamp. +// - Subsequent listen >= 16h after last_listen_date: advance streak by 1. +// - If >= 48h gap: reset streak to 1. +// - Otherwise (<16h): ignore. +// +// user_challenges rows have specifier "". When the +// streak crosses 7 days the row is completed; subsequent "endless" rows are +// minted at amount=1 each. +// +// NOTE: in challenges.json this challenge is currently active=false, so the +// processor is a no-op at present. Reconcile still does the right thing if +// it gets enabled. +type ListenStreakProcessor struct{} + +func (p *ListenStreakProcessor) ChallengeID() string { return "e" } + +const ( + listenStreakNextWindow = 16 * time.Hour + listenStreakBrokenWindow = 48 * time.Hour + listenStreakTarget = int32(7) +) + +const listenStreakCheckpoint = "challenges:e:last_play_id" + +// listenStreakPlay is one row from the plays scan. +type listenStreakPlay struct { + id int64 + userID int64 + createdAt time.Time +} + +func (p *ListenStreakProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + prev, err := readCheckpointInt(ctx, tx, listenStreakCheckpoint) + if err != nil { + return fmt.Errorf("read checkpoint: %w", err) + } + + // Pull new plays ordered by id. We deliberately use id ordering rather + // than created_at because id is monotonic per insert and matches the + // invariant apps' Python pipeline assumes (plays are processed in + // arrival order). + rows, err := tx.Query(ctx, ` + SELECT id, user_id, created_at + FROM plays + WHERE id > $1 AND user_id IS NOT NULL + ORDER BY id ASC + LIMIT 50000 + `, prev) + if err != nil { + return fmt.Errorf("scan plays: %w", err) + } + var plays []listenStreakPlay + for rows.Next() { + var r listenStreakPlay + if err := rows.Scan(&r.id, &r.userID, &r.createdAt); err != nil { + rows.Close() + return err + } + plays = append(plays, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + if len(plays) == 0 { + return nil + } + + // Apply transitions in-process. We load existing streak state for + // users with new plays in one go. + userIDs := uniqueUserIDs(plays) + type streakState struct { + lastListen *time.Time + streak int32 + } + state := make(map[int64]*streakState, len(userIDs)) + + srows, err := tx.Query(ctx, ` + SELECT user_id, last_listen_date, listen_streak + FROM challenge_listen_streak + WHERE user_id = ANY($1) + `, userIDs) + if err != nil { + return fmt.Errorf("load streak state: %w", err) + } + for srows.Next() { + var uid int64 + var last *time.Time + var streak int32 + if err := srows.Scan(&uid, &last, &streak); err != nil { + srows.Close() + return err + } + state[uid] = &streakState{lastListen: last, streak: streak} + } + srows.Close() + if err := srows.Err(); err != nil { + return err + } + + // Walk plays in order, applying the transition rules per user. Each + // play either advances or resets a streak — we batch the writes after + // the simulation. + type advance struct { + newStreak int32 + whenLogged time.Time + } + advancesByUser := make(map[int64][]advance) + + for _, pl := range plays { + s, ok := state[pl.userID] + if !ok { + s = &streakState{lastListen: nil, streak: 0} + state[pl.userID] = s + } + if s.lastListen == nil { + s.streak = 1 + t := pl.createdAt + s.lastListen = &t + advancesByUser[pl.userID] = append(advancesByUser[pl.userID], advance{1, pl.createdAt}) + continue + } + gap := pl.createdAt.Sub(*s.lastListen) + if gap < listenStreakNextWindow { + continue // too soon, ignore + } + if gap >= listenStreakBrokenWindow { + s.streak = 1 + } else { + s.streak++ + } + t := pl.createdAt + s.lastListen = &t + advancesByUser[pl.userID] = append(advancesByUser[pl.userID], advance{s.streak, pl.createdAt}) + } + + // Write updated streak state per user. + for userID, s := range state { + if _, err := tx.Exec(ctx, ` + INSERT INTO challenge_listen_streak (user_id, last_listen_date, listen_streak) + VALUES ($1, $2, $3) + ON CONFLICT (user_id) DO UPDATE SET + last_listen_date = EXCLUDED.last_listen_date, + listen_streak = EXCLUDED.listen_streak + `, userID, s.lastListen, s.streak); err != nil { + return fmt.Errorf("upsert streak state: %w", err) + } + } + + // For each advance, mint or update the relevant user_challenge row. + // Specifier format follows apps' new post-cutover format: + // first 7 days: ":YYYYMMDDHH" of *new streak boundary* + // endless : same, one row per day after 7 + for userID, advances := range advancesByUser { + for _, a := range advances { + specifier := fmt.Sprintf("%x%s", userID, a.whenLogged.UTC().Format("2006010215")) + // In the first-7-day window, current_step_count = streak (1..7), + // is_complete when streak >= 7. After 7, this row is the + // endless +1-per-day reward (step_count = 1, amount = 1). + var stepCount int32 = listenStreakTarget + report := a.newStreak + if a.newStreak > listenStreakTarget { + stepCount = 1 + report = 1 + } + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, userID, report, stepCount, amount, + ); err != nil { + return fmt.Errorf("upsert listen-streak user_challenge: %w", err) + } + } + } + + // Advance checkpoint to the last play id we processed. + if err := writeCheckpointInt(ctx, tx, listenStreakCheckpoint, plays[len(plays)-1].id); err != nil { + return fmt.Errorf("save checkpoint: %w", err) + } + return nil +} + +func uniqueUserIDs(plays []listenStreakPlay) []int64 { + seen := make(map[int64]struct{}, len(plays)) + out := make([]int64, 0, len(plays)) + for _, p := range plays { + if _, ok := seen[p.userID]; ok { + continue + } + seen[p.userID] = struct{}{} + out = append(out, p.userID) + } + return out +} + +// readCheckpointInt reads a named integer checkpoint from indexing_checkpoints, +// returning 0 if absent. +func readCheckpointInt(ctx context.Context, tx pgx.Tx, name string) (int64, error) { + var v int64 + err := tx.QueryRow(ctx, "SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1", name).Scan(&v) + if errors.Is(err, pgx.ErrNoRows) { + return 0, nil + } + return v, err +} + +func writeCheckpointInt(ctx context.Context, tx pgx.Tx, name string, value int64) error { + _, err := tx.Exec(ctx, ` + INSERT INTO indexing_checkpoints (tablename, last_checkpoint) + VALUES ($1, $2) + ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint + `, name, value) + return err +} diff --git a/jobs/challenges/listen_streak_test.go b/jobs/challenges/listen_streak_test.go new file mode 100644 index 00000000..a576689b --- /dev/null +++ b/jobs/challenges/listen_streak_test.go @@ -0,0 +1,72 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestListenStreak_SkippedWhenInactive — challenge "e" is active=false in +// challenges.json by default. Until/unless apps enables it, the processor +// is a no-op. This test pins that behavior. +func TestListenStreak_SkippedWhenInactive(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_ls", "number": 1}}, + "users": {{"user_id": 600, "wallet": "0x600"}}, + "tracks": {{"track_id": 6000, "owner_id": 600, "title": "T", "blocknumber": 1}}, + "plays": { + {"id": 1, "user_id": 600, "play_item_id": 6000, "created_at": time.Now()}, + }, + }) + + runProcessor(t, pool, &ListenStreakProcessor{}) + + // Nothing in challenge_listen_streak because the catalog row is inactive. + var n int + require.NoError(t, pool.QueryRow(context.Background(), + "SELECT COUNT(*) FROM challenge_listen_streak").Scan(&n)) + assert.Equal(t, 0, n) +} + +// TestListenStreak_AdvancesAcrossDays — exercises the active path by +// activating the catalog row, then feeding plays at increasing day +// boundaries. Expects streak to count up. +func TestListenStreak_AdvancesAcrossDays(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + _, err := pool.Exec(ctx, "UPDATE challenges SET active = true WHERE id = 'e'") + require.NoError(t, err) + + day := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_ls", "number": 1}}, + "users": {{"user_id": 610, "wallet": "0x610"}}, + "tracks": {{"track_id": 6100, "owner_id": 610, "title": "T", "blocknumber": 1}}, + "plays": { + {"id": 10, "user_id": 610, "play_item_id": 6100, "created_at": day}, + {"id": 11, "user_id": 610, "play_item_id": 6100, "created_at": day.Add(20 * time.Hour)}, // streak = 2 + {"id": 12, "user_id": 610, "play_item_id": 6100, "created_at": day.Add(40 * time.Hour)}, // streak = 3 + }, + }) + + runProcessor(t, pool, &ListenStreakProcessor{}) + + var streak int32 + require.NoError(t, pool.QueryRow(ctx, + "SELECT listen_streak FROM challenge_listen_streak WHERE user_id = 610").Scan(&streak)) + assert.Equal(t, int32(3), streak, "3 plays at >16h gaps → streak = 3") + + // Should have written 3 user_challenge rows (one per advance), each + // with specifier {hex_uid}{YYYYMMDDHH}. + for _, ts := range []time.Time{day, day.Add(20 * time.Hour), day.Add(40 * time.Hour)} { + spec := fmt.Sprintf("%x%s", 610, ts.UTC().Format("2006010215")) + _, ok := queryUserChallenge(t, pool, "e", spec) + assert.True(t, ok, "expected user_challenge row for advance at %s", ts) + } +} diff --git a/jobs/challenges/play_count_milestones.go b/jobs/challenges/play_count_milestones.go new file mode 100644 index 00000000..4f2b6f42 --- /dev/null +++ b/jobs/challenges/play_count_milestones.go @@ -0,0 +1,117 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// PlayCountMilestone implements challenges p1/p2/p3 — the 250/1k/10k play +// milestones from 2025 onward, restricted to verified artists. +// Mirrors apps/packages/discovery-provider/src/challenges/play_count_milestone_challenge_base.py. +// +// The play count is summed from aggregate_monthly_plays where timestamp >= 2025-01-01, +// joined to tracks the user owns. Each milestone has a "previous milestone" +// requirement: p2 requires p1 complete, p3 requires p2 complete (matches +// Python). +type PlayCountMilestone struct { + ID string + Threshold int32 // step_count override (we still load from challenges row) + PreviousMilestone string + SpecifierMilestone string // appended to specifier; matches Python's "hex:250"/"hex:1000"/"hex:10000" +} + +// Per-milestone factory funcs. Each returns a Processor. +func NewPlayCount250Processor() Processor { + return &PlayCountMilestone{ID: "p1", SpecifierMilestone: "250"} +} +func NewPlayCount1000Processor() Processor { + return &PlayCountMilestone{ID: "p2", PreviousMilestone: "p1", SpecifierMilestone: "1000"} +} +func NewPlayCount10000Processor() Processor { + return &PlayCountMilestone{ID: "p3", PreviousMilestone: "p2", SpecifierMilestone: "10000"} +} + +func (p *PlayCountMilestone) ChallengeID() string { return p.ID } + +func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ID) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active || c.StepCount == nil { + return nil + } + stepCount := *c.StepCount + amount := c.AmountInt() + + // Find verified artists with their total 2025+ play count. We use + // aggregate_monthly_plays which is cheap to scan. + // + // The SQL only takes the (optional) previous-milestone id as a + // parameter; stepCount is applied in Go after the scan. + prevFilter := "true" + var prevArgs []any + if p.PreviousMilestone != "" { + prevFilter = `EXISTS ( + SELECT 1 FROM user_challenges uc + WHERE uc.challenge_id = $1 + AND uc.user_id = u.user_id + AND uc.is_complete = true + )` + prevArgs = []any{p.PreviousMilestone} + } + + query := fmt.Sprintf(` + SELECT u.user_id, COALESCE(SUM(amp.count), 0)::int AS play_count + FROM users u + JOIN tracks t ON t.owner_id = u.user_id + AND t.is_current = true + AND t.is_delete = false + JOIN aggregate_monthly_plays amp ON amp.play_item_id = t.track_id + AND amp.timestamp >= DATE '2025-01-01' + WHERE u.is_current = true + AND u.is_verified = true + AND u.is_deactivated = false + AND %s + GROUP BY u.user_id + HAVING COALESCE(SUM(amp.count), 0) > 0 + `, prevFilter) + + rows, err := tx.Query(ctx, query, prevArgs...) + if err != nil { + return fmt.Errorf("scan play counts: %w", err) + } + type pcRow struct { + userID int64 + playCount int32 + } + var results []pcRow + for rows.Next() { + var r pcRow + if err := rows.Scan(&r.userID, &r.playCount); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + report := r.playCount + if report > stepCount { + report = stepCount + } + specifier := SpecifierFromUserID(r.userID) + ":" + p.SpecifierMilestone + if err := UpsertUserChallenge(ctx, tx, + p.ID, specifier, r.userID, report, stepCount, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/play_count_milestones_test.go b/jobs/challenges/play_count_milestones_test.go new file mode 100644 index 00000000..3572bd72 --- /dev/null +++ b/jobs/challenges/play_count_milestones_test.go @@ -0,0 +1,69 @@ +package challenges + +import ( + "fmt" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlayCount250_RequiresVerified(t *testing.T) { + pool := withChallengesDB(t) + t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_pcm", "number": 1}}, + "users": { + {"user_id": 400, "wallet": "0x400", "is_verified": true}, + {"user_id": 401, "wallet": "0x401", "is_verified": false}, + }, + "tracks": { + {"track_id": 4001, "owner_id": 400, "title": "T", "blocknumber": 1}, + {"track_id": 4011, "owner_id": 401, "title": "T", "blocknumber": 1}, + }, + "aggregate_monthly_plays": { + {"play_item_id": 4001, "timestamp": t2025, "count": 300, "country": ""}, + {"play_item_id": 4011, "timestamp": t2025, "count": 300, "country": ""}, + }, + }) + + runProcessor(t, pool, NewPlayCount250Processor()) + + r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 400)+":250") + require.True(t, ok, "verified user 400 should get a p1 row") + assert.True(t, r1.IsComplete) + assert.Equal(t, int32(250), *r1.CurrentStepCount) + assert.Equal(t, int32(25), r1.Amount) + + _, ok = queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 401)+":250") + assert.False(t, ok, "unverified user gets no row") +} + +func TestPlayCount1000_GatedOnPrevious(t *testing.T) { + pool := withChallengesDB(t) + t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_pcm2", "number": 1}}, + "users": {{"user_id": 410, "wallet": "0x410", "is_verified": true}}, + "tracks": {{"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}}, + "aggregate_monthly_plays": { + {"play_item_id": 4101, "timestamp": t2025, "count": 1500, "country": ""}, + }, + }) + + // Without p1 completed, p2 should not create a row. + runProcessor(t, pool, NewPlayCount1000Processor()) + _, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") + assert.False(t, ok, "p2 gated on p1 completion") + + // Complete p1 first. + runProcessor(t, pool, NewPlayCount250Processor()) + // Now p2 should land. + runProcessor(t, pool, NewPlayCount1000Processor()) + r, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") + if assert.True(t, ok) { + assert.True(t, r.IsComplete) + } +} diff --git a/jobs/challenges/processor.go b/jobs/challenges/processor.go new file mode 100644 index 00000000..19650b8e --- /dev/null +++ b/jobs/challenges/processor.go @@ -0,0 +1,124 @@ +// Package challenges implements the discovery-provider "challenges" reward +// system in api/. Each challenge is a Processor that scans source tables on +// a schedule and reconciles state in user_challenges + the per-challenge +// state table. +// +// Architecture vs the legacy Python stack: +// +// - Python uses a Redis event bus: every entity_manager handler dispatches +// a ChallengeEvent (track_upload, follow, favorite, etc.), index_challenges +// drains the queue and calls per-challenge ChallengeManager.process(). +// - We don't have a queue. Producers (chain indexer) write base rows; +// consumers (these processors) reconcile derived state from those rows on +// a tick. This is idempotent, restart-safe, and backfill-friendly. The +// consistency model matches Python's anyway: Python's challenge processing +// is async and rate-limited; we're just batching the work differently. +// +// Each processor owns: +// +// - A ChallengeID() returning the catalog id from the `challenges` table. +// - A Reconcile(ctx, tx) that reads source tables and applies updates to +// user_challenges (+ any per-challenge state table like +// challenge_profile_completion). +// +// The umbrella IndexChallengesJob runs them sequentially per tick. Failures +// in one processor don't stop others — each runs in its own transaction. +package challenges + +import ( + "context" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// Processor is the per-challenge interface. Implementations should be +// stateless w.r.t. process memory (state lives in DB). +type Processor interface { + // ChallengeID is the row id in the `challenges` table. + ChallengeID() string + // Reconcile reads source tables and applies updates. Must be + // idempotent — re-running with no source changes must not change state. + // Tx is begun + committed by the umbrella job; processors should not + // commit or rollback themselves. + Reconcile(ctx context.Context, tx pgx.Tx) error +} + +// LoadChallenge reads a single row from `challenges` by id. +// Returns ok=false if the row is absent. +func LoadChallenge(ctx context.Context, tx pgx.Tx, id string) (Challenge, bool, error) { + var c Challenge + err := tx.QueryRow(ctx, ` + SELECT id, type::text, amount, active, step_count, starting_block, weekly_pool, cooldown_days + FROM challenges WHERE id = $1 + `, id).Scan(&c.ID, &c.Type, &c.Amount, &c.Active, &c.StepCount, &c.StartingBlock, &c.WeeklyPool, &c.CooldownDays) + if errors.Is(err, pgx.ErrNoRows) { + return Challenge{}, false, nil + } + return c, err == nil, err +} + +// Challenge mirrors the public.challenges row shape. +type Challenge struct { + ID string + Type string + Amount string // varchar in DB + Active bool + StepCount *int32 + StartingBlock *int32 + WeeklyPool *int32 + CooldownDays *int32 +} + +// AmountInt parses Amount, defaulting to 0 on error. +func (c Challenge) AmountInt() int32 { + v := int32(0) + if _, err := fmt.Sscanf(c.Amount, "%d", &v); err != nil { + return 0 + } + return v +} + +// SpecifierFromUserID matches Python's default ChallengeUpdater.generate_specifier: +// +// hex(user_id)[2:] +// +// Lowercase hex, no "0x" prefix, no left-padding. +func SpecifierFromUserID(userID int64) string { + return fmt.Sprintf("%x", userID) +} + +// UpsertUserChallenge inserts a user_challenges row or updates progress. +// Mirrors the rows ChallengeManager.process() would write in Python. +// +// completedAt is set/cleared from current_step_count >= stepCount; when a +// row newly completes we use blockTime for completed_at if non-zero, else now(). +func UpsertUserChallenge( + ctx context.Context, + tx pgx.Tx, + challengeID, specifier string, + userID int64, + currentStepCount, stepCount int32, + amount int32, +) error { + isComplete := stepCount > 0 && currentStepCount >= stepCount + _, err := tx.Exec(ctx, ` + INSERT INTO user_challenges + (challenge_id, user_id, specifier, is_complete, current_step_count, amount, created_at, completed_at) + VALUES ($1, $2, $3, $4, $5, $6, now(), CASE WHEN $4 THEN now() ELSE NULL END) + ON CONFLICT (challenge_id, specifier) DO UPDATE SET + current_step_count = EXCLUDED.current_step_count, + is_complete = user_challenges.is_complete OR EXCLUDED.is_complete, + amount = CASE + WHEN user_challenges.is_complete THEN user_challenges.amount + ELSE EXCLUDED.amount + END, + completed_at = CASE + WHEN user_challenges.is_complete THEN user_challenges.completed_at + WHEN EXCLUDED.is_complete THEN now() + ELSE user_challenges.completed_at + END + `, challengeID, userID, specifier, isComplete, currentStepCount, amount) + return err +} diff --git a/jobs/challenges/processor_test.go b/jobs/challenges/processor_test.go new file mode 100644 index 00000000..d0f34650 --- /dev/null +++ b/jobs/challenges/processor_test.go @@ -0,0 +1,91 @@ +package challenges + +import ( + "context" + "testing" + + "api.audius.co/database" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" +) + +// withChallengesDB returns a test DB pool with the Phase 1 challenges +// catalog rows seeded. Each test that calls this re-seeds from scratch. +func withChallengesDB(t *testing.T) *pgxpool.Pool { + t.Helper() + pool := database.CreateTestDatabase(t, "test_jobs") + t.Cleanup(func() { pool.Close() }) + + // Seed Phase 1 challenges catalog inline. We don't run the production + // migration here because the test_jobs template DB isn't routed through + // the ddl runner — keeping the seed local to the test makes intent + // clearer too. + ctx := context.Background() + rows := []struct { + id, typ, amount string + active bool + stepCount *int32 + startingBlock int32 + weeklyPool int32 + cooldownDays *int32 + }{ + {"p", "numeric", "1", true, i32p(7), 0, 25000, i32p(7)}, + {"u", "numeric", "1", true, i32p(3), 25346436, 25000, i32p(7)}, + {"fp", "boolean", "2", true, nil, 28350000, 25000, i32p(7)}, + {"v", "boolean", "5", true, nil, 0, 25000, nil}, + {"e", "aggregate", "1", false, i32p(2147483647), 116023891, 25000, nil}, + {"p1", "numeric", "25", true, i32p(250), 0, 2147483647, i32p(7)}, + {"p2", "numeric", "100", true, i32p(1000), 0, 2147483647, i32p(7)}, + {"p3", "numeric", "1000", true, i32p(10000), 0, 2147483647, i32p(7)}, + {"tt", "trending", "1000", true, nil, 25346436, 100000, nil}, + {"tut", "trending", "1000", true, nil, 25346436, 100000, nil}, + {"tp", "trending", "100", true, nil, 25346436, 10000, nil}, + } + for _, r := range rows { + _, err := pool.Exec(ctx, ` + INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) + VALUES ($1, $2::challengetype, $3, $4, $5, $6, $7, $8) + ON CONFLICT (id) DO NOTHING + `, r.id, r.typ, r.amount, r.active, r.stepCount, r.startingBlock, r.weeklyPool, r.cooldownDays) + require.NoError(t, err) + } + return pool +} + +func i32p(v int32) *int32 { return &v } + +// runProcessor opens a tx, runs Reconcile, and commits — matches the +// umbrella job's per-processor unit-of-work. +func runProcessor(t *testing.T, pool *pgxpool.Pool, p Processor) { + t.Helper() + ctx := context.Background() + tx, err := pool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + require.NoError(t, p.Reconcile(ctx, tx)) + require.NoError(t, tx.Commit(ctx)) +} + +// queryUserChallenge returns the user_challenges row for (challenge, specifier). +type ucRow struct { + UserID int64 + IsComplete bool + CurrentStepCount *int32 + Amount int32 +} + +func queryUserChallenge(t *testing.T, pool *pgxpool.Pool, challengeID, specifier string) (ucRow, bool) { + t.Helper() + var r ucRow + err := pool.QueryRow(context.Background(), ` + SELECT user_id, is_complete, current_step_count, amount + FROM user_challenges + WHERE challenge_id = $1 AND specifier = $2 + `, challengeID, specifier).Scan(&r.UserID, &r.IsComplete, &r.CurrentStepCount, &r.Amount) + if err == pgx.ErrNoRows { + return ucRow{}, false + } + require.NoError(t, err) + return r, true +} diff --git a/jobs/challenges/profile_completion.go b/jobs/challenges/profile_completion.go new file mode 100644 index 00000000..275e98b0 --- /dev/null +++ b/jobs/challenges/profile_completion.go @@ -0,0 +1,159 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// ProfileCompletionProcessor implements challenge "p" — 7 boolean steps: +// +// profile_description (bio set) +// profile_name (name set) +// profile_picture (profile_picture or profile_picture_sizes set) +// profile_cover_photo (cover_photo or cover_photo_sizes set) +// follows (>= 5 follows) +// reposts (>= 1 repost) +// favorites (>= 1 favorite) +// +// State lives in the `challenge_profile_completion` table; user_challenges' +// current_step_count is the sum of the 7 booleans. +// +// Mirrors apps/packages/discovery-provider/src/challenges/profile_challenge.py. +type ProfileCompletionProcessor struct{} + +func (p *ProfileCompletionProcessor) ChallengeID() string { return "p" } + +const ( + profileFollowThreshold = 5 + profileRepostThreshold = 1 + profileFavoriteThreshold = 1 +) + +func (p *ProfileCompletionProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active || c.StepCount == nil { + return nil + } + stepCount := *c.StepCount // should be 7 + amount := c.AmountInt() + + // Recompute every step from scratch for users with any in-flight + // or completable progress. We use a single CTE-driven query that + // returns one row per user with the seven booleans. + // + // We only consider users with handle_lc set (i.e. real accounts), + // matching apps' downstream behavior — anonymous/guest users don't + // earn challenges. + rows, err := tx.Query(ctx, ` + WITH active_users AS ( + SELECT user_id, bio, name, + profile_picture, profile_picture_sizes, + cover_photo, cover_photo_sizes + FROM users + WHERE is_current = true + AND handle_lc IS NOT NULL + AND is_deactivated = false + ), + follow_counts AS ( + SELECT follower_user_id AS user_id, COUNT(*) AS n + FROM follows + WHERE is_current = true AND is_delete = false + GROUP BY follower_user_id + ), + repost_counts AS ( + SELECT user_id, COUNT(*) AS n + FROM reposts + WHERE is_current = true AND is_delete = false + GROUP BY user_id + ), + save_counts AS ( + SELECT user_id, COUNT(*) AS n + FROM saves + WHERE is_current = true AND is_delete = false + GROUP BY user_id + ) + SELECT u.user_id, + (u.bio IS NOT NULL)::int + + (u.name IS NOT NULL)::int + + ((u.profile_picture IS NOT NULL OR u.profile_picture_sizes IS NOT NULL))::int + + ((u.cover_photo IS NOT NULL OR u.cover_photo_sizes IS NOT NULL))::int + + (COALESCE(fc.n, 0) >= $1)::int + + (COALESCE(rc.n, 0) >= $2)::int + + (COALESCE(sc.n, 0) >= $3)::int AS steps, + (u.bio IS NOT NULL) AS f_bio, + (u.name IS NOT NULL) AS f_name, + (u.profile_picture IS NOT NULL OR u.profile_picture_sizes IS NOT NULL) AS f_picture, + (u.cover_photo IS NOT NULL OR u.cover_photo_sizes IS NOT NULL) AS f_cover, + (COALESCE(fc.n, 0) >= $1) AS f_follows, + (COALESCE(rc.n, 0) >= $2) AS f_reposts, + (COALESCE(sc.n, 0) >= $3) AS f_favorites + FROM active_users u + LEFT JOIN follow_counts fc ON fc.user_id = u.user_id + LEFT JOIN repost_counts rc ON rc.user_id = u.user_id + LEFT JOIN save_counts sc ON sc.user_id = u.user_id + WHERE + -- Only touch users with at least one step OR an existing in-progress row. + u.bio IS NOT NULL OR u.name IS NOT NULL + OR u.profile_picture IS NOT NULL OR u.profile_picture_sizes IS NOT NULL + OR u.cover_photo IS NOT NULL OR u.cover_photo_sizes IS NOT NULL + OR COALESCE(fc.n, 0) >= $1 + OR COALESCE(rc.n, 0) >= $2 + OR COALESCE(sc.n, 0) >= $3 + `, profileFollowThreshold, profileRepostThreshold, profileFavoriteThreshold) + if err != nil { + return fmt.Errorf("scan profile users: %w", err) + } + type pcRow struct { + userID int64 + steps int32 + fBio, fName, fPicture, fCover, fFollows, fReposts, fFavorites bool + } + var results []pcRow + for rows.Next() { + var r pcRow + if err := rows.Scan(&r.userID, &r.steps, + &r.fBio, &r.fName, &r.fPicture, &r.fCover, + &r.fFollows, &r.fReposts, &r.fFavorites); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + // Upsert the per-challenge state table first so the booleans are + // queryable (apps tools read this for client display). + if _, err := tx.Exec(ctx, ` + INSERT INTO challenge_profile_completion + (user_id, profile_description, profile_name, profile_picture, profile_cover_photo, + follows, reposts, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (user_id) DO UPDATE SET + profile_description = EXCLUDED.profile_description, + profile_name = EXCLUDED.profile_name, + profile_picture = EXCLUDED.profile_picture, + profile_cover_photo = EXCLUDED.profile_cover_photo, + follows = EXCLUDED.follows, + reposts = EXCLUDED.reposts, + favorites = EXCLUDED.favorites + `, r.userID, r.fBio, r.fName, r.fPicture, r.fCover, r.fFollows, r.fReposts, r.fFavorites); err != nil { + return fmt.Errorf("upsert profile_completion: %w", err) + } + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), SpecifierFromUserID(r.userID), + r.userID, r.steps, stepCount, amount, + ); err != nil { + return fmt.Errorf("upsert user_challenge: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/profile_completion_test.go b/jobs/challenges/profile_completion_test.go new file mode 100644 index 00000000..a024c237 --- /dev/null +++ b/jobs/challenges/profile_completion_test.go @@ -0,0 +1,75 @@ +package challenges + +import ( + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProfileCompletion_PartialProgress(t *testing.T) { + pool := withChallengesDB(t) + + // User with bio + name + picture (3 of 7 steps) — incomplete. + database.Seed(pool, database.FixtureMap{ + "users": {{ + "user_id": 200, + "handle": "u200", + "handle_lc": "u200", + "wallet": "0x200", + "bio": "hi", + "name": "U Two Hundred", + "profile_picture": "QmXYZ", + }}, + }) + + runProcessor(t, pool, &ProfileCompletionProcessor{}) + + r, ok := queryUserChallenge(t, pool, "p", fmt.Sprintf("%x", 200)) + require.True(t, ok) + assert.Equal(t, int32(3), *r.CurrentStepCount) + assert.False(t, r.IsComplete, "3/7 → incomplete") +} + +func TestProfileCompletion_FullyComplete(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "users": {{ + "user_id": 210, + "handle": "u210", + "handle_lc": "u210", + "wallet": "0x210", + "bio": "hi", + "name": "User", + "profile_picture": "QmA", + "cover_photo": "QmB", + }, {"user_id": 211, "wallet": "0x211", "handle": "f1", "handle_lc": "f1"}, + {"user_id": 212, "wallet": "0x212", "handle": "f2", "handle_lc": "f2"}, + {"user_id": 213, "wallet": "0x213", "handle": "f3", "handle_lc": "f3"}, + {"user_id": 214, "wallet": "0x214", "handle": "f4", "handle_lc": "f4"}, + {"user_id": 215, "wallet": "0x215", "handle": "f5", "handle_lc": "f5"}, + {"user_id": 220, "wallet": "0x220", "handle": "target", "handle_lc": "target"}}, + "follows": { + {"follower_user_id": 210, "followee_user_id": 211, "is_current": true, "is_delete": false}, + {"follower_user_id": 210, "followee_user_id": 212, "is_current": true, "is_delete": false}, + {"follower_user_id": 210, "followee_user_id": 213, "is_current": true, "is_delete": false}, + {"follower_user_id": 210, "followee_user_id": 214, "is_current": true, "is_delete": false}, + {"follower_user_id": 210, "followee_user_id": 215, "is_current": true, "is_delete": false}, + }, + "reposts": { + {"user_id": 210, "repost_item_id": 1, "repost_type": "track", "is_current": true, "is_delete": false}, + }, + "saves": { + {"user_id": 210, "save_item_id": 1, "save_type": "track", "is_current": true, "is_delete": false}, + }, + }) + + runProcessor(t, pool, &ProfileCompletionProcessor{}) + + r, ok := queryUserChallenge(t, pool, "p", fmt.Sprintf("%x", 210)) + require.True(t, ok) + assert.Equal(t, int32(7), *r.CurrentStepCount) + assert.True(t, r.IsComplete) +} diff --git a/jobs/challenges/profile_verified.go b/jobs/challenges/profile_verified.go new file mode 100644 index 00000000..65ca96c5 --- /dev/null +++ b/jobs/challenges/profile_verified.go @@ -0,0 +1,62 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// ProfileVerifiedProcessor implements challenge "v" — boolean: user is +// verified (connected Twitter/Instagram or similar). Mirrors apps' +// connect_verified_challenge.py. +// +// We complete the challenge for any user with users.is_verified = true, +// is_current = true, is_deactivated = false. Boolean — single step. +type ProfileVerifiedProcessor struct{} + +func (p *ProfileVerifiedProcessor) ChallengeID() string { return "v" } + +func (p *ProfileVerifiedProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + + rows, err := tx.Query(ctx, ` + SELECT user_id FROM users + WHERE is_current = true + AND is_verified = true + AND is_deactivated = false + `) + if err != nil { + return fmt.Errorf("scan verified users: %w", err) + } + var userIDs []int64 + for rows.Next() { + var userID int64 + if err := rows.Scan(&userID); err != nil { + rows.Close() + return err + } + userIDs = append(userIDs, userID) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, userID := range userIDs { + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), SpecifierFromUserID(userID), + userID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/profile_verified_test.go b/jobs/challenges/profile_verified_test.go new file mode 100644 index 00000000..d8dd2f5c --- /dev/null +++ b/jobs/challenges/profile_verified_test.go @@ -0,0 +1,29 @@ +package challenges + +import ( + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" +) + +func TestProfileVerified_CompletesForVerifiedUsers(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "users": { + {"user_id": 300, "wallet": "0x300", "is_verified": true}, + {"user_id": 301, "wallet": "0x301", "is_verified": false}, + }, + }) + + runProcessor(t, pool, &ProfileVerifiedProcessor{}) + + r, ok := queryUserChallenge(t, pool, "v", fmt.Sprintf("%x", 300)) + if assert.True(t, ok) { + assert.True(t, r.IsComplete) + assert.Equal(t, int32(5), r.Amount) + } + _, ok = queryUserChallenge(t, pool, "v", fmt.Sprintf("%x", 301)) + assert.False(t, ok, "unverified user has no row") +} diff --git a/jobs/challenges/track_upload.go b/jobs/challenges/track_upload.go new file mode 100644 index 00000000..81ac6f8e --- /dev/null +++ b/jobs/challenges/track_upload.go @@ -0,0 +1,84 @@ +package challenges + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// TrackUploadProcessor implements challenge "u" — upload 3 tracks. +// Mirrors apps/packages/discovery-provider/src/challenges/track_upload_challenge.py. +// +// One row per user in user_challenges with specifier = hex(user_id). +// Step count is the number of public, non-stem tracks the user owns since +// challenges.starting_block. Completes when count >= challenge.step_count +// (3 per challenges.json). +type TrackUploadProcessor struct{} + +func (p *TrackUploadProcessor) ChallengeID() string { return "u" } + +func (p *TrackUploadProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active || c.StepCount == nil || c.StartingBlock == nil { + return nil + } + stepCount := *c.StepCount + startingBlock := *c.StartingBlock + amount := c.AmountInt() + + // For every user with at least one qualifying track at or after the + // starting block, count tracks and write user_challenges. We don't + // checkpoint by user id because Python's behavior is to recompute + // from source every time the challenge processes — keeping that + // avoids edge cases where a track gets undeleted post-checkpoint. + rows, err := tx.Query(ctx, ` + SELECT owner_id, COUNT(*)::int + FROM tracks + WHERE is_current = true + AND is_delete = false + AND is_unlisted = false + AND stem_of IS NULL + AND blocknumber >= $1 + GROUP BY owner_id + `, startingBlock) + if err != nil { + return fmt.Errorf("scan tracks: %w", err) + } + type res struct { + userID int64 + count int32 + } + var results []res + for rows.Next() { + var r res + if err := rows.Scan(&r.userID, &r.count); err != nil { + rows.Close() + return err + } + results = append(results, r) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + for _, r := range results { + // Cap reported step count at the step_count target so we don't + // confuse readers about "is_complete" semantics. + report := r.count + if stepCount > 0 && report > stepCount { + report = stepCount + } + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), SpecifierFromUserID(r.userID), + r.userID, report, stepCount, amount, + ); err != nil { + return fmt.Errorf("upsert user_challenge: %w", err) + } + } + return nil +} diff --git a/jobs/challenges/track_upload_test.go b/jobs/challenges/track_upload_test.go new file mode 100644 index 00000000..81b7a297 --- /dev/null +++ b/jobs/challenges/track_upload_test.go @@ -0,0 +1,69 @@ +package challenges + +import ( + "fmt" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" +) + +func TestTrackUpload_CompletesAt3(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": { + {"blockhash": "blk_25346437", "number": 25346437}, + {"blockhash": "blk_25346438", "number": 25346438}, + {"blockhash": "blk_25346439", "number": 25346439}, + {"blockhash": "blk_25346440", "number": 25346440}, + }, + "users": {{"user_id": 1, "wallet": "0x01"}, {"user_id": 2, "wallet": "0x02"}}, + "tracks": { + {"track_id": 10, "owner_id": 1, "title": "T1", "blocknumber": 25346437, "created_at": time.Now()}, + {"track_id": 11, "owner_id": 1, "title": "T2", "blocknumber": 25346438, "created_at": time.Now()}, + {"track_id": 12, "owner_id": 1, "title": "T3", "blocknumber": 25346439, "created_at": time.Now()}, + {"track_id": 20, "owner_id": 2, "title": "T4", "blocknumber": 25346440, "created_at": time.Now()}, + }, + }) + + runProcessor(t, pool, &TrackUploadProcessor{}) + + r1, ok := queryUserChallenge(t, pool, "u", fmt.Sprintf("%x", 1)) + if assert.True(t, ok, "user 1 should have an u challenge row") { + assert.True(t, r1.IsComplete, "user 1 has 3 tracks → complete") + assert.Equal(t, int32(3), *r1.CurrentStepCount) + } + + r2, ok := queryUserChallenge(t, pool, "u", fmt.Sprintf("%x", 2)) + if assert.True(t, ok, "user 2 should have an u row") { + assert.False(t, r2.IsComplete, "user 2 only has 1 track") + assert.Equal(t, int32(1), *r2.CurrentStepCount) + } +} + +func TestTrackUpload_IgnoresStemsAndUnlisted(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{ + "blocks": { + {"blockhash": "blk_25346437", "number": 25346437}, + {"blockhash": "blk_25346438", "number": 25346438}, + {"blockhash": "blk_25346439", "number": 25346439}, + {"blockhash": "blk_25346440", "number": 25346440}, + }, + "users": {{"user_id": 3, "wallet": "0x03"}}, + "tracks": { + {"track_id": 30, "owner_id": 3, "title": "Public", "blocknumber": 25346437}, + {"track_id": 31, "owner_id": 3, "title": "Unlisted", "blocknumber": 25346437, "is_unlisted": true}, + {"track_id": 32, "owner_id": 3, "title": "Stem", "blocknumber": 25346437, "stem_of": `{"parent_track_id": 30}`}, + }, + }) + + runProcessor(t, pool, &TrackUploadProcessor{}) + + r, ok := queryUserChallenge(t, pool, "u", fmt.Sprintf("%x", 3)) + if assert.True(t, ok) { + assert.Equal(t, int32(1), *r.CurrentStepCount, "only the public, non-stem track counts") + assert.False(t, r.IsComplete) + } +} diff --git a/jobs/challenges/trending.go b/jobs/challenges/trending.go new file mode 100644 index 00000000..9cc5f555 --- /dev/null +++ b/jobs/challenges/trending.go @@ -0,0 +1,171 @@ +package challenges + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" +) + +// TrendingProcessor implements the three trending-reward challenges: +// +// tt trending_track +// tut trending_underground_track +// tp trending_playlist +// +// Mirrors apps' trending_challenge.py + calculate_trending_challenges.py. +// +// Once per week (Friday UTC, only ever once per Friday) we snapshot the top +// 10 entries from track_trending_scores / playlist_trending_scores for the +// week time range and version, write a trending_results row per entry, and +// mint a user_challenges row for the entity owner. The per-rank reward +// amount mirrors apps: +// +// track / underground: ranks 1-5 → 1000, ranks 6-10 → 100 +// playlist: ranks 1-5 → 100, ranks 6-10 → 10 +type TrendingProcessor struct { + ID string // "tt", "tut", or "tp" + EntityKind string // "track" or "playlist" + TableName string // "track_trending_scores" or "playlist_trending_scores" + TrendingTyp string // value used in track_trending_scores.type column + Version string // strategy version, e.g. "pnagD" +} + +func NewTrendingTrackProcessor() Processor { + return &TrendingProcessor{ID: "tt", EntityKind: "track", TableName: "track_trending_scores", TrendingTyp: "TRACKS", Version: "pnagD"} +} +func NewTrendingUndergroundProcessor() Processor { + return &TrendingProcessor{ID: "tut", EntityKind: "track", TableName: "track_trending_scores", TrendingTyp: "UNDERGROUND_TRACKS", Version: "pnagD"} +} +func NewTrendingPlaylistProcessor() Processor { + return &TrendingProcessor{ID: "tp", EntityKind: "playlist", TableName: "playlist_trending_scores", TrendingTyp: "PLAYLISTS", Version: "pnagD"} +} + +const trendingTopN = 10 + +func (p *TrendingProcessor) ChallengeID() string { return p.ID } + +// amountForRank mirrors apps' TRENDING_TRACK_AMOUNTS_BY_RANK / playlist +// equivalent. Ranks 1-5 pay 10× ranks 6-10. +func (p *TrendingProcessor) amountForRank(rank int32) int32 { + if p.EntityKind == "playlist" { + if rank <= 5 { + return 100 + } + return 10 + } + if rank <= 5 { + return 1000 + } + return 100 +} + +func (p *TrendingProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + + now := time.Now().UTC() + // Only run on Fridays — mirrors apps' get_is_valid_timestamp. + // We use UTC rather than apps' America/Los_Angeles since the + // container timezone shouldn't matter for fairness purposes; we + // just need a stable weekly anchor. + if now.Weekday() != time.Friday { + return nil + } + // Stable date-of-this-Friday in UTC. + weekDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + + // Idempotent: if trending_results already has rows for this + // (type, version, week), assume we've already paid out for this week. + var alreadyPaid bool + if err := tx.QueryRow(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM trending_results + WHERE type = $1 AND version = $2 AND week = $3 + ) + `, p.TrendingTyp, p.Version, weekDate).Scan(&alreadyPaid); err != nil { + return fmt.Errorf("check trending_results: %w", err) + } + if alreadyPaid { + return nil + } + + // Pull the top-N entries with their owners. Both score tables share + // the same shape minus the entity id column name. + var rows pgx.Rows + if p.EntityKind == "track" { + rows, err = tx.Query(ctx, ` + SELECT s.track_id AS entity_id, t.owner_id AS user_id + FROM track_trending_scores s + JOIN tracks t ON t.track_id = s.track_id AND t.is_current = true + WHERE s.type = $1 AND s.version = $2 AND s.time_range = 'week' + ORDER BY s.score DESC NULLS LAST + LIMIT $3 + `, p.TrendingTyp, p.Version, trendingTopN) + } else { + rows, err = tx.Query(ctx, ` + SELECT s.playlist_id AS entity_id, pl.playlist_owner_id AS user_id + FROM playlist_trending_scores s + JOIN playlists pl ON pl.playlist_id = s.playlist_id AND pl.is_current = true + WHERE s.type = $1 AND s.version = $2 AND s.time_range = 'week' + ORDER BY s.score DESC NULLS LAST + LIMIT $3 + `, p.TrendingTyp, p.Version, trendingTopN) + } + if err != nil { + return fmt.Errorf("scan trending scores: %w", err) + } + type entry struct { + entityID int64 + userID int64 + } + var entries []entry + for rows.Next() { + var e entry + if err := rows.Scan(&e.entityID, &e.userID); err != nil { + rows.Close() + return err + } + entries = append(entries, e) + } + rows.Close() + if err := rows.Err(); err != nil { + return err + } + + if len(entries) == 0 { + return nil + } + + for idx, e := range entries { + rank := int32(idx + 1) + rewardAmount := p.amountForRank(rank) + entityIDStr := fmt.Sprintf("%d", e.entityID) + if _, err := tx.Exec(ctx, ` + INSERT INTO trending_results (user_id, id, rank, type, version, week) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT DO NOTHING + `, e.userID, entityIDStr, rank, p.TrendingTyp, p.Version, weekDate); err != nil { + return fmt.Errorf("insert trending_results: %w", err) + } + // Specifier matches apps: "{week}:{rank}". + specifier := fmt.Sprintf("%s:%d", weekDate.Format("2006-01-02"), rank) + if err := UpsertUserChallenge(ctx, tx, + p.ID, specifier, e.userID, 1, 1, rewardAmount, + ); err != nil { + return fmt.Errorf("upsert trending user_challenge: %w", err) + } + } + return nil +} + +// Compile-time check the helper is referenced — avoids "declared and not used" +// when this file is the only consumer of pgx.ErrNoRows. +var _ = errors.Is diff --git a/jobs/challenges/trending_test.go b/jobs/challenges/trending_test.go new file mode 100644 index 00000000..bde6b337 --- /dev/null +++ b/jobs/challenges/trending_test.go @@ -0,0 +1,93 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + "time" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestTrending_IdempotentSameWeek seeds top-3 tracks for the current week +// in track_trending_scores and runs the trending tracks processor twice. +// The processor is gated to Fridays — this test skips itself on non-Fridays +// to avoid time-coupling. When it runs, both runs should produce the same +// 3 trending_results rows. +func TestTrending_IdempotentSameWeek(t *testing.T) { + if time.Now().UTC().Weekday() != time.Friday { + t.Skip("trending processor only runs on Fridays UTC") + } + pool := withChallengesDB(t) + now := time.Now() + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_tt", "number": 1}}, + "users": { + {"user_id": 500, "wallet": "0x500"}, + {"user_id": 501, "wallet": "0x501"}, + {"user_id": 502, "wallet": "0x502"}, + }, + "tracks": { + {"track_id": 5001, "owner_id": 500, "title": "A", "blocknumber": 1, "created_at": now}, + {"track_id": 5011, "owner_id": 501, "title": "B", "blocknumber": 1, "created_at": now}, + {"track_id": 5021, "owner_id": 502, "title": "C", "blocknumber": 1, "created_at": now}, + }, + }) + + // Manually seed track_trending_scores (the trending job in api/parity-jobs + // would normally populate this). + for i, tid := range []int{5001, 5011, 5021} { + _, err := pool.Exec(context.Background(), ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES ($1, 'TRACKS', 'pnagD', 'week', $2, now()) + `, tid, float64(100-i)) + require.NoError(t, err) + } + + runProcessor(t, pool, NewTrendingTrackProcessor()) + + // Three rows in trending_results, ranked. + var count int + require.NoError(t, pool.QueryRow(context.Background(), + "SELECT COUNT(*) FROM trending_results WHERE type = 'TRACKS' AND version = 'pnagD'").Scan(&count)) + assert.Equal(t, 3, count) + + // User_challenges: one row per rank with amount = 1000 (top-5). + for _, userID := range []int{500, 501, 502} { + weekDate := time.Date(time.Now().UTC().Year(), time.Now().UTC().Month(), time.Now().UTC().Day(), 0, 0, 0, 0, time.UTC).Format("2006-01-02") + var rank int32 = 1 + // We don't know which rank each user got; iterate. + for r := int32(1); r <= 3; r++ { + specifier := fmt.Sprintf("%s:%d", weekDate, r) + ucRow, ok := queryUserChallenge(t, pool, "tt", specifier) + if ok && ucRow.UserID == int64(userID) { + rank = r + assert.Equal(t, int32(1000), ucRow.Amount, "rank %d should pay 1000", rank) + assert.True(t, ucRow.IsComplete) + break + } + } + } + + // Second run is a no-op (already paid this week). + runProcessor(t, pool, NewTrendingTrackProcessor()) + var count2 int + require.NoError(t, pool.QueryRow(context.Background(), + "SELECT COUNT(*) FROM trending_results WHERE type = 'TRACKS' AND version = 'pnagD'").Scan(&count2)) + assert.Equal(t, 3, count2, "second run on same Friday should not duplicate") +} + +func TestTrending_SkipsNonFriday(t *testing.T) { + if time.Now().UTC().Weekday() == time.Friday { + t.Skip("test only meaningful on non-Fridays") + } + pool := withChallengesDB(t) + runProcessor(t, pool, NewTrendingTrackProcessor()) + + var count int + require.NoError(t, pool.QueryRow(context.Background(), + "SELECT COUNT(*) FROM trending_results").Scan(&count)) + assert.Equal(t, 0, count, "no rows written on non-Friday") +} diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go new file mode 100644 index 00000000..c64faebd --- /dev/null +++ b/jobs/index_challenges.go @@ -0,0 +1,122 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/jobs/challenges" + "api.audius.co/logging" + "go.uber.org/zap" +) + +// IndexChallengesJob runs all registered challenge processors on a tick. +// Each processor runs inside its own pgx transaction; failures in one +// don't stop the others. +// +// Mirrors apps' index_challenges celery task in role but not implementation: +// where apps drains a Redis queue of dispatched events, we reconcile derived +// state from source tables. See package docs in jobs/challenges/processor.go +// for the rationale. +type IndexChallengesJob struct { + pool database.DbPool + logger *zap.Logger + processors []challenges.Processor + + mutex sync.Mutex + isRunning bool +} + +// NewIndexChallengesJob constructs the umbrella job with Phase 1 processors +// pre-wired. +func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChallengesJob { + return &IndexChallengesJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"), + processors: []challenges.Processor{ + &challenges.TrackUploadProcessor{}, + &challenges.FirstPlaylistProcessor{}, + &challenges.ProfileCompletionProcessor{}, + &challenges.ProfileVerifiedProcessor{}, + &challenges.ListenStreakProcessor{}, + challenges.NewPlayCount250Processor(), + challenges.NewPlayCount1000Processor(), + challenges.NewPlayCount10000Processor(), + challenges.NewTrendingTrackProcessor(), + challenges.NewTrendingUndergroundProcessor(), + challenges.NewTrendingPlaylistProcessor(), + }, + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *IndexChallengesJob) ScheduleEvery(ctx context.Context, interval time.Duration) *IndexChallengesJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *IndexChallengesJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *IndexChallengesJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + start := time.Now() + var anyErr error + for _, p := range j.processors { + if err := j.runProcessor(ctx, p); err != nil { + j.logger.Error("processor failed", + zap.String("challenge_id", p.ChallengeID()), + zap.Error(err)) + anyErr = err + // Continue — one bad processor shouldn't kill the rest. + } + } + j.logger.Info("Reconciled challenges", + zap.Int("processors", len(j.processors)), + zap.Duration("duration", time.Since(start))) + return anyErr +} + +// runProcessor runs a single processor in its own transaction. +func (j *IndexChallengesJob) runProcessor(ctx context.Context, p challenges.Processor) error { + tx, err := j.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback(ctx) + if err := p.Reconcile(ctx, tx); err != nil { + return err + } + return tx.Commit(ctx) +}