From 8de8fac1f54cba9028bffa928190f23df78163e4 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 14 Apr 2026 16:13:44 +0200 Subject: [PATCH 1/5] feat/pubsub --- pkg/api/api.go | 8 + pkg/api/pubsub.go | 144 ++++++++++++++ pkg/api/router.go | 64 +++++++ pkg/node/node.go | 10 + pkg/p2p/libp2p/libp2p.go | 10 +- pkg/pubsub/mode.go | 210 +++++++++++++++++++++ pkg/pubsub/pubsub.go | 399 +++++++++++++++++++++++++++++++++++++++ pkg/pubsub/ws.go | 109 +++++++++++ 8 files changed, 952 insertions(+), 2 deletions(-) create mode 100644 pkg/api/pubsub.go create mode 100644 pkg/pubsub/mode.go create mode 100644 pkg/pubsub/pubsub.go create mode 100644 pkg/pubsub/ws.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 93595168cc3..d10d39a6339 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -41,6 +41,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/postagecontract" "github.com/ethersphere/bee/v2/pkg/pss" + "github.com/ethersphere/bee/v2/pkg/pubsub" "github.com/ethersphere/bee/v2/pkg/resolver" "github.com/ethersphere/bee/v2/pkg/resolver/client/ens" "github.com/ethersphere/bee/v2/pkg/resolver/multiresolver" @@ -94,6 +95,9 @@ const ( SwarmActTimestampHeader = "Swarm-Act-Timestamp" SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" + SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer" + SwarmPubsubGsocPublicKeyHeader = "Swarm-Pubsub-Gsoc-Public-Key" + SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic" ImmutableHeader = "Immutable" GasPriceHeader = "Gas-Price" @@ -187,6 +191,7 @@ type Service struct { topologyDriver topology.Driver p2p p2p.DebugService + pubsubSvc *pubsub.Service accounting accounting.Interface chequebook chequebook.Service pseudosettle settlement.Interface @@ -270,6 +275,7 @@ type ExtraOptions struct { SyncStatus func() (bool, error) NodeStatus *status.Service PinIntegrity PinIntegrity + PubsubService *pubsub.Service } func New( @@ -361,6 +367,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti s.lightNodes = e.LightNodes s.pseudosettle = e.Pseudosettle s.blockTime = e.BlockTime + s.pubsubSvc = e.PubsubService s.statusSem = semaphore.NewWeighted(1) s.postageSem = semaphore.NewWeighted(1) @@ -589,6 +596,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader, + SwarmPubsubPeerHeader, SwarmPubsubGsocPublicKeyHeader, SwarmPubsubGsocTopicHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/api/pubsub.go b/pkg/api/pubsub.go new file mode 100644 index 00000000000..a97ae52b703 --- /dev/null +++ b/pkg/api/pubsub.go @@ -0,0 +1,144 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "context" + "encoding/hex" + "net/http" + "time" + + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/pubsub" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + ma "github.com/multiformats/go-multiaddr" +) + +func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("pubsub").Build() + + paths := struct { + Topic string `map:"topic" validate:"required"` + }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + var topicAddr [32]byte + if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize { + copy(topicAddr[:], decoded) + } else { + h := swarm.NewHasher() + _, _ = h.Write([]byte(paths.Topic)) + copy(topicAddr[:], h.Sum(nil)) + } + + // Required header: underlay multiaddr + peerHeader := r.Header.Get(SwarmPubsubPeerHeader) + if peerHeader == "" { + jsonhttp.BadRequest(w, "missing Swarm-Pubsub-Peer header") + return + } + underlay, err := ma.NewMultiaddr(peerHeader) + if err != nil { + logger.Debug("invalid peer multiaddr", "value", peerHeader, "error", err) + jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Peer header") + return + } + + // Optional headers: GSOC fields for Participant upgrade + var connectOpts pubsub.ConnectOptions + + gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader) + gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader) + if gsocPubKeyHex != "" && gsocTopicHex != "" { + gsocOwner, err := hex.DecodeString(gsocPubKeyHex) + if err != nil { + jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Public-Key header") + return + } + gsocID, err := hex.DecodeString(gsocTopicHex) + if err != nil { + jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Topic header") + return + } + connectOpts.GsocOwner = gsocOwner + connectOpts.GsocID = gsocID + connectOpts.ReadWrite = true + } + + headers := struct { + KeepAlive time.Duration `map:"Swarm-Keep-Alive"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + if s.beeMode == DevMode { + logger.Warning("pubsub endpoint is disabled in dev mode") + jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) + return + } + + // Connect to broker peer + ctx, cancel := context.WithCancel(context.Background()) + subscriberConn, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts) + if err != nil { + cancel() + logger.Debug("pubsub connect failed", "error", err) + jsonhttp.InternalServerError(w, "pubsub connect failed") + return + } + + // Upgrade to WebSocket + upgrader := websocket.Upgrader{ + ReadBufferSize: swarm.ChunkWithSpanSize, + WriteBufferSize: swarm.ChunkWithSpanSize, + CheckOrigin: s.checkOrigin, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + cancel() + _ = subscriberConn.Stream.Close() + logger.Debug("websocket upgrade failed", "error", err) + logger.Error(nil, "websocket upgrade failed") + jsonhttp.InternalServerError(w, "upgrade failed") + return + } + + pingPeriod := headers.KeepAlive * time.Second + if pingPeriod == 0 { + pingPeriod = time.Minute + } + + isParticipant := connectOpts.ReadWrite + + s.wsWg.Add(1) + go func() { + pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isParticipant) + _ = conn.Close() + subscriberConn.Cancel() + s.wsWg.Done() + }() +} + +func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) { + if s.pubsubSvc == nil { + jsonhttp.NotFound(w, "pubsub service not available") + return + } + + topics := s.pubsubSvc.Topics() + jsonhttp.OK(w, struct { + Topics []pubsub.TopicInfo `json:"topics"` + }{ + Topics: topics, + }) +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 776def9457d..4614de3f2e2 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -364,6 +364,70 @@ func (s *Service) mountAPI() { ), }) + handle("/pubsub/{topic}", web.ChainHandlers( + web.FinalHandlerFunc(s.pubsubWsHandler), + )) + + handle("/pubsub/", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.pubsubListHandler), + }), + )) + + handle("/pss/subscribe/{topic}", web.ChainHandlers( + web.FinalHandlerFunc(s.pssWsHandler), + )) + + handle("/tags", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.listTagsHandler), + "POST": web.ChainHandlers( + jsonhttp.NewMaxBodyBytesHandler(1024), + web.FinalHandlerFunc(s.createTagHandler), + ), + })), + ) + + handle("/tags/{id}", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.getTagHandler), + "DELETE": http.HandlerFunc(s.deleteTagHandler), + "PATCH": web.ChainHandlers( + jsonhttp.NewMaxBodyBytesHandler(1024), + web.FinalHandlerFunc(s.doneSplitHandler), + ), + })), + ) + + handle("/pins", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.listPinnedRootHashes), + })), + ) + + handle("/pins/check", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.pinIntegrityHandler), + }), + )) + + handle("/pins/{reference}", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.getPinnedRootHash), + "POST": http.HandlerFunc(s.pinRootHash), + "DELETE": http.HandlerFunc(s.unpinRootHash), + })), + ) + + handle("/stewardship/{address}", jsonhttp.MethodHandler{ + "GET": web.ChainHandlers( + web.FinalHandlerFunc(s.stewardshipGetHandler), + ), + "PUT": web.ChainHandlers( + web.FinalHandlerFunc(s.stewardshipPutHandler), + ), + }) + handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler)) handle("/gsoc/subscribe/{address}", web.ChainHandlers( diff --git a/pkg/node/node.go b/pkg/node/node.go index 38107628149..63d602f7bc9 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -48,6 +48,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/pricer" "github.com/ethersphere/bee/v2/pkg/pricing" "github.com/ethersphere/bee/v2/pkg/pss" + "github.com/ethersphere/bee/v2/pkg/pubsub" "github.com/ethersphere/bee/v2/pkg/puller" "github.com/ethersphere/bee/v2/pkg/pullsync" "github.com/ethersphere/bee/v2/pkg/pusher" @@ -192,6 +193,8 @@ type Options struct { WarmupTime time.Duration WelcomeMessage string WhitelistedWithdrawalAddress []string + PubsubBrokerMode bool + PubsubMaxConnections int } const ( @@ -665,6 +668,7 @@ func NewBee( Nonce: nonce, ValidateOverlay: chainEnabled, Registry: registry, + PubsubReservedStreamSlots: o.PubsubMaxConnections, }) if err != nil { return nil, fmt.Errorf("p2p service: %w", err) @@ -737,6 +741,11 @@ func NewBee( return nil, fmt.Errorf("init batch service: %w", err) } + pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode, o.PubsubMaxConnections) + if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil { + return nil, fmt.Errorf("pubsub protocol: %w", err) + } + // Construct protocols. pingPong := pingpong.New(p2ps, logger, tracer) @@ -1266,6 +1275,7 @@ func NewBee( SyncStatus: syncStatusFn, NodeStatus: nodeStatus, PinIntegrity: localStore.PinIntegrity(), + PubsubService: pubsubSvc, } if o.APIAddr != "" { diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 316cbd6171f..9bdc73143cb 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -149,6 +149,7 @@ type Options struct { HeadersRWTimeout time.Duration Registry *prometheus.Registry autoTLSCertManager autoTLSCertManager + PubsubReservedStreamSlots int } func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, logger log.Logger, tracer *tracing.Tracer, o Options) (s *Service, returnErr error) { @@ -209,11 +210,16 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay } // Tweak certain settings + inboundLimit := rcmgr.LimitVal(IncomingStreamCountLimit - o.PubsubReservedStreamSlots) + if inboundLimit < 0 { + inboundLimit = 0 + } + cfg := rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ - Streams: IncomingStreamCountLimit + OutgoingStreamCountLimit, + Streams: inboundLimit + OutgoingStreamCountLimit, StreamsOutbound: OutgoingStreamCountLimit, - StreamsInbound: IncomingStreamCountLimit, + StreamsInbound: inboundLimit, }, } diff --git a/pkg/pubsub/mode.go b/pkg/pubsub/mode.go new file mode 100644 index 00000000000..9d0d3d0663d --- /dev/null +++ b/pkg/pubsub/mode.go @@ -0,0 +1,210 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +var ErrInvalidSignature = errors.New("pubsub: invalid SOC signature") + +const ( + // P2P headers + HeaderGsocOwner = "pubsub-gsoc-owner" + HeaderGsocID = "pubsub-gsoc-id" +) + +// Mode defines mode-specific behavior for the pubsub protocol. +// Each mode determines its own roles, wire format, and message handling. +type Mode interface { + ID() uint8 + TopicAddress() swarm.Address + Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) + + // Broker side - participant + ValidateParticipant(bc *brokerConn, headers p2p.Headers) error + ReadParticipantMessage(stream p2p.Stream) ([]byte, error) + + // Broker side - broadcast + FormatBroadcast(bt *brokerConn, sub *brokerSubscriber, rawMsg []byte) []byte + + // Subscriber/WS side + ReadBrokerMessage(stream p2p.Stream) ([]byte, error) +} + +// --- GSOC Ephemeral Mode (mode 1) --- + +const ( + // Message types (Broker → Subscriber) + MsgTypeHandshake byte = 0x01 + MsgTypeData byte = 0x02 +) + +// GSOCEphemeralMode implements Mode for GSOC ephemeral messaging. +type GSOCEphemeralMode struct { + topicAddress swarm.Address + gsocOwner []byte + gsocID []byte + handshakeHappened bool // true after the first message from broker. sends validation metadata +} + +var _ Mode = (*GSOCEphemeralMode)(nil) + +func NewGSOCEphemeralMode(topicAddress []byte) *GSOCEphemeralMode { + return &GSOCEphemeralMode{ + topicAddress: swarm.NewAddress(topicAddress), + } +} + +func (m *GSOCEphemeralMode) ID() uint8 { return ModeGSOCEphemeral } + +func (m *GSOCEphemeralMode) TopicAddress() swarm.Address { return m.topicAddress.Clone() } + +func (m *GSOCEphemeralMode) Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) { + var rw byte + if opts.ReadWrite { + rw = 1 + } + headers := p2p.Headers{ + HeaderTopicAddress: m.topicAddress.Bytes(), + HeaderMode: {m.ID()}, + HeaderReadWrite: {rw}, + } + if len(opts.GsocOwner) > 0 { + headers[HeaderGsocOwner] = opts.GsocOwner + } + if len(opts.GsocID) > 0 { + headers[HeaderGsocID] = opts.GsocID + } + return p.NewStream(ctx, overlay, headers, protocolName, protocolVersion, streamName) +} + +// ValidateParticipant sets SOC parameters on the broker side so it can validate the messages. +func (m *GSOCEphemeralMode) ValidateParticipant(bc *brokerConn, headers p2p.Headers) error { + gsocOwner := headers[HeaderGsocOwner] + gsocID := headers[HeaderGsocID] + + bc.mu.Lock() + m.setGsocParams(gsocOwner, gsocID) + set := m.gsocID != nil + bc.mu.Unlock() + + if !set { + return ErrWrongHeaders + } + return nil +} + +// FormatBroadcast formats a raw participant message for delivery to a subscriber. +// First delivery to each subscriber includes a handshake with SOC identity; subsequent are data-only. +func (m *GSOCEphemeralMode) FormatBroadcast(bt *brokerConn, sub *brokerSubscriber, rawMsg []byte) []byte { + if !m.handshakeHappened { + // Handshake: [1B type=0x01][32B SOC ID][20B owner][65B sig][4B span][NB payload] + msg := make([]byte, 1+IDSize+OwnerSize+len(rawMsg)) + msg[0] = MsgTypeHandshake + copy(msg[1:1+IDSize], m.gsocID) + copy(msg[1+IDSize:1+IDSize+OwnerSize], m.gsocOwner) + copy(msg[1+IDSize+OwnerSize:], rawMsg) + m.handshakeHappened = true + return msg + } + + // Data: [1B type=0x02][65B sig][4B span][NB payload] + msg := make([]byte, 1+len(rawMsg)) + msg[0] = MsgTypeData + copy(msg[1:], rawMsg) + return msg +} + +// ReadParticipantMessage reads [65B sig][4B span][NB payload (max 4KB)] from the stream, +// constructs and validates the SOC chunk and returns that. +func (m *GSOCEphemeralMode) ReadParticipantMessage(stream p2p.Stream) ([]byte, error) { + sig := make([]byte, SigSize) + if _, err := io.ReadFull(stream, sig); err != nil { + return nil, err + } + spanBytes := make([]byte, SpanSize) + if _, err := io.ReadFull(stream, spanBytes); err != nil { + return nil, err + } + span := min(binary.LittleEndian.Uint32(spanBytes), MaxPayload) + + payload := make([]byte, span) + if _, err := io.ReadFull(stream, payload); err != nil { + return nil, err + } + + // Construct SOC chunk with the known topic address: [ID (32B)][sig (65B)][span (4B)][payload] + // and validate whether message is valid + socData := make([]byte, IDSize+SigSize+SpanSize+int(span)) + copy(socData, m.gsocID) + copy(socData[IDSize:], sig) + copy(socData[IDSize+SigSize:], spanBytes) + copy(socData[IDSize+SigSize+SpanSize:], payload) + + if !soc.Valid(swarm.NewChunk(m.topicAddress, socData)) { + return nil, ErrInvalidSignature + } + + return socData[IDSize:], nil +} + +// ReadBrokerMessage reads one broker→subscriber message and verifies it +func (m *GSOCEphemeralMode) ReadBrokerMessage(stream p2p.Stream) ([]byte, error) { + typeBuf := make([]byte, 1) + if _, err := io.ReadFull(stream, typeBuf); err != nil { + return nil, err + } + + switch typeBuf[0] { + case MsgTypeHandshake: + socID := make([]byte, IDSize) + if _, err := io.ReadFull(stream, socID); err != nil { + return nil, fmt.Errorf("read SOC ID: %w", err) + } + ownerAddr := make([]byte, OwnerSize) + if _, err := io.ReadFull(stream, ownerAddr); err != nil { + return nil, fmt.Errorf("read owner addr: %w", err) + } + m.setGsocParams(ownerAddr, socID) + + return m.ReadParticipantMessage(stream) // same as participant message at this point + + case MsgTypeData: + if m.gsocID == nil { + return nil, fmt.Errorf("pubsub: data message before handshake") + } + return m.ReadParticipantMessage(stream) + + default: + return nil, fmt.Errorf("pubsub: unknown message type: 0x%02x", typeBuf[0]) + } +} + +// setGsocParams sets the GSOC recurring parameters so that messages don't need to include them. +func (m *GSOCEphemeralMode) setGsocParams(gsocOwner, gsocID []byte) { + if m.gsocOwner != nil { + return + } + // Verify got socId and address match with topicaddress + addr, err := soc.CreateAddress(gsocID, gsocOwner) + if err != nil || !bytes.Equal(addr.Bytes(), m.topicAddress.Bytes()) { + return + } + + m.gsocOwner = make([]byte, OwnerSize) + copy(m.gsocOwner, gsocOwner) + m.gsocID = make([]byte, IDSize) + copy(m.gsocID, gsocID) +} diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go new file mode 100644 index 00000000000..98ce8aa4867 --- /dev/null +++ b/pkg/pubsub/pubsub.go @@ -0,0 +1,399 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/swarm" + ma "github.com/multiformats/go-multiaddr" +) + +const ( + loggerName = "pubsub" + protocolName = "pubsub" + protocolVersion = "1.0.0" + streamName = "msg" + + // p2p stream header keys + HeaderTopicAddress = "pubsub-topic-address" + HeaderMode = "pubsub-mode" + HeaderReadWrite = "pubsub-readwrite" // 1 = read+write (participant), 0 = read-only (subscriber) + + // Mode constants + ModeGSOCEphemeral uint8 = 1 + + // Wire format sizes + SpanSize = 4 // pubsub span: uint32 little-endian + MaxPayload = swarm.ChunkSize + SigSize = swarm.SocSignatureSize + IDSize = swarm.HashSize + OwnerSize = crypto.AddressSize +) + +var ( + ErrBrokerDisabled = errors.New("pubsub: broker mode is disabled") + ErrMaxConnections = errors.New("pubsub: max connections reached") + ErrInvalidHandshake = errors.New("pubsub: handshake verification failed") + ErrWrongHeaders = errors.New("pubsub: wrong required headers") + ErrTopicMismatch = errors.New("pubsub: topic address mismatch") +) + +func newMode(topicAddr [32]byte, modeID uint8) (Mode, error) { + switch modeID { + case ModeGSOCEphemeral: + return NewGSOCEphemeralMode(topicAddr[:]), nil + default: + return nil, fmt.Errorf("pubsub: unknown mode: %d", modeID) + } +} + +// ConnectOptions carries optional mode-specific parameters for Connect. +type ConnectOptions struct { + ReadWrite bool // true = participant (read+write), false = subscriber (read-only) + GsocOwner []byte + GsocID []byte +} + +// TopicInfo describes a topic for the list endpoint. +type TopicInfo struct { + TopicAddress string `json:"topicAddress"` + Mode uint8 `json:"mode"` + Role string `json:"role"` + Connections []string `json:"connections"` +} + +// brokerSubscriber holds a subscriber's stream and outgoing message channel. +type brokerSubscriber struct { + overlay swarm.Address + stream p2p.Stream + outCh chan []byte + cancel context.CancelFunc +} + +// brokerConn manages all connections for a single topic on the broker side. +type brokerConn struct { + mu sync.RWMutex + mode Mode + subscribers map[string]*brokerSubscriber +} + +// SubscriberConn represents the subscriber-side connection to a broker. +type SubscriberConn struct { + Stream p2p.Stream + TopicAddr [32]byte + Mode Mode + Overlay swarm.Address + Cancel context.CancelFunc +} + +// P2P groups the p2p capabilities needed by the pubsub service. +type P2P interface { + p2p.Service + p2p.Streamer +} + +// Service is the pubsub protocol service. +type Service struct { + mu sync.RWMutex + p2p P2P + logger log.Logger + brokerMode bool + maxConns int + brokerConns map[[32]byte]*brokerConn // topic address -> broker connection + subscriberConns map[[32]byte]*SubscriberConn // topic address -> subscriber connection +} + +func New(p2p P2P, logger log.Logger, brokerMode bool, maxConns int) *Service { + s := &Service{ + p2p: p2p, + logger: logger.WithName(loggerName).Register(), + brokerMode: brokerMode, + maxConns: maxConns, + brokerConns: make(map[[32]byte]*brokerConn), + subscriberConns: make(map[[32]byte]*SubscriberConn), + } + return s +} + +// Protocol returns the p2p protocol spec. +func (s *Service) Protocol() p2p.ProtocolSpec { + return p2p.ProtocolSpec{ + Name: protocolName, + Version: protocolVersion, + StreamSpecs: []p2p.StreamSpec{ + { + Name: streamName, + Handler: s.brokerHandler, + }, + }, + } +} + +// Connect establishes a subscriber connection to a broker peer. +func (s *Service) Connect(ctx context.Context, underlay ma.Multiaddr, topicAddr [32]byte, modeID uint8, opts ConnectOptions) (*SubscriberConn, error) { + m, err := newMode(topicAddr, modeID) + if err != nil { + return nil, err + } + + bzzAddr, err := s.p2p.Connect(ctx, []ma.Multiaddr{underlay}) + if err != nil && !errors.Is(err, p2p.ErrAlreadyConnected) { + return nil, fmt.Errorf("connect to peer: %w", err) + } + + stream, err := m.Connect(ctx, s.p2p, bzzAddr.Overlay, opts) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + + connCtx, cancel := context.WithCancel(ctx) + sc := &SubscriberConn{ + Stream: stream, + TopicAddr: topicAddr, + Mode: m, + Overlay: bzzAddr.Overlay, + Cancel: cancel, + } + + s.mu.Lock() + s.subscriberConns[topicAddr] = sc + s.mu.Unlock() + + go func() { + <-connCtx.Done() + s.mu.Lock() + delete(s.subscriberConns, topicAddr) + s.mu.Unlock() + _ = stream.FullClose() + }() + + return sc, nil +} + +// Topics returns info about all active topics. +func (s *Service) Topics() []TopicInfo { + s.mu.RLock() + defer s.mu.RUnlock() + + var topics []TopicInfo + + for addr, bt := range s.brokerConns { + bt.mu.RLock() + conns := make([]string, 0, len(bt.subscribers)) + for _, sub := range bt.subscribers { + conns = append(conns, sub.overlay.String()) + } + bt.mu.RUnlock() + topics = append(topics, TopicInfo{ + TopicAddress: fmt.Sprintf("%x", addr), + Mode: bt.mode.ID(), + Role: "broker", + Connections: conns, + }) + } + + for addr, sc := range s.subscriberConns { + topics = append(topics, TopicInfo{ + TopicAddress: fmt.Sprintf("%x", addr), + Mode: sc.Mode.ID(), + Role: "subscriber", + Connections: []string{sc.Overlay.String()}, + }) + } + + return topics +} + +// brokerHandler handles incoming streams on the broker side. +func (s *Service) brokerHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { + if !s.brokerMode { + _ = stream.Reset() + return ErrBrokerDisabled + } + + headers := stream.Headers() + + topicAddrBytes := headers[HeaderTopicAddress] + if len(topicAddrBytes) != IDSize { + _ = stream.Reset() + return ErrWrongHeaders + } + var topicAddr [32]byte + copy(topicAddr[:], topicAddrBytes) + + modeBytes := headers[HeaderMode] + if len(modeBytes) != 1 { + _ = stream.Reset() + return ErrWrongHeaders + } + bc, err := s.getOrCreateBrokerConn(topicAddr, modeBytes[0]) + if err != nil { + _ = stream.Reset() + return err + } + + rwBytes := headers[HeaderReadWrite] + if len(rwBytes) != 1 { + _ = stream.Reset() + return ErrWrongHeaders + } + if rwBytes[0] == 1 { + return s.handleParticipant(ctx, peer, stream, bc, headers) + } + return s.handleSubscriber(ctx, peer, stream, bc) +} + +// registerBrokerConn registers the peer as a broadcast recipient on bc, starts a write goroutine, +// and returns the connection context, its cancel func, and an unregister func to call on exit. +func (s *Service) registerBrokerConn(ctx context.Context, peer p2p.Peer, stream p2p.Stream, bc *brokerConn) (connCtx context.Context, cancel context.CancelFunc, unregister func()) { + connCtx, cancel = context.WithCancel(ctx) + + conn := &brokerSubscriber{ + overlay: peer.Address, + stream: stream, + outCh: make(chan []byte, 256), + cancel: cancel, + } + + overlayKey := peer.Address.String() + bc.mu.Lock() + bc.subscribers[overlayKey] = conn + bc.mu.Unlock() + + go func() { + for { + select { + case <-connCtx.Done(): + return + case msg := <-conn.outCh: + if err := writeRaw(stream, msg); err != nil { + cancel() + return + } + } + } + }() + + unregister = func() { + bc.mu.Lock() + delete(bc.subscribers, overlayKey) + bc.mu.Unlock() + } + return +} + +func (s *Service) handleSubscriber(ctx context.Context, peer p2p.Peer, stream p2p.Stream, bc *brokerConn) error { + bc.mu.RLock() + connCount := len(bc.subscribers) + bc.mu.RUnlock() + if s.maxConns > 0 && connCount >= s.maxConns { + _ = stream.Reset() + return ErrMaxConnections + } + + subCtx, cancel, unregister := s.registerBrokerConn(ctx, peer, stream, bc) + defer cancel() + defer unregister() + + s.logger.Debug("subscriber connected", "peer", peer.Address, "topic", bc.mode.TopicAddress()) + + <-subCtx.Done() + if errors.Is(subCtx.Err(), context.Canceled) { + return nil + } + return subCtx.Err() +} + +func (s *Service) handleParticipant(ctx context.Context, peer p2p.Peer, stream p2p.Stream, bc *brokerConn, headers p2p.Headers) error { + if err := bc.mode.ValidateParticipant(bc, headers); err != nil { + _ = stream.Reset() + return err + } + + partCtx, cancel, unregister := s.registerBrokerConn(ctx, peer, stream, bc) + defer cancel() + defer unregister() + + s.logger.Debug("participant connected", "peer", peer.Address, "topic", bc.mode.TopicAddress()) + + for { + select { + case <-partCtx.Done(): + if errors.Is(partCtx.Err(), context.Canceled) { + return nil + } + return partCtx.Err() + default: + } + + rawMsg, err := bc.mode.ReadParticipantMessage(stream) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("read participant message: %w", err) + } + + s.broadcastToSubscribers(bc, rawMsg) + } +} + +// broadcastToSubscribers sends a message to all subscribers of a topic. +func (s *Service) broadcastToSubscribers(bc *brokerConn, rawMsg []byte) { + bc.mu.Lock() + defer bc.mu.Unlock() + + for _, sub := range bc.subscribers { + msg := bc.mode.FormatBroadcast(bc, sub, rawMsg) + + select { + case sub.outCh <- msg: + default: + s.logger.Warning("subscriber message queue full, dropping message", "peer", sub.overlay) + } + } +} + +func (s *Service) getOrCreateBrokerConn(topicAddr [32]byte, modeID uint8) (*brokerConn, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if bt, ok := s.brokerConns[topicAddr]; ok { + return bt, nil + } + + m, err := newMode(topicAddr, modeID) + if err != nil { + return nil, err + } + + bc := &brokerConn{ + mode: m, + subscribers: make(map[string]*brokerSubscriber), + } + s.brokerConns[topicAddr] = bc + return bc, nil +} + +// writeRaw writes raw bytes to the stream. +func writeRaw(stream p2p.Stream, data []byte) error { + c := 0 + for c < len(data) { + n, err := stream.Write(data[c:]) + if err != nil { + return err + } + c += n + } + return nil +} diff --git a/pkg/pubsub/ws.go b/pkg/pubsub/ws.go new file mode 100644 index 00000000000..1a19967a88d --- /dev/null +++ b/pkg/pubsub/ws.go @@ -0,0 +1,109 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pubsub + +import ( + "context" + "fmt" + "time" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/gorilla/websocket" +) + +type WsOptions struct { + PingPeriod time.Duration + Cancel context.CancelFunc +} + +// ListeningWs bridges a subscriber's p2p stream to a WebSocket connection. +// The Mode on sc.Mode handles all wire-format details: reading broker messages, +// verifying them, and returning the payload to forward to the WebSocket. +// If the subscriber is a Participant, it also reads from the WebSocket +// and writes raw messages to the p2p stream. +func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, logger log.Logger, sc *SubscriberConn, isParticipant bool) { + var ( + ticker = time.NewTicker(options.PingPeriod) + writeDeadline = options.PingPeriod + time.Second + readDeadline = options.PingPeriod + time.Second + ) + + conn.SetCloseHandler(func(code int, text string) error { + logger.Debug("pubsub ws: client gone", "topic", fmt.Sprintf("%x", sc.TopicAddr), "code", code, "message", text) + return nil + }) + + // If Participant, read from WebSocket and write to p2p stream (send to Broker). + if isParticipant { + go func() { + for { + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + logger.Debug("pubsub ws: set read deadline failed", "error", err) + break + } + _, p, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + logger.Debug("pubsub ws: read error", "error", err) + } + break + } + + if err := writeRaw(sc.Stream, p); err != nil { + logger.Debug("pubsub ws: write to p2p stream failed", "error", err) + break + } + } + options.Cancel() + }() + } + + // Read from p2p stream (Broker messages) and forward to WebSocket. + go func() { + for { + select { + case <-ctx.Done(): + return + default: + } + + wsPayload, err := sc.Mode.ReadBrokerMessage(sc.Stream) + if err != nil { + if ctx.Err() == nil { + logger.Debug("pubsub ws: read broker message failed", "error", err) + } + options.Cancel() + return + } + + if err := conn.WriteMessage(websocket.BinaryMessage, wsPayload); err != nil { + logger.Debug("pubsub ws: write to ws failed", "error", err) + options.Cancel() + return + } + } + }() + + defer func() { + ticker.Stop() + _ = conn.Close() + }() + + for { + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + logger.Debug("pubsub ws: set write deadline failed", "error", err) + return + } + select { + case <-ctx.Done(): + _ = conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + case <-ticker.C: + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} From e3de0250fd43f2b49a287311a91e6273494ff372 Mon Sep 17 00:00:00 2001 From: nugaon Date: Thu, 16 Apr 2026 09:44:21 +0200 Subject: [PATCH 2/5] refactor: mode id enum type --- pkg/pubsub/mode.go | 9 ++++++--- pkg/pubsub/pubsub.go | 12 ++++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/pubsub/mode.go b/pkg/pubsub/mode.go index 9d0d3d0663d..02504890084 100644 --- a/pkg/pubsub/mode.go +++ b/pkg/pubsub/mode.go @@ -25,10 +25,13 @@ const ( HeaderGsocID = "pubsub-gsoc-id" ) +// ModeID identifies a pubsub mode. +type ModeID uint8 + // Mode defines mode-specific behavior for the pubsub protocol. // Each mode determines its own roles, wire format, and message handling. type Mode interface { - ID() uint8 + ID() ModeID TopicAddress() swarm.Address Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) @@ -67,7 +70,7 @@ func NewGSOCEphemeralMode(topicAddress []byte) *GSOCEphemeralMode { } } -func (m *GSOCEphemeralMode) ID() uint8 { return ModeGSOCEphemeral } +func (m *GSOCEphemeralMode) ID() ModeID { return ModeGSOCEphemeral } func (m *GSOCEphemeralMode) TopicAddress() swarm.Address { return m.topicAddress.Clone() } @@ -78,7 +81,7 @@ func (m *GSOCEphemeralMode) Connect(ctx context.Context, p p2p.Streamer, overlay } headers := p2p.Headers{ HeaderTopicAddress: m.topicAddress.Bytes(), - HeaderMode: {m.ID()}, + HeaderMode: {byte(m.ID())}, HeaderReadWrite: {rw}, } if len(opts.GsocOwner) > 0 { diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index 98ce8aa4867..ab0557454e1 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -30,7 +30,7 @@ const ( HeaderReadWrite = "pubsub-readwrite" // 1 = read+write (participant), 0 = read-only (subscriber) // Mode constants - ModeGSOCEphemeral uint8 = 1 + ModeGSOCEphemeral ModeID = 1 // Wire format sizes SpanSize = 4 // pubsub span: uint32 little-endian @@ -48,7 +48,7 @@ var ( ErrTopicMismatch = errors.New("pubsub: topic address mismatch") ) -func newMode(topicAddr [32]byte, modeID uint8) (Mode, error) { +func newMode(topicAddr [32]byte, modeID ModeID) (Mode, error) { switch modeID { case ModeGSOCEphemeral: return NewGSOCEphemeralMode(topicAddr[:]), nil @@ -67,7 +67,7 @@ type ConnectOptions struct { // TopicInfo describes a topic for the list endpoint. type TopicInfo struct { TopicAddress string `json:"topicAddress"` - Mode uint8 `json:"mode"` + Mode ModeID `json:"mode"` Role string `json:"role"` Connections []string `json:"connections"` } @@ -140,7 +140,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { } // Connect establishes a subscriber connection to a broker peer. -func (s *Service) Connect(ctx context.Context, underlay ma.Multiaddr, topicAddr [32]byte, modeID uint8, opts ConnectOptions) (*SubscriberConn, error) { +func (s *Service) Connect(ctx context.Context, underlay ma.Multiaddr, topicAddr [32]byte, modeID ModeID, opts ConnectOptions) (*SubscriberConn, error) { m, err := newMode(topicAddr, modeID) if err != nil { return nil, err @@ -236,7 +236,7 @@ func (s *Service) brokerHandler(ctx context.Context, peer p2p.Peer, stream p2p.S _ = stream.Reset() return ErrWrongHeaders } - bc, err := s.getOrCreateBrokerConn(topicAddr, modeBytes[0]) + bc, err := s.getOrCreateBrokerConn(topicAddr, ModeID(modeBytes[0])) if err != nil { _ = stream.Reset() return err @@ -364,7 +364,7 @@ func (s *Service) broadcastToSubscribers(bc *brokerConn, rawMsg []byte) { } } -func (s *Service) getOrCreateBrokerConn(topicAddr [32]byte, modeID uint8) (*brokerConn, error) { +func (s *Service) getOrCreateBrokerConn(topicAddr [32]byte, modeID ModeID) (*brokerConn, error) { s.mu.Lock() defer s.mu.Unlock() From ad500fcaa5a351e26ff9c6416f56bb5e02f11175 Mon Sep 17 00:00:00 2001 From: nugaon Date: Thu, 16 Apr 2026 09:48:00 +0200 Subject: [PATCH 3/5] refactor: rename participant to publisher --- pkg/api/pubsub.go | 6 +++--- pkg/pubsub/mode.go | 20 ++++++++++---------- pkg/pubsub/pubsub.go | 16 ++++++++-------- pkg/pubsub/ws.go | 8 ++++---- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/api/pubsub.go b/pkg/api/pubsub.go index a97ae52b703..edf71e236e4 100644 --- a/pkg/api/pubsub.go +++ b/pkg/api/pubsub.go @@ -51,7 +51,7 @@ func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) { return } - // Optional headers: GSOC fields for Participant upgrade + // Optional headers: GSOC fields for Publisher upgrade var connectOpts pubsub.ConnectOptions gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader) @@ -118,11 +118,11 @@ func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) { pingPeriod = time.Minute } - isParticipant := connectOpts.ReadWrite + isPublisher := connectOpts.ReadWrite s.wsWg.Add(1) go func() { - pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isParticipant) + pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isPublisher) _ = conn.Close() subscriberConn.Cancel() s.wsWg.Done() diff --git a/pkg/pubsub/mode.go b/pkg/pubsub/mode.go index 02504890084..0d74777c470 100644 --- a/pkg/pubsub/mode.go +++ b/pkg/pubsub/mode.go @@ -35,9 +35,9 @@ type Mode interface { TopicAddress() swarm.Address Connect(ctx context.Context, p p2p.Streamer, overlay swarm.Address, opts ConnectOptions) (p2p.Stream, error) - // Broker side - participant - ValidateParticipant(bc *brokerConn, headers p2p.Headers) error - ReadParticipantMessage(stream p2p.Stream) ([]byte, error) + // Broker side - publisher + ValidatePublisher(bc *brokerConn, headers p2p.Headers) error + ReadPublisherMessage(stream p2p.Stream) ([]byte, error) // Broker side - broadcast FormatBroadcast(bt *brokerConn, sub *brokerSubscriber, rawMsg []byte) []byte @@ -93,8 +93,8 @@ func (m *GSOCEphemeralMode) Connect(ctx context.Context, p p2p.Streamer, overlay return p.NewStream(ctx, overlay, headers, protocolName, protocolVersion, streamName) } -// ValidateParticipant sets SOC parameters on the broker side so it can validate the messages. -func (m *GSOCEphemeralMode) ValidateParticipant(bc *brokerConn, headers p2p.Headers) error { +// ValidatePublisher sets SOC parameters on the broker side so it can validate the messages. +func (m *GSOCEphemeralMode) ValidatePublisher(bc *brokerConn, headers p2p.Headers) error { gsocOwner := headers[HeaderGsocOwner] gsocID := headers[HeaderGsocID] @@ -109,7 +109,7 @@ func (m *GSOCEphemeralMode) ValidateParticipant(bc *brokerConn, headers p2p.Head return nil } -// FormatBroadcast formats a raw participant message for delivery to a subscriber. +// FormatBroadcast formats a raw publisher message for delivery to a subscriber. // First delivery to each subscriber includes a handshake with SOC identity; subsequent are data-only. func (m *GSOCEphemeralMode) FormatBroadcast(bt *brokerConn, sub *brokerSubscriber, rawMsg []byte) []byte { if !m.handshakeHappened { @@ -130,9 +130,9 @@ func (m *GSOCEphemeralMode) FormatBroadcast(bt *brokerConn, sub *brokerSubscribe return msg } -// ReadParticipantMessage reads [65B sig][4B span][NB payload (max 4KB)] from the stream, +// ReadPublisherMessage reads [65B sig][4B span][NB payload (max 4KB)] from the stream, // constructs and validates the SOC chunk and returns that. -func (m *GSOCEphemeralMode) ReadParticipantMessage(stream p2p.Stream) ([]byte, error) { +func (m *GSOCEphemeralMode) ReadPublisherMessage(stream p2p.Stream) ([]byte, error) { sig := make([]byte, SigSize) if _, err := io.ReadFull(stream, sig); err != nil { return nil, err @@ -182,13 +182,13 @@ func (m *GSOCEphemeralMode) ReadBrokerMessage(stream p2p.Stream) ([]byte, error) } m.setGsocParams(ownerAddr, socID) - return m.ReadParticipantMessage(stream) // same as participant message at this point + return m.ReadPublisherMessage(stream) // same as publisher message at this point case MsgTypeData: if m.gsocID == nil { return nil, fmt.Errorf("pubsub: data message before handshake") } - return m.ReadParticipantMessage(stream) + return m.ReadPublisherMessage(stream) default: return nil, fmt.Errorf("pubsub: unknown message type: 0x%02x", typeBuf[0]) diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index ab0557454e1..d56bbbf3c76 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -27,7 +27,7 @@ const ( // p2p stream header keys HeaderTopicAddress = "pubsub-topic-address" HeaderMode = "pubsub-mode" - HeaderReadWrite = "pubsub-readwrite" // 1 = read+write (participant), 0 = read-only (subscriber) + HeaderReadWrite = "pubsub-readwrite" // 1 = read+write (publisher), 0 = read-only (subscriber) // Mode constants ModeGSOCEphemeral ModeID = 1 @@ -59,7 +59,7 @@ func newMode(topicAddr [32]byte, modeID ModeID) (Mode, error) { // ConnectOptions carries optional mode-specific parameters for Connect. type ConnectOptions struct { - ReadWrite bool // true = participant (read+write), false = subscriber (read-only) + ReadWrite bool // true = publisher (read+write), false = subscriber (read-only) GsocOwner []byte GsocID []byte } @@ -248,7 +248,7 @@ func (s *Service) brokerHandler(ctx context.Context, peer p2p.Peer, stream p2p.S return ErrWrongHeaders } if rwBytes[0] == 1 { - return s.handleParticipant(ctx, peer, stream, bc, headers) + return s.handlePublisher(ctx, peer, stream, bc, headers) } return s.handleSubscriber(ctx, peer, stream, bc) } @@ -314,8 +314,8 @@ func (s *Service) handleSubscriber(ctx context.Context, peer p2p.Peer, stream p2 return subCtx.Err() } -func (s *Service) handleParticipant(ctx context.Context, peer p2p.Peer, stream p2p.Stream, bc *brokerConn, headers p2p.Headers) error { - if err := bc.mode.ValidateParticipant(bc, headers); err != nil { +func (s *Service) handlePublisher(ctx context.Context, peer p2p.Peer, stream p2p.Stream, bc *brokerConn, headers p2p.Headers) error { + if err := bc.mode.ValidatePublisher(bc, headers); err != nil { _ = stream.Reset() return err } @@ -324,7 +324,7 @@ func (s *Service) handleParticipant(ctx context.Context, peer p2p.Peer, stream p defer cancel() defer unregister() - s.logger.Debug("participant connected", "peer", peer.Address, "topic", bc.mode.TopicAddress()) + s.logger.Debug("publisher connected", "peer", peer.Address, "topic", bc.mode.TopicAddress()) for { select { @@ -336,12 +336,12 @@ func (s *Service) handleParticipant(ctx context.Context, peer p2p.Peer, stream p default: } - rawMsg, err := bc.mode.ReadParticipantMessage(stream) + rawMsg, err := bc.mode.ReadPublisherMessage(stream) if err != nil { if errors.Is(err, io.EOF) { return nil } - return fmt.Errorf("read participant message: %w", err) + return fmt.Errorf("read publisher message: %w", err) } s.broadcastToSubscribers(bc, rawMsg) diff --git a/pkg/pubsub/ws.go b/pkg/pubsub/ws.go index 1a19967a88d..ac6ed49d230 100644 --- a/pkg/pubsub/ws.go +++ b/pkg/pubsub/ws.go @@ -21,9 +21,9 @@ type WsOptions struct { // ListeningWs bridges a subscriber's p2p stream to a WebSocket connection. // The Mode on sc.Mode handles all wire-format details: reading broker messages, // verifying them, and returning the payload to forward to the WebSocket. -// If the subscriber is a Participant, it also reads from the WebSocket +// If the subscriber is a Publisher, it also reads from the WebSocket // and writes raw messages to the p2p stream. -func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, logger log.Logger, sc *SubscriberConn, isParticipant bool) { +func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, logger log.Logger, sc *SubscriberConn, isPublisher bool) { var ( ticker = time.NewTicker(options.PingPeriod) writeDeadline = options.PingPeriod + time.Second @@ -35,8 +35,8 @@ func ListeningWs(ctx context.Context, conn *websocket.Conn, options WsOptions, l return nil }) - // If Participant, read from WebSocket and write to p2p stream (send to Broker). - if isParticipant { + // If Publisher, read from WebSocket and write to p2p stream (send to Broker). + if isPublisher { go func() { for { if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { From c3d1fe77b9727f3a11add53502d3fb9337f5d371 Mon Sep 17 00:00:00 2001 From: nugaon Date: Thu, 16 Apr 2026 15:08:58 +0200 Subject: [PATCH 4/5] docs: openapi --- openapi/Swarm.yaml | 54 ++++++++++++++++++++++++++++++++++++++++ openapi/SwarmCommon.yaml | 49 ++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 6bf0e89fc3c..435d2e3fd7d 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -2428,3 +2428,57 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/400" default: description: Default response. + + "/pubsub/{topic}": + get: + summary: Connect to a pubsub topic via WebSocket + description: | + Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write) + or subscriber (read-only) depending on the presence of GSOC headers. + + **WebSocket protocol:** + - Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:4B][payload:N B]` + - Outbound (node → client): raw SOC payload `[sig:65B][span:4B][payload:N B]` + tags: + - Pubsub + parameters: + - in: path + name: topic + schema: + type: string + required: true + description: Topic identifier (hex-encoded address or arbitrary string to be hashed) + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocPublicKey" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic" + - in: header + name: swarm-keep-alive + schema: + type: integer + required: false + description: WebSocket ping period in seconds (default: 60) + responses: + "101": + description: WebSocket upgrade successful + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + + "/pubsub/": + get: + summary: List all pubsub topics + description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber) + tags: + - Pubsub + responses: + "200": + description: List of pubsub topics + content: + application/json: + schema: + $ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse" + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index ab39fa48555..8d13c102dea 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1076,6 +1076,31 @@ components: required: false description: "Indicates which feed version was resolved (v1 or v2)" + PubsubTopicInfo: + type: object + properties: + topicAddress: + type: string + description: "Hex-encoded topic address" + mode: + type: integer + description: "Pubsub mode identifier" + role: + type: string + description: "Role of this node: 'broker' or 'subscriber'" + connections: + type: array + items: + type: string + description: "List of connected peer overlays" + + PubsubTopicListResponse: + type: object + properties: + topics: + type: array + items: + $ref: "#/components/schemas/PubsubTopicInfo" parameters: GasPriceParameter: @@ -1279,6 +1304,30 @@ components: required: false description: "ACT history Unix timestamp" + SwarmPubsubPeer: + in: header + name: swarm-pubsub-peer + schema: + type: string + required: true + description: "Multiaddress of the broker peer to connect to for pubsub" + + SwarmPubsubGsocPublicKey: + in: header + name: swarm-pubsub-gsoc-public-key + schema: + $ref: "#/components/schemas/HexString" + required: false + description: "GSOC public key (hex) for publisher role. Required together with swarm-pubsub-gsoc-topic to upgrade to publisher." + + SwarmPubsubGsocTopic: + in: header + name: swarm-pubsub-gsoc-topic + schema: + $ref: "#/components/schemas/HexString" + required: false + description: "GSOC topic identifier (hex) for publisher role. Required together with swarm-pubsub-gsoc-public-key to upgrade to publisher." + responses: "200": description: Success From ecf9be6d74893ac49a49bff5bd794d17cb71acaa Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 17 Apr 2026 16:32:51 +0200 Subject: [PATCH 5/5] fix: eth address instead of public key on api --- openapi/Swarm.yaml | 2 +- openapi/SwarmCommon.yaml | 8 +++--- pkg/api/api.go | 4 +-- pkg/api/pubsub.go | 8 +++--- pkg/api/router.go | 54 ---------------------------------------- 5 files changed, 11 insertions(+), 65 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 435d2e3fd7d..d5a67ab9af6 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -2449,7 +2449,7 @@ paths: required: true description: Topic identifier (hex-encoded address or arbitrary string to be hashed) - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer" - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocPublicKey" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic" - in: header name: swarm-keep-alive diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index 8d13c102dea..d1fd4293642 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1312,13 +1312,13 @@ components: required: true description: "Multiaddress of the broker peer to connect to for pubsub" - SwarmPubsubGsocPublicKey: + SwarmPubsubGsocEthAddress: in: header - name: swarm-pubsub-gsoc-public-key + name: swarm-pubsub-gsoc-eth-address schema: $ref: "#/components/schemas/HexString" required: false - description: "GSOC public key (hex) for publisher role. Required together with swarm-pubsub-gsoc-topic to upgrade to publisher." + description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with swarm-pubsub-gsoc-topic to upgrade to publisher." SwarmPubsubGsocTopic: in: header @@ -1326,7 +1326,7 @@ components: schema: $ref: "#/components/schemas/HexString" required: false - description: "GSOC topic identifier (hex) for publisher role. Required together with swarm-pubsub-gsoc-public-key to upgrade to publisher." + description: "GSOC topic identifier (hex) for publisher role. Required together with swarm-pubsub-gsoc-eth-address to upgrade to publisher." responses: "200": diff --git a/pkg/api/api.go b/pkg/api/api.go index d10d39a6339..748ccc2500b 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -96,7 +96,7 @@ const ( SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer" - SwarmPubsubGsocPublicKeyHeader = "Swarm-Pubsub-Gsoc-Public-Key" + SwarmPubsubGsocEthAddressHeader = "Swarm-Pubsub-Gsoc-Eth-Address" SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic" ImmutableHeader = "Immutable" @@ -596,7 +596,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader, - SwarmPubsubPeerHeader, SwarmPubsubGsocPublicKeyHeader, SwarmPubsubGsocTopicHeader, + SwarmPubsubPeerHeader, SwarmPubsubGsocEthAddressHeader, SwarmPubsubGsocTopicHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/api/pubsub.go b/pkg/api/pubsub.go index edf71e236e4..e9a6723646b 100644 --- a/pkg/api/pubsub.go +++ b/pkg/api/pubsub.go @@ -54,12 +54,12 @@ func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) { // Optional headers: GSOC fields for Publisher upgrade var connectOpts pubsub.ConnectOptions - gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader) + gsocEthAddrHex := r.Header.Get(SwarmPubsubGsocEthAddressHeader) gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader) - if gsocPubKeyHex != "" && gsocTopicHex != "" { - gsocOwner, err := hex.DecodeString(gsocPubKeyHex) + if gsocEthAddrHex != "" && gsocTopicHex != "" { + gsocOwner, err := hex.DecodeString(gsocEthAddrHex) if err != nil { - jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Public-Key header") + jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Eth-Address header") return } gsocID, err := hex.DecodeString(gsocTopicHex) diff --git a/pkg/api/router.go b/pkg/api/router.go index 4614de3f2e2..473ff48f142 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -374,60 +374,6 @@ func (s *Service) mountAPI() { }), )) - handle("/pss/subscribe/{topic}", web.ChainHandlers( - web.FinalHandlerFunc(s.pssWsHandler), - )) - - handle("/tags", web.ChainHandlers( - web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.listTagsHandler), - "POST": web.ChainHandlers( - jsonhttp.NewMaxBodyBytesHandler(1024), - web.FinalHandlerFunc(s.createTagHandler), - ), - })), - ) - - handle("/tags/{id}", web.ChainHandlers( - web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.getTagHandler), - "DELETE": http.HandlerFunc(s.deleteTagHandler), - "PATCH": web.ChainHandlers( - jsonhttp.NewMaxBodyBytesHandler(1024), - web.FinalHandlerFunc(s.doneSplitHandler), - ), - })), - ) - - handle("/pins", web.ChainHandlers( - web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.listPinnedRootHashes), - })), - ) - - handle("/pins/check", web.ChainHandlers( - web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.pinIntegrityHandler), - }), - )) - - handle("/pins/{reference}", web.ChainHandlers( - web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.getPinnedRootHash), - "POST": http.HandlerFunc(s.pinRootHash), - "DELETE": http.HandlerFunc(s.unpinRootHash), - })), - ) - - handle("/stewardship/{address}", jsonhttp.MethodHandler{ - "GET": web.ChainHandlers( - web.FinalHandlerFunc(s.stewardshipGetHandler), - ), - "PUT": web.ChainHandlers( - web.FinalHandlerFunc(s.stewardshipPutHandler), - ), - }) - handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler)) handle("/gsoc/subscribe/{address}", web.ChainHandlers(