Skip to content

Distribute Raft Workers Across The Cluster#1370

Open
zonotope wants to merge 51 commits into
mainfrom
feature/distributed-raft-workers
Open

Distribute Raft Workers Across The Cluster#1370
zonotope wants to merge 51 commits into
mainfrom
feature/distributed-raft-workers

Conversation

@zonotope

Copy link
Copy Markdown
Contributor

Splits the leader-only CommitWorker into per-branch Stager tasks, distributes them across all cluster members via rendezvous hashing, and adds a cross-node RPC so follower-staged commits route their ApplyHead propose through the current leader. Net effect: followers no longer sit idle while the leader serializes every transaction; each node owns ~1/N of branches and stages in parallel.

Motivation

The previous architecture ran every per-branch commit worker on the raft leader. Followers were ledger replicas that only saw applies — they didn't stage. On a busy ledger this made the leader the bottleneck for every transaction in the cluster, while CPU and IO on followers went unused. The design goal was to fan staging out without losing the consensus guarantees raft already provides (exactly-once apply, ordered queue per branch).

Architecture

Per-branch staging tasks

CommitWorker (one task on the leader, iterating every branch's queue in turn) is replaced by Stager (one task per active RefKey, polls only its own branch). Each stager owns the same end-to-end pipeline as the old worker — load envelope from CAS, stage the commit, write the commit blob, publish the head advance — scoped to a single branch. Cross-branch concurrency is now the default: branch B's first entry no longer waits for branch A's backlog to drain.

The staging code itself is unchanged; this is a refactor of who calls it, not how it works. The retry loop (MAX_STAGE_ATTEMPTS + exponential backoff + poison fallback), the panic-per-entry boundary (catch_unwind around process_entry), and the StagedReceiptMap side-channel all behave identically.

Node-lifetime supervisor

StagerSupervisor runs on every node (leader and followers alike), spawned at process start instead of being bound to leadership transitions. Each tick (250ms) the supervisor reconciles its running stagers against the desired set: branches in NameServiceState::queues whose rendezvous owner under the current voter set is this node. New owner → spawn a stager; ownership moved away → abort the stager. Shutdown is driven by CancellationToken; the supervisor's select! loop catches the cancel, aborts every running stager, and returns.

The supervisor wakes on either raft.metrics().changed() (membership / leader / term updates, so ownership recomputes within one tick) or a 250ms poll (new branches appearing in the queue map).

Rendezvous-hashing assignment

A new raft::ownership::owner(ref_key, voters) function maps each RefKey to the NodeId that should run its stager. Uses xxh64 with a fixed seed (so every node computes identical scores) and Highest Random Weight selection. Per-call cost is one xxh64 over the composed key bytes plus one xxh64 per voter. Empty voter set returns None; the supervisor treats that as "cluster not yet bootstrapped, claim nothing."

Rendezvous was chosen over plain hash(ref_key) % N because the latter reshuffles ~(N-1)/N of branches on every membership change (adding the 5th node to a 4-node cluster moves ~67% of branches). Rendezvous moves only ~1/(N+1) and only branches that genuinely changed owner — important for rolling-restart deployments.

Voters only, not learners. Learners are full data replicas but typically transient (joining cluster, staging for promotion); promoting one to voter is the operator's signal that it should participate.

Cross-node ApplyHead RPC

When a stager runs on a follower, it can't propose Command::ApplyHead directly — openraft surfaces ForwardToLeader on non-leader proposes. Added RaftNameService::apply_staged_commit(args), a new method exposed at POST /raft/apply_staged_commit (postcard body, no auth, same intra-cluster trust model as the existing openraft RPCs). The follower's publish_commit detects non-leader role, looks up the leader's raft_addr from membership, takes the staged receipt out of its local map, ferries it to the leader via the new endpoint. The leader's handler validates the queue front matches the follower's queue_id, stashes the receipt in the leader's StagedReceiptMap, proposes Command::ApplyHead, returns the outcome.

The endpoint lives on the existing private listener under /raft alongside the openraft RPCs. It is included in RaftIntegration::raft_rpc_router() so any caller using the integration's router (including the multi-node test harness) gets it automatically.

The CommitPublisher trait is untouched. All forwarding logic lives inside the RaftNameService impl — other backends (MemoryNameService, FileNameService, DynamoDbNameService, etc.) are unaffected.

Event-bus reconciliation

AppliedReceipt and its per-op variants are now Clone + Serialize + Deserialize so the typed receipt rides the wire to the leader (lets cross-node-staged transactions resolve with full per-op detail instead of falling back to AppliedReceipt::Minimal).

The state-machine adapter publishes LedgerCommitPublished events on RaftIntegration::event_bus on every ApplyHead apply. The previous local cache event listener subscribed to Fluree's internal LedgerEventBus and only handled LedgerIndexPublished / LedgerRetractedLedgerCommitPublished was silently dropped. In the old architecture this was fine because the leader's stager called finalize_local_state on the leader's Fluree directly, keeping its cache warm. With distributed stagers, the staging node and the queried node are usually different, so non-staging nodes needed an event-driven refresh path.

Two fixes: the existing listener now handles LedgerCommitPublished (merged with the existing LedgerIndexPublished arm — both call LedgerManager::notify); spawn_local_cache_event_listener is made pub and the server spawns a second instance against the raft integration's bus, so commits published by the state-machine adapter reach the cache. Fluree's internal listener stays put (harmless when raft owns publish — its bus simply sees no events).

Lifecycle integration

RaftIntegration now owns the Arc<RaftNameService> it used to construct externally — the integration builds it in new with staged_receipts and with_forwarding(id, http_client) configured, and exposes it via nameservice(). Both apply_staged_commit_router and raft_network::router mount under /raft via raft_rpc_router().

Server lifecycle gains one new field: raft_stager_supervisor: Option<CancellableTaskHandle>. Spawned at startup as a peer to raft_leader_watcher. Shutdown sequence is supervisor → leader watcher → release task so in-flight stagers drain before the leader-only background tasks (indexer, evictor) stop racing on shared state.

LeaderWatcherHandle and StagerSupervisorHandle were structurally identical (a JoinHandle plus a CancellationToken) so they were consolidated into a single CancellableTaskHandle driven by a private spawn_cancellable(future_fn) helper. The two public spawn functions still exist with their distinct docs; only the handle type is shared.

Failure modes

  • Mid-stage abort (ownership moves while a stager is partway through staging): the new owner restarts from scratch. Partial CAS writes on the old node are orphaned but safe (content-addressed; the same bytes would have hashed identically). Same lifecycle pattern as today's leader-loss abort.

  • Transient duplicate stagers during a membership change: ownership decisions on different nodes may briefly disagree because they're computed against different membership_config snapshots. Both nodes may stage the same queue entry. Wasteful but safe — the state machine's queue_id front check serializes ApplyHead, so exactly one apply lands per entry and the loser sees Stale.

  • Cross-node RPC failures (network / unreachable leader): the follower's stager treats the failure as a publish error, cleans up its local stash, and the outer retry loop re-stages. Idempotent because the queue_id check on the state machine guarantees exactly-once apply regardless of how many times we try.

  • Leader change mid-apply_staged_commit: the leader receives the ferried receipt, stashes it, calls client_write. If it steps down before commit, openraft returns ForwardToLeader; my handler takes the stash back and returns NotLeader to the follower. The follower retries against the new leader.

@zonotope zonotope requested review from aaj3f and bplatz June 24, 2026 16:09
}

#[derive(Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactApplied embeds Option<TrackingTally>, and TrackingTally has skip_serializing_if fields — those don't round-trip through positional postcard on the apply_staged_commit RPC. Needs a skip-free wire DTO, plus a round-trip test for ApplyStagedCommitArgs (none today).

}

for (_, handle) in stagers.drain() {
handle.abort();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stagers are abort()ed but never awaited here (same at the reassign path ~1040), so an ownership flap can respawn a stager for the same RefKey before the old one yields, racing on the shared staged_receipts. Want the abort_and_await discipline from the leader watcher (reconcile would need to go async).

/// queue front. Poisoning still goes direct to Raft because there's
/// no trait surface for "fail this queue entry."
/// queue front (forwarding to the leader when this stager runs on a
/// follower). Poisoning still goes direct to Raft because there's no

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bites follower-owned stagers: propose_poison goes through local client_write, which returns ForwardToLeader on a follower → the poison bounces forever and head-of-line-blocks the branch. Deterministic poisons need a leader-forwarded path like apply_staged_commit.


let resp = forwarding
.http_client
.post(&target)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No per-request .timeout(...) here — only the shared client's connect_timeout. The other raft RPCs set one (network.rs / forward.rs); without it a connected-but-stalled leader hangs the stager on this send().await before it ever reaches backoff/retry. Thread a timeout through ForwardingConfig.

"apply_staged_commit stale: queue_id {queue_id} no longer at front \
(current front: {current_front_queue_id:?})"
))),
Err(e) => Err(NameServiceError::storage(format!(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch-all flattens every ApplyStagedCommitError into one storage error, so the caller retries them all as transient. Terminal ones (notably InvariantViolated) then spin forever instead of poisoning. Classify off the structured variants here.

/// raft port to peer addresses only.
pub fn apply_staged_commit_router(ns: Arc<RaftNameService>) -> Router {
Router::new()
.route(APPLY_STAGED_COMMIT_PATH, post(handle_apply_staged_commit))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No DefaultBodyLimit on this route, so it falls back to axum's 2 MiB default while the sibling raft RPCs set explicit caps — and an oversize receipt 413s into the retry-forever path. Add an explicit cap (reuse the network config).

Comment thread fluree-db-consensus/src/raft.rs Outdated
//! work locally, writes the commit blob, stashes the typed receipt
//! in [`staged_receipt::StagedReceiptMap`], and proposes
//! [`state_machine::Command::ApplyHead`] via the
//! 3. The leader-only [`commit_worker::StagerSupervisor`] (driven by

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale now — the StagerSupervisor runs on every node and isn't driven by the leader watcher. The "Submission flow on the leader" framing at the top of this module doc reads the same way.

})
.keys()
.filter(|ref_key| owner(ref_key, &voters) == Some(self.id))
.cloned()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth calling out as a known liveness limitation: owner() runs over the configured voter set (current_voters() reads membership_config), not the set that's actually up. If a voter is down but still in the membership config, every live node computes that dead node as the owner for ~1/N of branches and declines, so those branches have no stager anywhere and their queues stall — even though consensus still has quorum. It clears only when the voter returns or membership is reconfigured to drop it. This is a real reduction in write availability vs. what raft alone tolerates (the old leader-only worker never hit it, since the leader is always live).

Not suggesting the naive fix — filtering by live voters would break the determinism this relies on (nodes have divergent liveness views → owner disagreement → either gaps or double-ownership on the same RefKey). A real fix has to make liveness a consensus fact (e.g. leader-driven membership eviction of an unreachable voter). For this PR a doc note on the limitation is probably enough; flagging so it's a deliberate choice rather than a surprise.

Base automatically changed from feature/raft to main June 24, 2026 18:35
}
Err(err) => {
if self.commit_replicated(ref_key, &commit_id).await {
if self.commit_replicated(&commit_id).await {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit_replicated proves our ApplyHead landed only when commit_id is a fresh CID. No-op revert/rebase republish the existing head (current_head_id / pre_rebase_head_id), so head == commit_id holds whether or not this entry's ApplyHead actually applied. On a publish error this then returns Ok(()), run() sets last_committed = queue_id, but the front was never popped — snapshot_front keeps returning the same entry and the queue_id <= last guard sleeps the branch forever. Gate the landed-check on the queue front advancing past queue_id, not on head equality.

@bplatz bplatz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just added one more comment for a type of failure scenario.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants