diff --git a/pkg/config/config.go b/pkg/config/config.go index 54b86d4697..6c6e1020d4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "errors" "fmt" + "net/url" "os" "path" "path/filepath" @@ -96,12 +97,33 @@ const ( // FlagDAFiberEnabled enables the Fiber DA client instead of the default JSON-RPC blob client FlagDAFiberEnabled = FlagPrefixEvnode + "da.fiber.enabled" - // FlagDAFiberStateAddress is the gRPC address of the celestia-app node for Fiber state queries - FlagDAFiberStateAddress = FlagPrefixEvnode + "da.fiber.state_address" - // FlagDAFiberBridgeAddress is the gRPC address of the bridge node for Fiber state queries + // FlagDAFiberConsensusAddress is the gRPC address of the celestia-app (consensus) + // node. Feeds both the shared TxClient/CoreAccessor gRPC conn and + // appfibre.Client's internal state-client dial target — they're the same + // physical endpoint. + FlagDAFiberConsensusAddress = FlagPrefixEvnode + "da.fiber.consensus_address" + // FlagDAFiberConsensusTLS enables TLS for the consensus gRPC connection. + FlagDAFiberConsensusTLS = FlagPrefixEvnode + "da.fiber.consensus_tls" + // FlagDAFiberConsensusAuthToken is the auth token (x-token) for the + // consensus gRPC endpoint. Kept separate from bridge_auth_token because + // consensus providers typically issue different credentials from the + // bridge JWT. + FlagDAFiberConsensusAuthToken = FlagPrefixEvnode + "da.fiber.consensus_auth_token" + // FlagDAFiberBridgeAddress is the URL of the celestia-node bridge for + // JSON-RPC access. Must use ws:// or wss:// because blob.Subscribe is a + // channel-returning RPC and only works over WebSocket. FlagDAFiberBridgeAddress = FlagPrefixEvnode + "da.fiber.bridge_address" + // FlagDAFiberBridgeAuthToken is the JWT for the bridge node, passed as + // Authorization: Bearer . + FlagDAFiberBridgeAuthToken = FlagPrefixEvnode + "da.fiber.bridge_auth_token" // FlagDAFiberKeyName is the key name in the keyring to use for signing payment promises FlagDAFiberKeyName = FlagPrefixEvnode + "da.fiber.key_name" + // FlagDAFiberUploadConcurrency caps concurrent FSP upload connections. + // 0 keeps the appfibre.Client default (ProtocolParams.MaxValidatorCount). + FlagDAFiberUploadConcurrency = FlagPrefixEvnode + "da.fiber.upload_concurrency" + // FlagDAFiberDownloadConcurrency caps concurrent FSP download connections. + // 0 keeps the appfibre.Client default (ProtocolParams.ValidatorsForReconstruction()). + FlagDAFiberDownloadConcurrency = FlagPrefixEvnode + "da.fiber.download_concurrency" // P2P configuration flags @@ -285,21 +307,82 @@ type DAConfig struct { // FiberDAConfig contains configuration for the Fiber DA client. // When Enabled is true, the Fiber client is used instead of the default // JSON-RPC blob client for DA operations. +// +// Two physical endpoints are addressed: +// - ConsensusAddress: celestia-app gRPC, used for tx broadcasts (MsgPayForFibre, +// Deposit/Withdraw) and state queries (validator set, chain-id, escrow). +// Internally feeds both CoreGRPCConfig.Addr and appfibre.Client.StateAddress. +// - BridgeAddress: celestia-node JSON-RPC, used for blob.Subscribe (Listen +// path) and as a read-only fallback for Fibre APIs. type FiberDAConfig struct { // Enabled switches the DA backend from the default JSON-RPC blob client // to the Fiber protocol client. Enabled bool `mapstructure:"enabled" yaml:"enabled" comment:"Enable the Fiber DA client for direct validator communication instead of the default JSON-RPC blob client"` - // StateAddress is the gRPC address of the celestia-app node used for - // state queries (validator set, chain ID, promise verification). - StateAddress string `mapstructure:"state_address" yaml:"state_address" comment:"gRPC address of the celestia-app node for Fiber state queries (host:port)"` - // BridgeAddress is the address of the bridge node. - BridgeAddress string `mapstructure:"bridge_address" yaml:"bridge_address" comment:"Bridge Node Address for Fiber"` + + // ConsensusAddress is the gRPC address of the celestia-app node. Used + // for tx broadcasts (MsgPayForFibre, Deposit, Withdraw) and for + // appfibre.Client's state queries (validator set, chain-id, promise + // verification). host:port. + ConsensusAddress string `mapstructure:"consensus_address" yaml:"consensus_address" comment:"gRPC address of the celestia-app node used for tx broadcasts and Fiber state queries (host:port)"` + // ConsensusTLS enables TLS for the consensus gRPC connection. Required + // when talking to a TLS-terminated consensus endpoint. + ConsensusTLS bool `mapstructure:"consensus_tls" yaml:"consensus_tls" comment:"Enable TLS for the consensus gRPC connection"` + // ConsensusAuthToken is the auth token (x-token header) for the + // consensus gRPC endpoint. Empty disables header injection. + ConsensusAuthToken string `mapstructure:"consensus_auth_token" yaml:"consensus_auth_token" comment:"x-token auth for the consensus gRPC endpoint"` + + // BridgeAddress is the URL of the celestia-node bridge for JSON-RPC + // access. Must use ws:// or wss:// — blob.Subscribe is a + // channel-returning RPC and only works over WebSocket. + BridgeAddress string `mapstructure:"bridge_address" yaml:"bridge_address" comment:"celestia-node bridge URL; must be ws:// or wss:// (blob.Subscribe needs WebSocket)"` + // BridgeAuthToken is the JWT for the bridge, sent as + // Authorization: Bearer . + BridgeAuthToken string `mapstructure:"bridge_auth_token" yaml:"bridge_auth_token" comment:"JWT for the celestia-node bridge (Authorization: Bearer header)"` + // KeyringPath is the directory path containing the keyring for signing - // Fiber payment promises. - KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing"` + // Fiber payment promises. Consumers (e.g. risotto) may ignore this and + // derive the path from their own RootDir; documented for completeness. + KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing (optional; consumers may derive from RootDir)"` // KeyName is the name of the key in the keyring to use for signing. KeyName string `mapstructure:"key_name" yaml:"key_name" comment:"Name of the key in the keyring to use for signing Fiber payment promises"` - // UploadConcurrency limits the number of concurrent upload connections + + // UploadConcurrency caps concurrent FSP upload connections. 0 keeps + // appfibre.Client's default (ProtocolParams.MaxValidatorCount). + UploadConcurrency int `mapstructure:"upload_concurrency" yaml:"upload_concurrency" comment:"Max concurrent FSP upload connections; 0 uses appfibre.Client's protocol default"` + // DownloadConcurrency caps concurrent FSP download connections. 0 keeps + // appfibre.Client's default (ProtocolParams.ValidatorsForReconstruction). + DownloadConcurrency int `mapstructure:"download_concurrency" yaml:"download_concurrency" comment:"Max concurrent FSP download connections; 0 uses appfibre.Client's protocol default"` +} + +// Validate checks that a FiberDAConfig is usable. Only called when enabled; +// a disabled block is always valid because the Fibre client is not built. +func (c *FiberDAConfig) Validate() error { + if !c.Enabled { + return nil + } + if c.ConsensusAddress == "" { + return fmt.Errorf("%s is required when fiber DA is enabled", FlagDAFiberConsensusAddress) + } + if c.BridgeAddress == "" { + return fmt.Errorf("%s is required when fiber DA is enabled", FlagDAFiberBridgeAddress) + } + u, err := url.Parse(c.BridgeAddress) + if err != nil { + return fmt.Errorf("%s: %w", FlagDAFiberBridgeAddress, err) + } + if u.Scheme != "ws" && u.Scheme != "wss" { + return fmt.Errorf( + "%s must use ws:// or wss:// (got %q) — blob.Subscribe requires WebSocket, HTTP JSON-RPC cannot stream channels", + FlagDAFiberBridgeAddress, u.Scheme, + ) + } + if c.UploadConcurrency < 0 { + return fmt.Errorf("%s must be non-negative", FlagDAFiberUploadConcurrency) + } + if c.DownloadConcurrency < 0 { + return fmt.Errorf("%s must be non-negative", FlagDAFiberDownloadConcurrency) + } + return nil } // IsFiberEnabled returns true if the Fiber DA client is configured and enabled. @@ -578,6 +661,10 @@ func (c *Config) Validate() error { return err } + if err := c.DA.Fiber.Validate(); err != nil { + return err + } + return nil } @@ -661,9 +748,14 @@ func AddFlags(cmd *cobra.Command) { // Fiber DA configuration flags cmd.Flags().Bool(FlagDAFiberEnabled, def.DA.Fiber.Enabled, "enable the Fiber DA client for direct validator communication") - cmd.Flags().String(FlagDAFiberStateAddress, def.DA.Fiber.StateAddress, "gRPC address of the celestia-app node for Fiber state queries (host:port)") - cmd.Flags().String(FlagDAFiberBridgeAddress, def.DA.Fiber.BridgeAddress, "json rpc of the bridge node") + cmd.Flags().String(FlagDAFiberConsensusAddress, def.DA.Fiber.ConsensusAddress, "gRPC address of the celestia-app node used for tx broadcasts and Fiber state queries (host:port)") + cmd.Flags().Bool(FlagDAFiberConsensusTLS, def.DA.Fiber.ConsensusTLS, "enable TLS for the consensus gRPC connection") + cmd.Flags().String(FlagDAFiberConsensusAuthToken, def.DA.Fiber.ConsensusAuthToken, "x-token auth for the consensus gRPC endpoint") + cmd.Flags().String(FlagDAFiberBridgeAddress, def.DA.Fiber.BridgeAddress, "celestia-node bridge URL; must be ws:// or wss:// (blob.Subscribe needs WebSocket)") + cmd.Flags().String(FlagDAFiberBridgeAuthToken, def.DA.Fiber.BridgeAuthToken, "JWT for the celestia-node bridge (Authorization: Bearer header)") cmd.Flags().String(FlagDAFiberKeyName, def.DA.Fiber.KeyName, "name of the key in the keyring for signing Fiber payment promises") + cmd.Flags().Int(FlagDAFiberUploadConcurrency, def.DA.Fiber.UploadConcurrency, "max concurrent FSP upload connections; 0 uses the appfibre protocol default") + cmd.Flags().Int(FlagDAFiberDownloadConcurrency, def.DA.Fiber.DownloadConcurrency, "max concurrent FSP download connections; 0 uses the appfibre protocol default") // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index b9120324f4..53203db90d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -85,8 +85,14 @@ func TestAddFlags(t *testing.T) { // DA Fiber flags assertFlagValue(t, flags, FlagDAFiberEnabled, DefaultConfig().DA.Fiber.Enabled) - assertFlagValue(t, flags, FlagDAFiberStateAddress, DefaultConfig().DA.Fiber.StateAddress) + assertFlagValue(t, flags, FlagDAFiberConsensusAddress, DefaultConfig().DA.Fiber.ConsensusAddress) + assertFlagValue(t, flags, FlagDAFiberConsensusTLS, DefaultConfig().DA.Fiber.ConsensusTLS) + assertFlagValue(t, flags, FlagDAFiberConsensusAuthToken, DefaultConfig().DA.Fiber.ConsensusAuthToken) + assertFlagValue(t, flags, FlagDAFiberBridgeAddress, DefaultConfig().DA.Fiber.BridgeAddress) + assertFlagValue(t, flags, FlagDAFiberBridgeAuthToken, DefaultConfig().DA.Fiber.BridgeAuthToken) assertFlagValue(t, flags, FlagDAFiberKeyName, DefaultConfig().DA.Fiber.KeyName) + assertFlagValue(t, flags, FlagDAFiberUploadConcurrency, DefaultConfig().DA.Fiber.UploadConcurrency) + assertFlagValue(t, flags, FlagDAFiberDownloadConcurrency, DefaultConfig().DA.Fiber.DownloadConcurrency) // P2P flags assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress) @@ -153,7 +159,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 84 // Update this number if you add more flag checks above + expectedFlagCount := 91 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index f90ba7d375..4eb5f8240e 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -85,10 +85,15 @@ func DefaultConfig() Config { BatchMaxDelay: DurationWrapper{0}, // 0 means use DA BlockTime BatchMinItems: 1, Fiber: FiberDAConfig{ - Enabled: false, - StateAddress: "127.0.0.1:9090", - BridgeAddress: "", - KeyName: "default-fibre", + Enabled: false, + ConsensusAddress: "127.0.0.1:9090", + ConsensusTLS: false, + ConsensusAuthToken: "", + BridgeAddress: "", + BridgeAuthToken: "", + KeyName: "default-fibre", + UploadConcurrency: 0, + DownloadConcurrency: 0, }, }, Instrumentation: DefaultInstrumentationConfig(),