Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions jobs/reclaim_rent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package jobs

import (
"context"
"fmt"
"sync"
"time"
_ "time/tzdata" // embed IANA tzdata so time.LoadLocation works on minimal images

"api.audius.co/config"
"api.audius.co/database"
"api.audius.co/logging"
"api.audius.co/solana/spl"
"api.audius.co/solana/spl/programs/claimable_tokens"
"github.com/ethereum/go-ethereum/common"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/token"
"github.com/gagliardetto/solana-go/rpc"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
)

const (
reclaimRentLookback = 7 * 24 * time.Hour
reclaimRentBatchSize = 15
reclaimRentDbPageSize = 1000
)

type ReclaimRentJob struct {
cfg config.Config
pool database.DbPool
rpcClient *rpc.Client
transactionSender *spl.TransactionSender
mints []solana.PublicKey
logger *zap.Logger

mutex sync.Mutex
isRunning bool
}

func NewReclaimRentJob(cfg config.Config, pool database.DbPool) *ReclaimRentJob {
logger := logging.NewZapLogger(cfg).Named("ReclaimRentJob")

var rpcClient *rpc.Client
if len(cfg.SolanaConfig.RpcProviders) > 0 {
rpcClient = rpc.New(cfg.SolanaConfig.RpcProviders[0])
}

var transactionSender *spl.TransactionSender
if len(cfg.SolanaConfig.RpcProviders) > 0 {
transactionSender = spl.NewTransactionSender(cfg.SolanaConfig.FeePayers, cfg.SolanaConfig.RpcProviders)
}

return &ReclaimRentJob{
cfg: cfg,
pool: pool,
rpcClient: rpcClient,
transactionSender: transactionSender,
mints: []solana.PublicKey{
cfg.SolanaConfig.MintAudio,
cfg.SolanaConfig.MintUSDC,
},
logger: logger,
}
}

// ScheduleDailyAt runs the job once per day at hour:minute in the given
// location, starting at the next occurrence after the call.
func (j *ReclaimRentJob) ScheduleDailyAt(ctx context.Context, hour, minute int, location *time.Location) *ReclaimRentJob {
go func() {
for {
now := time.Now().In(location)
next := time.Date(now.Year(), now.Month(), now.Day(), hour, minute, 0, 0, location)
if !next.After(now) {
next = next.Add(24 * time.Hour)
}
j.logger.Info("Next run scheduled", zap.Time("at", next))
timer := time.NewTimer(time.Until(next))
select {
case <-timer.C:
j.Run(ctx)
case <-ctx.Done():
timer.Stop()
j.logger.Info("Job schedule shutting down")
return
}
}
}()
return j
}

// Run executes the job once
func (j *ReclaimRentJob) Run(ctx context.Context) {
j.logger.Info("Job started")
if err := j.run(ctx); err != nil {
j.logger.Error("Job run failed", zap.Error(err))
} else {
j.logger.Info("Job completed successfully")
}
}

// Closes zero-balance claimable token accounts created in the last 7 days
// for the configured AUDIO and USDC mints, returning the rent lamports to the
// fee payer that signs each transaction. Ensures only one instance runs at a time.
func (j *ReclaimRentJob) run(ctx context.Context) error {
j.mutex.Lock()
if j.isRunning {
j.mutex.Unlock()
return fmt.Errorf("job is already running")
}
j.isRunning = true
j.mutex.Unlock()
defer func() {
j.mutex.Lock()
j.isRunning = false
j.mutex.Unlock()
}()

if j.rpcClient == nil || j.transactionSender == nil {
j.logger.Warn("No Solana RPC configured, skipping reclaim_rent")
return nil
}
if len(j.cfg.SolanaConfig.FeePayers) == 0 {
j.logger.Warn("No Solana fee payers configured, skipping reclaim_rent")
return nil
}

for _, mint := range j.mints {
if mint.IsZero() {
continue
}
if err := j.processMint(ctx, mint); err != nil {
j.logger.Error("failed to process mint",
zap.String("mint", mint.String()),
zap.Error(err),
)
}
}
return nil
}

type reclaimRentAccount struct {
Account string `db:"account"`
EthereumAddress string `db:"ethereum_address"`
}

func (j *ReclaimRentJob) processMint(ctx context.Context, mint solana.PublicKey) error {
logger := j.logger.With(zap.String("mint", mint.String()))
logger.Info("Processing mint")

authority, _, err := claimable_tokens.DeriveAuthority(mint)
if err != nil {
return fmt.Errorf("failed to derive authority: %w", err)
}

cutoff := time.Now().Add(-reclaimRentLookback)
offset := 0
totalClosed := 0
for {
accounts, err := j.fetchCandidates(ctx, mint.String(), cutoff, reclaimRentDbPageSize, offset)
if err != nil {
return fmt.Errorf("failed to fetch candidates: %w", err)
}
if len(accounts) == 0 {
break
}
offset += len(accounts)

filtered, err := j.filterOnChain(ctx, accounts)
if err != nil {
logger.Error("filterOnChain failed", zap.Error(err))
continue
}

for i := 0; i < len(filtered); i += reclaimRentBatchSize {
end := i + reclaimRentBatchSize
if end > len(filtered) {
end = len(filtered)
}
batch := filtered[i:end]
sig, err := j.processBatch(ctx, batch, authority)
if err != nil {
logger.Error("processBatch failed",
zap.Error(err),
zap.Int("batch_size", len(batch)),
)
continue
}
logger.Info("Reclaimed batch",
zap.String("signature", sig.String()),
zap.Int("accounts", len(batch)),
)
totalClosed += len(batch)
}
}
logger.Info("Done processing mint", zap.Int("total_closed", totalClosed))
return nil
}

func (j *ReclaimRentJob) fetchCandidates(ctx context.Context, mint string, since time.Time, limit, offset int) ([]reclaimRentAccount, error) {
sql := `
SELECT DISTINCT sca.account, sca.ethereum_address
FROM sol_claimable_accounts sca
JOIN sol_token_account_balances stab ON stab.account = sca.account
WHERE sca.mint = @mint
AND stab.mint = @mint
AND stab.balance = 0
AND stab.created_at > @since
ORDER BY sca.account
LIMIT @limit OFFSET @offset
`
rows, err := j.pool.Query(ctx, sql, pgx.NamedArgs{
"mint": mint,
"since": since,
"limit": limit,
"offset": offset,
})
if err != nil {
return nil, err
}
return pgx.CollectRows(rows, pgx.RowToStructByName[reclaimRentAccount])
}

func (j *ReclaimRentJob) filterOnChain(ctx context.Context, batch []reclaimRentAccount) ([]reclaimRentAccount, error) {
pubkeys := make([]solana.PublicKey, 0, len(batch))
for _, acct := range batch {
pubkeys = append(pubkeys, solana.MustPublicKeyFromBase58(acct.Account))
}
res, err := j.rpcClient.GetMultipleAccountsWithOpts(ctx, pubkeys, &rpc.GetMultipleAccountsOpts{
Encoding: solana.EncodingBase64,
})
if err != nil {
return nil, fmt.Errorf("failed to get accounts: %w", err)
}

filtered := make([]reclaimRentAccount, 0, len(batch))
for i, info := range res.Value {
if info == nil {
continue
}
var ta token.Account
if err := bin.NewBorshDecoder(info.Data.GetBinary()).Decode(&ta); err != nil {
continue
}
if ta.Amount != 0 {
continue
}
filtered = append(filtered, batch[i])
}
return filtered, nil
}

func (j *ReclaimRentJob) processBatch(ctx context.Context, batch []reclaimRentAccount, authority solana.PublicKey) (*solana.Signature, error) {
if len(batch) == 0 {
return nil, nil
}

payer, err := j.transactionSender.GetFeePayer()
if err != nil {
return nil, err
}

builder := solana.NewTransactionBuilder().SetFeePayer(payer.PublicKey())
for _, acct := range batch {
inst := claimable_tokens.NewCloseInstructionBuilder().
SetUserBank(solana.MustPublicKeyFromBase58(acct.Account)).
SetAuthority(authority).
SetDestination(payer.PublicKey()).
SetEthAddress(common.HexToAddress(acct.EthereumAddress))
builder.AddInstruction(inst.Build())
}

return j.transactionSender.SendTransactionWithRetries(ctx, builder, rpc.CommitmentConfirmed, rpc.TransactionOpts{})
}
8 changes: 8 additions & 0 deletions solana/indexer/solana_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func (s *SolanaIndexer) Start(ctx context.Context) error {
gapJob.ScheduleEvery(gapCtx, 1*time.Hour)
}

reclaimRentJob := jobs.NewReclaimRentJob(s.config, s.pool)
reclaimRentCtx := context.WithoutCancel(ctx)
reclaimRentLocation, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(fmt.Errorf("failed to load America/Los_Angeles timezone: %w", err))
}
reclaimRentJob.ScheduleDailyAt(reclaimRentCtx, 12, 0, reclaimRentLocation)

for _, indexer := range s.indexers {
go indexer.Start(ctx)
}
Expand Down
Loading