diff --git a/ddl/migrations/0201_backfill_missing_reward_disbursements.sql b/ddl/migrations/0201_backfill_missing_reward_disbursements.sql new file mode 100644 index 00000000..6cd32358 --- /dev/null +++ b/ddl/migrations/0201_backfill_missing_reward_disbursements.sql @@ -0,0 +1,51 @@ +BEGIN; + +-- One-shot recovery of challenge_disbursements rows that never made it into +-- sol_reward_disbursements. Two historical loss sources contributed: +-- +-- 1. Migration 0152's INNER JOIN on user_bank_accounts dropped rows whose +-- authoring user no longer had a current user_bank_accounts entry. Those +-- rows were silently excluded from the original backfill. +-- +-- 2. Before the indexer.go:113 swallowed-error fix, the live Go indexer +-- could drop reward_manager EvaluateAttestations transactions whenever +-- ProcessTransaction returned an error, without surfacing the failure +-- to the retry queue or pausing the slot checkpoint. +-- +-- This migration recovers only the rows we can reconstruct from current +-- relational state: a current users row plus an indexed AUDIO sol_claimable +-- account. Rows whose user record no longer exists are intentionally skipped; +-- they would need on-chain signature replay (via program.Indexer) to recover. +INSERT INTO sol_reward_disbursements + (signature, instruction_index, amount, slot, user_bank, challenge_id, specifier, recipient_eth_address, created_at) +SELECT + cd.signature, + 0 AS instruction_index, + cd.amount::bigint, + cd.slot, + sca.account AS user_bank, + cd.challenge_id, + cd.specifier, + LOWER(u.wallet) AS recipient_eth_address, + cd.created_at +FROM challenge_disbursements cd +LEFT JOIN sol_reward_disbursements rd + ON rd.challenge_id = cd.challenge_id + AND rd.specifier = cd.specifier +JOIN users u + ON u.user_id = cd.user_id + AND u.is_current = TRUE +JOIN LATERAL ( + -- A user can have multiple sol_claimable_accounts rows (one per on-chain + -- Create instruction over time). Pick the latest as the active user_bank. + SELECT account + FROM sol_claimable_accounts + WHERE ethereum_address = u.wallet + AND mint = '9LzCMqDgTKYz9Drzqnpgee3SGa89up3a247ypMj2xrqM' + ORDER BY slot DESC + LIMIT 1 +) sca ON TRUE +WHERE rd.signature IS NULL +ON CONFLICT (signature, instruction_index) DO NOTHING; + +COMMIT; diff --git a/jobs/checkpoint_gap_backfill.go b/jobs/checkpoint_gap_backfill.go new file mode 100644 index 00000000..c7d94a1a --- /dev/null +++ b/jobs/checkpoint_gap_backfill.go @@ -0,0 +1,252 @@ +package jobs + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "sort" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +// Backfillable is satisfied by any indexer that can replay transactions for +// its subscribed addresses across a slot range. +type Backfillable interface { + Backfill(ctx context.Context, fromSlot, toSlot uint64) error +} + +// CheckpointGapBackfillJob scans sol_slot_checkpoints for each registered +// backfillable indexer, identifies uncovered slot ranges (gaps left by +// subscription downtime or earlier-version processing failures), and dispatches +// the corresponding indexer's Backfill method to recover them. +type CheckpointGapBackfillJob struct { + pool database.DbPool + backfillers map[string]Backfillable + logger *zap.Logger + minGapSize uint64 + maxGapPerRun int + + mutex sync.Mutex + isRunning bool +} + +type CheckpointGapBackfillJobConfig struct { + Pool database.DbPool + Backfillers map[string]Backfillable + // MinGapSize ignores gaps smaller than this many slots. Tiny gaps are + // common during normal reconnects and aren't worth a full backfill pass. + MinGapSize uint64 + // MaxGapsPerRun caps how many gaps a single tick will process, so a sudden + // flood of gaps can't pin the RPC for hours. + MaxGapsPerRun int +} + +func NewCheckpointGapBackfillJob(cfg config.Config, jobCfg CheckpointGapBackfillJobConfig) *CheckpointGapBackfillJob { + logger := logging.NewZapLogger(cfg).Named("CheckpointGapBackfillJob") + if jobCfg.MinGapSize == 0 { + jobCfg.MinGapSize = 2500 // matches common.MAX_SLOT_GAP — anything smaller is a tolerable reconnect + } + if jobCfg.MaxGapsPerRun == 0 { + jobCfg.MaxGapsPerRun = 8 + } + return &CheckpointGapBackfillJob{ + pool: jobCfg.Pool, + backfillers: jobCfg.Backfillers, + logger: logger, + minGapSize: jobCfg.MinGapSize, + maxGapPerRun: jobCfg.MaxGapsPerRun, + } +} + +func (j *CheckpointGapBackfillJob) ScheduleEvery(ctx context.Context, duration time.Duration) *CheckpointGapBackfillJob { + go func() { + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job schedule shutting down") + return + } + } + }() + return j +} + +func (j *CheckpointGapBackfillJob) Run(ctx context.Context) { + j.logger.Info("Job started") + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } else { + j.logger.Info("Job completed successfully") + } +} + +func (j *CheckpointGapBackfillJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + names := make([]string, 0, len(j.backfillers)) + for n := range j.backfillers { + names = append(names, n) + } + sort.Strings(names) + + for _, name := range names { + bf := j.backfillers[name] + gaps, err := j.findGaps(ctx, name) + if err != nil { + j.logger.Error("failed to find gaps", zap.String("name", name), zap.Error(err)) + continue + } + if len(gaps) == 0 { + j.logger.Debug("no gaps detected", zap.String("name", name)) + continue + } + processed := 0 + for _, gap := range gaps { + if processed >= j.maxGapPerRun { + j.logger.Info("reached max gaps per run, deferring remaining", + zap.String("name", name), + zap.Int("remaining", len(gaps)-processed), + ) + break + } + j.logger.Info("backfilling checkpoint gap", + zap.String("name", name), + zap.Uint64("fromSlot", gap.from), + zap.Uint64("toSlot", gap.to), + zap.Uint64("size", gap.to-gap.from+1), + ) + if err := bf.Backfill(ctx, gap.from, gap.to); err != nil { + j.logger.Error("backfill failed", + zap.String("name", name), + zap.Uint64("fromSlot", gap.from), + zap.Uint64("toSlot", gap.to), + zap.Error(err), + ) + continue + } + if err := j.markGapFilled(ctx, name, gap); err != nil { + j.logger.Error("failed to record gap-fill checkpoint", + zap.String("name", name), + zap.Uint64("fromSlot", gap.from), + zap.Uint64("toSlot", gap.to), + zap.Error(err), + ) + } + processed++ + } + } + return nil +} + +type slotRange struct { + from uint64 + to uint64 +} + +func (j *CheckpointGapBackfillJob) findGaps(ctx context.Context, name string) ([]slotRange, error) { + rows, err := j.pool.Query(ctx, ` + SELECT from_slot, to_slot + FROM sol_slot_checkpoints + WHERE name = $1 + ORDER BY from_slot ASC + `, name) + if err != nil { + return nil, fmt.Errorf("query checkpoints: %w", err) + } + defer rows.Close() + + var ranges []slotRange + for rows.Next() { + var from, to int64 + if err := rows.Scan(&from, &to); err != nil { + return nil, fmt.Errorf("scan checkpoint row: %w", err) + } + if to < from { + continue + } + ranges = append(ranges, slotRange{from: uint64(from), to: uint64(to)}) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate checkpoint rows: %w", err) + } + + merged := mergeRanges(ranges) + gaps := make([]slotRange, 0) + for i := 1; i < len(merged); i++ { + prev := merged[i-1] + cur := merged[i] + if cur.from <= prev.to+1 { + continue + } + gap := slotRange{from: prev.to + 1, to: cur.from - 1} + if gap.to-gap.from+1 < j.minGapSize { + continue + } + gaps = append(gaps, gap) + } + return gaps, nil +} + +// mergeRanges collapses overlapping/adjacent slot ranges. Caller passes +// ranges sorted by `from`. +func mergeRanges(ranges []slotRange) []slotRange { + if len(ranges) == 0 { + return nil + } + merged := make([]slotRange, 0, len(ranges)) + cur := ranges[0] + for i := 1; i < len(ranges); i++ { + next := ranges[i] + if next.from <= cur.to+1 { + if next.to > cur.to { + cur.to = next.to + } + continue + } + merged = append(merged, cur) + cur = next + } + merged = append(merged, cur) + return merged +} + +// markGapFilled inserts a checkpoint row claiming coverage for a backfilled gap, +// so subsequent runs of this job see the slot range as covered. +func (j *CheckpointGapBackfillJob) markGapFilled(ctx context.Context, name string, gap slotRange) error { + subscription := fmt.Sprintf(`{"type":"gap-backfill","name":%q,"fromSlot":%d,"toSlot":%d}`, name, gap.from, gap.to) + sum := sha256.Sum256([]byte(subscription)) + hash := hex.EncodeToString(sum[:]) + _, err := j.pool.Exec(ctx, ` + INSERT INTO sol_slot_checkpoints (name, from_slot, to_slot, subscription, subscription_hash) + VALUES (@name, @from_slot, @to_slot, @subscription, @subscription_hash) + `, pgx.NamedArgs{ + "name": name, + "from_slot": gap.from, + "to_slot": gap.to, + "subscription": subscription, + "subscription_hash": hash, + }) + return err +} diff --git a/solana/indexer/backfiller.go b/solana/indexer/program/backfiller.go similarity index 94% rename from solana/indexer/backfiller.go rename to solana/indexer/program/backfiller.go index 1f8605f0..b443f1d2 100644 --- a/solana/indexer/backfiller.go +++ b/solana/indexer/program/backfiller.go @@ -1,4 +1,4 @@ -package indexer +package program import ( "context" @@ -206,8 +206,13 @@ func (s *Backfiller) backfillAddressTransactions(ctx context.Context, address so // Add the lookup table accounts to the message accounts tx = common.ResolveLookupTables(ctx, s.rpcClient, tx, txRes.Meta) - // Process the transaction - s.processor.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()) + // Process the transaction. Errors used to be silently discarded here, + // which let intermittent failures advance the cursor without anyone noticing. + // Log at minimum so it's observable in retrospect; we still advance lastIndexedSig + // to keep forward progress on permanently-broken signatures. + if err := s.processor.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()); err != nil { + logger.Error("failed to process transaction during backfill", zap.Error(err)) + } lastIndexedSig = sig.Signature diff --git a/solana/indexer/backfiller_test.go b/solana/indexer/program/backfiller_test.go similarity index 99% rename from solana/indexer/backfiller_test.go rename to solana/indexer/program/backfiller_test.go index e5f64622..60dbc7ff 100644 --- a/solana/indexer/backfiller_test.go +++ b/solana/indexer/program/backfiller_test.go @@ -1,4 +1,4 @@ -package indexer +package program import ( "context" diff --git a/solana/indexer/program/indexer.go b/solana/indexer/program/indexer.go index 7da940bf..c59b0cbe 100644 --- a/solana/indexer/program/indexer.go +++ b/solana/indexer/program/indexer.go @@ -70,6 +70,20 @@ func (d *Indexer) Start(ctx context.Context) { } } +// Backfill replays transactions for this indexer's subscribed programs across +// the given inclusive slot range, routing each through ProcessTransaction. +// Used by the gap-detection job to recover slots the live subscription missed. +func (d *Indexer) Backfill(ctx context.Context, fromSlot, toSlot uint64) error { + bf := &Backfiller{ + rpcClient: d.rpcClient, + pool: d.pool, + processor: d, + transactionCache: d.transactionCache, + logger: d.logger, + } + return bf.Start(ctx, fromSlot, toSlot) +} + func (d *Indexer) HandleUpdate(ctx context.Context, msg *pb.SubscribeUpdate) error { // Handle slot updates slotUpdate := msg.GetSlot() @@ -109,8 +123,11 @@ func (d *Indexer) HandleUpdate(ctx context.Context, msg *pb.SubscribeUpdate) err // Add the lookup table accounts to the message accounts tx = common.ResolveLookupTables(ctx, d.rpcClient, tx, txRes.Meta) - // Process the transaction - d.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()) + // Process the transaction. Surface any error so HandleUpdate's caller + // can route the message onto the retry queue (see subscribe()). + if err := d.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()); err != nil { + return fmt.Errorf("failed to process transaction %s: %w", txSig.String(), err) + } return nil } diff --git a/solana/indexer/solana_indexer.go b/solana/indexer/solana_indexer.go index 5a95c1d3..48cfc645 100644 --- a/solana/indexer/solana_indexer.go +++ b/solana/indexer/solana_indexer.go @@ -138,6 +138,21 @@ func (s *SolanaIndexer) Start(ctx context.Context) error { balanceHistoryJob.ScheduleEvery(balanceHistoryCtx, 1*time.Hour) go balanceHistoryJob.Run(balanceHistoryCtx) + backfillers := make(map[string]jobs.Backfillable) + for name, idx := range s.indexers { + if bf, ok := idx.(jobs.Backfillable); ok { + backfillers[name] = bf + } + } + if len(backfillers) > 0 { + gapJob := jobs.NewCheckpointGapBackfillJob(s.config, jobs.CheckpointGapBackfillJobConfig{ + Pool: s.pool, + Backfillers: backfillers, + }) + gapCtx := context.WithoutCancel(ctx) + gapJob.ScheduleEvery(gapCtx, 1*time.Hour) + } + for _, indexer := range s.indexers { go indexer.Start(ctx) }