Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions pkg/bigint/bigint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@ func (i *BigInt) MarshalJSON() ([]byte, error) {
}

func (i *BigInt) UnmarshalJSON(b []byte) error {
var val string
err := json.Unmarshal(b, &val)
if err != nil {
return err
}

if i.Int == nil {
i.Int = new(big.Int)
}

i.SetString(val, 10)
var val string
if err := json.Unmarshal(b, &val); err == nil {
if _, ok := i.SetString(val, 10); !ok {
return fmt.Errorf("bigint: invalid decimal string %q", val)
}
return nil
}

var num json.Number
if err := json.Unmarshal(b, &num); err != nil {
return err
}
if _, ok := i.SetString(num.String(), 10); !ok {
return fmt.Errorf("bigint: invalid json number %q", num.String())
}
return nil
}

Expand All @@ -58,5 +65,9 @@ func (i *BigInt) UnmarshalBinary(data []byte) error {
return fmt.Errorf("bigint: UnmarshalBinary called with empty data")
}
i.Int = new(big.Int)
return i.GobDecode(data)
if err := i.GobDecode(data); err != nil {
// fallback: try JSON
return i.UnmarshalJSON(data)
}
return nil
}
29 changes: 29 additions & 0 deletions pkg/bigint/bigint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,32 @@ func TestMarshaling(t *testing.T) {
t.Error("Wrongly marshaled data")
}
}

func TestUnmarshalJSONAcceptsStringAndNumber(t *testing.T) {
t.Parallel()

tests := []struct {
name string
input string
want string
}{
{name: "string", input: `"123456789"`, want: "123456789"},
{name: "number", input: `123456789`, want: "123456789"},
{name: "negative number", input: `-42`, want: "-42"},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

var got bigint.BigInt
if err := got.UnmarshalJSON([]byte(tc.input)); err != nil {
t.Fatalf("UnmarshalJSON: %v", err)
}

if got.String() != tc.want {
t.Fatalf("got %s, want %s", got.String(), tc.want)
}
})
}
}
27 changes: 27 additions & 0 deletions pkg/pushsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type metrics struct {
ShallowReceiptDepth *prometheus.CounterVec
ShallowReceipt prometheus.Counter
OverdraftRefresh prometheus.Counter
WantSelf *prometheus.CounterVec
StoreReason *prometheus.CounterVec
WantSelfOutOfDepth prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -153,6 +156,30 @@ func newMetrics() metrics {
Name: "overdraft_refresh",
Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.",
}),
WantSelf: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "want_self_total",
Help: "Total number of times pushsync concluded that the local node should store the chunk.",
},
[]string{"cause", "origin"},
),
StoreReason: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "store_reason_total",
Help: "Total number of times chunks were stored locally by pushsync, partitioned by reason.",
},
[]string{"reason"},
),
WantSelfOutOfDepth: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "want_self_out_of_depth_total",
Help: "Total number of times a chunk was stored via ErrWantSelf with proximity strictly below the storage radius.",
}),
}
}

Expand Down
30 changes: 25 additions & 5 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ const (
maxPushErrors = 32
)

const (
wantSelfCauseClosestIsSelf = "closest_is_self"
wantSelfCauseNoPeerLeft = "no_peer_left"
storeReasonWithinAOR = "within_aor"
storeReasonWantSelf = "want_self"
)

var (
ErrNoPush = errors.New("could not push chunk")
ErrOutOfDepthStoring = errors.New("storing outside of the neighborhood")
Expand Down Expand Up @@ -293,13 +300,18 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}

if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad {
stored, reason = true, "is within AOR"
stored, reason = true, storeReasonWithinAOR
ps.metrics.StoreReason.WithLabelValues(reason).Inc()
return store(ctx)
}

switch receipt, err := ps.pushToClosest(ctx, chunk, false); {
case errors.Is(err, topology.ErrWantSelf):
stored, reason = true, "want self"
stored, reason = true, storeReasonWantSelf
ps.metrics.StoreReason.WithLabelValues(reason).Inc()
if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad {
ps.metrics.WantSelfOutOfDepth.Inc()
}
return store(ctx)
case err == nil:
ps.metrics.Forwarder.Inc()
Expand Down Expand Up @@ -422,6 +434,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
if cac.Valid(ch) {
go ps.unwrap(ch)
}
ps.incWantSelf(wantSelfCauseNoPeerLeft, origin)
return nil, topology.ErrWantSelf
}
ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err)
Expand Down Expand Up @@ -513,16 +526,23 @@ func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, skipLis

peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...)
peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...)
peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...)
}
return peer, err
}

if errors.Is(err, topology.ErrWantSelf) {
ps.incWantSelf(wantSelfCauseClosestIsSelf, origin)
}

return peer, err
}

func (ps *PushSync) incWantSelf(cause string, origin bool) {
ps.metrics.WantSelf.WithLabelValues(cause, strconv.FormatBool(origin)).Inc()
}

func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) {
// here we use a background timeout context because we do not want another push attempt to cancel this one
ctx, cancel := context.WithTimeout(context.Background(), defaultTTL)
Expand Down
Loading