Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .claude/board/AGENT_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,63 @@
one work path).

**Commit(s):** _filled in by the committing session_

## 2026-05-29 — kvs-lance time-series view (SoA/Rubicon step 1)
**Branch:** claude/sleepy-cori-aRK2x
**Added:**
- `kvs/lance/timeline.rs` (264 LOC) — `Timeline` (read-only view over Lance
version history) + `TimelineView` (immutable snapshot at one version) +
`VersionInfo{version:u64, timestamp_us:Option<i64>}`. Uses only confirmed
Lance 6.0.0 surface: versions(), checkout_version(), version().version,
scan().project()/filter(). Tombstone-aware reads.
- `kvs/lance/mod.rs` — `Datastore::timeline()` accessor (shares the live
dataset handle, no second open).
- `kvs/mvcc_source.rs` (170 LOC) — `MvccSource` trait + `LocalGeneratedMvcc`,
borrowed verbatim from reverted PR #24 (2a54a32); additive, dead_code-gated
until its consumer (kv-tikv native MVCC / lance version source) lands.
- `kvs/lance/tests.rs` — 2 tests: versions grow+monotone with commits; a
historical TimelineView reads the SoA as it stood (present at write version,
absent before).
**Verify:** `cargo check -p surrealdb-core --features kv-lance` → Finished, 0
errors (6m43s cold). Timeline tests: see commit (run pending at log time).
**Deferred (per user):** thinking-style i4-32 `I4x32::pack/unpack` are todo!()
in lance-graph-contract (carrier glitch) — NOT touched; wiring first.
**Next:** ractor mailbox owns SoA → publishes link onto this timeline (kanban);
EpisodicWitness64; replace BindSpace; wire deprecated→cognitive-shader-driver.

## 2026-05-30 — codex P1 fix: write+delete commit = ONE Lance version (PR #29)
**Branch:** claude/kvs-lance-timeline
**Scope:**
- `kvs/lance/commit_gate.rs`, `kvs/lance/flusher.rs` — `single_lance_commit`
- `kvs/lance/mod.rs` — `build_tombstone_batch_lance` helper
- `kvs/config.rs` — retire dead `delete_via_tombstone_row` flag
- `kvs/lance/tests.rs` — regression test
**Verdict:** PASS

**What was done (max 5 lines):**
- Codex P1 on PR #29 was VALID: a batch with BOTH writes and deletes ran
`merge_insert` (writes) THEN `Dataset::delete` (deletes) = TWO Lance
versions; the intermediate write-before-delete version leaked through
`Timeline::versions()` as a snapshot that was never an atomic commit.
- Fix: fold deletes into tombstone rows (`tombstone=true`) in the SAME
`merge_insert` → exactly ONE version per commit/flush. New
`Transaction::build_tombstone_batch_lance` mirrors `build_write_batch_lance`.
- Read path already filters `tombstone = false` (schema.rs:145,152), so
get/scan/keys stay correct; physical `Dataset::delete` fully removed.
- Retired the never-read `delete_via_tombstone_row` config flag — the fix is
unconditional; a toggle would only re-open the torn-timeline hole.

**Tests run:**
- `cargo check -p surrealdb-core --features kv-lance` → Finished, 0 errors
- `cargo test … kvs::lance::tests::test_timeline` → 3 passed; 0 failed (incl.
new `test_timeline_write_delete_commit_is_single_atomic_version`)

**Open questions / follow-ups:**
- Tombstone rows now accumulate (one dead row per created-then-deleted key)
until compaction; the background optimizer should GC tombstones past the
retention horizon — queued for the compaction pass.
- NOT run through `cargo +nightly fmt`: the crate is not fmt-clean under
`.rustfmt.toml`'s unstable opts (whole-crate churn across 22+ untouched
files), so hand-formatted to match surrounding `lance/` style.

**Commit(s):** (this commit)
57 changes: 57 additions & 0 deletions .claude/board/EPIPHANIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,60 @@ guard invariant 1; invariant 2 needs new range-scan tests once

**Cross-ref:** `lance/mod.rs:362-417` (get path), `lance/mod.rs:607-642`
(scan_impl), `lance-backend/README.md` § "Transaction Model".

## 2026-05-29 — kvs-lance Timeline: Lance-native versioning IS the time-series view
**Status:** FINDING
**Scope:** surrealdb/core/src/kvs/lance/{timeline.rs,mod.rs}

The "SurrealDB-as-view-over-Lance" (Rubicon) surface needs no new storage:
Lance 6.0.0 already exposes the full timeline. `Dataset::versions() ->
Vec<Version{version:u64, timestamp:DateTime<Utc>, metadata}>` enumerates the
history; `checkout_version(u64)` pins an immutable snapshot. Confirmed against
fetched lance-6.0.0 source (dataset.rs:202 Version struct; dataset.rs:2000
versions()) AND against in-org usage in lance-graph
crates/lance-graph/src/graph/versioned.rs:432. The new `Timeline` /
`TimelineView` types are read-only BY CONSTRUCTION (they own a checked-out
snapshot, expose no set/del/commit), so "SurrealDB never mutates the leading
store" is a type-system guarantee, not a convention. Per-key time-travel
(`checkout_version` + tombstone-as-data) was already wired in get()/scan_impl();
this only adds the timeline *enumeration* + a read-only view handle. Compiles
clean under `cargo check -p surrealdb-core --features kv-lance` (Finished, 0
errors; the only warnings are never-used on the not-yet-wired consumer side).

## 2026-05-30 — kvs-lance timeline granularity = write-path-dependent (corrects 2026-05-29)
**Status:** FINDING
**Scope:** surrealdb/core/src/kvs/lance/{timeline.rs,tests.rs}

The 2026-05-29 timeline tests wrongly assumed "1 commit = 1 Lance version".
On the DEFAULT `WritePath::LsmWithWal`, commits land in WAL+memtable and the
background flusher batches them into Lance asynchronously — so the timeline
reflects FLUSH BOUNDARIES, not individual commits (observed: 2 commits → 1
version; a single commit left latest_version unchanged). For per-commit
timeline granularity (which the Rubicon kanban needs — each commit/plan/prune
a distinct entry) the datastore must use `WritePath::LegacyCommitGate`, where
`Transaction::commit` returns only after its own Lance commit lands. Tests
fixed to construct LegacyCommitGate configs; both pass (2/2). The timeline CODE
was correct; the test HARNESS used the wrong write-path. Design consequence:
the ractor/kanban consumer that publishes onto the timeline must run on the
gate path (or call an explicit flush) to get one timeline entry per Rubicon
commit. Cross-ref: config.rs WritePath docs; writepath_legacy_commit_gate_smoke.

## 2026-05-30 — A SurrealDB commit with writes+deletes was TWO Lance versions, not one
**Status:** FINDING
**Scope:** `kvs/lance/commit_gate.rs`, `kvs/lance/flusher.rs`, `kvs/lance/mod.rs`

`single_lance_commit` applied writes via `MergeInsertBuilder::execute_reader`
and deletes via a SEPARATE `Dataset::delete` — each its own native Lance
commit. So any batch carrying both produced two versions: an intermediate
(writes applied, deletes pending) and the final. The datastore write lock hid
the intermediate from live readers, but `Timeline::versions()` enumerates raw
`Dataset::versions()` and surfaced it, letting a replayer `view_at()` a torn
state that never atomically existed. The schema was already built for the fix
(a `tombstone` Boolean column + read predicates filtering `tombstone = false`):
folding deletes as tombstone rows into the same `merge_insert` makes
1 commit = 1 version *structurally*, not by convention. Trade-off accepted:
tombstone rows accumulate until a compaction/GC pass (physical `Dataset::delete`
previously reclaimed that space immediately).

**Cross-ref:** codex P1 on PR #29 (discussion_r3328296248); fix in this
commit; regression `test_timeline_write_delete_commit_is_single_atomic_version`.
5 changes: 0 additions & 5 deletions surrealdb/core/src/kvs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,6 @@ pub struct LanceConfig {
/// `Dataset::checkout(version)`).
pub versioned: bool,

/// Whether to write deletions as explicit tombstone rows
/// (in addition to using Lance's native deletion vectors).
pub delete_via_tombstone_row: bool,

/// Which write-path to use. See [`WritePath`] for the two
/// options and their semantics. Defaults to
/// [`WritePath::LsmWithWal`] — the Sprint AA hot path.
Expand Down Expand Up @@ -367,7 +363,6 @@ impl Default for LanceConfig {
fn default() -> Self {
Self {
versioned: true,
delete_via_tombstone_row: false,
write_path: WritePath::default(),
disable_background_flusher: false,
}
Expand Down
79 changes: 48 additions & 31 deletions surrealdb/core/src/kvs/lance/commit_gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use tokio::sync::{RwLock, mpsc, oneshot};
use tracing::{trace, warn};

use super::DatasetHandle;
use super::schema::KvSchema;
use crate::kvs::err::{Error, Result};
use crate::kvs::{Key, Val};

Expand Down Expand Up @@ -342,9 +341,23 @@ async fn execute_batch(dataset: &Arc<RwLock<DatasetHandle>>, batch: Vec<Submissi
}
}

/// Issue a single Lance `MergeInsertBuilder::execute_reader` (for writes)
/// followed by a `Dataset::delete` (for deletes). One dataset version is
/// produced per call — concurrent submitters in the same batch share it.
/// Apply a batch's writes **and** deletes as a SINGLE Lance commit.
///
/// Writes become live rows (`tombstone = false`); deletes become
/// tombstone rows (`tombstone = true`). Both are streamed into one
/// `MergeInsertBuilder::execute_reader` keyed on `key`. `execute_batch`
/// coalesces every key to exactly one of write/delete, so the merge
/// source has unique keys and the upsert is well-defined - producing
/// exactly ONE dataset version per call, which concurrent submitters in
/// the same batch share.
///
/// Folding deletes in as tombstone rows (rather than a separate
/// `Dataset::delete`) is what keeps the Lance version history aligned
/// with SurrealDB commit boundaries. The old `merge_insert` +
/// `Dataset::delete` pair produced *two* versions for a write+delete
/// batch; the intermediate write-applied/delete-pending version, though
/// hidden from live readers by the write lock, leaked through
/// `Timeline::versions()` as a snapshot that was never an atomic commit.
async fn single_lance_commit(
dataset: &Arc<RwLock<DatasetHandle>>,
writes: Vec<(Key, Val)>,
Expand All @@ -357,38 +370,42 @@ async fn single_lance_commit(
return Ok(());
}

let mut ds = dataset.write().await;

// ── writes ─────────────────────────────────────────────────────────
// Build the merge source: live rows for writes, tombstone rows for
// deletes. Identical schema, so both stream through one reader.
let mut batches: Vec<arrow_array::RecordBatch> = Vec::with_capacity(2);
if !writes.is_empty() {
let batch = super::Transaction::build_write_batch_lance(&writes, version)
.map_err(|e| Error::Datastore(format!("lance build batch: {e}")))?;
let schema_ref = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema_ref);

let arc_ds = Arc::new(ds.inner.clone());
let (new_ds, _stats) = MergeInsertBuilder::try_new(arc_ds, vec!["key".into()])
.map_err(|e| Error::Datastore(format!("lance merge builder: {e}")))?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.map_err(|e| Error::Datastore(format!("lance merge build: {e}")))?
.execute_reader(reader)
.await
.map_err(|e| Error::Datastore(format!("lance merge_insert: {e}")))?;

ds.inner = Arc::try_unwrap(new_ds).unwrap_or_else(|arc| (*arc).clone());
batches.push(
super::Transaction::build_write_batch_lance(&writes, version)
.map_err(|e| Error::Datastore(format!("lance build batch: {e}")))?,
);
}

// ── deletes ────────────────────────────────────────────────────────
if !deletes.is_empty() {
let predicate = KvSchema::build_delete_predicate(&deletes);
ds.inner
.delete(&predicate)
.await
.map_err(|e| Error::Datastore(format!("lance delete: {e}")))?;
batches.push(
super::Transaction::build_tombstone_batch_lance(&deletes, version)
.map_err(|e| Error::Datastore(format!("lance build tombstones: {e}")))?,
);
}

let schema_ref = batches[0].schema();
let reader = arrow_array::RecordBatchIterator::new(
batches.into_iter().map(Ok::<_, arrow_schema::ArrowError>).collect::<Vec<_>>(),
schema_ref,
);

let mut ds = dataset.write().await;
let arc_ds = Arc::new(ds.inner.clone());
let (new_ds, _stats) = MergeInsertBuilder::try_new(arc_ds, vec!["key".into()])
.map_err(|e| Error::Datastore(format!("lance merge builder: {e}")))?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.map_err(|e| Error::Datastore(format!("lance merge build: {e}")))?
.execute_reader(reader)
.await
.map_err(|e| Error::Datastore(format!("lance merge_insert: {e}")))?;

ds.inner = Arc::try_unwrap(new_ds).unwrap_or_else(|arc| (*arc).clone());

Ok(())
}

Expand Down
76 changes: 44 additions & 32 deletions surrealdb/core/src/kvs/lance/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use tracing::{trace, warn};

use super::DatasetHandle;
use super::memtable::{Memtable, Op};
use super::schema::KvSchema;
use super::wal::Wal;
use crate::kvs::err::{Error, Result};
use crate::kvs::{Key, Val};
Expand Down Expand Up @@ -244,13 +243,20 @@ async fn do_flush(
Ok(snapshot.len())
}

/// Issue a single Lance `MergeInsertBuilder::execute_reader` for the
/// writes, followed by `Dataset::delete` for the deletes. One
/// dataset version is produced per call.
/// Apply a flush's writes **and** deletes as a SINGLE Lance commit.
///
/// Extracted from the prior `commit_gate::single_lance_commit` —
/// same shape, no longer routed through the gate because the flusher
/// is the only writer to Lance now.
/// Writes become live rows (`tombstone = false`); deletes become
/// tombstone rows (`tombstone = true`). Both stream into one
/// `MergeInsertBuilder::execute_reader` keyed on `key`. A memtable
/// snapshot holds exactly one op per key, so the merge source has unique
/// keys and the upsert produces exactly ONE dataset version per flush.
///
/// Folding deletes in as tombstone rows (rather than a separate
/// `Dataset::delete`) keeps the Lance version history aligned with flush
/// boundaries: the old `merge_insert` + `Dataset::delete` pair produced
/// *two* versions for a write+delete flush, and the intermediate
/// write-applied/delete-pending version leaked through
/// `Timeline::versions()` as a snapshot that never atomically existed.
async fn single_lance_commit(
dataset: &Arc<RwLock<DatasetHandle>>,
writes: Vec<(Key, Val)>,
Expand All @@ -263,36 +269,42 @@ async fn single_lance_commit(
return Ok(());
}

let mut ds = dataset.write().await;

// Build the merge source: live rows for writes, tombstone rows for
// deletes. Identical schema, so both stream through one reader.
let mut batches: Vec<arrow_array::RecordBatch> = Vec::with_capacity(2);
if !writes.is_empty() {
let batch = super::Transaction::build_write_batch_lance(&writes, version)
.map_err(|e| Error::Datastore(format!("lance build batch: {e}")))?;
let schema_ref = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema_ref);

let arc_ds = Arc::new(ds.inner.clone());
let (new_ds, _stats) = MergeInsertBuilder::try_new(arc_ds, vec!["key".into()])
.map_err(|e| Error::Datastore(format!("lance merge builder: {e}")))?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.map_err(|e| Error::Datastore(format!("lance merge build: {e}")))?
.execute_reader(reader)
.await
.map_err(|e| Error::Datastore(format!("lance merge_insert: {e}")))?;

ds.inner = Arc::try_unwrap(new_ds).unwrap_or_else(|arc| (*arc).clone());
batches.push(
super::Transaction::build_write_batch_lance(&writes, version)
.map_err(|e| Error::Datastore(format!("lance build batch: {e}")))?,
);
}

if !deletes.is_empty() {
let predicate = KvSchema::build_delete_predicate(&deletes);
ds.inner
.delete(&predicate)
.await
.map_err(|e| Error::Datastore(format!("lance delete: {e}")))?;
batches.push(
super::Transaction::build_tombstone_batch_lance(&deletes, version)
.map_err(|e| Error::Datastore(format!("lance build tombstones: {e}")))?,
);
}

let schema_ref = batches[0].schema();
let reader = arrow_array::RecordBatchIterator::new(
batches.into_iter().map(Ok::<_, arrow_schema::ArrowError>).collect::<Vec<_>>(),
schema_ref,
);

let mut ds = dataset.write().await;
let arc_ds = Arc::new(ds.inner.clone());
let (new_ds, _stats) = MergeInsertBuilder::try_new(arc_ds, vec!["key".into()])
.map_err(|e| Error::Datastore(format!("lance merge builder: {e}")))?
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.map_err(|e| Error::Datastore(format!("lance merge build: {e}")))?
.execute_reader(reader)
.await
.map_err(|e| Error::Datastore(format!("lance merge_insert: {e}")))?;

ds.inner = Arc::try_unwrap(new_ds).unwrap_or_else(|arc| (*arc).clone());

Ok(())
}

Expand Down
Loading