Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions ddl/migrations/0201_backfill_missing_reward_disbursements.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
BEGIN;

-- One-shot recovery of challenge_disbursements rows that never made it into
-- sol_reward_disbursements. Two historical loss sources contributed:
--
-- 1. Migration 0152's INNER JOIN on user_bank_accounts dropped rows whose
-- authoring user no longer had a current user_bank_accounts entry. Those
-- rows were silently excluded from the original backfill.
--
-- 2. Before the indexer.go:113 swallowed-error fix, the live Go indexer
-- could drop reward_manager EvaluateAttestations transactions whenever
-- ProcessTransaction returned an error, without surfacing the failure
-- to the retry queue or pausing the slot checkpoint.
--
-- This migration recovers only the rows we can reconstruct from current
-- relational state: a current users row plus an indexed AUDIO sol_claimable
-- account. Rows whose user record no longer exists are intentionally skipped;
-- they would need on-chain signature replay (via program.Indexer) to recover.
INSERT INTO sol_reward_disbursements
(signature, instruction_index, amount, slot, user_bank, challenge_id, specifier, recipient_eth_address, created_at)
SELECT
cd.signature,
0 AS instruction_index,
cd.amount::bigint,
cd.slot,
sca.account AS user_bank,
cd.challenge_id,
cd.specifier,
LOWER(u.wallet) AS recipient_eth_address,
cd.created_at
FROM challenge_disbursements cd
LEFT JOIN sol_reward_disbursements rd
ON rd.challenge_id = cd.challenge_id
AND rd.specifier = cd.specifier
JOIN users u
ON u.user_id = cd.user_id
AND u.is_current = TRUE
JOIN LATERAL (
-- A user can have multiple sol_claimable_accounts rows (one per on-chain
-- Create instruction over time). Pick the latest as the active user_bank.
SELECT account
FROM sol_claimable_accounts
WHERE ethereum_address = u.wallet
AND mint = '9LzCMqDgTKYz9Drzqnpgee3SGa89up3a247ypMj2xrqM'
ORDER BY slot DESC
LIMIT 1
) sca ON TRUE
WHERE rd.signature IS NULL
ON CONFLICT (signature, instruction_index) DO NOTHING;

COMMIT;
252 changes: 252 additions & 0 deletions jobs/checkpoint_gap_backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package jobs

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"sync"
"time"

"api.audius.co/config"
"api.audius.co/database"
"api.audius.co/logging"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
)

// Backfillable is satisfied by any indexer that can replay transactions for
// its subscribed addresses across a slot range.
type Backfillable interface {
Backfill(ctx context.Context, fromSlot, toSlot uint64) error
}

// CheckpointGapBackfillJob scans sol_slot_checkpoints for each registered
// backfillable indexer, identifies uncovered slot ranges (gaps left by
// subscription downtime or earlier-version processing failures), and dispatches
// the corresponding indexer's Backfill method to recover them.
type CheckpointGapBackfillJob struct {
pool database.DbPool
backfillers map[string]Backfillable
logger *zap.Logger
minGapSize uint64
maxGapPerRun int

mutex sync.Mutex
isRunning bool
}

type CheckpointGapBackfillJobConfig struct {
Pool database.DbPool
Backfillers map[string]Backfillable
// MinGapSize ignores gaps smaller than this many slots. Tiny gaps are
// common during normal reconnects and aren't worth a full backfill pass.
MinGapSize uint64
// MaxGapsPerRun caps how many gaps a single tick will process, so a sudden
// flood of gaps can't pin the RPC for hours.
MaxGapsPerRun int
}

func NewCheckpointGapBackfillJob(cfg config.Config, jobCfg CheckpointGapBackfillJobConfig) *CheckpointGapBackfillJob {
logger := logging.NewZapLogger(cfg).Named("CheckpointGapBackfillJob")
if jobCfg.MinGapSize == 0 {
jobCfg.MinGapSize = 2500 // matches common.MAX_SLOT_GAP — anything smaller is a tolerable reconnect
}
if jobCfg.MaxGapsPerRun == 0 {
jobCfg.MaxGapsPerRun = 8
}
return &CheckpointGapBackfillJob{
pool: jobCfg.Pool,
backfillers: jobCfg.Backfillers,
logger: logger,
minGapSize: jobCfg.MinGapSize,
maxGapPerRun: jobCfg.MaxGapsPerRun,
}
}

func (j *CheckpointGapBackfillJob) ScheduleEvery(ctx context.Context, duration time.Duration) *CheckpointGapBackfillJob {
go func() {
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
j.Run(ctx)
case <-ctx.Done():
j.logger.Info("Job schedule shutting down")
return
}
}
}()
return j
}

func (j *CheckpointGapBackfillJob) Run(ctx context.Context) {
j.logger.Info("Job started")
if err := j.run(ctx); err != nil {
j.logger.Error("Job run failed", zap.Error(err))
} else {
j.logger.Info("Job completed successfully")
}
}

func (j *CheckpointGapBackfillJob) run(ctx context.Context) error {
j.mutex.Lock()
if j.isRunning {
j.mutex.Unlock()
return fmt.Errorf("job is already running")
}
j.isRunning = true
j.mutex.Unlock()
defer func() {
j.mutex.Lock()
j.isRunning = false
j.mutex.Unlock()
}()

names := make([]string, 0, len(j.backfillers))
for n := range j.backfillers {
names = append(names, n)
}
sort.Strings(names)

for _, name := range names {
bf := j.backfillers[name]
gaps, err := j.findGaps(ctx, name)
if err != nil {
j.logger.Error("failed to find gaps", zap.String("name", name), zap.Error(err))
continue
}
if len(gaps) == 0 {
j.logger.Debug("no gaps detected", zap.String("name", name))
continue
}
processed := 0
for _, gap := range gaps {
if processed >= j.maxGapPerRun {
j.logger.Info("reached max gaps per run, deferring remaining",
zap.String("name", name),
zap.Int("remaining", len(gaps)-processed),
)
break
}
j.logger.Info("backfilling checkpoint gap",
zap.String("name", name),
zap.Uint64("fromSlot", gap.from),
zap.Uint64("toSlot", gap.to),
zap.Uint64("size", gap.to-gap.from+1),
)
if err := bf.Backfill(ctx, gap.from, gap.to); err != nil {
j.logger.Error("backfill failed",
zap.String("name", name),
zap.Uint64("fromSlot", gap.from),
zap.Uint64("toSlot", gap.to),
zap.Error(err),
)
continue
}
if err := j.markGapFilled(ctx, name, gap); err != nil {
j.logger.Error("failed to record gap-fill checkpoint",
zap.String("name", name),
zap.Uint64("fromSlot", gap.from),
zap.Uint64("toSlot", gap.to),
zap.Error(err),
)
}
processed++
}
}
return nil
}

type slotRange struct {
from uint64
to uint64
}

func (j *CheckpointGapBackfillJob) findGaps(ctx context.Context, name string) ([]slotRange, error) {
rows, err := j.pool.Query(ctx, `
SELECT from_slot, to_slot
FROM sol_slot_checkpoints
WHERE name = $1
ORDER BY from_slot ASC
`, name)
if err != nil {
return nil, fmt.Errorf("query checkpoints: %w", err)
}
defer rows.Close()

var ranges []slotRange
for rows.Next() {
var from, to int64
if err := rows.Scan(&from, &to); err != nil {
return nil, fmt.Errorf("scan checkpoint row: %w", err)
}
if to < from {
continue
}
ranges = append(ranges, slotRange{from: uint64(from), to: uint64(to)})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate checkpoint rows: %w", err)
}

merged := mergeRanges(ranges)
gaps := make([]slotRange, 0)
for i := 1; i < len(merged); i++ {
prev := merged[i-1]
cur := merged[i]
if cur.from <= prev.to+1 {
continue
}
gap := slotRange{from: prev.to + 1, to: cur.from - 1}
if gap.to-gap.from+1 < j.minGapSize {
continue
}
gaps = append(gaps, gap)
}
return gaps, nil
}

// mergeRanges collapses overlapping/adjacent slot ranges. Caller passes
// ranges sorted by `from`.
func mergeRanges(ranges []slotRange) []slotRange {
if len(ranges) == 0 {
return nil
}
merged := make([]slotRange, 0, len(ranges))
cur := ranges[0]
for i := 1; i < len(ranges); i++ {
next := ranges[i]
if next.from <= cur.to+1 {
if next.to > cur.to {
cur.to = next.to
}
continue
}
merged = append(merged, cur)
cur = next
}
merged = append(merged, cur)
return merged
}

// markGapFilled inserts a checkpoint row claiming coverage for a backfilled gap,
// so subsequent runs of this job see the slot range as covered.
func (j *CheckpointGapBackfillJob) markGapFilled(ctx context.Context, name string, gap slotRange) error {
subscription := fmt.Sprintf(`{"type":"gap-backfill","name":%q,"fromSlot":%d,"toSlot":%d}`, name, gap.from, gap.to)
sum := sha256.Sum256([]byte(subscription))
hash := hex.EncodeToString(sum[:])
_, err := j.pool.Exec(ctx, `
INSERT INTO sol_slot_checkpoints (name, from_slot, to_slot, subscription, subscription_hash)
VALUES (@name, @from_slot, @to_slot, @subscription, @subscription_hash)
`, pgx.NamedArgs{
"name": name,
"from_slot": gap.from,
"to_slot": gap.to,
"subscription": subscription,
"subscription_hash": hash,
})
return err
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package indexer
package program

import (
"context"
Expand Down Expand Up @@ -206,8 +206,13 @@ func (s *Backfiller) backfillAddressTransactions(ctx context.Context, address so
// Add the lookup table accounts to the message accounts
tx = common.ResolveLookupTables(ctx, s.rpcClient, tx, txRes.Meta)

// Process the transaction
s.processor.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time())
// Process the transaction. Errors used to be silently discarded here,
// which let intermittent failures advance the cursor without anyone noticing.
// Log at minimum so it's observable in retrospect; we still advance lastIndexedSig
// to keep forward progress on permanently-broken signatures.
if err := s.processor.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()); err != nil {
logger.Error("failed to process transaction during backfill", zap.Error(err))
}

lastIndexedSig = sig.Signature

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package indexer
package program

import (
"context"
Expand Down
21 changes: 19 additions & 2 deletions solana/indexer/program/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ func (d *Indexer) Start(ctx context.Context) {
}
}

// Backfill replays transactions for this indexer's subscribed programs across
// the given inclusive slot range, routing each through ProcessTransaction.
// Used by the gap-detection job to recover slots the live subscription missed.
func (d *Indexer) Backfill(ctx context.Context, fromSlot, toSlot uint64) error {
bf := &Backfiller{
rpcClient: d.rpcClient,
pool: d.pool,
processor: d,
transactionCache: d.transactionCache,
logger: d.logger,
}
return bf.Start(ctx, fromSlot, toSlot)
}

func (d *Indexer) HandleUpdate(ctx context.Context, msg *pb.SubscribeUpdate) error {
// Handle slot updates
slotUpdate := msg.GetSlot()
Expand Down Expand Up @@ -109,8 +123,11 @@ func (d *Indexer) HandleUpdate(ctx context.Context, msg *pb.SubscribeUpdate) err
// Add the lookup table accounts to the message accounts
tx = common.ResolveLookupTables(ctx, d.rpcClient, tx, txRes.Meta)

// Process the transaction
d.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time())
// Process the transaction. Surface any error so HandleUpdate's caller
// can route the message onto the retry queue (see subscribe()).
if err := d.ProcessTransaction(ctx, txRes.Slot, txRes.Meta, tx, txRes.BlockTime.Time()); err != nil {
return fmt.Errorf("failed to process transaction %s: %w", txSig.String(), err)
}

return nil
}
Expand Down
15 changes: 15 additions & 0 deletions solana/indexer/solana_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ func (s *SolanaIndexer) Start(ctx context.Context) error {
balanceHistoryJob.ScheduleEvery(balanceHistoryCtx, 1*time.Hour)
go balanceHistoryJob.Run(balanceHistoryCtx)

backfillers := make(map[string]jobs.Backfillable)
for name, idx := range s.indexers {
if bf, ok := idx.(jobs.Backfillable); ok {
backfillers[name] = bf
}
}
if len(backfillers) > 0 {
gapJob := jobs.NewCheckpointGapBackfillJob(s.config, jobs.CheckpointGapBackfillJobConfig{
Pool: s.pool,
Backfillers: backfillers,
})
gapCtx := context.WithoutCancel(ctx)
gapJob.ScheduleEvery(gapCtx, 1*time.Hour)
}

for _, indexer := range s.indexers {
go indexer.Start(ctx)
}
Expand Down
Loading