From 5ca16b69fab3148983e69185a0525d59c3b3d0da Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 18 May 2026 17:53:20 -0700 Subject: [PATCH 1/2] feat(jobs): daily reclaim_rent sweep for zero-balance user banks Adds a ReclaimRentJob to the solana-indexer that, every 24h, closes zero-balance claimable token accounts created in the last 7 days for AUDIO and USDC, returning the rent lamports to the signing fee payer. The 7-day lookback with a 24h cadence provides overlap to ensure no eligible accounts are missed. Reuses the existing solanaRpcProviders, solanaFeePayerKeys, and writeDbUrl configuration via cfg.SolanaConfig and database.DbPool. Skips gracefully when RPC or fee payers are not configured (dev/test). Each batch closes up to 15 accounts per transaction via the existing spl.TransactionSender. Co-Authored-By: Claude Opus 4.7 --- jobs/reclaim_rent.go | 267 +++++++++++++++++++++++++++++++ solana/indexer/solana_indexer.go | 4 + 2 files changed, 271 insertions(+) create mode 100644 jobs/reclaim_rent.go diff --git a/jobs/reclaim_rent.go b/jobs/reclaim_rent.go new file mode 100644 index 00000000..494746ee --- /dev/null +++ b/jobs/reclaim_rent.go @@ -0,0 +1,267 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "api.audius.co/solana/spl" + "api.audius.co/solana/spl/programs/claimable_tokens" + "github.com/ethereum/go-ethereum/common" + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/programs/token" + "github.com/gagliardetto/solana-go/rpc" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +const ( + reclaimRentLookback = 7 * 24 * time.Hour + reclaimRentBatchSize = 15 + reclaimRentDbPageSize = 1000 +) + +type ReclaimRentJob struct { + cfg config.Config + pool database.DbPool + rpcClient *rpc.Client + transactionSender *spl.TransactionSender + mints []solana.PublicKey + logger *zap.Logger + + mutex sync.Mutex + isRunning bool +} + +func NewReclaimRentJob(cfg config.Config, pool database.DbPool) *ReclaimRentJob { + logger := logging.NewZapLogger(cfg).Named("ReclaimRentJob") + + var rpcClient *rpc.Client + if len(cfg.SolanaConfig.RpcProviders) > 0 { + rpcClient = rpc.New(cfg.SolanaConfig.RpcProviders[0]) + } + + var transactionSender *spl.TransactionSender + if len(cfg.SolanaConfig.RpcProviders) > 0 { + transactionSender = spl.NewTransactionSender(cfg.SolanaConfig.FeePayers, cfg.SolanaConfig.RpcProviders) + } + + return &ReclaimRentJob{ + cfg: cfg, + pool: pool, + rpcClient: rpcClient, + transactionSender: transactionSender, + mints: []solana.PublicKey{ + cfg.SolanaConfig.MintAudio, + cfg.SolanaConfig.MintUSDC, + }, + logger: logger, + } +} + +// ScheduleEvery runs the job every `duration` until the context is cancelled. +func (j *ReclaimRentJob) ScheduleEvery(ctx context.Context, duration time.Duration) *ReclaimRentJob { + go func() { + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job schedule shutting down") + return + } + } + }() + return j +} + +// Run executes the job once +func (j *ReclaimRentJob) Run(ctx context.Context) { + j.logger.Info("Job started") + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } else { + j.logger.Info("Job completed successfully") + } +} + +// Closes zero-balance claimable token accounts created in the last 7 days +// for the configured AUDIO and USDC mints, returning the rent lamports to the +// fee payer that signs each transaction. Ensures only one instance runs at a time. +func (j *ReclaimRentJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + if j.rpcClient == nil || j.transactionSender == nil { + j.logger.Warn("No Solana RPC configured, skipping reclaim_rent") + return nil + } + if len(j.cfg.SolanaConfig.FeePayers) == 0 { + j.logger.Warn("No Solana fee payers configured, skipping reclaim_rent") + return nil + } + + for _, mint := range j.mints { + if mint.IsZero() { + continue + } + if err := j.processMint(ctx, mint); err != nil { + j.logger.Error("failed to process mint", + zap.String("mint", mint.String()), + zap.Error(err), + ) + } + } + return nil +} + +type reclaimRentAccount struct { + Account string `db:"account"` + EthereumAddress string `db:"ethereum_address"` +} + +func (j *ReclaimRentJob) processMint(ctx context.Context, mint solana.PublicKey) error { + logger := j.logger.With(zap.String("mint", mint.String())) + logger.Info("Processing mint") + + authority, _, err := claimable_tokens.DeriveAuthority(mint) + if err != nil { + return fmt.Errorf("failed to derive authority: %w", err) + } + + cutoff := time.Now().Add(-reclaimRentLookback) + offset := 0 + totalClosed := 0 + for { + accounts, err := j.fetchCandidates(ctx, mint.String(), cutoff, reclaimRentDbPageSize, offset) + if err != nil { + return fmt.Errorf("failed to fetch candidates: %w", err) + } + if len(accounts) == 0 { + break + } + offset += len(accounts) + + filtered, err := j.filterOnChain(ctx, accounts) + if err != nil { + logger.Error("filterOnChain failed", zap.Error(err)) + continue + } + + for i := 0; i < len(filtered); i += reclaimRentBatchSize { + end := i + reclaimRentBatchSize + if end > len(filtered) { + end = len(filtered) + } + batch := filtered[i:end] + sig, err := j.processBatch(ctx, batch, authority) + if err != nil { + logger.Error("processBatch failed", + zap.Error(err), + zap.Int("batch_size", len(batch)), + ) + continue + } + logger.Info("Reclaimed batch", + zap.String("signature", sig.String()), + zap.Int("accounts", len(batch)), + ) + totalClosed += len(batch) + } + } + logger.Info("Done processing mint", zap.Int("total_closed", totalClosed)) + return nil +} + +func (j *ReclaimRentJob) fetchCandidates(ctx context.Context, mint string, since time.Time, limit, offset int) ([]reclaimRentAccount, error) { + sql := ` + SELECT DISTINCT sca.account, sca.ethereum_address + FROM sol_claimable_accounts sca + JOIN sol_token_account_balances stab ON stab.account = sca.account + WHERE sca.mint = @mint + AND stab.mint = @mint + AND stab.balance = 0 + AND stab.created_at > @since + ORDER BY sca.account + LIMIT @limit OFFSET @offset + ` + rows, err := j.pool.Query(ctx, sql, pgx.NamedArgs{ + "mint": mint, + "since": since, + "limit": limit, + "offset": offset, + }) + if err != nil { + return nil, err + } + return pgx.CollectRows(rows, pgx.RowToStructByName[reclaimRentAccount]) +} + +func (j *ReclaimRentJob) filterOnChain(ctx context.Context, batch []reclaimRentAccount) ([]reclaimRentAccount, error) { + pubkeys := make([]solana.PublicKey, 0, len(batch)) + for _, acct := range batch { + pubkeys = append(pubkeys, solana.MustPublicKeyFromBase58(acct.Account)) + } + res, err := j.rpcClient.GetMultipleAccountsWithOpts(ctx, pubkeys, &rpc.GetMultipleAccountsOpts{ + Encoding: solana.EncodingBase64, + }) + if err != nil { + return nil, fmt.Errorf("failed to get accounts: %w", err) + } + + filtered := make([]reclaimRentAccount, 0, len(batch)) + for i, info := range res.Value { + if info == nil { + continue + } + var ta token.Account + if err := bin.NewBorshDecoder(info.Data.GetBinary()).Decode(&ta); err != nil { + continue + } + if ta.Amount != 0 { + continue + } + filtered = append(filtered, batch[i]) + } + return filtered, nil +} + +func (j *ReclaimRentJob) processBatch(ctx context.Context, batch []reclaimRentAccount, authority solana.PublicKey) (*solana.Signature, error) { + if len(batch) == 0 { + return nil, nil + } + + payer, err := j.transactionSender.GetFeePayer() + if err != nil { + return nil, err + } + + builder := solana.NewTransactionBuilder().SetFeePayer(payer.PublicKey()) + for _, acct := range batch { + inst := claimable_tokens.NewCloseInstructionBuilder(). + SetUserBank(solana.MustPublicKeyFromBase58(acct.Account)). + SetAuthority(authority). + SetDestination(payer.PublicKey()). + SetEthAddress(common.HexToAddress(acct.EthereumAddress)) + builder.AddInstruction(inst.Build()) + } + + return j.transactionSender.SendTransactionWithRetries(ctx, builder, rpc.CommitmentConfirmed, rpc.TransactionOpts{}) +} diff --git a/solana/indexer/solana_indexer.go b/solana/indexer/solana_indexer.go index 48cfc645..302103e8 100644 --- a/solana/indexer/solana_indexer.go +++ b/solana/indexer/solana_indexer.go @@ -153,6 +153,10 @@ func (s *SolanaIndexer) Start(ctx context.Context) error { gapJob.ScheduleEvery(gapCtx, 1*time.Hour) } + reclaimRentJob := jobs.NewReclaimRentJob(s.config, s.pool) + reclaimRentCtx := context.WithoutCancel(ctx) + reclaimRentJob.ScheduleEvery(reclaimRentCtx, 24*time.Hour) + for _, indexer := range s.indexers { go indexer.Start(ctx) } From 7f4fdee4ff10067e1abcdc4834e30dca90b1f886 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 18 May 2026 18:09:36 -0700 Subject: [PATCH 2/2] feat(jobs): run reclaim_rent at 12:00 America/Los_Angeles Replaces the 24h-after-deploy ticker with a daily scheduler that fires at noon Pacific. Recomputes "next noon LA" each iteration so DST transitions don't drift the schedule. Embeds time/tzdata since the runtime image is Alpine, which doesn't ship IANA tzdata by default. Co-Authored-By: Claude Opus 4.7 --- jobs/reclaim_rent.go | 18 +++++++++++++----- solana/indexer/solana_indexer.go | 6 +++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/jobs/reclaim_rent.go b/jobs/reclaim_rent.go index 494746ee..5b7496af 100644 --- a/jobs/reclaim_rent.go +++ b/jobs/reclaim_rent.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "time" + _ "time/tzdata" // embed IANA tzdata so time.LoadLocation works on minimal images "api.audius.co/config" "api.audius.co/database" @@ -64,16 +65,23 @@ func NewReclaimRentJob(cfg config.Config, pool database.DbPool) *ReclaimRentJob } } -// ScheduleEvery runs the job every `duration` until the context is cancelled. -func (j *ReclaimRentJob) ScheduleEvery(ctx context.Context, duration time.Duration) *ReclaimRentJob { +// ScheduleDailyAt runs the job once per day at hour:minute in the given +// location, starting at the next occurrence after the call. +func (j *ReclaimRentJob) ScheduleDailyAt(ctx context.Context, hour, minute int, location *time.Location) *ReclaimRentJob { go func() { - ticker := time.NewTicker(duration) - defer ticker.Stop() for { + now := time.Now().In(location) + next := time.Date(now.Year(), now.Month(), now.Day(), hour, minute, 0, 0, location) + if !next.After(now) { + next = next.Add(24 * time.Hour) + } + j.logger.Info("Next run scheduled", zap.Time("at", next)) + timer := time.NewTimer(time.Until(next)) select { - case <-ticker.C: + case <-timer.C: j.Run(ctx) case <-ctx.Done(): + timer.Stop() j.logger.Info("Job schedule shutting down") return } diff --git a/solana/indexer/solana_indexer.go b/solana/indexer/solana_indexer.go index 302103e8..3a68481d 100644 --- a/solana/indexer/solana_indexer.go +++ b/solana/indexer/solana_indexer.go @@ -155,7 +155,11 @@ func (s *SolanaIndexer) Start(ctx context.Context) error { reclaimRentJob := jobs.NewReclaimRentJob(s.config, s.pool) reclaimRentCtx := context.WithoutCancel(ctx) - reclaimRentJob.ScheduleEvery(reclaimRentCtx, 24*time.Hour) + reclaimRentLocation, err := time.LoadLocation("America/Los_Angeles") + if err != nil { + panic(fmt.Errorf("failed to load America/Los_Angeles timezone: %w", err)) + } + reclaimRentJob.ScheduleDailyAt(reclaimRentCtx, 12, 0, reclaimRentLocation) for _, indexer := range s.indexers { go indexer.Start(ctx)