diff --git a/Cargo.lock b/Cargo.lock index e3fe910..a0e5b87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1036,6 +1036,7 @@ dependencies = [ "diesel_derives", "downcast-rs 2.0.2", "libsqlite3-sys", + "r2d2", "sqlite-wasm-rs", "time", ] @@ -3714,6 +3715,17 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.9.4" @@ -4140,6 +4152,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "schemars" version = "0.8.22" diff --git a/crates/libcalibre/Cargo.toml b/crates/libcalibre/Cargo.toml index 0482ba1..c1d4acb 100644 --- a/crates/libcalibre/Cargo.toml +++ b/crates/libcalibre/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] chrono = { version = "0.4.31", features = ["serde"] } -diesel = { version = "2.2.4", features = ["sqlite", "chrono", "returning_clauses_for_sqlite_3_35"] } +diesel = { version = "2.2.4", features = ["sqlite", "chrono", "r2d2", "returning_clauses_for_sqlite_3_35"] } diesel_migrations = { version = "2.2.0", features = ["sqlite"] } epub = "2.1.1" mobi = "0.8.0" diff --git a/crates/libcalibre/src/library.rs b/crates/libcalibre/src/library.rs index 45972ee..1431045 100644 --- a/crates/libcalibre/src/library.rs +++ b/crates/libcalibre/src/library.rs @@ -204,8 +204,7 @@ pub struct TagSummary { impl Library { pub fn new(db_path: ValidDbPath) -> Result { - let conn = establish_connection(&db_path.database_path) - .map_err(|_| CalibreError::LibraryNotInitialized)?; + let conn = establish_connection(&db_path.database_path)?; Ok(Self { db_path, conn }) } diff --git a/crates/libcalibre/src/persistence.rs b/crates/libcalibre/src/persistence.rs index 324565d..c6a39b8 100644 --- a/crates/libcalibre/src/persistence.rs +++ b/crates/libcalibre/src/persistence.rs @@ -1,7 +1,11 @@ +use std::time::Duration; + +use diesel::connection::SimpleConnection; use diesel::prelude::*; +use diesel::r2d2::{ConnectionManager, CustomizeConnection, Pool}; use diesel::sql_query; -use diesel::sql_types::Text; +use crate::error::CalibreError; use crate::sorting; /// Converts book title to sortable format for SQL. @@ -84,24 +88,185 @@ pub fn register_triggers(conn: &mut SqliteConnection) -> Result<(), diesel::resu Ok(()) } -pub fn establish_connection(db_path: &str) -> Result { - // Setup custom SQL functions. Required because Calibre does this. - // See: https://github.com/kovidgoyal/calibre/blob/7f3ccb333d906f5867636dd0dc4700b495e5ae6f/src/calibre/library/database.py#L55-L70 - define_sql_function!(fn title_sort(title: Text) -> Text); - define_sql_function!(fn uuid4() -> Text); - define_sql_function!(fn author_to_author_sort(name: Text) -> Text); +// Custom SQL functions Calibre relies on. Declared at module scope (rather than +// inside the establishing fn) so they can be (re)registered on any connection — +// a single owned one or every checkout from a pool. +// See: https://github.com/kovidgoyal/calibre/blob/7f3ccb333d906f5867636dd0dc4700b495e5ae6f/src/calibre/library/database.py#L55-L70 +define_sql_function!(fn title_sort(title: diesel::sql_types::Text) -> diesel::sql_types::Text); +define_sql_function!(fn uuid4() -> diesel::sql_types::Text); +define_sql_function!(fn author_to_author_sort(name: diesel::sql_types::Text) -> diesel::sql_types::Text); + +/// Register Calibre's custom SQL functions (`title_sort`, `uuid4`, +/// `author_to_author_sort`) on `conn`. These are per-connection, so they must +/// be registered on every connection — including each one a pool hands out. +pub fn register_sql_functions(conn: &mut SqliteConnection) -> Result<(), diesel::result::Error> { + title_sort_utils::register_impl(conn, sort_book_title)?; + uuid4_utils::register_impl(conn, || uuid::Uuid::new_v4().to_string())?; + author_to_author_sort_utils::register_impl(conn, sort_author_name)?; + Ok(()) +} + +/// Apply the PRAGMAs libcalibre wants on every SQLite connection. +/// +/// Run in order as individual statements: `busy_timeout` goes first so the +/// subsequent WAL switch (which may briefly need a write lock) waits instead of +/// failing with `SQLITE_BUSY`. WAL plus a non-zero busy timeout removes the +/// spurious `SQLITE_BUSY` errors that the DB-write + `metadata.opf`-write pair +/// per book edit can otherwise provoke. +/// +/// `foreign_keys = ON` is future-proofing: Calibre's schema (including the +/// custom-column tables) declares no FK constraints today, so enabling +/// enforcement changes nothing for existing libraries. +/// +/// On `:memory:` and other temp databases SQLite silently keeps its in-memory +/// journal (reporting `journal_mode` = `memory`); this is not an error. +/// +/// NOTE: WAL coordinates concurrent access only between connections **on the +/// same host** (via the `-shm` file). A library on a network/cloud filesystem +/// is unsupported under WAL. +fn apply_pragmas(conn: &mut SqliteConnection) -> Result<(), diesel::result::Error> { + conn.batch_execute( + "PRAGMA busy_timeout = 3000;\ + PRAGMA journal_mode = WAL;\ + PRAGMA synchronous = NORMAL;\ + PRAGMA foreign_keys = ON;\ + PRAGMA wal_autocheckpoint = 1000;", + ) +} - let mut connection = diesel::SqliteConnection::establish(db_path).or(Err(()))?; +/// Per-connection setup: PRAGMAs plus the custom SQL functions. Both are +/// scoped to a single SQLite connection, so every connection — owned or +/// pooled — needs them. +/// +/// Triggers are deliberately NOT registered here: they are persistent database +/// objects (stored in `sqlite_master`), and re-running their DROP/CREATE DDL +/// from every connection would both serialize checkouts on the write lock and +/// open a window where a concurrent insert fires no trigger. Callers register +/// triggers once per database instead (see [`establish_connection`] and +/// [`create_write_pool`]). +/// +/// Function registration is best-effort (matching the historical behaviour); +/// PRAGMA failures propagate. +fn prepare_connection(conn: &mut SqliteConnection) -> Result<(), diesel::result::Error> { + apply_pragmas(conn)?; // Register SQL function implementations. Ignore any errors. - let _ = title_sort_utils::register_impl(&mut connection, sort_book_title); - let _ = uuid4_utils::register_impl(&mut connection, || uuid::Uuid::new_v4().to_string()); - let _ = author_to_author_sort_utils::register_impl(&mut connection, sort_author_name); + let _ = register_sql_functions(conn); + + Ok(()) +} + +pub fn establish_connection(db_path: &str) -> Result { + let mut connection = SqliteConnection::establish(db_path).map_err(CalibreError::database)?; + prepare_connection(&mut connection)?; - // Register triggers for data integrity and automatic field generation + // Register triggers for data integrity and automatic field generation. + // Best-effort: a fresh or non-Calibre database may lack the `books` table + // the triggers attach to, and that must not fail opening the connection. register_triggers(&mut connection) .map_err(|e| eprintln!("Failed to register triggers: {}", e)) .ok(); Ok(connection) } + +/// An r2d2 pool of Calibre connections. Every connection it hands out has the +/// PRAGMAs applied and the custom SQL functions registered. +pub type CalibrePool = Pool>; + +/// Applies libcalibre's per-connection setup (PRAGMAs + custom SQL functions) +/// to every connection the pool creates — SQLite scopes both to a single +/// connection, so the pool's first connection is not enough. Read-only +/// customizers additionally set `PRAGMA query_only`, making writes through a +/// read pool a hard error rather than a documentation violation. +#[derive(Debug)] +struct CalibreConnectionCustomizer { + read_only: bool, +} + +impl CustomizeConnection for CalibreConnectionCustomizer { + fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { + prepare_connection(conn).map_err(diesel::r2d2::Error::QueryError)?; + if self.read_only { + conn.batch_execute("PRAGMA query_only = ON") + .map_err(diesel::r2d2::Error::QueryError)?; + } + Ok(()) + } +} + +/// Build the single-writer pool for the Calibre database at `db_path`. +/// +/// The pool holds exactly **one** connection: SQLite has one write lock, so a +/// multi-writer pool would only queue on it (and collapses under contention — +/// ~20× in published benchmarks). Callers that need parallel reads pair this +/// with [`create_read_pool`]. The desktop app keeps using the single owned +/// connection from [`establish_connection`]. +/// +/// Calibre's triggers are (re)registered here, once, through the writer — +/// they persist in the database, so per-checkout DDL is both unnecessary and +/// racy. +pub fn create_write_pool(db_path: &str) -> Result { + let manager = ConnectionManager::::new(db_path); + let pool = Pool::builder() + .max_size(1) + .connection_customizer(Box::new(CalibreConnectionCustomizer { read_only: false })) + .build(manager) + .map_err(CalibreError::database)?; + + let mut conn = pool.get().map_err(CalibreError::database)?; + register_triggers(&mut conn) + .map_err(|e| eprintln!("Failed to register triggers: {}", e)) + .ok(); + + Ok(pool) +} + +/// Build a pool of `max_size` **read-only** connections (`PRAGMA query_only`) +/// for the Calibre database at `db_path`. Attempting to write through one of +/// these connections fails; route writes through [`create_write_pool`]. +/// +/// Read connections never run trigger DDL — triggers live in the database +/// itself and are registered by the write path. +pub fn create_read_pool(db_path: &str, max_size: u32) -> Result { + let manager = ConnectionManager::::new(db_path); + Pool::builder() + .max_size(max_size) + .connection_customizer(Box::new(CalibreConnectionCustomizer { read_only: true })) + .build(manager) + .map_err(CalibreError::database) +} + +/// True for the errors SQLite raises when a required lock is held elsewhere +/// (`SQLITE_BUSY` "database is locked" / `SQLITE_LOCKED` "database table is +/// locked"). +fn is_busy_error(error: &diesel::result::Error) -> bool { + matches!( + error, + diesel::result::Error::DatabaseError(_, info) if info.message().contains("locked") + ) +} + +/// Run `op`, retrying (with short linear backoff) when SQLite reports the +/// database is locked. `busy_timeout` already makes connections wait for the +/// lock, but a busy error can still surface — most notably when a transaction +/// tries to upgrade from read to write while another writer is active, which +/// SQLite fails immediately to avoid deadlock. Wrap short write transactions +/// in this when they must coexist with other writers. +/// +/// Non-busy errors and the final busy error (after `max_attempts`) propagate. +pub fn retry_on_busy( + max_attempts: u32, + mut op: impl FnMut() -> Result, +) -> Result { + let mut attempt = 1; + loop { + match op() { + Err(e) if is_busy_error(&e) && attempt < max_attempts => { + std::thread::sleep(Duration::from_millis(10 * u64::from(attempt))); + attempt += 1; + } + result => return result, + } + } +} diff --git a/crates/libcalibre/tests/connection_pragmas_test.rs b/crates/libcalibre/tests/connection_pragmas_test.rs new file mode 100644 index 0000000..1afef55 --- /dev/null +++ b/crates/libcalibre/tests/connection_pragmas_test.rs @@ -0,0 +1,388 @@ +// Tests for connection PRAGMAs (WAL + busy_timeout, ...), the r2d2 pool paths, +// and multi-connection contention behaviour. +// See CDL-18: libcalibre must enable WAL + busy_timeout on every connection and +// offer pooled paths whose connections carry the same setup (custom SQL +// functions + PRAGMAs, with triggers registered once per database) the single +// owned connection does. + +use std::path::PathBuf; +use std::sync::mpsc; +use std::time::Duration; + +use diesel::connection::SimpleConnection; +use diesel::prelude::*; +use diesel::sql_query; +use diesel::sql_types::{Integer, Text}; +use libcalibre::persistence::{ + create_read_pool, create_write_pool, establish_connection, retry_on_busy, +}; + +#[derive(QueryableByName)] +struct JournalMode { + #[diesel(sql_type = Text)] + journal_mode: String, +} + +#[derive(QueryableByName)] +struct BusyTimeout { + #[diesel(sql_type = Integer)] + timeout: i32, +} + +#[derive(QueryableByName)] +struct TextResult { + #[diesel(sql_type = Text)] + result: String, +} + +#[derive(QueryableByName)] +struct CountResult { + #[diesel(sql_type = Integer)] + count: i32, +} + +fn journal_mode(conn: &mut SqliteConnection) -> String { + sql_query("PRAGMA journal_mode") + .get_result::(conn) + .unwrap() + .journal_mode + .to_lowercase() +} + +fn busy_timeout(conn: &mut SqliteConnection) -> i32 { + sql_query("PRAGMA busy_timeout") + .get_result::(conn) + .unwrap() + .timeout +} + +fn book_count(conn: &mut SqliteConnection) -> i32 { + sql_query("SELECT COUNT(*) AS count FROM books") + .get_result::(conn) + .unwrap() + .count +} + +/// Copy the empty Calibre library fixture (a real Calibre `metadata.db`, with +/// the `books` table the triggers attach to) into a temp dir and return its +/// path. The `TempDir` is returned so the caller keeps it alive. +fn fixture_db() -> (tempfile::TempDir, PathBuf) { + let temp = tempfile::tempdir().unwrap(); + let db_path = temp.path().join("metadata.db"); + let fixture = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("fixtures") + .join("empty_library") + .join("metadata.db"); + std::fs::copy(&fixture, &db_path).unwrap(); + (temp, db_path) +} + +// ============================================================================= +// PRAGMAs on the single-connection path +// ============================================================================= + +/// A new file-backed connection reports WAL journal mode + a non-zero busy +/// timeout, and the `-wal`/`-shm` sidecar files appear next to `metadata.db`. +#[test] +fn establish_connection_enables_wal_and_busy_timeout() { + let (_temp, db_path) = fixture_db(); + let db_path_str = db_path.to_str().unwrap(); + + let mut conn = establish_connection(db_path_str).unwrap(); + + assert_eq!(journal_mode(&mut conn), "wal"); + assert_eq!(busy_timeout(&mut conn), 3000); + + // WAL sidecar files exist alongside the database while a connection is open. + let wal = PathBuf::from(format!("{db_path_str}-wal")); + let shm = PathBuf::from(format!("{db_path_str}-shm")); + assert!(wal.exists(), "expected WAL file at {wal:?}"); + assert!(shm.exists(), "expected shared-memory file at {shm:?}"); +} + +/// In-memory databases can't use WAL; the PRAGMA must not error, it just leaves +/// the journal mode as `memory`. (The existing `:memory:` tests rely on this.) +#[test] +fn establish_connection_on_memory_db_does_not_error() { + let mut conn = establish_connection(":memory:").unwrap(); + // SQLite keeps the in-memory journal rather than switching to WAL. + assert_eq!(journal_mode(&mut conn), "memory"); + assert_eq!(busy_timeout(&mut conn), 3000); +} + +/// The connection error is surfaced, not swallowed into `()`. +#[test] +fn establish_connection_reports_the_underlying_error() { + let Err(error) = establish_connection("/nonexistent-dir/metadata.db") else { + panic!("opening a database in a nonexistent directory must fail"); + }; + let message = error.to_string(); + assert!( + message.to_lowercase().contains("unable to open"), + "expected SQLite's open error in {message:?}" + ); +} + +// ============================================================================= +// Pools +// ============================================================================= + +/// A connection obtained from the write pool has the PRAGMAs applied and the +/// custom SQL functions + triggers registered — the same setup as the +/// single-conn path. +#[test] +fn write_pool_connection_has_pragmas_functions_and_triggers() { + let (_temp, db_path) = fixture_db(); + let pool = create_write_pool(db_path.to_str().unwrap()).unwrap(); + let mut conn = pool.get().unwrap(); + + // PRAGMAs applied on checkout. + assert_eq!(journal_mode(&mut conn), "wal"); + assert_eq!(busy_timeout(&mut conn), 3000); + + // Custom SQL function registered and callable. + let sorted: TextResult = sql_query("SELECT title_sort('The Great Book') AS result") + .get_result(&mut conn) + .unwrap(); + assert_eq!(sorted.result, "Great Book, The"); + + // Triggers registered against the real `books` table. + let triggers: CountResult = sql_query( + "SELECT COUNT(*) AS count FROM sqlite_master \ + WHERE type = 'trigger' \ + AND name IN ('books_insert_trg', 'books_update_trg', 'books_delete_trg')", + ) + .get_result(&mut conn) + .unwrap(); + assert_eq!(triggers.count, 3); +} + +/// End-to-end check that the pooled connection's trigger + functions cooperate: +/// inserting a bare book row fires `books_insert_trg`, which calls the +/// registered `title_sort()` and `uuid4()` functions to populate `sort`/`uuid`. +#[test] +fn write_pool_insert_trigger_populates_sort_and_uuid() { + let (_temp, db_path) = fixture_db(); + let pool = create_write_pool(db_path.to_str().unwrap()).unwrap(); + let mut conn = pool.get().unwrap(); + + sql_query("INSERT INTO books (title) VALUES ('The Hobbit')") + .execute(&mut conn) + .unwrap(); + + #[derive(QueryableByName)] + struct BookRow { + #[diesel(sql_type = Text)] + sort: String, + #[diesel(sql_type = Text)] + uuid: String, + } + + let row: BookRow = sql_query("SELECT sort, uuid FROM books WHERE title = 'The Hobbit'") + .get_result(&mut conn) + .unwrap(); + + assert_eq!(row.sort, "Hobbit, The"); + assert_eq!(row.uuid.len(), 36, "uuid4() should produce a 36-char UUID"); +} + +/// The write pool holds exactly one connection — SQLite has one write lock, +/// and a multi-writer pool would only queue on it. +#[test] +fn write_pool_is_single_connection() { + let (_temp, db_path) = fixture_db(); + let pool = create_write_pool(db_path.to_str().unwrap()).unwrap(); + assert_eq!(pool.max_size(), 1); +} + +/// Read-pool connections have the PRAGMAs + functions, can read, but writing +/// through them is a hard error (`PRAGMA query_only`), not a convention. +#[test] +fn read_pool_connection_reads_but_cannot_write() { + let (_temp, db_path) = fixture_db(); + let pool = create_read_pool(db_path.to_str().unwrap(), 2).unwrap(); + let mut conn = pool.get().unwrap(); + + assert_eq!(journal_mode(&mut conn), "wal"); + assert_eq!(busy_timeout(&mut conn), 3000); + + // Functions are registered — read queries may call them too. + let sorted: TextResult = sql_query("SELECT title_sort('A Study in Scarlet') AS result") + .get_result(&mut conn) + .unwrap(); + assert_eq!(sorted.result, "Study in Scarlet, A"); + + // Reads work. + assert_eq!(book_count(&mut conn), 0); + + // Writes are rejected. + let write_attempt = sql_query("INSERT INTO books (title) VALUES ('Nope')").execute(&mut conn); + let message = write_attempt.unwrap_err().to_string(); + assert!( + message.contains("readonly"), + "expected a readonly-database error, got {message:?}" + ); +} + +// ============================================================================= +// Multi-connection contention +// ============================================================================= + +/// Readers are not blocked by an in-flight write transaction: while the write +/// pool's connection holds an open IMMEDIATE transaction with an uncommitted +/// insert, a read-pool connection still reads (and sees the pre-transaction +/// snapshot). After commit, a fresh read sees the new row. +#[test] +fn reader_is_not_blocked_by_open_write_transaction() { + let (_temp, db_path) = fixture_db(); + let db_path_str = db_path.to_str().unwrap(); + let write_pool = create_write_pool(db_path_str).unwrap(); + let read_pool = create_read_pool(db_path_str, 1).unwrap(); + + let mut writer = write_pool.get().unwrap(); + writer + .immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Uncommitted')").execute(conn)?; + + // Mid-transaction: a reader on another connection is neither + // blocked nor shown the uncommitted row. + let mut reader = read_pool.get().unwrap(); + assert_eq!(book_count(&mut reader), 0); + + Ok(()) + }) + .unwrap(); + + let mut reader = read_pool.get().unwrap(); + assert_eq!(book_count(&mut reader), 1); +} + +/// `busy_timeout` is what turns concurrent writers from instant SQLITE_BUSY +/// failures into short waits. One thread holds the write lock for ~300ms; a +/// second writer with the default 3000ms timeout succeeds, while a control +/// connection with the timeout zeroed fails immediately. +#[test] +fn busy_timeout_lets_second_writer_wait_out_the_lock() { + let (_temp, db_path) = fixture_db(); + let db_path_str = db_path.to_str().unwrap().to_string(); + + // Open the contending connections BEFORE the lock is taken: + // establish_connection itself writes (trigger DDL), so opening one while + // the lock is held would just wait out the holder via busy_timeout and + // leave nothing to contend with. + let mut impatient = establish_connection(&db_path_str).unwrap(); + impatient.batch_execute("PRAGMA busy_timeout = 0").unwrap(); + let mut patient = establish_connection(&db_path_str).unwrap(); + + // `lock_held` fires once the background thread owns the write lock; + // the thread then keeps it for ~300ms before committing. + let (lock_held, lock_held_rx) = mpsc::channel(); + let holder = std::thread::spawn({ + let db_path_str = db_path_str.clone(); + move || { + let mut conn = establish_connection(&db_path_str).unwrap(); + conn.immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Held Lock')").execute(conn)?; + lock_held.send(()).unwrap(); + std::thread::sleep(Duration::from_millis(300)); + Ok(()) + }) + .unwrap(); + } + }); + + lock_held_rx.recv().unwrap(); + + // Control: with busy_timeout disabled, the second writer fails immediately + // instead of waiting — proving the lock really is held and that it's the + // busy_timeout PRAGMA doing the work in the assertion below. + let failure = impatient + .immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Impatient')").execute(conn)?; + Ok(()) + }) + .unwrap_err(); + assert!( + failure.to_string().contains("locked"), + "expected a database-locked error, got {failure:?}" + ); + + // With the default 3000ms busy_timeout, the same write waits out the + // ~300ms lock and succeeds. + patient + .immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Patient')").execute(conn)?; + Ok(()) + }) + .unwrap(); + + holder.join().unwrap(); + + let mut conn = establish_connection(&db_path_str).unwrap(); + assert_eq!(book_count(&mut conn), 2, "Held Lock + Patient rows"); +} + +/// `retry_on_busy` re-runs an operation that hits the write lock until it +/// succeeds. Uses a zero busy_timeout so every attempt fails fast while the +/// lock is held, making the retry loop (not the timeout) do the work. +#[test] +fn retry_on_busy_retries_until_lock_is_released() { + let (_temp, db_path) = fixture_db(); + let db_path_str = db_path.to_str().unwrap().to_string(); + + // Opened before the lock is taken — see the busy_timeout test for why. + let mut conn = establish_connection(&db_path_str).unwrap(); + conn.batch_execute("PRAGMA busy_timeout = 0").unwrap(); + + let (lock_held, lock_held_rx) = mpsc::channel(); + let holder = std::thread::spawn({ + let db_path_str = db_path_str.clone(); + move || { + let mut conn = establish_connection(&db_path_str).unwrap(); + conn.immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Held Lock')").execute(conn)?; + lock_held.send(()).unwrap(); + std::thread::sleep(Duration::from_millis(200)); + Ok(()) + }) + .unwrap(); + } + }); + + lock_held_rx.recv().unwrap(); + + let mut attempts = 0; + retry_on_busy(100, || { + attempts += 1; + conn.immediate_transaction::<_, diesel::result::Error, _>(|conn| { + sql_query("INSERT INTO books (title) VALUES ('Retried')").execute(conn)?; + Ok(()) + }) + }) + .unwrap(); + + assert!( + attempts > 1, + "lock was held ~200ms with no busy_timeout; the first attempt(s) must have failed" + ); + + holder.join().unwrap(); + assert_eq!(book_count(&mut conn), 2, "Held Lock + Retried rows"); +} + +/// Non-busy errors propagate immediately instead of being retried. +#[test] +fn retry_on_busy_does_not_retry_real_errors() { + let (_temp, db_path) = fixture_db(); + let mut conn = establish_connection(db_path.to_str().unwrap()).unwrap(); + + let mut attempts = 0; + let result = retry_on_busy(5, || { + attempts += 1; + sql_query("SELECT * FROM no_such_table").execute(&mut conn) + }); + + assert!(result.is_err()); + assert_eq!(attempts, 1, "a non-busy error must not be retried"); +}