Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions forester/src/compressible/mint/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,24 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
};
}

let mint_state =
match compressor.tracker.accounts().get(&pubkey).map(|r| r.clone()) {
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"mint {} removed from tracker before compression",
pubkey
)),
};
}
};
let mint_state = match compressor
.tracker
.accounts()
.get(&pubkey)
.map(|r| r.clone())
{
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"mint {} removed from tracker before compression",
pubkey
)),
};
}
};

match compressor.compress(&mint_state).await {
Ok(sig) => CompressionOutcome::Compressed {
Expand Down
32 changes: 18 additions & 14 deletions forester/src/compressible/pda/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,24 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
}

// Look up account state from tracker; it may have been removed
let account_state =
match compressor.tracker.accounts().get(&pubkey).map(|r| r.clone()) {
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"account {} removed from tracker before compression",
pubkey
)),
};
}
};
let account_state = match compressor
.tracker
.accounts()
.get(&pubkey)
.map(|r| r.clone())
{
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"account {} removed from tracker before compression",
pubkey
)),
};
}
};

match compressor
.compress(&account_state, &program_config, &cached_config)
Expand Down
17 changes: 17 additions & 0 deletions forester/src/compressible/pda/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ impl PdaAccountTracker {
.collect()
}

pub fn get_ready_states_for_program(
&self,
program_id: &Pubkey,
current_slot: u64,
) -> Vec<PdaAccountState> {
let pending = self.pending();
self.accounts()
.iter()
.filter(|entry| {
entry.value().program_id == *program_id
&& entry.value().is_ready_to_compress(current_slot)
&& !pending.contains(entry.key())
})
.map(|entry| entry.value().clone())
.collect()
}

pub fn update_from_account(
&self,
pubkey: Pubkey,
Expand Down
15 changes: 14 additions & 1 deletion forester/src/compressible/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub enum CompressionOutcome {

pub type CompressionOutcomes = Vec<CompressionOutcome>;

pub trait CompressibleState: Send + Sync {
pub trait CompressibleState: Clone + Send + Sync {
fn pubkey(&self) -> &Pubkey;
fn lamports(&self) -> u64;
fn compressible_slot(&self) -> u64;
Expand Down Expand Up @@ -138,6 +138,19 @@ pub trait CompressibleTracker<S: CompressibleState>: Send + Sync {
.map(|entry| *entry.key())
.collect()
}

/// Clone of ready-to-compress states. Prefer `get_ready_to_compress` in
/// production; this helper exists for tests that need full state fields.
fn get_ready_states(&self, current_slot: u64) -> Vec<S> {
let pending = self.pending();
self.accounts()
.iter()
.filter(|entry| {
entry.value().is_ready_to_compress(current_slot) && !pending.contains(entry.key())
})
.map(|entry| entry.value().clone())
.collect()
}
}

/// Allows AccountSubscriber to work with any tracker type.
Expand Down
Loading