Distribute Raft Workers Across The Cluster#1370
Conversation
…ibuted-raft-workers
| } | ||
|
|
||
| #[derive(Debug)] | ||
| #[derive(Clone, Debug, Serialize, Deserialize)] |
There was a problem hiding this comment.
TransactApplied embeds Option<TrackingTally>, and TrackingTally has skip_serializing_if fields — those don't round-trip through positional postcard on the apply_staged_commit RPC. Needs a skip-free wire DTO, plus a round-trip test for ApplyStagedCommitArgs (none today).
| } | ||
|
|
||
| for (_, handle) in stagers.drain() { | ||
| handle.abort(); |
There was a problem hiding this comment.
Stagers are abort()ed but never awaited here (same at the reassign path ~1040), so an ownership flap can respawn a stager for the same RefKey before the old one yields, racing on the shared staged_receipts. Want the abort_and_await discipline from the leader watcher (reconcile would need to go async).
| /// queue front. Poisoning still goes direct to Raft because there's | ||
| /// no trait surface for "fail this queue entry." | ||
| /// queue front (forwarding to the leader when this stager runs on a | ||
| /// follower). Poisoning still goes direct to Raft because there's no |
There was a problem hiding this comment.
This bites follower-owned stagers: propose_poison goes through local client_write, which returns ForwardToLeader on a follower → the poison bounces forever and head-of-line-blocks the branch. Deterministic poisons need a leader-forwarded path like apply_staged_commit.
|
|
||
| let resp = forwarding | ||
| .http_client | ||
| .post(&target) |
There was a problem hiding this comment.
No per-request .timeout(...) here — only the shared client's connect_timeout. The other raft RPCs set one (network.rs / forward.rs); without it a connected-but-stalled leader hangs the stager on this send().await before it ever reaches backoff/retry. Thread a timeout through ForwardingConfig.
| "apply_staged_commit stale: queue_id {queue_id} no longer at front \ | ||
| (current front: {current_front_queue_id:?})" | ||
| ))), | ||
| Err(e) => Err(NameServiceError::storage(format!( |
There was a problem hiding this comment.
This catch-all flattens every ApplyStagedCommitError into one storage error, so the caller retries them all as transient. Terminal ones (notably InvariantViolated) then spin forever instead of poisoning. Classify off the structured variants here.
| /// raft port to peer addresses only. | ||
| pub fn apply_staged_commit_router(ns: Arc<RaftNameService>) -> Router { | ||
| Router::new() | ||
| .route(APPLY_STAGED_COMMIT_PATH, post(handle_apply_staged_commit)) |
There was a problem hiding this comment.
No DefaultBodyLimit on this route, so it falls back to axum's 2 MiB default while the sibling raft RPCs set explicit caps — and an oversize receipt 413s into the retry-forever path. Add an explicit cap (reuse the network config).
| //! work locally, writes the commit blob, stashes the typed receipt | ||
| //! in [`staged_receipt::StagedReceiptMap`], and proposes | ||
| //! [`state_machine::Command::ApplyHead`] via the | ||
| //! 3. The leader-only [`commit_worker::StagerSupervisor`] (driven by |
There was a problem hiding this comment.
Stale now — the StagerSupervisor runs on every node and isn't driven by the leader watcher. The "Submission flow on the leader" framing at the top of this module doc reads the same way.
| }) | ||
| .keys() | ||
| .filter(|ref_key| owner(ref_key, &voters) == Some(self.id)) | ||
| .cloned() |
There was a problem hiding this comment.
Worth calling out as a known liveness limitation: owner() runs over the configured voter set (current_voters() reads membership_config), not the set that's actually up. If a voter is down but still in the membership config, every live node computes that dead node as the owner for ~1/N of branches and declines, so those branches have no stager anywhere and their queues stall — even though consensus still has quorum. It clears only when the voter returns or membership is reconfigured to drop it. This is a real reduction in write availability vs. what raft alone tolerates (the old leader-only worker never hit it, since the leader is always live).
Not suggesting the naive fix — filtering by live voters would break the determinism this relies on (nodes have divergent liveness views → owner disagreement → either gaps or double-ownership on the same RefKey). A real fix has to make liveness a consensus fact (e.g. leader-driven membership eviction of an unreachable voter). For this PR a doc note on the limitation is probably enough; flagging so it's a deliberate choice rather than a surprise.
| } | ||
| Err(err) => { | ||
| if self.commit_replicated(ref_key, &commit_id).await { | ||
| if self.commit_replicated(&commit_id).await { |
There was a problem hiding this comment.
commit_replicated proves our ApplyHead landed only when commit_id is a fresh CID. No-op revert/rebase republish the existing head (current_head_id / pre_rebase_head_id), so head == commit_id holds whether or not this entry's ApplyHead actually applied. On a publish error this then returns Ok(()), run() sets last_committed = queue_id, but the front was never popped — snapshot_front keeps returning the same entry and the queue_id <= last guard sleeps the branch forever. Gate the landed-check on the queue front advancing past queue_id, not on head equality.
bplatz
left a comment
There was a problem hiding this comment.
Looks good, just added one more comment for a type of failure scenario.
Splits the leader-only
CommitWorkerinto per-branchStagertasks, distributes them across all cluster members via rendezvous hashing, and adds a cross-node RPC so follower-staged commits route theirApplyHeadpropose through the current leader. Net effect: followers no longer sit idle while the leader serializes every transaction; each node owns ~1/N of branches and stages in parallel.Motivation
The previous architecture ran every per-branch commit worker on the raft leader. Followers were ledger replicas that only saw applies — they didn't stage. On a busy ledger this made the leader the bottleneck for every transaction in the cluster, while CPU and IO on followers went unused. The design goal was to fan staging out without losing the consensus guarantees raft already provides (exactly-once apply, ordered queue per branch).
Architecture
Per-branch staging tasks
CommitWorker(one task on the leader, iterating every branch's queue in turn) is replaced byStager(one task per activeRefKey, polls only its own branch). Each stager owns the same end-to-end pipeline as the old worker — load envelope from CAS, stage the commit, write the commit blob, publish the head advance — scoped to a single branch. Cross-branch concurrency is now the default: branch B's first entry no longer waits for branch A's backlog to drain.The staging code itself is unchanged; this is a refactor of who calls it, not how it works. The retry loop (
MAX_STAGE_ATTEMPTS+ exponential backoff + poison fallback), the panic-per-entry boundary (catch_unwindaroundprocess_entry), and theStagedReceiptMapside-channel all behave identically.Node-lifetime supervisor
StagerSupervisorruns on every node (leader and followers alike), spawned at process start instead of being bound to leadership transitions. Each tick (250ms) the supervisor reconciles its running stagers against the desired set: branches inNameServiceState::queueswhose rendezvous owner under the current voter set is this node. New owner → spawn a stager; ownership moved away → abort the stager. Shutdown is driven byCancellationToken; the supervisor'sselect!loop catches the cancel, aborts every running stager, and returns.The supervisor wakes on either
raft.metrics().changed()(membership / leader / term updates, so ownership recomputes within one tick) or a 250ms poll (new branches appearing in the queue map).Rendezvous-hashing assignment
A new
raft::ownership::owner(ref_key, voters)function maps eachRefKeyto theNodeIdthat should run its stager. Uses xxh64 with a fixed seed (so every node computes identical scores) and Highest Random Weight selection. Per-call cost is one xxh64 over the composed key bytes plus one xxh64 per voter. Empty voter set returnsNone; the supervisor treats that as "cluster not yet bootstrapped, claim nothing."Rendezvous was chosen over plain
hash(ref_key) % Nbecause the latter reshuffles~(N-1)/Nof branches on every membership change (adding the 5th node to a 4-node cluster moves ~67% of branches). Rendezvous moves only~1/(N+1)and only branches that genuinely changed owner — important for rolling-restart deployments.Voters only, not learners. Learners are full data replicas but typically transient (joining cluster, staging for promotion); promoting one to voter is the operator's signal that it should participate.
Cross-node ApplyHead RPC
When a stager runs on a follower, it can't propose
Command::ApplyHeaddirectly — openraft surfacesForwardToLeaderon non-leader proposes. AddedRaftNameService::apply_staged_commit(args), a new method exposed atPOST /raft/apply_staged_commit(postcard body, no auth, same intra-cluster trust model as the existing openraft RPCs). The follower'spublish_commitdetects non-leader role, looks up the leader'sraft_addrfrom membership, takes the staged receipt out of its local map, ferries it to the leader via the new endpoint. The leader's handler validates the queue front matches the follower'squeue_id, stashes the receipt in the leader'sStagedReceiptMap, proposesCommand::ApplyHead, returns the outcome.The endpoint lives on the existing private listener under
/raftalongside the openraft RPCs. It is included inRaftIntegration::raft_rpc_router()so any caller using the integration's router (including the multi-node test harness) gets it automatically.The
CommitPublishertrait is untouched. All forwarding logic lives inside theRaftNameServiceimpl — other backends (MemoryNameService,FileNameService,DynamoDbNameService, etc.) are unaffected.Event-bus reconciliation
AppliedReceiptand its per-op variants are nowClone + Serialize + Deserializeso the typed receipt rides the wire to the leader (lets cross-node-staged transactions resolve with full per-op detail instead of falling back toAppliedReceipt::Minimal).The state-machine adapter publishes
LedgerCommitPublishedevents onRaftIntegration::event_buson everyApplyHeadapply. The previous local cache event listener subscribed to Fluree's internalLedgerEventBusand only handledLedgerIndexPublished/LedgerRetracted—LedgerCommitPublishedwas silently dropped. In the old architecture this was fine because the leader's stager calledfinalize_local_stateon the leader'sFlureedirectly, keeping its cache warm. With distributed stagers, the staging node and the queried node are usually different, so non-staging nodes needed an event-driven refresh path.Two fixes: the existing listener now handles
LedgerCommitPublished(merged with the existingLedgerIndexPublishedarm — both callLedgerManager::notify);spawn_local_cache_event_listeneris madepuband the server spawns a second instance against the raft integration's bus, so commits published by the state-machine adapter reach the cache. Fluree's internal listener stays put (harmless when raft owns publish — its bus simply sees no events).Lifecycle integration
RaftIntegrationnow owns theArc<RaftNameService>it used to construct externally — the integration builds it innewwithstaged_receiptsandwith_forwarding(id, http_client)configured, and exposes it vianameservice(). Bothapply_staged_commit_routerandraft_network::routermount under/raftviaraft_rpc_router().Server lifecycle gains one new field:
raft_stager_supervisor: Option<CancellableTaskHandle>. Spawned at startup as a peer toraft_leader_watcher. Shutdown sequence issupervisor → leader watcher → release taskso in-flight stagers drain before the leader-only background tasks (indexer, evictor) stop racing on shared state.LeaderWatcherHandleandStagerSupervisorHandlewere structurally identical (aJoinHandleplus aCancellationToken) so they were consolidated into a singleCancellableTaskHandledriven by a privatespawn_cancellable(future_fn)helper. The two public spawn functions still exist with their distinct docs; only the handle type is shared.Failure modes
Mid-stage abort (ownership moves while a stager is partway through staging): the new owner restarts from scratch. Partial CAS writes on the old node are orphaned but safe (content-addressed; the same bytes would have hashed identically). Same lifecycle pattern as today's leader-loss abort.
Transient duplicate stagers during a membership change: ownership decisions on different nodes may briefly disagree because they're computed against different
membership_configsnapshots. Both nodes may stage the same queue entry. Wasteful but safe — the state machine'squeue_idfront check serializesApplyHead, so exactly one apply lands per entry and the loser seesStale.Cross-node RPC failures (network / unreachable leader): the follower's stager treats the failure as a publish error, cleans up its local stash, and the outer retry loop re-stages. Idempotent because the
queue_idcheck on the state machine guarantees exactly-once apply regardless of how many times we try.Leader change mid-
apply_staged_commit: the leader receives the ferried receipt, stashes it, callsclient_write. If it steps down before commit, openraft returnsForwardToLeader; my handler takes the stash back and returnsNotLeaderto the follower. The follower retries against the new leader.