Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2428,3 +2428,57 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

"/pubsub/{topic}":
get:
summary: Connect to a pubsub topic via WebSocket
description: |
Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write)
or subscriber (read-only) depending on the presence of GSOC headers.

**WebSocket protocol:**
- Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:4B][payload:N B]`
- Outbound (node → client): raw SOC payload `[sig:65B][span:4B][payload:N B]`
tags:
- Pubsub
parameters:
- in: path
name: topic
schema:
type: string
required: true
description: Topic identifier (hex-encoded address or arbitrary string to be hashed)
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic"
- in: header
name: swarm-keep-alive
schema:
type: integer
required: false
description: WebSocket ping period in seconds (default: 60)
responses:
"101":
description: WebSocket upgrade successful
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"

"/pubsub/":
get:
summary: List all pubsub topics
description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber)
tags:
- Pubsub
responses:
"200":
description: List of pubsub topics
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
49 changes: 49 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,31 @@ components:
required: false
description: "Indicates which feed version was resolved (v1 or v2)"

PubsubTopicInfo:
type: object
properties:
topicAddress:
type: string
description: "Hex-encoded topic address"
mode:
type: integer
description: "Pubsub mode identifier"
role:
type: string
description: "Role of this node: 'broker' or 'subscriber'"
connections:
type: array
items:
type: string
description: "List of connected peer overlays"

PubsubTopicListResponse:
type: object
properties:
topics:
type: array
items:
$ref: "#/components/schemas/PubsubTopicInfo"

parameters:
GasPriceParameter:
Expand Down Expand Up @@ -1279,6 +1304,30 @@ components:
required: false
description: "ACT history Unix timestamp"

SwarmPubsubPeer:
in: header
name: swarm-pubsub-peer
schema:
type: string
required: true
description: "Multiaddress of the broker peer to connect to for pubsub"

SwarmPubsubGsocEthAddress:
in: header
name: swarm-pubsub-gsoc-eth-address
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with swarm-pubsub-gsoc-topic to upgrade to publisher."

SwarmPubsubGsocTopic:
in: header
name: swarm-pubsub-gsoc-topic
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC topic identifier (hex) for publisher role. Required together with swarm-pubsub-gsoc-eth-address to upgrade to publisher."

responses:
"200":
description: Success
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/resolver"
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
Expand Down Expand Up @@ -94,6 +95,9 @@ const (
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
SwarmActPublisherHeader = "Swarm-Act-Publisher"
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"
SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer"
SwarmPubsubGsocEthAddressHeader = "Swarm-Pubsub-Gsoc-Eth-Address"
SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand Down Expand Up @@ -187,6 +191,7 @@ type Service struct {

topologyDriver topology.Driver
p2p p2p.DebugService
pubsubSvc *pubsub.Service
accounting accounting.Interface
chequebook chequebook.Service
pseudosettle settlement.Interface
Expand Down Expand Up @@ -270,6 +275,7 @@ type ExtraOptions struct {
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
PubsubService *pubsub.Service
}

func New(
Expand Down Expand Up @@ -361,6 +367,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.lightNodes = e.LightNodes
s.pseudosettle = e.Pseudosettle
s.blockTime = e.BlockTime
s.pubsubSvc = e.PubsubService

s.statusSem = semaphore.NewWeighted(1)
s.postageSem = semaphore.NewWeighted(1)
Expand Down Expand Up @@ -589,6 +596,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader,
SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader,
SwarmPubsubPeerHeader, SwarmPubsubGsocEthAddressHeader, SwarmPubsubGsocTopicHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
144 changes: 144 additions & 0 deletions pkg/api/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"encoding/hex"
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
ma "github.com/multiformats/go-multiaddr"
)

func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("pubsub").Build()

paths := struct {
Topic string `map:"topic" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

var topicAddr [32]byte
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
copy(topicAddr[:], decoded)
} else {
h := swarm.NewHasher()
_, _ = h.Write([]byte(paths.Topic))
copy(topicAddr[:], h.Sum(nil))
}

// Required header: underlay multiaddr
peerHeader := r.Header.Get(SwarmPubsubPeerHeader)
if peerHeader == "" {
jsonhttp.BadRequest(w, "missing Swarm-Pubsub-Peer header")
return
}
underlay, err := ma.NewMultiaddr(peerHeader)
if err != nil {
logger.Debug("invalid peer multiaddr", "value", peerHeader, "error", err)
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Peer header")
return
}

// Optional headers: GSOC fields for Publisher upgrade
var connectOpts pubsub.ConnectOptions

gsocEthAddrHex := r.Header.Get(SwarmPubsubGsocEthAddressHeader)
gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader)
if gsocEthAddrHex != "" && gsocTopicHex != "" {
gsocOwner, err := hex.DecodeString(gsocEthAddrHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Eth-Address header")
return
}
gsocID, err := hex.DecodeString(gsocTopicHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Topic header")
return
}
connectOpts.GsocOwner = gsocOwner
connectOpts.GsocID = gsocID
connectOpts.ReadWrite = true
}

headers := struct {
KeepAlive time.Duration `map:"Swarm-Keep-Alive"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

if s.beeMode == DevMode {
logger.Warning("pubsub endpoint is disabled in dev mode")
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
return
}

// Connect to broker peer
ctx, cancel := context.WithCancel(context.Background())
subscriberConn, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
if err != nil {
cancel()
logger.Debug("pubsub connect failed", "error", err)
jsonhttp.InternalServerError(w, "pubsub connect failed")
return
}

// Upgrade to WebSocket
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkWithSpanSize,
WriteBufferSize: swarm.ChunkWithSpanSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
cancel()
_ = subscriberConn.Stream.Close()
logger.Debug("websocket upgrade failed", "error", err)
logger.Error(nil, "websocket upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

pingPeriod := headers.KeepAlive * time.Second
if pingPeriod == 0 {
pingPeriod = time.Minute
}

isPublisher := connectOpts.ReadWrite

s.wsWg.Add(1)
go func() {
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isPublisher)
_ = conn.Close()
subscriberConn.Cancel()
s.wsWg.Done()
}()
}

func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
if s.pubsubSvc == nil {
jsonhttp.NotFound(w, "pubsub service not available")
return
}

topics := s.pubsubSvc.Topics()
jsonhttp.OK(w, struct {
Topics []pubsub.TopicInfo `json:"topics"`
}{
Topics: topics,
})
}
10 changes: 10 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ func (s *Service) mountAPI() {
),
})

handle("/pubsub/{topic}", web.ChainHandlers(
web.FinalHandlerFunc(s.pubsubWsHandler),
))

handle("/pubsub/", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pubsubListHandler),
}),
))

handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler))

handle("/gsoc/subscribe/{address}", web.ChainHandlers(
Expand Down
10 changes: 10 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/pricer"
"github.com/ethersphere/bee/v2/pkg/pricing"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/puller"
"github.com/ethersphere/bee/v2/pkg/pullsync"
"github.com/ethersphere/bee/v2/pkg/pusher"
Expand Down Expand Up @@ -192,6 +193,8 @@ type Options struct {
WarmupTime time.Duration
WelcomeMessage string
WhitelistedWithdrawalAddress []string
PubsubBrokerMode bool
PubsubMaxConnections int
}

const (
Expand Down Expand Up @@ -665,6 +668,7 @@ func NewBee(
Nonce: nonce,
ValidateOverlay: chainEnabled,
Registry: registry,
PubsubReservedStreamSlots: o.PubsubMaxConnections,
})
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
Expand Down Expand Up @@ -737,6 +741,11 @@ func NewBee(
return nil, fmt.Errorf("init batch service: %w", err)
}

pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode, o.PubsubMaxConnections)
if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil {
return nil, fmt.Errorf("pubsub protocol: %w", err)
}

// Construct protocols.
pingPong := pingpong.New(p2ps, logger, tracer)

Expand Down Expand Up @@ -1266,6 +1275,7 @@ func NewBee(
SyncStatus: syncStatusFn,
NodeStatus: nodeStatus,
PinIntegrity: localStore.PinIntegrity(),
PubsubService: pubsubSvc,
}

if o.APIAddr != "" {
Expand Down
Loading
Loading