diff --git a/pkg/bigint/bigint.go b/pkg/bigint/bigint.go index ce03a464600..c83cfffed2d 100644 --- a/pkg/bigint/bigint.go +++ b/pkg/bigint/bigint.go @@ -23,18 +23,25 @@ func (i *BigInt) MarshalJSON() ([]byte, error) { } func (i *BigInt) UnmarshalJSON(b []byte) error { - var val string - err := json.Unmarshal(b, &val) - if err != nil { - return err - } - if i.Int == nil { i.Int = new(big.Int) } - i.SetString(val, 10) + var val string + if err := json.Unmarshal(b, &val); err == nil { + if _, ok := i.SetString(val, 10); !ok { + return fmt.Errorf("bigint: invalid decimal string %q", val) + } + return nil + } + var num json.Number + if err := json.Unmarshal(b, &num); err != nil { + return err + } + if _, ok := i.SetString(num.String(), 10); !ok { + return fmt.Errorf("bigint: invalid json number %q", num.String()) + } return nil } @@ -58,5 +65,9 @@ func (i *BigInt) UnmarshalBinary(data []byte) error { return fmt.Errorf("bigint: UnmarshalBinary called with empty data") } i.Int = new(big.Int) - return i.GobDecode(data) + if err := i.GobDecode(data); err != nil { + // fallback: try JSON + return i.UnmarshalJSON(data) + } + return nil } diff --git a/pkg/bigint/bigint_test.go b/pkg/bigint/bigint_test.go index 1b6cdf824e3..8d63a50497b 100644 --- a/pkg/bigint/bigint_test.go +++ b/pkg/bigint/bigint_test.go @@ -117,3 +117,32 @@ func TestMarshaling(t *testing.T) { t.Error("Wrongly marshaled data") } } + +func TestUnmarshalJSONAcceptsStringAndNumber(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + want string + }{ + {name: "string", input: `"123456789"`, want: "123456789"}, + {name: "number", input: `123456789`, want: "123456789"}, + {name: "negative number", input: `-42`, want: "-42"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + var got bigint.BigInt + if err := got.UnmarshalJSON([]byte(tc.input)); err != nil { + t.Fatalf("UnmarshalJSON: %v", err) + } + + if got.String() != tc.want { + t.Fatalf("got %s, want %s", got.String(), tc.want) + } + }) + } +} diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 758a6f072a5..d8fba31a29d 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -29,6 +29,9 @@ type metrics struct { ShallowReceiptDepth *prometheus.CounterVec ShallowReceipt prometheus.Counter OverdraftRefresh prometheus.Counter + WantSelf *prometheus.CounterVec + StoreReason *prometheus.CounterVec + WantSelfOutOfDepth prometheus.Counter } func newMetrics() metrics { @@ -153,6 +156,30 @@ func newMetrics() metrics { Name: "overdraft_refresh", Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.", }), + WantSelf: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "want_self_total", + Help: "Total number of times pushsync concluded that the local node should store the chunk.", + }, + []string{"cause", "origin"}, + ), + StoreReason: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "store_reason_total", + Help: "Total number of times chunks were stored locally by pushsync, partitioned by reason.", + }, + []string{"reason"}, + ), + WantSelfOutOfDepth: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "want_self_out_of_depth_total", + Help: "Total number of times a chunk was stored via ErrWantSelf with proximity strictly below the storage radius.", + }), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 05c4aef3d41..dd05f3a1552 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -56,6 +56,13 @@ const ( maxPushErrors = 32 ) +const ( + wantSelfCauseClosestIsSelf = "closest_is_self" + wantSelfCauseNoPeerLeft = "no_peer_left" + storeReasonWithinAOR = "within_aor" + storeReasonWantSelf = "want_self" +) + var ( ErrNoPush = errors.New("could not push chunk") ErrOutOfDepthStoring = errors.New("storing outside of the neighborhood") @@ -293,13 +300,18 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) } if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad { - stored, reason = true, "is within AOR" + stored, reason = true, storeReasonWithinAOR + ps.metrics.StoreReason.WithLabelValues(reason).Inc() return store(ctx) } switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): - stored, reason = true, "want self" + stored, reason = true, storeReasonWantSelf + ps.metrics.StoreReason.WithLabelValues(reason).Inc() + if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { + ps.metrics.WantSelfOutOfDepth.Inc() + } return store(ctx) case err == nil: ps.metrics.Forwarder.Inc() @@ -422,6 +434,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if cac.Valid(ch) { go ps.unwrap(ch) } + ps.incWantSelf(wantSelfCauseNoPeerLeft, origin) return nil, topology.ErrWantSelf } ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) @@ -513,16 +526,23 @@ func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, skipLis peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { - peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { - return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) } - return peer, err + } + + if errors.Is(err, topology.ErrWantSelf) { + ps.incWantSelf(wantSelfCauseClosestIsSelf, origin) } return peer, err } +func (ps *PushSync) incWantSelf(cause string, origin bool) { + ps.metrics.WantSelf.WithLabelValues(cause, strconv.FormatBool(origin)).Inc() +} + func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { // here we use a background timeout context because we do not want another push attempt to cancel this one ctx, cancel := context.WithTimeout(context.Background(), defaultTTL)