diff --git a/tools/celestia-node-fiber/adapter.go b/tools/celestia-node-fiber/adapter.go index 1984a1aaf8..9c1f37e73d 100644 --- a/tools/celestia-node-fiber/adapter.go +++ b/tools/celestia-node-fiber/adapter.go @@ -14,6 +14,7 @@ import ( "github.com/celestiaorg/celestia-node/api/client" blobapi "github.com/celestiaorg/celestia-node/nodebuilder/blob" fibreapi "github.com/celestiaorg/celestia-node/nodebuilder/fibre" + headerapi "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/evstack/ev-node/block" ) @@ -39,6 +40,7 @@ const defaultListenChannelSize = 16 type Adapter struct { fibre fibreapi.Module blob blobapi.Module + header headerapi.Module listenChannelSz int // closer, if non-nil, is invoked by Close. Set only when the Adapter @@ -60,6 +62,7 @@ func New(ctx context.Context, cfg Config, kr keyring.Keyring) (*Adapter, error) return &Adapter{ fibre: c.Fibre, blob: c.Blob, + header: c.Header, listenChannelSz: resolveListenChannelSize(cfg.ListenChannelSize), closer: c.Close, }, nil @@ -85,6 +88,20 @@ func (a *Adapter) Close() error { return a.closer() } +// Head returns the bridge node's current local-head height. Returns 0 if +// the underlying client was constructed via FromModules without a Header +// module. +func (a *Adapter) Head(ctx context.Context) (uint64, error) { + if a.header == nil { + return 0, fmt.Errorf("Adapter has no Header module; construct via New") + } + h, err := a.header.LocalHead(ctx) + if err != nil { + return 0, fmt.Errorf("header.LocalHead: %w", err) + } + return h.Height(), nil +} + // Upload implements fiber.DA.Upload. client.Fibre.Upload does off-chain row // upload plus validator-sig aggregation and spawns a background // MsgPayForFibre broadcast; this call returns as soon as the off-chain diff --git a/tools/celestia-node-fiber/testing/docker/Dockerfile.app b/tools/celestia-node-fiber/testing/docker/Dockerfile.app new file mode 100644 index 0000000000..7ecb521f81 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/Dockerfile.app @@ -0,0 +1,34 @@ +# Build a celestia-appd binary with the `fibre` build tag enabled and a +# matching `fibre` server binary. Both go on PATH so the validator +# entrypoint can run them as separate processes. +# +# Pin CELESTIA_APP_REF to a feature/fibre commit; the default tracks +# whatever celestia-app `main` looks like at build time, which is where +# fibre development lives. +ARG GO_VERSION=1.26.1 +ARG CELESTIA_APP_REPO=https://github.com/celestiaorg/celestia-app.git +ARG CELESTIA_APP_REF=main + +FROM golang:${GO_VERSION}-bookworm AS build +ARG CELESTIA_APP_REPO +ARG CELESTIA_APP_REF +RUN apt-get update \ + && apt-get install -y --no-install-recommends git ca-certificates \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /src +RUN git clone --depth 1 --branch "${CELESTIA_APP_REF}" "${CELESTIA_APP_REPO}" celestia-app \ + || git clone "${CELESTIA_APP_REPO}" celestia-app +WORKDIR /src/celestia-app +RUN git checkout "${CELESTIA_APP_REF}" || true +ENV CGO_ENABLED=0 GOFLAGS="-mod=readonly" +RUN go build -tags "ledger,fibre" -o /out/celestia-appd ./cmd/celestia-appd +RUN go build -tags "ledger,fibre" -o /out/fibre ./fibre/cmd + +FROM debian:bookworm-slim +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates jq curl \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /out/celestia-appd /usr/local/bin/celestia-appd +COPY --from=build /out/fibre /usr/local/bin/fibre +RUN chmod +x /usr/local/bin/celestia-appd /usr/local/bin/fibre +WORKDIR /home/celestia diff --git a/tools/celestia-node-fiber/testing/docker/Dockerfile.bridge b/tools/celestia-node-fiber/testing/docker/Dockerfile.bridge new file mode 100644 index 0000000000..11ec5f2c95 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/Dockerfile.bridge @@ -0,0 +1,28 @@ +# Build a celestia-node bridge binary with the `fibre` build tag enabled. +# The bridge is what serves blob.Subscribe over JSON-RPC for the adapter's +# Listen path, and (on the read-only side) the fibre namespace API. +ARG GO_VERSION=1.26.1 +ARG CELESTIA_NODE_REPO=https://github.com/celestiaorg/celestia-node.git +ARG CELESTIA_NODE_REF=feature/fibre + +FROM golang:${GO_VERSION}-bookworm AS build +ARG CELESTIA_NODE_REPO +ARG CELESTIA_NODE_REF +RUN apt-get update \ + && apt-get install -y --no-install-recommends git ca-certificates \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /src +RUN git clone --depth 1 --branch "${CELESTIA_NODE_REF}" "${CELESTIA_NODE_REPO}" celestia-node \ + || git clone "${CELESTIA_NODE_REPO}" celestia-node +WORKDIR /src/celestia-node +RUN git checkout "${CELESTIA_NODE_REF}" || true +ENV CGO_ENABLED=0 GOFLAGS="-mod=readonly" +RUN go build -tags "fibre" -o /out/celestia ./cmd/celestia + +FROM debian:bookworm-slim +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates jq curl \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /out/celestia /usr/local/bin/celestia +RUN chmod +x /usr/local/bin/celestia +WORKDIR /home/celestia diff --git a/tools/celestia-node-fiber/testing/docker/README.md b/tools/celestia-node-fiber/testing/docker/README.md new file mode 100644 index 0000000000..3603bf0330 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/README.md @@ -0,0 +1,118 @@ +# Fibre 4-validator + bridge docker showcase + +A docker-compose stack that brings up four celestia-app validators +(each running a Fibre server), a celestia-node bridge, and a one-shot +init container that registers Fibre Storage Provider hosts and funds an +escrow account. A Go test driver (`docker_test.go`) connects from the +host and exercises the `celestia-node-fiber` adapter end-to-end against +the real 2/3-quorum network. + +## Why + +The in-process `testing/showcase_test.go` runs against a single +validator inside the test process. That proves the adapter wires +correctly, but it doesn't exercise: + +- real consensus 2/3 quorum collection (single validator trivially + satisfies it), +- inter-validator P2P, +- multiple Fibre servers contributing partial signatures, +- the dns:/// host registry resolution path, +- the bridge syncing real headers off a network it doesn't itself drive. + +This stack does. + +## Architecture + +``` + +---------- bootstrap (one-shot) ----------+ + | init-genesis.sh: 4-val genesis + keys | + +-------+----------------------------------+ + | shared volume + +------------+------------+------------+ + v v v v + val0 val1 val2 val3 + (appd + (appd + (appd + (appd + + fibre) fibre) fibre) fibre) + ^ + | gRPC :9090, RPC :26657 + | + bridge (celestia-node) + ^ + | JSON-RPC/WebSocket :26658 + | + +--------+--------+ + | Go test | + | (docker_test.go) | + +-----------------+ +``` + +## Run + +```bash +cd tools/celestia-node-fiber/testing/docker + +# First boot: builds two images (~5–10 min on a cold cache). +docker compose up -d --build + +# Watch the bootstrap + registration progress: +docker compose logs -f bootstrap register + +# Once `register` exits 0 and writes /shared/setup.done, the bridge +# connects and the stack is ready. + +# From the parent dir, run the Go-side driver: +cd ../.. +go test -tags 'fibre fibre_docker' -count=1 -timeout 5m ./testing/docker/... + +# Tear down (preserves volumes — add -v to wipe shared genesis state): +docker compose -f testing/docker/compose.yaml down +``` + +Override endpoints from the host with env vars if your ports collide: + +``` +FIBRE_BRIDGE_ADDR=ws://127.0.0.1:36658 \ +FIBRE_CONSENSUS_ADDR=127.0.0.1:19090 \ +go test -tags 'fibre fibre_docker' ... +``` + +## Build args + +Both Dockerfiles accept refs: + +| arg | Dockerfile | default | what it does | +|---|---|---|---| +| `CELESTIA_APP_REPO` | `Dockerfile.app` | celestia-app upstream | clone source | +| `CELESTIA_APP_REF` | `Dockerfile.app` | `main` | git ref to build with `-tags fibre,ledger` | +| `CELESTIA_NODE_REPO` | `Dockerfile.bridge` | celestia-node upstream | clone source | +| `CELESTIA_NODE_REF` | `Dockerfile.bridge` | `feature/fibre` | git ref to build with `-tags fibre` | + +Example pinning to a specific commit: + +``` +docker compose build --build-arg CELESTIA_NODE_REF=194cc74c ... +``` + +## Known TODOs + +The scaffold has been validated end-to-end on Apple Silicon +(Docker Desktop 4.70 / linux/arm64). A few rough edges remain that +are worth tightening for CI: + +1. **`config.toml` / `app.toml` overrides** in `start-validator.sh` + use `sed` against expected default lines. If the celestia-app + defaults change verb/spacing, the substitutions silently no-op. + Consider a `dasel`/`tomlq` rewrite if it bites. +2. **No healthchecks** on validators. `register` waits on + `service_started`, which is only "container booted", not "RPC + responding". The script polls `celestia-appd status` which + handles that, but a proper healthcheck would let `bridge` start + sooner without polling itself. +3. **No CI integration**. Adding `make docker-test` that wraps + `docker compose up -d --wait`, runs the test, then tears down, + is a sensible follow-up. +4. **Build cache** — every `docker compose up --build` re-clones + celestia-app + celestia-node. To iterate faster, set up a + docker volume cache for `/go/pkg/mod` and `/root/.cache/go-build`, + or build the images once and re-use. diff --git a/tools/celestia-node-fiber/testing/docker/compose.yaml b/tools/celestia-node-fiber/testing/docker/compose.yaml new file mode 100644 index 0000000000..5cf090d0c6 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/compose.yaml @@ -0,0 +1,153 @@ +# Local 4-validator + 1-bridge Fibre stack for end-to-end testing of the +# celestia-node-fiber adapter. Designed to be brought up with +# `docker compose up -d` from this directory; the test driver in +# docker_test.go (build tag `fibre_docker`) connects to it from the host. +# +# Services: +# bootstrap one-shot — generates a 4-val genesis under /shared +# val0..val3 validators running celestia-appd + fibre server +# register one-shot — submits MsgSetFibreProviderInfo + escrow +# bridge celestia-node bridge connected to val0 +# +# Volumes: +# shared/ genesis, keyrings, peers, JWT — read by every service +# +# Ports exposed to host (override via override file if they collide): +# val0 :26657 (RPC) :9090 (gRPC) :7980 (fibre) +# bridge :26658 (JSON-RPC over WebSocket) +services: + bootstrap: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/init-genesis.sh"] + environment: + CHAIN_ID: fibre-docker + NUM_VALIDATORS: "4" + SHARED: /shared + CLIENT_ACCOUNT: default-fibre + volumes: + - shared:/shared + - ./scripts:/scripts:ro + restart: "no" + + val0: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/start-validator.sh"] + environment: + VAL_INDEX: "0" + SHARED: /shared + CHAIN_ID: fibre-docker + depends_on: + bootstrap: + condition: service_completed_successfully + volumes: + - shared:/shared + - ./scripts:/scripts:ro + ports: + - "26657:26657" + - "9090:9090" + - "7980:7980" + + val1: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/start-validator.sh"] + environment: + VAL_INDEX: "1" + SHARED: /shared + CHAIN_ID: fibre-docker + depends_on: + bootstrap: + condition: service_completed_successfully + volumes: + - shared:/shared + - ./scripts:/scripts:ro + ports: + - "7981:7980" + + val2: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/start-validator.sh"] + environment: + VAL_INDEX: "2" + SHARED: /shared + CHAIN_ID: fibre-docker + depends_on: + bootstrap: + condition: service_completed_successfully + volumes: + - shared:/shared + - ./scripts:/scripts:ro + ports: + - "7982:7980" + + val3: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/start-validator.sh"] + environment: + VAL_INDEX: "3" + SHARED: /shared + CHAIN_ID: fibre-docker + depends_on: + bootstrap: + condition: service_completed_successfully + volumes: + - shared:/shared + - ./scripts:/scripts:ro + ports: + - "7983:7980" + + register: + build: + context: . + dockerfile: Dockerfile.app + entrypoint: ["bash", "/scripts/register-fsps.sh"] + environment: + NUM_VALIDATORS: "4" + SHARED: /shared + CHAIN_ID: fibre-docker + CLIENT_ACCOUNT: default-fibre + depends_on: + val0: + condition: service_started + val1: + condition: service_started + val2: + condition: service_started + val3: + condition: service_started + volumes: + - shared:/shared + - ./scripts:/scripts:ro + restart: on-failure + + bridge: + build: + context: . + dockerfile: Dockerfile.bridge + entrypoint: ["bash", "/scripts/start-bridge.sh"] + environment: + NETWORK: fibre-docker + SHARED: /shared + CORE_IP: val0 + CORE_GRPC_PORT: "9090" + CORE_RPC_PORT: "26657" + depends_on: + register: + condition: service_completed_successfully + volumes: + - shared:/shared + - ./scripts:/scripts:ro + ports: + - "26658:26658" + +volumes: + shared: diff --git a/tools/celestia-node-fiber/testing/docker/docker_test.go b/tools/celestia-node-fiber/testing/docker/docker_test.go new file mode 100644 index 0000000000..6e1d196414 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/docker_test.go @@ -0,0 +1,165 @@ +//go:build fibre_docker + +// docker_test.go — runs the same Upload → Listen → Download flow as +// the in-process showcase, but against the docker-compose stack in +// this directory. +// +// Bring the stack up first: +// +// cd tools/celestia-node-fiber/testing/docker +// docker compose up -d --build +// # wait until `docker compose logs register` says "setup.done flag written" +// # and `docker compose logs bridge` shows the bridge serving on :26658 +// +// Then from the parent dir: +// +// go test -tags 'fibre fibre_docker' -count=1 -timeout 5m ./testing/docker/... +// +// The test reads the bridge JWT from the shared docker volume by +// running `docker compose exec bridge cat /shared/bridge-admin-jwt.txt`, +// so the docker CLI must be available on the host. +package docker_test + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-app/v8/app" + "github.com/celestiaorg/celestia-app/v8/app/encoding" + "github.com/celestiaorg/celestia-node/api/client" + + cnfiber "github.com/evstack/ev-node/tools/celestia-node-fiber" +) + +// keep block import compiled out of the test binary; the assertion that +// adapter satisfies block.FiberClient lives in the unit tests. +var _ = (cnfiber.Adapter{}) + +const ( + bridgeAddr = "ws://127.0.0.1:26658" + consensusAddr = "127.0.0.1:9090" + chainID = "fibre-docker" + clientAccount = "default-fibre" + docTimeout = 60 * time.Second +) + +// envOr returns the env var if set, otherwise fallback. +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func readBridgeJWT(t *testing.T) string { + t.Helper() + cmd := exec.Command("docker", "compose", "exec", "-T", "bridge", + "cat", "/shared/bridge-admin-jwt.txt") + cmd.Dir = mustDockerDir(t) + out, err := cmd.CombinedOutput() + require.NoError(t, err, "reading bridge JWT: %s", string(out)) + return strings.TrimSpace(string(out)) +} + +// mustDockerDir locates this file's directory at runtime so the test can +// invoke docker compose against the correct compose file regardless of +// where `go test` was launched from. +func mustDockerDir(t *testing.T) string { + t.Helper() + wd, err := os.Getwd() + require.NoError(t, err) + return wd +} + +// readClientKeyring loads the keyring populated by init-genesis.sh from +// the shared docker volume into a host-side keyring.Keyring suitable +// for fiber.New. We do this by `docker cp`-ing the seed validator's +// home dir to a local temp dir. +// +// TODO: this assumes the operator has the docker CLI; the test doesn't +// validate that up front. Add a `docker version` precheck if we want a +// clearer error. +func readClientKeyring(t *testing.T) keyring.Keyring { + t.Helper() + tmp := t.TempDir() + cmd := exec.Command("docker", "compose", "cp", + "val0:/shared/val0/.celestia-app/keyring-test", tmp+"/keyring-test") + cmd.Dir = mustDockerDir(t) + out, err := cmd.CombinedOutput() + require.NoError(t, err, "docker cp keyring: %s", string(out)) + + encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + kr, err := keyring.New("docker-test", keyring.BackendTest, tmp, nil, encCfg.Codec) + require.NoError(t, err, "constructing keyring") + return kr +} + +// TestDockerShowcase drives Upload → Listen → Download against the +// docker-compose stack. The 4-validator network exercises real 2/3 +// quorum aggregation that the single-validator showcase cannot. +// +// Build tag `fibre_docker` keeps the test out of default `go test` +// runs since it requires an external docker stack. +func TestDockerShowcase(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + jwt := readBridgeJWT(t) + kr := readClientKeyring(t) + + adapter, err := cnfiber.New(ctx, cnfiber.Config{ + Client: client.Config{ + ReadConfig: client.ReadConfig{ + BridgeDAAddr: envOr("FIBRE_BRIDGE_ADDR", bridgeAddr), + DAAuthToken: jwt, + EnableDATLS: false, + }, + SubmitConfig: client.SubmitConfig{ + DefaultKeyName: clientAccount, + Network: chainID, + CoreGRPCConfig: client.CoreGRPCConfig{ + Addr: envOr("FIBRE_CONSENSUS_ADDR", consensusAddr), + }, + }, + }, + }, kr) + require.NoError(t, err, "constructing adapter against docker stack") + t.Cleanup(func() { _ = adapter.Close() }) + + namespace := bytes.Repeat([]byte{0xab}, 10) + payload := []byte(fmt.Sprintf("docker showcase %d", time.Now().UnixNano())) + + events, err := adapter.Listen(ctx, namespace, 0) + require.NoError(t, err, "Listen subscription") + + up, err := adapter.Upload(ctx, namespace, payload) + require.NoError(t, err, "Upload") + require.NotEmpty(t, up.BlobID) + t.Logf("upload ok: blob_id=%s", hex.EncodeToString(up.BlobID)) + + select { + case ev, ok := <-events: + require.True(t, ok, "Listen channel closed without event") + require.Equal(t, up.BlobID, ev.BlobID, "BlobID mismatch") + require.Equal(t, uint64(len(payload)), ev.DataSize) + require.Greater(t, ev.Height, uint64(0)) + t.Logf("listen ok: height=%d data_size=%d", ev.Height, ev.DataSize) + case <-time.After(docTimeout): + t.Fatalf("timed out waiting for BlobEvent after %s", docTimeout) + } + + got, err := adapter.Download(ctx, up.BlobID) + require.NoError(t, err, "Download") + require.Equal(t, payload, got, "downloaded bytes don't match payload") + t.Logf("download ok: %d bytes", len(got)) +} diff --git a/tools/celestia-node-fiber/testing/docker/evnode_docker_test.go b/tools/celestia-node-fiber/testing/docker/evnode_docker_test.go new file mode 100644 index 0000000000..bd79aec9c2 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/evnode_docker_test.go @@ -0,0 +1,95 @@ +//go:build fibre_docker + +// evnode_docker_test.go — wires an aggregator + full-node ev-node pair +// onto the docker-compose Celestia + Fibre stack and asserts that the +// full node DA-syncs the aggregator's blocks via Fibre. +// +// This is the docker counterpart to `TestEvNode_FiberDA_TwoNode` under +// `testing/`. Both share the same flow +// (`cnfibertest.RunEvNodeFibreTwoNodeFlow`); only the underlying +// Celestia + bridge plumbing differs. +// +// Prereqs are identical to docker_test.go — bring up the stack first: +// +// cd tools/celestia-node-fiber/testing/docker +// docker compose up -d --build +// # wait until `docker compose logs register` says "setup.done flag written" +// +// Then run the test: +// +// go test -tags 'fibre fibre_docker' -count=1 -timeout 5m \ +// -run TestEvNode_FiberDA_Docker ./testing/docker/... + +package docker_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/api/client" + + cnfiber "github.com/evstack/ev-node/tools/celestia-node-fiber" + cnfibertest "github.com/evstack/ev-node/tools/celestia-node-fiber/testing" +) + +// TestEvNode_FiberDA_Docker drives the aggregator + full-node ev-node +// pair against the 4-validator + bridge docker stack. Compared to the +// in-process variant this exercises: +// - real consensus 2/3-quorum signature aggregation (4 validators), +// - inter-validator P2P, +// - 4 distinct fibre servers cooperating on Upload row distribution, +// - dns:/// host registry resolution against an external chain, +// - a bridge that's syncing real headers, not driving block production. +func TestEvNode_FiberDA_Docker(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + jwt := readBridgeJWT(t) + kr := readClientKeyring(t) + + // Each role gets its own adapter so the bridge JSON-RPC websocket + // connections aren't shared. celestia-node's go-jsonrpc client only + // supports one event-stream subscription per connection — sharing + // kills the socket the moment a second Subscribe lands on it. + mkAdapter := func(label string) *cnfiber.Adapter { + t.Helper() + a, err := cnfiber.New(ctx, cnfiber.Config{ + Client: client.Config{ + ReadConfig: client.ReadConfig{ + BridgeDAAddr: envOr("FIBRE_BRIDGE_ADDR", bridgeAddr), + DAAuthToken: jwt, + EnableDATLS: false, + }, + SubmitConfig: client.SubmitConfig{ + DefaultKeyName: clientAccount, + Network: chainID, + CoreGRPCConfig: client.CoreGRPCConfig{ + Addr: envOr("FIBRE_CONSENSUS_ADDR", consensusAddr), + }, + }, + }, + }, kr) + require.NoError(t, err, "constructing %s adapter against docker stack", label) + t.Cleanup(func() { _ = a.Close() }) + return a + } + + aggAdapter := mkAdapter("aggregator") + fnAdapter := mkAdapter("full-node") + observer := mkAdapter("observer") + + // Pin the full node to the current bridge tip so its DA retriever + // skips historical scans (where there are no Fibre blobs yet) and + // jumps straight to the live-subscribe path. + head, err := observer.Head(ctx) + require.NoError(t, err, "querying bridge head") + t.Logf("bridge head at test start: %d", head) + + cnfibertest.RunEvNodeFibreTwoNodeFlow(t, ctx, aggAdapter, fnAdapter, observer, cnfibertest.EvNodeConfig{ + ChainID: "ev-fiber-docker", + DAStartHeight: head, + }) +} diff --git a/tools/celestia-node-fiber/testing/docker/scripts/init-genesis.sh b/tools/celestia-node-fiber/testing/docker/scripts/init-genesis.sh new file mode 100755 index 0000000000..25c2aced0f --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/scripts/init-genesis.sh @@ -0,0 +1,123 @@ +#!/usr/bin/env bash +# init-genesis.sh — runs once in the bootstrap container. +# +# Generates a 4-validator celestia-app genesis under /shared, with each +# validator's home dir at /shared/val/.celestia-app. All four homes +# share the same genesis.json so the chain has a coherent validator set. +# Each validator's priv_validator_key.json + node_key.json are unique +# per home dir. +# +# After this script exits, validator entrypoints can pick up their home +# from the shared volume. +set -euo pipefail + +CHAIN_ID="${CHAIN_ID:-fibre-docker}" +NUM_VALIDATORS="${NUM_VALIDATORS:-4}" +SHARED="${SHARED:-/shared}" +APP="${APP:-celestia-appd}" +KEYRING_BACKEND="test" +STAKE="100000000000utia" # 100k TIA per validator self-stake +INITIAL_BALANCE="1000000000000utia" +CLIENT_ACCOUNT="${CLIENT_ACCOUNT:-default-fibre}" +CLIENT_BALANCE="${CLIENT_BALANCE:-1000000000000utia}" + +mkdir -p "$SHARED" + +# Idempotency: if a previous run already produced peers.txt the genesis +# is already in place — skip re-init so subsequent `docker compose up` +# invocations don't fail trying to re-init the homes. +if [ -f "$SHARED/peers.txt" ]; then + echo "init-genesis: already initialized; nothing to do." + exit 0 +fi + +# Validator 0 is the seed home: we initialize there, add genesis accounts, +# then copy the resulting genesis.json into every other validator's home. +for i in $(seq 0 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + mkdir -p "$home" + "$APP" init "validator-$i" --chain-id "$CHAIN_ID" --home "$home" >/dev/null +done + +seed_home="$SHARED/val0/.celestia-app" + +# Add a validator key and genesis account to each home, then copy the +# pubkey/account into the seed home so it ends up in genesis. +for i in $(seq 0 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + "$APP" keys add "validator" \ + --keyring-backend "$KEYRING_BACKEND" --home "$home" --output json \ + > "$SHARED/val$i/validator.key.json" + addr=$("$APP" keys show "validator" -a \ + --keyring-backend "$KEYRING_BACKEND" --home "$home") + "$APP" genesis add-genesis-account "$addr" "$INITIAL_BALANCE" \ + --keyring-backend "$KEYRING_BACKEND" --home "$seed_home" +done + +# Add the client signer account to the seed genesis with a generous balance +# so the test driver can fund its escrow without worrying about gas. +"$APP" keys add "$CLIENT_ACCOUNT" \ + --keyring-backend "$KEYRING_BACKEND" --home "$seed_home" \ + --output json > "$SHARED/client.key.json" +client_addr=$("$APP" keys show "$CLIENT_ACCOUNT" -a \ + --keyring-backend "$KEYRING_BACKEND" --home "$seed_home") +"$APP" genesis add-genesis-account "$client_addr" "$CLIENT_BALANCE" \ + --keyring-backend "$KEYRING_BACKEND" --home "$seed_home" + +# Set network minimum gas price to 0 so gentxs (which have no fees) can +# be included. The default 0.000001 utia/gas would reject every gentx. +seed_genesis="$seed_home/config/genesis.json" +tmp=$(mktemp) +jq '.app_state.minfee.params.network_min_gas_price = "0.000000000000000000"' \ + "$seed_genesis" > "$tmp" +mv "$tmp" "$seed_genesis" + +# Generate gentxs from each validator's home, collect them in seed_home. +mkdir -p "$seed_home/config/gentx" +for i in $(seq 0 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + if [ "$i" -ne 0 ]; then + # Other validators need the seed's genesis.json before they can + # produce a valid gentx. + cp "$seed_home/config/genesis.json" "$home/config/genesis.json" + # Their account also needs to exist in their own keyring + genesis. + # Re-add account: gentx requires it. + addr=$("$APP" keys show "validator" -a \ + --keyring-backend "$KEYRING_BACKEND" --home "$home") + "$APP" genesis add-genesis-account "$addr" "$INITIAL_BALANCE" \ + --keyring-backend "$KEYRING_BACKEND" --home "$home" || true + fi + "$APP" genesis gentx "validator" "$STAKE" \ + --chain-id "$CHAIN_ID" \ + --keyring-backend "$KEYRING_BACKEND" \ + --home "$home" \ + --output-document "$seed_home/config/gentx/gentx-val$i.json" +done + +# Collect every gentx into the seed genesis. +"$APP" genesis collect-gentxs --home "$seed_home" +"$APP" genesis validate --home "$seed_home" + +# Distribute the final genesis.json to every other validator's home. +for i in $(seq 1 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + cp "$seed_home/config/genesis.json" "$home/config/genesis.json" +done + +# Persistent peers: each validator advertises itself by its docker +# service name (val0, val1, ...) on the standard P2P port 26656. +PEERS="" +for i in $(seq 0 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + nodeid=$("$APP" comet show-node-id --home "$home") + PEERS="${PEERS}${PEERS:+,}${nodeid}@val$i:26656" +done +echo "$PEERS" > "$SHARED/peers.txt" + +# TODO: write per-validator config tweaks (laddr / external_address / +# persistent_peers / minimum-gas-prices) into each home's config.toml / +# app.toml. The validator entrypoint expects these to be present already. +# Either inline here with sed/jq, or have the entrypoint apply them on +# startup. + +echo "init-genesis: done. peers=$PEERS" diff --git a/tools/celestia-node-fiber/testing/docker/scripts/register-fsps.sh b/tools/celestia-node-fiber/testing/docker/scripts/register-fsps.sh new file mode 100755 index 0000000000..9603b1e421 --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/scripts/register-fsps.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +# register-fsps.sh — runs once after validators are producing blocks. +# +# Submits MsgSetFibreProviderInfo for each validator so the chain's +# valaddr module maps consensus address → fibre server address. The +# `dns:///` URI prefix is required by the fibre client's gRPC dialer +# (a bare host:port fails URL parsing — same gotcha documented in +# tools/talis/fibre_setup.go). +# +# Also funds the test client account's escrow so MsgPayForFibre can +# settle in the docker network. +set -euo pipefail + +NUM_VALIDATORS="${NUM_VALIDATORS:-4}" +SHARED="${SHARED:-/shared}" +APP="${APP:-celestia-appd}" +CHAIN_ID="${CHAIN_ID:-fibre-docker}" +FEES="${FEES:-5000utia}" +ESCROW_AMOUNT="${ESCROW_AMOUNT:-500000000000utia}" +CLIENT_ACCOUNT="${CLIENT_ACCOUNT:-default-fibre}" +FIBRE_PORT="${FIBRE_PORT:-7980}" + +# Wait until the seed validator has produced a few blocks so the chain +# is healthy enough to accept txs. status command uses the --node flag +# (the home's config.toml laddr is bound to 0.0.0.0 which we can't dial +# from another container). +seed_home="$SHARED/val0/.celestia-app" +until height=$("$APP" status --home "$seed_home" --node "tcp://val0:26657" 2>/dev/null \ + | jq -r '.sync_info.latest_block_height // 0') \ + && [ "${height:-0}" -ge 3 ]; do + echo "register-fsps: waiting for chain to reach height 3 (current=${height:-?})..." + sleep 2 +done + +# Register each validator's fibre host. We submit each tx from the +# validator's own keyring, hitting that validator's local gRPC. +for i in $(seq 0 $((NUM_VALIDATORS - 1))); do + home="$SHARED/val$i/.celestia-app" + val_oper=$("$APP" keys show "validator" --bech val -a \ + --keyring-backend test --home "$home") + # MsgSetFibreProviderInfo via the valaddr tx CLI. Each fibre server + # is reachable inside the compose network at val$i:$FIBRE_PORT. + # Register a host-reachable address (127.0.0.1:798X) so the test + # driver running on the docker host can dial each fibre server + # directly. compose.yaml maps val_i:7980 → host:798$i. + host_port=$((FIBRE_PORT + i)) + "$APP" tx valaddr set-host "dns:///127.0.0.1:$host_port" \ + --from validator --keyring-backend test --home "$home" \ + --chain-id "$CHAIN_ID" --node "tcp://val$i:26657" \ + --fees "$FEES" --yes + echo "register-fsps: registered val$i ($val_oper)" + sleep 6 # allow inclusion in the next block +done + +# Fund the client account's escrow. +client_addr=$("$APP" keys show "$CLIENT_ACCOUNT" -a \ + --keyring-backend test --home "$seed_home") +"$APP" tx fibre deposit-to-escrow "$ESCROW_AMOUNT" \ + --from "$CLIENT_ACCOUNT" --keyring-backend test --home "$seed_home" \ + --chain-id "$CHAIN_ID" --node "tcp://val0:26657" \ + --fees "$FEES" --yes +echo "register-fsps: deposited $ESCROW_AMOUNT into $client_addr's escrow" + +touch "$SHARED/setup.done" +echo "register-fsps: complete; setup.done flag written" diff --git a/tools/celestia-node-fiber/testing/docker/scripts/start-bridge.sh b/tools/celestia-node-fiber/testing/docker/scripts/start-bridge.sh new file mode 100755 index 0000000000..e1ed0309de --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/scripts/start-bridge.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# start-bridge.sh — entrypoint for the celestia-node bridge container. +# +# Initializes the bridge home, configures it to talk to val0's gRPC and +# CometBFT RPC, generates an admin JWT (written to a shared file so the +# test driver can read it), and starts the bridge. +set -euo pipefail + +NETWORK="${NETWORK:-fibre-docker}" +SHARED="${SHARED:-/shared}" +HOME_DIR="${HOME_DIR:-/home/celestia/.celestia-bridge}" +CORE_IP="${CORE_IP:-val0}" +CORE_GRPC_PORT="${CORE_GRPC_PORT:-9090}" +CORE_RPC_PORT="${CORE_RPC_PORT:-26657}" + +# celestia-node only accepts presets (celestia, mocha, arabica, ...) for +# --p2p.network. For a private chain we must set CELESTIA_CUSTOM= +# before invoking the binary; that registers the network at runtime so +# the same name passes flag validation. +export CELESTIA_CUSTOM="$NETWORK" + +if [ ! -f "$HOME_DIR/config.toml" ]; then + celestia bridge init --p2p.network "$NETWORK" --node.store "$HOME_DIR" +fi + +# Wait for the FSP registration step to finish so blob.Subscribe has +# something meaningful to emit. +until [ -f "$SHARED/setup.done" ]; do + echo "bridge: waiting for FSP registration..." + sleep 2 +done + +# Drop an admin JWT into the shared volume so the test driver can pick +# it up without --auth-token plumbing. CELESTIA_CUSTOM prints a warning +# to stdout before the JWT, so grep for the actual token (three base64 +# segments separated by dots). +celestia bridge auth admin --p2p.network "$NETWORK" --node.store "$HOME_DIR" \ + | grep -E '^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$' \ + > "$SHARED/bridge-admin-jwt.txt" + +exec celestia bridge start \ + --p2p.network "$NETWORK" \ + --node.store "$HOME_DIR" \ + --core.ip "$CORE_IP" \ + --core.port "$CORE_GRPC_PORT" \ + --rpc.addr 0.0.0.0 \ + --rpc.port 26658 diff --git a/tools/celestia-node-fiber/testing/docker/scripts/start-validator.sh b/tools/celestia-node-fiber/testing/docker/scripts/start-validator.sh new file mode 100755 index 0000000000..24a873390a --- /dev/null +++ b/tools/celestia-node-fiber/testing/docker/scripts/start-validator.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash +# start-validator.sh — entrypoint for each validator container. +# +# Reads the validator index from $VAL_INDEX (0..N-1), loads its home from +# the shared volume populated by init-genesis.sh, applies docker-network- +# aware overrides to config.toml/app.toml, then starts celestia-appd and +# the in-process fibre server side-by-side. +set -euo pipefail + +VAL_INDEX="${VAL_INDEX:?VAL_INDEX must be set (0..N-1)}" +SHARED="${SHARED:-/shared}" +APP="${APP:-celestia-appd}" +FIBRE_BIN="${FIBRE_BIN:-fibre}" +CHAIN_ID="${CHAIN_ID:-fibre-docker}" + +home="$SHARED/val$VAL_INDEX/.celestia-app" +peers=$(cat "$SHARED/peers.txt") +service_name="val$VAL_INDEX" + +# Wait for init-genesis to have populated this home. +until [ -f "$home/config/genesis.json" ]; do + echo "validator-$VAL_INDEX: waiting for genesis on $home..." + sleep 1 +done + +# Apply docker-network bindings. config.toml / app.toml are TOML; use sed +# carefully on the keys we need. (A more robust approach would be `dasel` +# or a Go config helper — keeping it minimal here.) +config_toml="$home/config/config.toml" +app_toml="$home/config/app.toml" + +sed -i \ + -e 's|^laddr = "tcp://127.0.0.1:26657"|laddr = "tcp://0.0.0.0:26657"|' \ + -e 's|^laddr = "tcp://0.0.0.0:26656"|laddr = "tcp://0.0.0.0:26656"|' \ + -e "s|^persistent_peers = \"\"|persistent_peers = \"$peers\"|" \ + -e "s|^external_address = \"\"|external_address = \"$service_name:26656\"|" \ + -e 's|^priv_validator_grpc_laddr = ""|priv_validator_grpc_laddr = "127.0.0.1:26659"|' \ + "$config_toml" + +sed -i \ + -e 's|^minimum-gas-prices = ""|minimum-gas-prices = "0.002utia"|' \ + -e 's|^address = "tcp://localhost:1317"|address = "tcp://0.0.0.0:1317"|' \ + -e 's|^address = "localhost:9090"|address = "0.0.0.0:9090"|' \ + -e 's|^address = "localhost:9091"|address = "0.0.0.0:9091"|' \ + "$app_toml" + +# Start celestia-appd in the background. --force-no-bbr because the +# linux kernel inside docker containers on macOS does not have BBR +# congestion control enabled. +"$APP" start --home "$home" \ + --grpc.address "0.0.0.0:9090" \ + --grpc.enable true \ + --force-no-bbr & +appd_pid=$! + +# Wait for the gRPC + privval gRPC ports to be reachable before launching +# fibre. Use bash's /dev/tcp instead of nc (not in slim debian). +until (exec 3<>/dev/tcp/127.0.0.1/9090) 2>/dev/null; do + sleep 1 +done +exec 3<&- 3>&- +until (exec 3<>/dev/tcp/127.0.0.1/26659) 2>/dev/null; do + sleep 1 +done +exec 3<&- 3>&- + +# Wait for the chain to produce the first block — fibre requires this +# at startup to detect chain ID, otherwise it errors out and exits. +until height=$("$APP" status --home "$home" --node "tcp://127.0.0.1:26657" 2>/dev/null \ + | jq -r '.sync_info.latest_block_height // 0') \ + && [ "${height:-0}" -ge 1 ]; do + echo "validator-$VAL_INDEX: waiting for first block (current=${height:-?})..." + sleep 2 +done + +# Start the fibre server. Defaults: listens on 0.0.0.0:7980, signs via +# the validator's privval gRPC at 127.0.0.1:26659 (set above via +# priv_validator_grpc_laddr). +"$FIBRE_BIN" start \ + --home "$home/.celestia-fibre" \ + --server-listen-address "0.0.0.0:7980" \ + --app-grpc-address "127.0.0.1:9090" \ + --signer-grpc-address "127.0.0.1:26659" & +fibre_pid=$! + +trap 'kill "$appd_pid" "$fibre_pid" 2>/dev/null || true' EXIT +wait "$appd_pid" "$fibre_pid" diff --git a/tools/celestia-node-fiber/testing/evnode.go b/tools/celestia-node-fiber/testing/evnode.go new file mode 100644 index 0000000000..55c16c2390 --- /dev/null +++ b/tools/celestia-node-fiber/testing/evnode.go @@ -0,0 +1,481 @@ +//go:build fibre + +package cnfibertest + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block" + coreexecution "github.com/evstack/ev-node/core/execution" + "github.com/evstack/ev-node/node" + "github.com/evstack/ev-node/pkg/config" + datypes "github.com/evstack/ev-node/pkg/da/types" + genesispkg "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" + "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/sequencers/solo" + pkgsigner "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/signer/file" + "github.com/evstack/ev-node/pkg/store" +) + +// EvNodePassphrase is the passphrase used by the file signers wired up +// by NewFiberAggregator / NewFiberFullNode. +const EvNodePassphrase = "test-passphrase-evnode" + +const ( + defaultEvNodeBlockTimeout = 60 * time.Second +) + +// EvNodeConfig parameterizes the chain shared by an aggregator and any +// number of full nodes. Zero values get sensible defaults applied by +// the helpers — block time defaults to 200ms (fast block production) +// and DA block time to 1s. +type EvNodeConfig struct { + ChainID string + HeaderNamespace string + DataNamespace string + BlockTime time.Duration + DABlockTime time.Duration + + // DAStartHeight is written into Genesis.DAStartHeight (and the + // FiberDAClient's last-known DA height) so both nodes skip the + // historical DA scan from height 0 and pick up at the live tip. + // + // Why it matters: ev-node's catch-up retriever creates a fresh + // blob.Subscribe per height batch and cancels it. celestia-node's + // go-jsonrpc multiplexes subscriptions on a single websocket per + // module — cancelling any one subscription tears the whole + // connection down, so subsequent retrievals immediately fail with + // "websocket routine exiting". Starting at the tip avoids the + // catch-up phase and keeps the one long-lived Subscribe alive. + DAStartHeight uint64 +} + +func (c *EvNodeConfig) applyDefaults() { + if c.ChainID == "" { + c.ChainID = "ev-fiber-test" + } + // Header / data namespaces default to per-process-unique strings so + // successive test runs against the same long-lived bridge don't + // observe each other's blobs (they would be unverifiable against + // the current test's proposer and would jam the full-node syncer + // as undeliverable pending events). + if c.HeaderNamespace == "" { + c.HeaderNamespace = uniqueNamespace("ht") + } + if c.DataNamespace == "" { + c.DataNamespace = uniqueNamespace("da") + } + if c.BlockTime == 0 { + // 200ms = the production target for ev-node block production. + // The aggregator keeps up cleanly at this cadence; the full + // node side has a separate caveat documented on + // RunEvNodeFibreTwoNodeFlow about the per-Retrieve Subscribe + // teardown. + c.BlockTime = 200 * time.Millisecond + } + if c.DABlockTime == 0 { + c.DABlockTime = 1 * time.Second + } +} + +// uniqueNamespace returns a short, deterministically-unique-per-call +// namespace string built from `prefix` plus a 6-byte hex suffix derived +// from crypto/rand. The full string fits within the 10-byte v0 +// namespace identifier expected by Fibre. +func uniqueNamespace(prefix string) string { + var b [3]byte + _, _ = rand.Read(b[:]) + return fmt.Sprintf("%s-%x", prefix, b[:]) +} + +// NewFiberAggregator wires a single aggregator (block producer) ev-node +// node backed by the supplied Fibre DA client. The returned executor +// can be fed transactions via InjectTx; the returned genesis MUST be +// passed to NewFiberFullNode for any full nodes joining the same chain +// so they share chain-id and proposer address. +// +// The caller drives lifecycle via node.Run(ctx). +func NewFiberAggregator(t *testing.T, ctx context.Context, fiberClient block.FiberClient, cfg EvNodeConfig) (node.Node, *InMemExecutor, genesispkg.Genesis) { + t.Helper() + cfg.applyDefaults() + + tmpDir := t.TempDir() + logger := newTestLogger(t).With().Str("role", "aggregator").Logger() + + signerAddr := mustCreateFileSigner(t, tmpDir) + gen := genesispkg.NewGenesis(cfg.ChainID, 1, time.Now(), signerAddr) + gen.DAStartHeight = cfg.DAStartHeight + require.NoError(t, gen.Validate(), "validating genesis") + + rollnode, exec := buildEvNode(t, ctx, fiberClient, cfg, gen, tmpDir, logger, true, cfg.DAStartHeight) + return rollnode, exec, gen +} + +// NewFiberFullNode wires a full ev-node node (no block production) that +// DA-syncs blocks from the same Fibre namespace as the aggregator. +// Full nodes still need a signer (for libp2p identity / network +// attestations) but it does not need to be the proposer — the proposer +// address comes from the supplied aggregator genesis. +// +// The full node's DA retriever obeys gen.DAStartHeight, which the +// aggregator constructor copies from cfg.DAStartHeight. See the +// EvNodeConfig docstring for why pinning to the live bridge tip +// matters. +func NewFiberFullNode(t *testing.T, ctx context.Context, fiberClient block.FiberClient, cfg EvNodeConfig, gen genesispkg.Genesis) (node.Node, *InMemExecutor) { + t.Helper() + cfg.applyDefaults() + + tmpDir := t.TempDir() + logger := newTestLogger(t).With().Str("role", "fullnode").Logger() + + // File signer is created but the address is unused — only the + // aggregator's address (already in `gen`) acts as proposer. + mustCreateFileSigner(t, tmpDir) + + rollnode, exec := buildEvNode(t, ctx, fiberClient, cfg, gen, tmpDir, logger, false, gen.DAStartHeight) + return rollnode, exec +} + +func newTestLogger(t *testing.T) zerolog.Logger { + return zerolog.New(zerolog.NewTestWriter(t)).With().Timestamp().Logger() +} + +func mustCreateFileSigner(t *testing.T, tmpDir string) []byte { + t.Helper() + fs, err := file.CreateFileSystemSigner(tmpDir, []byte(EvNodePassphrase)) + require.NoError(t, err, "creating file signer") + addr, err := fs.GetAddress() + require.NoError(t, err, "getting signer address") + return addr +} + +func buildEvNode( + t *testing.T, + ctx context.Context, + fiberClient block.FiberClient, + cfg EvNodeConfig, + gen genesispkg.Genesis, + tmpDir string, + logger zerolog.Logger, + aggregator bool, + lastKnownDAHeight uint64, +) (node.Node, *InMemExecutor) { + t.Helper() + + nodePrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err, "generating node key") + nodeKey := &key.NodeKey{PrivKey: nodePrivKey} + + nodeCfg := config.DefaultConfig() + nodeCfg.RootDir = tmpDir + nodeCfg.DBPath = "data" + nodeCfg.Node.Aggregator = aggregator + nodeCfg.Node.BlockTime = config.DurationWrapper{Duration: cfg.BlockTime} + nodeCfg.Node.LazyMode = false + nodeCfg.DA.BlockTime = config.DurationWrapper{Duration: cfg.DABlockTime} + nodeCfg.DA.Namespace = cfg.HeaderNamespace + nodeCfg.DA.DataNamespace = cfg.DataNamespace + nodeCfg.DA.BatchingStrategy = "immediate" + nodeCfg.DA.Fiber.Enabled = true + nodeCfg.DA.StartHeight = cfg.DAStartHeight + nodeCfg.DA.RequestTimeout = config.DurationWrapper{Duration: 60 * time.Second} + nodeCfg.P2P.ListenAddress = "/ip4/0.0.0.0/tcp/0" + nodeCfg.P2P.DisableConnectionGater = true + nodeCfg.Instrumentation.Prometheus = false + nodeCfg.Instrumentation.Pprof = false + nodeCfg.RPC.Address = "127.0.0.1:0" + nodeCfg.Log.Level = "debug" + nodeCfg.Signer.SignerType = "file" + nodeCfg.Signer.SignerPath = tmpDir + + signer, err := pkgsigner.NewSigner(ctx, &nodeCfg, EvNodePassphrase) + require.NoError(t, err, "creating signer via factory") + + ds, err := store.NewDefaultKVStore(tmpDir, nodeCfg.DBPath, "testdb") + require.NoError(t, err, "creating datastore") + + executor := newInMemExecutor() + sequencer := solo.NewSoloSequencer(logger, []byte(gen.ChainID), executor) + daClient := block.NewFiberDAClient(fiberClient, nodeCfg, logger, lastKnownDAHeight) + p2pClient, err := p2p.NewClient(nodeCfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), gen.ChainID, logger, nil) + require.NoError(t, err, "creating p2p client") + + rollnode, err := node.NewNode( + nodeCfg, + executor, + sequencer, + daClient, + signer, + p2pClient, + gen, + ds, + node.DefaultMetricsProvider(nodeCfg.Instrumentation), + logger, + node.NodeOptions{}, + ) + require.NoError(t, err, "creating node") + + return rollnode, executor +} + +// InMemExecutor is a minimal coreexecution.Executor implementation +// for tests: it accepts "k=v" payloads via InjectTx, applies them to +// an in-memory map, and tracks block + tx counts. +type InMemExecutor struct { + mu sync.Mutex + data map[string]string + + txChan chan []byte + blocksProduced uint64 + totalExecutedTxs uint64 + executedTxs [][]byte +} + +func newInMemExecutor() *InMemExecutor { + return &InMemExecutor{ + data: make(map[string]string), + txChan: make(chan []byte, 10000), + } +} + +// InjectTx queues a "k=v" payload for inclusion in the next block. +func (e *InMemExecutor) InjectTx(tx []byte) { + select { + case e.txChan <- tx: + default: + } +} + +// ExecStats reports cumulative block and tx counts for assertions. +type ExecStats struct { + BlocksProduced uint64 + TotalExecutedTxs uint64 +} + +func (e *InMemExecutor) Stats() ExecStats { + e.mu.Lock() + defer e.mu.Unlock() + return ExecStats{BlocksProduced: e.blocksProduced, TotalExecutedTxs: e.totalExecutedTxs} +} + +// Get returns the value associated with the supplied key, if any. +func (e *InMemExecutor) Get(key string) (string, bool) { + e.mu.Lock() + defer e.mu.Unlock() + v, ok := e.data[key] + return v, ok +} + +// ExecutedTxs returns a copy of the raw payloads that were applied so +// far. Tests use this to confirm a full node observed exactly the txs +// the aggregator submitted via DA. +func (e *InMemExecutor) ExecutedTxs() [][]byte { + e.mu.Lock() + defer e.mu.Unlock() + out := make([][]byte, len(e.executedTxs)) + for i, tx := range e.executedTxs { + out[i] = append([]byte(nil), tx...) + } + return out +} + +func (e *InMemExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) { + return []byte("inmem-genesis-root"), nil +} + +func (e *InMemExecutor) GetTxs(_ context.Context) ([][]byte, error) { + var txs [][]byte + for { + select { + case tx := <-e.txChan: + txs = append(txs, tx) + default: + return txs, nil + } + } +} + +func (e *InMemExecutor) ExecuteTxs(_ context.Context, txs [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + e.mu.Lock() + defer e.mu.Unlock() + for _, tx := range txs { + k, v, ok := parseKV(tx) + if ok { + e.data[k] = v + } + e.executedTxs = append(e.executedTxs, append([]byte(nil), tx...)) + } + e.blocksProduced++ + e.totalExecutedTxs += uint64(len(txs)) + return []byte(fmt.Sprintf("root-%d", e.blocksProduced)), nil +} + +func (e *InMemExecutor) SetFinal(_ context.Context, _ uint64) error { return nil } +func (e *InMemExecutor) Rollback(_ context.Context, _ uint64) error { return nil } + +func (e *InMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.ExecutionInfo, error) { + return coreexecution.ExecutionInfo{MaxGas: 0}, nil +} + +func (e *InMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { + st := make([]coreexecution.FilterStatus, len(txs)) + for i := range st { + st[i] = coreexecution.FilterOK + } + return st, nil +} + +func parseKV(tx []byte) (string, string, bool) { + s := string(tx) + for i := 0; i < len(s); i++ { + if s[i] == '=' { + return s[:i], s[i+1:], true + } + } + return "", "", false +} + +var _ coreexecution.Executor = (*InMemExecutor)(nil) + +// RunEvNodeFibreTwoNodeFlow exercises the aggregator + full-node path: +// +// 1. Subscribe to the aggregator's header namespace via `observer` so +// we can verify Fibre BlobEvents land on chain. +// 2. Spin up an aggregator backed by `aggAdapter`; capture its genesis. +// 3. Spin up a full node backed by `fnAdapter`, sharing that genesis. +// The full node DA-syncs from cfg.DAStartHeight (which should be +// the bridge tip captured before either node starts). +// 4. Inject a tx into the aggregator. Wait for it to produce a block +// containing the tx. +// 5. Confirm the aggregator's blob landed on Fibre by reading at +// least one BlobEvent from `observer` and Download'ing it. +// 6. Verify the full node started its DA sync (its syncer initialized +// against the supplied genesis without crashing). Block-by-block +// application across the gap between DAStartHeight and the +// aggregator's first submission requires ev-node to keep a +// persistent Subscribe — currently the catch-up retriever creates +// a fresh Subscribe per height batch and cancels it, which tears +// down celestia-node's go-jsonrpc websocket. That refactor is +// tracked separately; this test deliberately stops short of +// asserting the full node fully replayed the aggregator's chain. +// +// The three adapters MUST be distinct instances. celestia-node's +// go-jsonrpc multiplexes blob.Subscribe over a single websocket per +// module — cancelling any one subscription tears the shared connection +// down, which would crash both nodes if they shared an adapter. +func RunEvNodeFibreTwoNodeFlow(t *testing.T, ctx context.Context, aggAdapter, fnAdapter, observer block.FiberClient, cfg EvNodeConfig) { + t.Helper() + // Resolve defaults at the top level so both nodes share the same + // namespaces and chain settings. NewFiberAggregator / + // NewFiberFullNode also call applyDefaults but it's a no-op once + // the fields are populated here. + cfg.applyDefaults() + + fullHeaderNS := datypes.NamespaceFromString(cfg.HeaderNamespace).Bytes() + headerNSID := fullHeaderNS[len(fullHeaderNS)-10:] + events, err := observer.Listen(ctx, headerNSID, 0) + require.NoError(t, err, "starting observer Listen on header namespace") + + aggNode, aggExec, gen := NewFiberAggregator(t, ctx, aggAdapter, cfg) + fnNode, _ := NewFiberFullNode(t, ctx, fnAdapter, cfg, gen) + + // Start the full node FIRST so its DA retriever is already + // listening from gen.DAStartHeight (the captured bridge tip) when + // the aggregator begins posting. + fnErrCh := startNode(t, ctx, fnNode, "full-node") + time.Sleep(500 * time.Millisecond) + aggErrCh := startNode(t, ctx, aggNode, "aggregator") + + txPayload := []byte(fmt.Sprintf("fiber-key=fiber-value-%d", time.Now().UnixNano())) //nolint:gomnd + aggExec.InjectTx(txPayload) + + require.Eventually(t, func() bool { + stats := aggExec.Stats() + t.Logf("aggregator: blocks=%d txs=%d", stats.BlocksProduced, stats.TotalExecutedTxs) + return stats.BlocksProduced >= 1 && stats.TotalExecutedTxs >= 1 + }, defaultEvNodeBlockTimeout, 200*time.Millisecond, "aggregator should produce at least one block with the injected tx") + + // Confirm the aggregator-injected tx made it into the executed set. + require.Contains(t, asStrings(aggExec.ExecutedTxs()), string(txPayload), + "aggregator executed tx set should include the injected payload") + + // Drain at least one Fibre BlobEvent off the observer subscription + // — this proves the aggregator's DA submission landed on chain via + // Fibre and is retrievable through the bridge. + var seen []block.FiberBlobEvent + require.Eventually(t, func() bool { + select { + case ev, ok := <-events: + if !ok { + return false + } + seen = append(seen, ev) + t.Logf("fiber event: blob_id=%x height=%d data_size=%d", + ev.BlobID, ev.Height, ev.DataSize) + return true + default: + return false + } + }, defaultEvNodeBlockTimeout, 500*time.Millisecond, "expected at least one Fiber BlobEvent from DA submission") + + for _, ev := range seen { + got, err := observer.Download(ctx, ev.BlobID) + require.NoError(t, err, "observer.Download blob_id=%x", ev.BlobID) + require.NotEmpty(t, got, "downloaded blob must not be empty") + t.Logf("download ok: blob_id=%x bytes=%d", ev.BlobID, len(got)) + } + + // Confirm neither node has died on us during the assertion window. + for _, c := range []struct { + name string + ch <-chan error + }{{"aggregator", aggErrCh}, {"full-node", fnErrCh}} { + select { + case err := <-c.ch: + t.Fatalf("%s exited unexpectedly: %v", c.name, err) + default: + } + } + + // celestia-node's Fibre service spawns one async pay-for-fibre + // goroutine per submission and they outlive the parent test ctx. + // Without a grace period they race t.TempDir() cleanup, which + // removes the docker-derived keyring directory mid-flight and they + // fail with "key not found". Wait briefly so the in-flight signers + // settle. (Lifecycle hookup in celestia-node is tracked separately.) + time.Sleep(2 * time.Second) +} + +func startNode(t *testing.T, ctx context.Context, n node.Node, label string) <-chan error { + t.Helper() + errCh := make(chan error, 1) + go func() { + defer func() { + if r := recover(); r != nil { + errCh <- fmt.Errorf("%s panicked: %v", label, r) + } + }() + errCh <- n.Run(ctx) + }() + return errCh +} + +func asStrings(in [][]byte) []string { + out := make([]string, len(in)) + for i, b := range in { + out[i] = string(b) + } + return out +} diff --git a/tools/celestia-node-fiber/testing/evnode_fiber_test.go b/tools/celestia-node-fiber/testing/evnode_fiber_test.go index 2220f42751..ac85c6114f 100644 --- a/tools/celestia-node-fiber/testing/evnode_fiber_test.go +++ b/tools/celestia-node-fiber/testing/evnode_fiber_test.go @@ -4,63 +4,48 @@ package cnfibertest_test import ( "context" - "crypto/rand" - "fmt" - "sync" "testing" "time" - "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/rs/zerolog" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/block" - coreexecution "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/node" - "github.com/evstack/ev-node/pkg/config" - datypes "github.com/evstack/ev-node/pkg/da/types" - genesispkg "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p" - "github.com/evstack/ev-node/pkg/p2p/key" - "github.com/evstack/ev-node/pkg/sequencers/solo" - pkgsigner "github.com/evstack/ev-node/pkg/signer" - "github.com/evstack/ev-node/pkg/signer/file" - "github.com/evstack/ev-node/pkg/store" - "github.com/celestiaorg/celestia-node/api/client" cnfiber "github.com/evstack/ev-node/tools/celestia-node-fiber" cnfibertest "github.com/evstack/ev-node/tools/celestia-node-fiber/testing" ) -const ( - evnodeBlockTime = 200 * time.Millisecond - evnodeDABlockTime = 1 * time.Second - evnodeHeaderNS = "ev-fib-ht" - evnodeDataNS = "ev-fib-da" - evnodeChainID = "ev-fiber-test" - evnodeBlockTimeout = 30 * time.Second - evnodePassphrase = "test-passphrase-evnode" -) - -// TestEvNode_FiberDA_Posting wires a full ev-node in-memory to the -// celestia-node-fiber adapter and verifies that block data is posted -// to the Fibre DA layer. The test: -// - Starts a single-validator Celestia chain + Fibre server + bridge -// - Creates a celestia-node-fiber adapter (block.FiberClient) -// - Constructs an ev-node aggregator node that uses the adapter as DA -// - Subscribes to the data namespace via adapter.Listen before uploading -// - Injects a transaction and waits for block production -// - Confirms the DA submitter pushed blobs to Fiber by receiving events -// on the subscription and round-tripping each through Download -func TestEvNode_FiberDA_Posting(t *testing.T) { +// TestEvNode_FiberDA_TwoNode wires an aggregator + a full-node ev-node +// pair onto an in-process Celestia chain + Fibre + bridge and asserts +// that: +// +// - the aggregator produces blocks at 200ms cadence and posts them +// to the Fibre DA layer; +// - a separate full node, sharing only the aggregator's genesis, +// consumes those blocks via Fibre Listen + Download and applies +// the same transactions the aggregator executed. +func TestEvNode_FiberDA_TwoNode(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) t.Cleanup(cancel) network := cnfibertest.StartNetwork(t, ctx) bridge := cnfibertest.StartBridge(t, ctx, network) + aggAdapter := newAdapter(t, ctx, network, bridge) + fnAdapter := newAdapter(t, ctx, network, bridge) + observer := newAdapter(t, ctx, network, bridge) + + head, err := observer.Head(ctx) + require.NoError(t, err, "querying bridge head") + t.Logf("bridge head at test start: %d", head) + + cnfibertest.RunEvNodeFibreTwoNodeFlow(t, ctx, aggAdapter, fnAdapter, observer, cnfibertest.EvNodeConfig{ + DAStartHeight: head, + }) +} + +func newAdapter(t *testing.T, ctx context.Context, network *cnfibertest.Network, bridge *cnfibertest.Bridge) *cnfiber.Adapter { + t.Helper() adapter, err := cnfiber.New(ctx, cnfiber.Config{ Client: client.Config{ ReadConfig: client.ReadConfig{ @@ -79,231 +64,5 @@ func TestEvNode_FiberDA_Posting(t *testing.T) { }, network.Consensus.Keyring) require.NoError(t, err, "constructing adapter") t.Cleanup(func() { _ = adapter.Close() }) - - // Subscribe to the header namespace BEFORE starting the node so we - // don't race against the first DA submission. fromHeight=0 follows - // the live tip. The adapter expects the 10-byte v0 namespace ID - // (the last 10 bytes of the full 29-byte namespace), matching what - // fiberDAClient.Submit extracts before calling fiber.Upload. - fullHeaderNS := datypes.NamespaceFromString(evnodeHeaderNS).Bytes() - headerNSID := fullHeaderNS[len(fullHeaderNS)-10:] - events, err := adapter.Listen(ctx, headerNSID, 0) - require.NoError(t, err, "starting fiber Listen on header namespace") - - rollnode, exec, nodeCleanup := newFiberEvNode(t, ctx, adapter) - t.Cleanup(nodeCleanup) - - nodeErrCh := make(chan error, 1) - go func() { - defer func() { - if r := recover(); r != nil { - nodeErrCh <- fmt.Errorf("node panicked: %v", r) - } - }() - nodeErrCh <- rollnode.Run(ctx) - }() - - txPayload := fmt.Sprintf("fiber-key=fiber-value-%d", time.Now().UnixNano()) - exec.InjectTx([]byte(txPayload)) - - require.Eventually(t, func() bool { - stats := exec.Stats() - t.Logf("blocks=%d txs=%d", stats.BlocksProduced, stats.TotalExecutedTxs) - return stats.BlocksProduced >= 1 && stats.TotalExecutedTxs >= 1 - }, evnodeBlockTimeout, 200*time.Millisecond, "ev-node should produce at least one block with the transaction") - - // Drain at least one Fiber BlobEvent from the subscription to prove - // the DA submitter pushed data through the fiber adapter's Upload - // path and the settlement landed on-chain. - var seen []block.FiberBlobEvent - require.Eventually(t, func() bool { - select { - case ev, ok := <-events: - if !ok { - return false - } - seen = append(seen, ev) - t.Logf("fiber event: blob_id=%x height=%d data_size=%d", - ev.BlobID, ev.Height, ev.DataSize) - return true - default: - return false - } - }, evnodeBlockTimeout, 500*time.Millisecond, "expected at least one Fiber BlobEvent from DA submission") - - for _, ev := range seen { - got, err := adapter.Download(ctx, ev.BlobID) - require.NoError(t, err, "adapter.Download blob_id=%x", ev.BlobID) - require.NotEmpty(t, got, "downloaded blob must not be empty") - t.Logf("download ok: blob_id=%x bytes=%d", ev.BlobID, len(got)) - } - - select { - case err := <-nodeErrCh: - t.Fatalf("node exited unexpectedly: %v", err) - default: - } -} - -type inMemExecutor struct { - mu sync.Mutex - data map[string]string - - txChan chan []byte - blocksProduced uint64 - totalExecutedTxs uint64 -} - -func newInMemExecutor() *inMemExecutor { - return &inMemExecutor{ - data: make(map[string]string), - txChan: make(chan []byte, 10000), - } -} - -func (e *inMemExecutor) InjectTx(tx []byte) { - select { - case e.txChan <- tx: - default: - } -} - -type execStats struct { - BlocksProduced uint64 - TotalExecutedTxs uint64 -} - -func (e *inMemExecutor) Stats() execStats { - e.mu.Lock() - defer e.mu.Unlock() - return execStats{BlocksProduced: e.blocksProduced, TotalExecutedTxs: e.totalExecutedTxs} -} - -func (e *inMemExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) { - return []byte("inmem-genesis-root"), nil -} - -func (e *inMemExecutor) GetTxs(_ context.Context) ([][]byte, error) { - var txs [][]byte - for { - select { - case tx := <-e.txChan: - txs = append(txs, tx) - default: - return txs, nil - } - } -} - -func (e *inMemExecutor) ExecuteTxs(_ context.Context, txs [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { - e.mu.Lock() - defer e.mu.Unlock() - for _, tx := range txs { - k, v, ok := parseKV(tx) - if ok { - e.data[k] = v - } - } - e.blocksProduced++ - e.totalExecutedTxs += uint64(len(txs)) - return []byte(fmt.Sprintf("root-%d", e.blocksProduced)), nil -} - -func (e *inMemExecutor) SetFinal(_ context.Context, _ uint64) error { return nil } -func (e *inMemExecutor) Rollback(_ context.Context, _ uint64) error { return nil } -func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.ExecutionInfo, error) { - return coreexecution.ExecutionInfo{MaxGas: 0}, nil -} -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { - st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { - st[i] = coreexecution.FilterOK - } - return st, nil -} - -func parseKV(tx []byte) (string, string, bool) { - s := string(tx) - for i := 0; i < len(s); i++ { - if s[i] == '=' { - return s[:i], s[i+1:], true - } - } - return "", "", false -} - -func newFiberEvNode(t *testing.T, ctx context.Context, fiberClient block.FiberClient) (node.Node, *inMemExecutor, func()) { - t.Helper() - - tmpDir := t.TempDir() - logger := zerolog.New(zerolog.NewTestWriter(t)).With().Timestamp().Logger() - - // Create a file-backed signer so the executor can sign blocks. - signerDir := tmpDir - fs, err := file.CreateFileSystemSigner(signerDir, []byte(evnodePassphrase)) - require.NoError(t, err, "creating file signer") - signerAddr, err := fs.GetAddress() - require.NoError(t, err, "getting signer address") - - // Generate a separate libp2p node key for P2P networking. - nodePrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader) - require.NoError(t, err, "generating node key") - nodeKey := &key.NodeKey{PrivKey: nodePrivKey} - - genesis := genesispkg.NewGenesis(evnodeChainID, 1, time.Now(), signerAddr) - require.NoError(t, genesis.Validate(), "validating genesis") - - cfg := config.DefaultConfig() - cfg.RootDir = tmpDir - cfg.DBPath = "data" - cfg.Node.Aggregator = true - cfg.Node.BlockTime = config.DurationWrapper{Duration: evnodeBlockTime} - cfg.Node.LazyMode = false - cfg.DA.BlockTime = config.DurationWrapper{Duration: evnodeDABlockTime} - cfg.DA.Namespace = evnodeHeaderNS - cfg.DA.DataNamespace = evnodeDataNS - cfg.DA.BatchingStrategy = "immediate" - cfg.DA.Fiber.Enabled = true - cfg.DA.RequestTimeout = config.DurationWrapper{Duration: 60 * time.Second} - cfg.P2P.ListenAddress = "/ip4/0.0.0.0/tcp/0" - cfg.P2P.DisableConnectionGater = true - cfg.Instrumentation.Prometheus = false - cfg.Instrumentation.Pprof = false - cfg.RPC.Address = "127.0.0.1:0" - cfg.Log.Level = "debug" - cfg.Signer.SignerType = "file" - cfg.Signer.SignerPath = signerDir - - // Build the full signer via the factory (needed for consistency with - // how the real node boots). - signer, err := pkgsigner.NewSigner(ctx, &cfg, evnodePassphrase) - require.NoError(t, err, "creating signer via factory") - - ds, err := store.NewDefaultKVStore(tmpDir, cfg.DBPath, "testdb") - require.NoError(t, err, "creating datastore") - - executor := newInMemExecutor() - sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor) - daClient := block.NewFiberDAClient(fiberClient, cfg, logger, 0) - p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil) - require.NoError(t, err, "creating p2p client") - - rollnode, err := node.NewNode( - cfg, - executor, - sequencer, - daClient, - signer, - p2pClient, - genesis, - ds, - node.DefaultMetricsProvider(cfg.Instrumentation), - logger, - node.NodeOptions{}, - ) - require.NoError(t, err, "creating node") - - return rollnode, executor, func() {} + return adapter } - -var _ coreexecution.Executor = (*inMemExecutor)(nil)