Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions ddl/migrations/0203_seed_phase_1_challenges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Seed the challenges catalog rows for Phase 1 challenge processors.
--
-- Values mirror apps/packages/discovery-provider/src/challenges/challenges.json
-- as of this writing. Production may already have these rows (from earlier
-- migrations or seeded another way); we use ON CONFLICT DO UPDATE so the
-- catalog stays aligned with the JSON source-of-truth — matching apps'
-- create_new_challenges.py behavior.
--
-- Phase 1 set:
-- p profile completion (numeric, 7 steps)
-- u track upload (numeric, 3 tracks)
-- fp first playlist (boolean)
-- v connect-verified (boolean)
-- e listen streak (aggregate; currently inactive)
-- p1/p2/p3 play count milestones (numeric, 250/1k/10k)
-- tt/tut/tp trending track/under/playlist (trending)

BEGIN;

INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES
('p', 'numeric', '1', true, 7, 0, 25000, 7),
('u', 'numeric', '1', true, 3, 25346436, 25000, 7),
('fp', 'boolean', '2', true, NULL, 28350000, 25000, 7),
('v', 'boolean', '5', true, NULL, 0, 25000, NULL),
('e', 'aggregate', '1', false, 2147483647, 116023891,25000, NULL),
('p1', 'numeric', '25', true, 250, 0, 2147483647, 7),
('p2', 'numeric', '100', true, 1000, 0, 2147483647, 7),
('p3', 'numeric', '1000', true, 10000, 0, 2147483647, 7),
('tt', 'trending', '1000', true, NULL, 25346436, 100000, NULL),
('tut', 'trending', '1000', true, NULL, 25346436, 100000, NULL),
('tp', 'trending', '100', true, NULL, 25346436, 10000, NULL)
ON CONFLICT (id) DO UPDATE SET
type = EXCLUDED.type,
amount = EXCLUDED.amount,
active = EXCLUDED.active,
step_count = EXCLUDED.step_count,
starting_block = EXCLUDED.starting_block,
weekly_pool = EXCLUDED.weekly_pool,
cooldown_days = EXCLUDED.cooldown_days;

COMMIT;
5 changes: 5 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) {

jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 5*time.Minute)

// Reconcile derived challenge state from source tables. Per-challenge
// scanners live in api/jobs/challenges/.
jobs.NewIndexChallengesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 30*time.Second)
}

func (ci *CoreIndexer) run(ctx context.Context) error {
Expand Down
69 changes: 69 additions & 0 deletions jobs/challenges/first_playlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package challenges

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// FirstPlaylistProcessor implements challenge "fp" — boolean: the user has
// created at least one playlist.
// Mirrors apps/packages/discovery-provider/src/challenges/first_playlist_challenge.py
// (Python just sets is_complete=true when an event fires; we derive it from
// playlists table state directly).
type FirstPlaylistProcessor struct{}

func (p *FirstPlaylistProcessor) ChallengeID() string { return "fp" }

func (p *FirstPlaylistProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
return fmt.Errorf("load challenge: %w", err)
}
if !ok || !c.Active {
return nil
}
startingBlock := int32(0)
if c.StartingBlock != nil {
startingBlock = *c.StartingBlock
}
amount := c.AmountInt()

// Find every user with at least one non-deleted playlist at or after
// the starting block. Boolean challenges complete in a single step
// (step_count is null/0 — we treat current_step_count=1, step=1).
rows, err := tx.Query(ctx, `
SELECT DISTINCT playlist_owner_id
FROM playlists
WHERE is_current = true
AND is_delete = false
AND blocknumber >= $1
`, startingBlock)
if err != nil {
return fmt.Errorf("scan playlists: %w", err)
}
var userIDs []int64
for rows.Next() {
var userID int64
if err := rows.Scan(&userID); err != nil {
rows.Close()
return err
}
userIDs = append(userIDs, userID)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, userID := range userIDs {
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), SpecifierFromUserID(userID),
userID, 1, 1, amount,
); err != nil {
return fmt.Errorf("upsert: %w", err)
}
}
return nil
}
28 changes: 28 additions & 0 deletions jobs/challenges/first_playlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package challenges

import (
"fmt"
"testing"

"api.audius.co/database"
"github.com/stretchr/testify/assert"
)

func TestFirstPlaylist_CompletesOnAnyPlaylist(t *testing.T) {
pool := withChallengesDB(t)
database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_28350001", "number": 28350001}},
"users": {{"user_id": 100, "wallet": "0x100"}, {"user_id": 101, "wallet": "0x101"}},
"playlists": {{"playlist_id": 1, "playlist_owner_id": 100, "blocknumber": 28350001}},
})

runProcessor(t, pool, &FirstPlaylistProcessor{})

r1, ok := queryUserChallenge(t, pool, "fp", fmt.Sprintf("%x", 100))
if assert.True(t, ok) {
assert.True(t, r1.IsComplete)
assert.Equal(t, int32(2), r1.Amount, "amount=2 per challenges.json")
}
_, ok = queryUserChallenge(t, pool, "fp", fmt.Sprintf("%x", 101))
assert.False(t, ok, "user 101 has no playlist; no row")
}
237 changes: 237 additions & 0 deletions jobs/challenges/listen_streak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package challenges

import (
"context"
"errors"
"fmt"
"time"

"github.com/jackc/pgx/v5"
)

// ListenStreakProcessor implements challenge "e" — daily listen streak.
// Mirrors apps' listen_streak_endless_challenge.py.
//
// State lives in `challenge_listen_streak` (last_listen_date, listen_streak).
// Rules (per apps):
// - First listen: streak = 1, last_listen_date = play timestamp.
// - Subsequent listen >= 16h after last_listen_date: advance streak by 1.
// - If >= 48h gap: reset streak to 1.
// - Otherwise (<16h): ignore.
//
// user_challenges rows have specifier "<hex_user_id><YYYYMMDDHH>". When the
// streak crosses 7 days the row is completed; subsequent "endless" rows are
// minted at amount=1 each.
//
// NOTE: in challenges.json this challenge is currently active=false, so the
// processor is a no-op at present. Reconcile still does the right thing if
// it gets enabled.
type ListenStreakProcessor struct{}

func (p *ListenStreakProcessor) ChallengeID() string { return "e" }

const (
listenStreakNextWindow = 16 * time.Hour
listenStreakBrokenWindow = 48 * time.Hour
listenStreakTarget = int32(7)
)

const listenStreakCheckpoint = "challenges:e:last_play_id"

// listenStreakPlay is one row from the plays scan.
type listenStreakPlay struct {
id int64
userID int64
createdAt time.Time
}

func (p *ListenStreakProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
return fmt.Errorf("load challenge: %w", err)
}
if !ok || !c.Active {
return nil
}
amount := c.AmountInt()

prev, err := readCheckpointInt(ctx, tx, listenStreakCheckpoint)
if err != nil {
return fmt.Errorf("read checkpoint: %w", err)
}

// Pull new plays ordered by id. We deliberately use id ordering rather
// than created_at because id is monotonic per insert and matches the
// invariant apps' Python pipeline assumes (plays are processed in
// arrival order).
rows, err := tx.Query(ctx, `
SELECT id, user_id, created_at
FROM plays
WHERE id > $1 AND user_id IS NOT NULL
ORDER BY id ASC
LIMIT 50000
`, prev)
if err != nil {
return fmt.Errorf("scan plays: %w", err)
}
var plays []listenStreakPlay
for rows.Next() {
var r listenStreakPlay
if err := rows.Scan(&r.id, &r.userID, &r.createdAt); err != nil {
rows.Close()
return err
}
plays = append(plays, r)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}
if len(plays) == 0 {
return nil
}

// Apply transitions in-process. We load existing streak state for
// users with new plays in one go.
userIDs := uniqueUserIDs(plays)
type streakState struct {
lastListen *time.Time
streak int32
}
state := make(map[int64]*streakState, len(userIDs))

srows, err := tx.Query(ctx, `
SELECT user_id, last_listen_date, listen_streak
FROM challenge_listen_streak
WHERE user_id = ANY($1)
`, userIDs)
if err != nil {
return fmt.Errorf("load streak state: %w", err)
}
for srows.Next() {
var uid int64
var last *time.Time
var streak int32
if err := srows.Scan(&uid, &last, &streak); err != nil {
srows.Close()
return err
}
state[uid] = &streakState{lastListen: last, streak: streak}
}
srows.Close()
if err := srows.Err(); err != nil {
return err
}

// Walk plays in order, applying the transition rules per user. Each
// play either advances or resets a streak — we batch the writes after
// the simulation.
type advance struct {
newStreak int32
whenLogged time.Time
}
advancesByUser := make(map[int64][]advance)

for _, pl := range plays {
s, ok := state[pl.userID]
if !ok {
s = &streakState{lastListen: nil, streak: 0}
state[pl.userID] = s
}
if s.lastListen == nil {
s.streak = 1
t := pl.createdAt
s.lastListen = &t
advancesByUser[pl.userID] = append(advancesByUser[pl.userID], advance{1, pl.createdAt})
continue
}
gap := pl.createdAt.Sub(*s.lastListen)
if gap < listenStreakNextWindow {
continue // too soon, ignore
}
if gap >= listenStreakBrokenWindow {
s.streak = 1
} else {
s.streak++
}
t := pl.createdAt
s.lastListen = &t
advancesByUser[pl.userID] = append(advancesByUser[pl.userID], advance{s.streak, pl.createdAt})
}

// Write updated streak state per user.
for userID, s := range state {
if _, err := tx.Exec(ctx, `
INSERT INTO challenge_listen_streak (user_id, last_listen_date, listen_streak)
VALUES ($1, $2, $3)
ON CONFLICT (user_id) DO UPDATE SET
last_listen_date = EXCLUDED.last_listen_date,
listen_streak = EXCLUDED.listen_streak
`, userID, s.lastListen, s.streak); err != nil {
return fmt.Errorf("upsert streak state: %w", err)
}
}

// For each advance, mint or update the relevant user_challenge row.
// Specifier format follows apps' new post-cutover format:
// first 7 days: "<hex>:YYYYMMDDHH" of *new streak boundary*
// endless : same, one row per day after 7
for userID, advances := range advancesByUser {
for _, a := range advances {
specifier := fmt.Sprintf("%x%s", userID, a.whenLogged.UTC().Format("2006010215"))
// In the first-7-day window, current_step_count = streak (1..7),
// is_complete when streak >= 7. After 7, this row is the
// endless +1-per-day reward (step_count = 1, amount = 1).
var stepCount int32 = listenStreakTarget
report := a.newStreak
if a.newStreak > listenStreakTarget {
stepCount = 1
report = 1
}
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), specifier, userID, report, stepCount, amount,
); err != nil {
return fmt.Errorf("upsert listen-streak user_challenge: %w", err)
}
}
}

// Advance checkpoint to the last play id we processed.
if err := writeCheckpointInt(ctx, tx, listenStreakCheckpoint, plays[len(plays)-1].id); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
return nil
}

func uniqueUserIDs(plays []listenStreakPlay) []int64 {
seen := make(map[int64]struct{}, len(plays))
out := make([]int64, 0, len(plays))
for _, p := range plays {
if _, ok := seen[p.userID]; ok {
continue
}
seen[p.userID] = struct{}{}
out = append(out, p.userID)
}
return out
}

// readCheckpointInt reads a named integer checkpoint from indexing_checkpoints,
// returning 0 if absent.
func readCheckpointInt(ctx context.Context, tx pgx.Tx, name string) (int64, error) {
var v int64
err := tx.QueryRow(ctx, "SELECT last_checkpoint FROM indexing_checkpoints WHERE tablename = $1", name).Scan(&v)
if errors.Is(err, pgx.ErrNoRows) {
return 0, nil
}
return v, err
}

func writeCheckpointInt(ctx context.Context, tx pgx.Tx, name string, value int64) error {
_, err := tx.Exec(ctx, `
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
VALUES ($1, $2)
ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint
`, name, value)
return err
}
Loading
Loading