diff --git a/.mdl_style.rb b/.mdl_style.rb index ad9dbe0..7ea5620 100644 --- a/.mdl_style.rb +++ b/.mdl_style.rb @@ -5,3 +5,11 @@ rule 'MD007', :indent => 3 rule "MD029", style => "one" + +# Keep-a-Changelog (https://keepachangelog.com) uses repeated `### Added`, +# `### Fixed`, `### Security` headings under each `## [version]` heading by +# design. MD024 with the default config flags those as duplicates. +# allow_different_nesting permits same-text headings as long as they sit +# under distinct parent headings — which is exactly the Keep-a-Changelog +# shape, and still catches genuine duplicates within the same section. +rule "MD024", :allow_different_nesting => true diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e29d4d..d9cbf01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,48 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] -## [2.0.1] — 2026-05-05 +### Added + +- **Structured logging on the dist backend.** New `WithDistLogger(*slog.Logger)` + option wires a structured logger into the dist backend's background + loops (heartbeat, hint replay, rebalance, merkle sync) and operational + error surfaces (HTTP listener bind failures, serve-goroutine exits, + failed migrations during rebalance, dropped hints, peer state + transitions). Library default is silent — `WithDistLogger` not called + installs a `slog.DiscardHandler` so the dist backend never writes to + stderr unless the caller opts in. Every record is pre-bound with + `component=dist_memory` and `node_id=` attributes for grep/filter. + Phase A.1 of the production-readiness work. +- **OpenTelemetry tracing on the dist backend.** New + `WithDistTracerProvider(trace.TracerProvider)` option opens spans on + every public `Get` / `Set` / `Remove`, with child spans + (`dist.replicate.set` / `dist.replicate.remove`) per peer during + fan-out. Span attributes include `cache.key.length`, + `dist.consistency`, `dist.owners.count`, `dist.acks`, `cache.hit`, + and `peer.id`. Cache key *values* are intentionally never recorded + on spans — keys can be PII (user IDs, session tokens). Library + default is a no-op tracer (`noop.NewTracerProvider`), so spans cost + nothing unless the caller opts in. New `ConsistencyLevel.String()` + method renders consistency levels human-readably for log/span attrs. + Phase A.2 of the production-readiness work. +- **OpenTelemetry metrics on the dist backend.** New + `WithDistMeterProvider(metric.MeterProvider)` option registers an + observable instrument for every field on `DistMetrics` — counters + for cumulative totals (`dist.write.attempts`, `dist.forward.*`, + `dist.hinted.*`, `dist.merkle.syncs`, `dist.rebalance.*`, etc.), + gauges for current state (`dist.members.alive`, + `dist.tombstones.active`, `dist.hinted.bytes`, last-operation + latencies in nanoseconds, etc.). A single registered callback + observes all instruments from one `Metrics()` snapshot per + collection cycle, so there is no per-operation overhead beyond the + existing atomic counters. Names use the `dist.` prefix so a + Prometheus exporter renders them under a single subsystem. + `Stop` unregisters the callback so the SDK does not invoke it + against a stopped backend. Library default is a no-op meter, so + metrics cost nothing unless the caller opts in. Phase A.3 of the + production-readiness work. + +## [0.5.0] — 2026-05-05 ### Security @@ -25,7 +66,7 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). in via the new `DistHTTPAuth.AllowAnonymousInbound` field. All other configurations (`Token`-only, `Token+ServerVerify`, `Token+ClientSign`, `ServerVerify`-only) are unaffected. Reported by the post-tag - security review; addressed before any v2.0.0 public announcement. + security review; addressed before any v0.5.0 public announcement. ### Added @@ -34,7 +75,7 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - `sentinel.ErrInsecureAuthConfig` — surfaced from `NewDistMemory` when the auth policy would silently disable inbound enforcement. -## [2.0.0] — 2026-05-04 +## [0.4.3] — 2026-05-04 A modernization release. The headline themes: @@ -86,7 +127,6 @@ RFCs that informed the design decisions live under [docs/rfcs/](docs/rfcs/). ### Performance Measurements on Apple M4 Pro, `go test -bench`, `count=5`, benchstat. -Full release snapshot captured in [bench-v2.0.0.txt](bench-v2.0.0.txt). - **Per-shard atomic `Count`.** `BenchmarkConcurrentMap_Count`: 53 → ~10 ns/op. `_CountParallel`: 1181 → ~13 ns/op. Eliminates the @@ -186,5 +226,5 @@ Worth surfacing for contributors: [RFC document](docs/rfcs/0001-backend-owned-eviction.md) preserves the measurement and the lessons. -[Unreleased]: https://github.com/hyp3rd/hypercache/compare/v2.0.0...HEAD -[2.0.0]: https://github.com/hyp3rd/hypercache/releases/tag/v2.0.0 +Unreleased: +Released: [0.5.0](https://github.com/hyp3rd/hypercache/releases/tag/v0.5.0) diff --git a/README.md b/README.md index 56e4899..cb3eb67 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,7 @@ Available algorithm names you can pass to `WithEvictionAlgorithm`: Note: ARC is experimental and isn’t included in the default registry. If you choose to use it, register it manually or enable it explicitly in your build. -#### Sharded eviction (default since v2.0.0) +#### Sharded eviction (default since v0.5.0) The configured algorithm is wrapped by a 32-shard router (`pkg/eviction/sharded.go`) that uses the same key hash as `ConcurrentMap` — so a key's data shard and eviction shard line up. This eliminates the global mutex contention single-instance algorithms (LRU/LFU/Clock/CAWOLFU) suffer from. Total capacity is honored within ±32 (one slot of slack per shard), and items evict per-shard rather than in strict global LRU/LFU order. @@ -263,7 +263,7 @@ Limitations / not yet implemented: - Compression on the wire. - Persistence / durability (out of scope presently). -#### Transport hardening (since v2.0.0) +#### Transport hardening (since v0.5.0) The dist HTTP server and the auto-created HTTP client share a single configuration surface — apply the same option to every node in the cluster. @@ -347,7 +347,7 @@ Test helpers `AddPeer` and `RemovePeer` simulate join / leave events that trigge | Advanced versioning (HLC/vector) | Planned | | Client SDK (direct routing) | Planned | | Tracing spans | Planned | -| Security (TLS/auth) | Done (since v2.0.0; see "Transport hardening") | +| Security (TLS/auth) | Done (since v0.5.0; see "Transport hardening") | | Compression | Planned | | Persistence | Out of scope (current phase) | | Chaos / fault injection | Planned | diff --git a/__examples/observability/observability.go b/__examples/observability/observability.go index 3bf011a..614b304 100644 --- a/__examples/observability/observability.go +++ b/__examples/observability/observability.go @@ -34,7 +34,8 @@ func main() { tracer := trace.NewNoopTracerProvider().Tracer("hypercache/examples") // Apply OTel tracing and metrics middleware. - svc = hypercache.ApplyMiddleware(svc, + svc = hypercache.ApplyMiddleware( + svc, func(next hypercache.Service) hypercache.Service { return middleware.NewOTelTracingMiddleware(next, tracer, middleware.WithCommonAttributes( attribute.String("component", "hypercache"), diff --git a/__examples/service/service.go b/__examples/service/service.go index 369ba31..b8d3584 100644 --- a/__examples/service/service.go +++ b/__examples/service/service.go @@ -37,7 +37,8 @@ func main() { logger := log.Default() // apply middleware in the same order as you want to execute them - svc = hypercache.ApplyMiddleware(svc, + svc = hypercache.ApplyMiddleware( + svc, // middleware.YourMiddleware, func(next hypercache.Service) hypercache.Service { return middleware.NewLoggingMiddleware(next, logger) diff --git a/cspell.config.yaml b/cspell.config.yaml index 12dbb77..63cca55 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -132,6 +132,8 @@ words: - longbridgeapp - maxmemory - memprofile + - metricdata + - metricnoop - Merkle - mfinal - Mgmt @@ -146,6 +148,7 @@ words: - noctx - noinlineerr - nolint + - nolintlint - nonamedreturns - nosec - NOVENDOR @@ -162,6 +165,7 @@ words: - Repls - rerr - sarif + - sdkmetric - sectools - securego - sess @@ -180,6 +184,7 @@ words: - thelper - toplevel - tparallel + - tracetest - traefik - ugorji - unmarshals diff --git a/docs/rfcs/0001-backend-owned-eviction.md b/docs/rfcs/0001-backend-owned-eviction.md index 7370430..7803e1d 100644 --- a/docs/rfcs/0001-backend-owned-eviction.md +++ b/docs/rfcs/0001-backend-owned-eviction.md @@ -253,7 +253,7 @@ Per the RFC's own discipline (`Reject A and revisit if any criterion fails`): "slower on Get, semantically-correct LRU." Default stays legacy. 1. **Do not pursue Option A2** (co-located locks) — the win Option A would have justified A2 isn't there to amortize the bigger refactor. -1. **The "Get does not touch LRU" semantic gap is a separate concern** +1n. **The "Get does not touch LRU" semantic gap is a separate concern** that could be addressed inside the legacy path (have HyperCache.Get call `evictionAlgorithm.Get(key)`) at similar cost to the Item-aware Touch — i.e., the cost is fundamental to "real LRU", not specific diff --git a/go.mod b/go.mod index a14c7c7..bc66b0a 100644 --- a/go.mod +++ b/go.mod @@ -13,12 +13,16 @@ require ( github.com/ugorji/go/codec v1.3.1 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 ) require ( github.com/andybalholm/brotli v1.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/schema v1.7.1 // indirect github.com/gofiber/utils/v2 v2.0.4 // indirect github.com/google/uuid v1.6.0 // indirect @@ -27,15 +31,14 @@ require ( github.com/mattn/go-isatty v0.0.22 // indirect github.com/philhofer/fwd v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/tinylib/msgp v1.6.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.71.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/crypto v0.50.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/sys v0.43.0 // indirect golang.org/x/text v0.36.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 987a387..b03d265 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fxamacker/cbor/v2 v2.9.1 h1:2rWm8B193Ll4VdjsJY28jxs70IdDsHRWgQYAI80+rMQ= github.com/fxamacker/cbor/v2 v2.9.1/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -34,11 +35,8 @@ github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXD github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= @@ -77,10 +75,16 @@ go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index 051e3d1..7752e95 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/subtle" "crypto/tls" + "log/slog" "net" "net/http" "strconv" @@ -47,6 +48,12 @@ type distHTTPServer struct { // use, TLS handshake failure on accept) instead of having them // silently swallowed. serveErr atomic.Pointer[error] + // logger is the structured logger inherited from the parent + // DistMemory. Used to surface serve-goroutine errors that previously + // only landed in serveErr (LastServeError accessor) — operators + // running with a configured logger now see them in their log stream + // at the moment of failure, not just on demand. + logger *slog.Logger } // DistHTTPAuth configures authentication for the dist HTTP server @@ -482,8 +489,18 @@ func (s *distHTTPServer) listen(ctx context.Context) error { if serveErr != nil { // Stash so operators can read it via LastServeError(); a // listener that crashed silently is the worst kind of - // production bug. + // production bug. Also surface to the structured logger when + // configured so the failure shows up in the operator's log + // stream at the moment it happens, not just on demand. s.serveErr.Store(&serveErr) + + if s.logger != nil { + s.logger.Error( + "dist HTTP serve goroutine exited", + slog.String("addr", s.addr), + slog.Any("err", serveErr), + ) + } } }() diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 97679fc..19a4bc8 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -8,20 +8,33 @@ import ( "errors" "hash" "hash/fnv" + "log/slog" "math/big" "slices" + "strconv" "strings" "sync" "sync/atomic" "time" "github.com/hyp3rd/ewrap" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + metricnoop "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "github.com/hyp3rd/hypercache/internal/cluster" "github.com/hyp3rd/hypercache/internal/sentinel" cache "github.com/hyp3rd/hypercache/pkg/cache/v2" ) +// distTracerName is the OpenTelemetry instrumentation-library name for +// dist-backend spans. Stable, package-qualified — operators can grep +// span streams by it without depending on individual span names. +const distTracerName = "github.com/hyp3rd/hypercache/pkg/backend" + // Internal tuning constants. const ( defaultDistShardCount = 8 // default number of shards @@ -157,6 +170,34 @@ type DistMemory struct { // stopped guards Stop() against double-invocation (idempotent shutdown). stopped atomic.Bool + + // tracer is the OpenTelemetry tracer used to create spans on the + // public Get/Set/Remove ops and on replication fan-out. Defaults + // to noop.NewTracerProvider so library code emits no spans unless + // the caller opts in via WithDistTracerProvider. The noop tracer's + // Span values are zero-allocation, so the always-on instrumentation + // is cheap on the hot path when tracing is disabled. + tracer trace.Tracer + + // meter is the OpenTelemetry meter used to register observable + // counters/gauges that mirror the in-process distMetrics atomics. + // Defaults to noop.NewMeterProvider so library code emits no + // metrics unless the caller opts in via WithDistMeterProvider. + // metricRegistration retains the registration handle returned by + // meter.RegisterCallback so Stop can unregister cleanly — without + // it the SDK would keep invoking the callback after shutdown. + meter metric.Meter + metricRegistration metric.Registration + + // logger is the structured logger used by background loops + // (heartbeat, hint replay, rebalance, gossip, merkle sync) and + // error surfaces (transport bind failures, sync errors, dropped + // hints). Defaults to a no-op handler writing to io.Discard so + // library code does not write to stderr unless the caller opts + // in via WithDistLogger. All log lines are pre-bound with + // `node_id` so operators can grep/filter without the call sites + // having to weave the ID through every record. + logger *slog.Logger } const ( @@ -196,6 +237,22 @@ const ( ConsistencyAll ) +// String returns the human-readable form for logs and span attributes. +// Unknown values render as `consistency()` rather than panicking +// so a corrupted/forwards-compatible value still produces useful telemetry. +func (l ConsistencyLevel) String() string { + switch l { + case ConsistencyOne: + return "ONE" + case ConsistencyQuorum: + return "QUORUM" + case ConsistencyAll: + return "ALL" + default: + return "consistency(" + strconv.Itoa(int(l)) + ")" + } +} + // WithDistReadConsistency sets read consistency (default ONE). func WithDistReadConsistency(l ConsistencyLevel) DistMemoryOption { return func(dm *DistMemory) { dm.readConsistency = l } @@ -433,6 +490,12 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error { remoteTree, err := transport.FetchMerkle(ctx, nodeID) if err != nil { + dm.logger.Warn( + "merkle sync fetch failed", + slog.String("peer_id", nodeID), + slog.Any("err", err), + ) + return err } @@ -625,6 +688,88 @@ func WithDistHTTPAuth(auth DistHTTPAuth) DistMemoryOption { return func(dm *DistMemory) { dm.httpAuth = auth } } +// WithDistLogger supplies a structured logger for the dist backend's +// background loops (heartbeat, hint replay, rebalance, gossip, merkle +// auto-sync) and operational error surfaces (HTTP listener failures, +// transport errors, dropped hints). The supplied logger is wrapped with +// `node_id` and `component=dist_memory` attributes before use, so call +// sites do not need to weave the node ID through every record. +// +// Pass slog.Default() to inherit the application's logger, or supply a +// custom *slog.Logger with the desired level / handler. Zero-value (no +// option call) keeps the dist backend silent — the default uses an +// io.Discard handler, which means library code never writes to stderr +// unless the caller opts in. +// +// nil is treated as "no change" — useful when callers conditionally +// build options. +func WithDistLogger(logger *slog.Logger) DistMemoryOption { + return func(dm *DistMemory) { + if logger != nil { + dm.logger = logger + } + } +} + +// WithDistTracerProvider supplies an OpenTelemetry TracerProvider for +// the dist backend. When set, every public Get/Set/Remove call opens a +// span (`dist.get` / `dist.set` / `dist.remove`) carrying consistency +// level and key length attributes; replication fan-out adds child spans +// (`dist.replicate.set` / `dist.replicate.remove`) per peer so operators +// can see where time is spent under load. +// +// Span attributes intentionally omit the cache key value — keys can be +// PII (user IDs, session tokens). Only `cache.key.length` is recorded. +// Callers needing the key value should add their own outer span before +// invoking the dist backend. +// +// Pass otel.GetTracerProvider() to inherit the application's globally +// registered provider, or supply a custom *sdktrace.TracerProvider to +// route dist spans to a dedicated exporter. nil is treated as "no +// change" — useful for conditional option building. +// +// Library default (no option call) installs a no-op tracer, so library +// code emits no spans unless the caller opts in. +func WithDistTracerProvider(tp trace.TracerProvider) DistMemoryOption { + return func(dm *DistMemory) { + if tp != nil { + dm.tracer = tp.Tracer(distTracerName) + } + } +} + +// WithDistMeterProvider supplies an OpenTelemetry MeterProvider for the +// dist backend. When set, NewDistMemory registers an observable +// instrument for every field on DistMetrics — counters for cumulative +// totals (writes, forwards, hints, rebalance batches, etc.), gauges for +// current state (active tombstones, hint queue size, alive/suspect/dead +// member counts) and last-operation latencies (merkle build/diff/fetch +// nanoseconds, last rebalance/auto-sync duration). Instrument names use +// the `dist.` prefix so a Prometheus exporter can route them under a +// dedicated subsystem. +// +// A single registered callback drives all instruments: on each +// collection cycle it takes one Metrics() snapshot and observes every +// instrument from that snapshot. There is no per-operation overhead +// when a real meter is configured beyond the existing atomic counters +// the dist backend already maintains. +// +// Pass otel.GetMeterProvider() to inherit the application's globally +// registered provider, or supply a custom MeterProvider built via the +// otel/sdk/metric package (typically wrapping a Prometheus exporter or +// OTLP pipeline). nil is treated as "no change" — useful for +// conditional option building. +// +// Library default (no option call) installs a no-op meter, so library +// code emits no metrics unless the caller opts in. +func WithDistMeterProvider(mp metric.MeterProvider) DistMemoryOption { + return func(dm *DistMemory) { + if mp != nil { + dm.meter = mp.Meter(distTracerName) + } + } +} + // NewDistMemory creates a new DistMemory backend. func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[DistMemory], error) { // Derive a server-lifetime context from the caller's ctx so that: @@ -659,6 +804,8 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist return nil, authErr } + dm.installTelemetryDefaults() + dm.ensureShardConfig() dm.initMembershipIfNeeded() // Pass the lifecycle ctx to subsystems that capture it (HTTP handlers, @@ -790,84 +937,56 @@ func (dm *DistMemory) Count(_ context.Context) int { // Get fetches item. func (dm *DistMemory) Get(ctx context.Context, key string) (*cache.Item, bool) { - start := time.Now() - defer func() { - if dm.latency != nil { - dm.latency.observe(opGet, time.Since(start)) - } - }() - - if dm.readConsistency == ConsistencyOne { // fast local path - if it, ok := dm.shardFor(key).items.GetCopy(key); ok { - return it, true - } - } + ctx, span := dm.tracer.Start( + ctx, "dist.get", + trace.WithAttributes( + attribute.Int("cache.key.length", len(key)), + attribute.String("dist.consistency", dm.readConsistency.String()), + ), + ) + defer span.End() - owners := dm.lookupOwners(key) - if len(owners) == 0 { - return nil, false - } + start := time.Now() + item, hit := dm.getImpl(ctx, key) - if dm.readConsistency == ConsistencyOne { - return dm.getOne(ctx, key, owners) - } + span.SetAttributes(attribute.Bool("cache.hit", hit)) - if dm.parallelReads { - return dm.getWithConsistencyParallel(ctx, key, owners) + if dm.latency != nil { + dm.latency.observe(opGet, time.Since(start)) } - return dm.getWithConsistency(ctx, key, owners) + return item, hit } // Set stores item. func (dm *DistMemory) Set(ctx context.Context, item *cache.Item) error { - err := item.Valid() - if err != nil { - return err + validateErr := item.Valid() + if validateErr != nil { + return validateErr } - start := time.Now() - defer func() { - if dm.latency != nil { - dm.latency.observe(opSet, time.Since(start)) - } - }() - - dm.metrics.writeAttempts.Add(1) - - owners := dm.lookupOwners(item.Key) - if len(owners) == 0 { - return sentinel.ErrNotOwner - } + ctx, span := dm.tracer.Start( + ctx, "dist.set", + trace.WithAttributes( + attribute.Int("cache.key.length", len(item.Key)), + attribute.String("dist.consistency", dm.writeConsistency.String()), + ), + ) + defer span.End() - if owners[0] != dm.localNode.ID { // attempt forward; may promote - proceedAsPrimary, ferr := dm.handleForwardPrimary(ctx, owners, item) - if ferr != nil { - return ferr - } + start := time.Now() - if !proceedAsPrimary { // forwarded successfully; nothing else to do - return nil - } + err := dm.setImpl(ctx, item, span) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } - // primary path: assign version & timestamp - item.Version = dm.versionCounter.Add(1) - item.Origin = string(dm.localNode.ID) - item.LastUpdated = time.Now() - dm.applySet(ctx, item, false) - - acks := 1 + dm.replicateTo(ctx, item, owners[1:]) - dm.metrics.writeAcks.Add(int64(acks)) - - needed := dm.requiredAcks(len(owners), dm.writeConsistency) - if acks < needed { - dm.metrics.writeQuorumFailures.Add(1) - - return sentinel.ErrQuorumFailed + if dm.latency != nil { + dm.latency.observe(opSet, time.Since(start)) } - return nil + return err } // --- Consistency helper methods. --- @@ -888,36 +1007,25 @@ func (dm *DistMemory) List(_ context.Context, _ ...IFilter) ([]*cache.Item, erro // Remove deletes keys. func (dm *DistMemory) Remove(ctx context.Context, keys ...string) error { - start := time.Now() - defer func() { - if dm.latency != nil { - dm.latency.observe(opRemove, time.Since(start)) - } - }() - - for _, key := range keys { - if dm.ownsKeyInternal(key) { // primary path - dm.applyRemove(ctx, key, true) - - continue - } - - transport := dm.loadTransport() - if transport == nil { // non-owner without transport - return sentinel.ErrNotOwner - } + ctx, span := dm.tracer.Start( + ctx, "dist.remove", + trace.WithAttributes(attribute.Int("dist.keys.count", len(keys))), + ) + defer span.End() - owners := dm.ring.Lookup(key) - if len(owners) == 0 { - continue - } + start := time.Now() - dm.metrics.forwardRemove.Add(1) + err := dm.removeImpl(ctx, keys) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } - _ = transport.ForwardRemove(ctx, string(owners[0]), key, true) + if dm.latency != nil { + dm.latency.observe(opRemove, time.Since(start)) } - return nil + return err } // Clear wipes all shards. @@ -1246,6 +1354,21 @@ func (dm *DistMemory) Stop(ctx context.Context) error { dm.rebalanceStopCh = nil } + // Unregister the OTel metric callback before tearing down the HTTP + // server so the SDK does not invoke a callback against a + // half-stopped DistMemory (Metrics() reads membership which is + // still safe, but the snapshot would be misleading). Errors are + // logged, not propagated — Stop is idempotent and the unregister + // path is best-effort. + if dm.metricRegistration != nil { + err := dm.metricRegistration.Unregister() + if err != nil { + dm.logger.Error("dist meter: callback unregister failed", slog.Any("err", err)) + } + + dm.metricRegistration = nil + } + if dm.httpServer != nil { err := dm.httpServer.stop(ctx) // best-effort @@ -1917,7 +2040,19 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { dm.metrics.rebalancedKeys.Add(1) dm.metrics.rebalancedPrimary.Add(1) - _ = transport.ForwardSet(ctx, string(owners[0]), item, true) + // Fire-and-forget forwarding: failures are dropped silently today + // (Phase B will introduce a retry queue). Logging is the minimum + // surface so operators can correlate vanished keys with transport + // failures during rolling deploys. + migrationErr := transport.ForwardSet(ctx, string(owners[0]), item, true) + if migrationErr != nil { + dm.logger.Warn( + "rebalance migration forward failed", + slog.String("key", item.Key), + slog.String("new_primary", string(owners[0])), + slog.Any("err", migrationErr), + ) + } // Update originalPrimary so we don't recount repeatedly. sh := dm.shardFor(item.Key) @@ -2077,14 +2212,28 @@ func (dm *DistMemory) tryStartHTTP(ctx context.Context) { server := newDistHTTPServer(dm.nodeAddr, limits, dm.httpAuth) server.ctx = dm.lifeCtx // handler-side cancellation tied to Stop + server.logger = dm.logger err := server.start(ctx, dm) - if err != nil { // best-effort + if err != nil { // best-effort, but the operator must see this + dm.logger.Error( + "dist HTTP listener bind failed", + slog.String("addr", dm.nodeAddr), + slog.Any("err", err), + ) + return } dm.httpServer = server + dm.logger.Info( + "dist HTTP listener started", + slog.String("addr", dm.nodeAddr), + slog.Bool("tls", limits.TLSConfig != nil), + slog.Bool("auth", dm.httpAuth.inboundConfigured()), + ) + resolver := dm.makePeerURLResolver(limits) dm.storeTransport(NewDistHTTPTransportWithAuth(limits, dm.httpAuth, resolver)) @@ -2353,7 +2502,12 @@ func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replica continue } - err := transport.ForwardSet(ctx, string(oid), item, false) + // Reuse the per-peer child-span helper so the primary-path + // fan-out emits the same `dist.replicate.set` spans that + // applySet's incoming-replica fan-out emits — operators see a + // uniform trace tree regardless of which path drove the + // replication. + err := dm.replicateSetWithSpan(ctx, transport, oid, item) if err == nil { acks++ @@ -2609,6 +2763,13 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint dm.metrics.hintedDropped.Add(1) + dm.logger.Warn( + "hint dropped after replay error", + slog.String("peer_id", nodeID), + slog.String("key", entry.item.Key), + slog.Any("err", err), + ) + return 1 } @@ -2992,6 +3153,508 @@ func (dm *DistMemory) ownsKeyInternal(key string) bool { return len(owners) == 0 // empty ring => owner } +// getImpl is the business logic for Get, separated so the public Get +// stays a thin tracing/latency wrapper. The split keeps the tracing +// wrapper to a literal `defer span.End()`, avoids named returns, and +// still lets the wrapper set `cache.hit` after the impl returns. +func (dm *DistMemory) getImpl(ctx context.Context, key string) (*cache.Item, bool) { + if dm.readConsistency == ConsistencyOne { // fast local path + if it, ok := dm.shardFor(key).items.GetCopy(key); ok { + return it, true + } + } + + owners := dm.lookupOwners(key) + if len(owners) == 0 { + return nil, false + } + + if dm.readConsistency == ConsistencyOne { + return dm.getOne(ctx, key, owners) + } + + if dm.parallelReads { + return dm.getWithConsistencyParallel(ctx, key, owners) + } + + return dm.getWithConsistency(ctx, key, owners) +} + +// setImpl is the business logic for Set. Takes the active span as a +// param so it can attach owners.count / acks attributes mid-flight; +// returns the operation error for the wrapper to record on the span. +func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace.Span) error { + dm.metrics.writeAttempts.Add(1) + + owners := dm.lookupOwners(item.Key) + if len(owners) == 0 { + return sentinel.ErrNotOwner + } + + span.SetAttributes(attribute.Int("dist.owners.count", len(owners))) + + if owners[0] != dm.localNode.ID { // attempt forward; may promote + proceedAsPrimary, ferr := dm.handleForwardPrimary(ctx, owners, item) + if ferr != nil { + return ferr + } + + if !proceedAsPrimary { // forwarded successfully; nothing else to do + return nil + } + } + + // primary path: assign version & timestamp + item.Version = dm.versionCounter.Add(1) + item.Origin = string(dm.localNode.ID) + item.LastUpdated = time.Now() + dm.applySet(ctx, item, false) + + acks := 1 + dm.replicateTo(ctx, item, owners[1:]) + dm.metrics.writeAcks.Add(int64(acks)) + + span.SetAttributes(attribute.Int("dist.acks", acks)) + + needed := dm.requiredAcks(len(owners), dm.writeConsistency) + if acks < needed { + dm.metrics.writeQuorumFailures.Add(1) + + return sentinel.ErrQuorumFailed + } + + return nil +} + +// removeImpl is the business logic for Remove. +func (dm *DistMemory) removeImpl(ctx context.Context, keys []string) error { + for _, key := range keys { + if dm.ownsKeyInternal(key) { // primary path + dm.applyRemove(ctx, key, true) + + continue + } + + transport := dm.loadTransport() + if transport == nil { // non-owner without transport + return sentinel.ErrNotOwner + } + + owners := dm.ring.Lookup(key) + if len(owners) == 0 { + continue + } + + dm.metrics.forwardRemove.Add(1) + + _ = transport.ForwardRemove(ctx, string(owners[0]), key, true) + } + + return nil +} + +// distMetricSpec describes one OTel observable instrument backed by a +// field on DistMetrics. The kind selects between cumulative-counter and +// gauge semantics (OTel exporters render them differently); `get` reads +// the value from a DistMetrics snapshot. +type distMetricSpec struct { + name string + desc string + unit string + counter bool // true = ObservableCounter, false = ObservableGauge + get func(DistMetrics) int64 +} + +// OTel UCUM-compatible unit annotations for the dist metrics. Pulled +// out as constants so adjacent table entries don't repeat the literal +// (and trip the goconst linter). +const ( + unitOp = "{op}" + unitProbe = "{probe}" + unitTransition = "{transition}" + unitResolution = "{resolution}" + unitHint = "{hint}" + unitBytes = "By" + unitSync = "{sync}" + unitKey = "{key}" + unitTick = "{tick}" + unitTombstone = "{tombstone}" + unitAck = "{ack}" + unitBatch = "{batch}" + unitEvent = "{event}" + unitNanos = "ns" + unitNode = "{node}" + unitVersion = "{version}" +) + +// distMetricSpecs is the registry of every DistMetrics field exposed +// via OpenTelemetry. Names are dot-separated and `dist.`-prefixed so a +// Prometheus exporter renders them as `dist_` under a single +// subsystem. New atomics on distMetrics MUST be mirrored here — the +// JSON snapshot at /dist/metrics is the human-facing surface, OTel is +// the production-pipeline surface, and divergence between them creates +// confusing operator handoffs. +// +//nolint:gochecknoglobals // package-level table is the registration source of truth; alternatives hurt readability +var distMetricSpecs = []distMetricSpec{ + // --- Routing / forwarding (cumulative) --- + { + name: "dist.forward.get", unit: unitOp, counter: true, + desc: "Get requests forwarded to a remote owner", + get: func(m DistMetrics) int64 { return m.ForwardGet }, + }, + { + name: "dist.forward.set", unit: unitOp, counter: true, + desc: "Set requests forwarded to a remote primary", + get: func(m DistMetrics) int64 { return m.ForwardSet }, + }, + { + name: "dist.forward.remove", unit: unitOp, counter: true, + desc: "Remove requests forwarded to a remote primary", + get: func(m DistMetrics) int64 { return m.ForwardRemove }, + }, + { + name: "dist.replica.fanout.set", unit: unitOp, counter: true, + desc: "Replica fan-out attempts on Set", + get: func(m DistMetrics) int64 { return m.ReplicaFanoutSet }, + }, + { + name: "dist.replica.fanout.remove", unit: unitOp, counter: true, + desc: "Replica fan-out attempts on Remove", + get: func(m DistMetrics) int64 { return m.ReplicaFanoutRemove }, + }, + { + name: "dist.read.repair", unit: unitOp, counter: true, + desc: "Read-repair operations triggered by stale-replica reads", + get: func(m DistMetrics) int64 { return m.ReadRepair }, + }, + { + name: "dist.replica.get.miss", unit: unitOp, counter: true, + desc: "Replica Get returned not-found for a key the primary holds", + get: func(m DistMetrics) int64 { return m.ReplicaGetMiss }, + }, + { + name: "dist.read.primary_promote", unit: unitOp, counter: true, + desc: "Read path promoted to next owner after primary unreachable", + get: func(m DistMetrics) int64 { return m.ReadPrimaryPromote }, + }, + + // --- Heartbeat / membership transitions (cumulative) --- + { + name: "dist.heartbeat.success", unit: unitProbe, counter: true, + desc: "Successful heartbeat probes", + get: func(m DistMetrics) int64 { return m.HeartbeatSuccess }, + }, + { + name: "dist.heartbeat.failure", unit: unitProbe, counter: true, + desc: "Failed heartbeat probes", + get: func(m DistMetrics) int64 { return m.HeartbeatFailure }, + }, + { + name: "dist.nodes.suspect", unit: unitTransition, counter: true, + desc: "Cumulative peer transitions to suspect state", + get: func(m DistMetrics) int64 { return m.NodesSuspect }, + }, + { + name: "dist.nodes.dead", unit: unitTransition, counter: true, + desc: "Cumulative peer transitions to dead state", + get: func(m DistMetrics) int64 { return m.NodesDead }, + }, + { + name: "dist.nodes.removed", unit: unitTransition, counter: true, + desc: "Cumulative peer prunes from membership", + get: func(m DistMetrics) int64 { return m.NodesRemoved }, + }, + + // --- Versioning (cumulative) --- + { + name: "dist.version.conflicts", unit: unitResolution, counter: true, + desc: "Version-conflict resolutions (later version replaced earlier)", + get: func(m DistMetrics) int64 { return m.VersionConflicts }, + }, + { + name: "dist.version.tie_breaks", unit: unitResolution, counter: true, + desc: "Version-conflict tie-breaks decided by origin", + get: func(m DistMetrics) int64 { return m.VersionTieBreaks }, + }, + + // --- Hinted handoff (cumulative + bytes gauge) --- + { + name: "dist.hinted.queued", unit: unitHint, counter: true, + desc: "Hints queued for unreachable peers", + get: func(m DistMetrics) int64 { return m.HintedQueued }, + }, + { + name: "dist.hinted.replayed", unit: unitHint, counter: true, + desc: "Hints successfully replayed", + get: func(m DistMetrics) int64 { return m.HintedReplayed }, + }, + { + name: "dist.hinted.expired", unit: unitHint, counter: true, + desc: "Hints expired before delivery", + get: func(m DistMetrics) int64 { return m.HintedExpired }, + }, + { + name: "dist.hinted.dropped", unit: unitHint, counter: true, + desc: "Hints dropped after replay error (non-recoverable)", + get: func(m DistMetrics) int64 { return m.HintedDropped }, + }, + { + name: "dist.hinted.global_dropped", unit: unitHint, counter: true, + desc: "Hints dropped due to global queue caps (count/bytes)", + get: func(m DistMetrics) int64 { return m.HintedGlobalDropped }, + }, + { + name: "dist.hinted.bytes", unit: unitBytes, counter: false, + desc: "Approximate total bytes currently queued in hints", + get: func(m DistMetrics) int64 { return m.HintedBytes }, + }, + + // --- Anti-entropy (Merkle) --- + { + name: "dist.merkle.syncs", unit: unitSync, counter: true, + desc: "Completed Merkle sync operations", + get: func(m DistMetrics) int64 { return m.MerkleSyncs }, + }, + { + name: "dist.merkle.keys_pulled", unit: unitKey, counter: true, + desc: "Keys applied during Merkle sync", + get: func(m DistMetrics) int64 { return m.MerkleKeysPulled }, + }, + { + name: "dist.merkle.last_build_ns", unit: unitNanos, counter: false, + desc: "Duration of last Merkle tree build", + get: func(m DistMetrics) int64 { return m.MerkleBuildNanos }, + }, + { + name: "dist.merkle.last_diff_ns", unit: unitNanos, counter: false, + desc: "Duration of last Merkle diff computation", + get: func(m DistMetrics) int64 { return m.MerkleDiffNanos }, + }, + { + name: "dist.merkle.last_fetch_ns", unit: unitNanos, counter: false, + desc: "Duration of last remote Merkle tree fetch", + get: func(m DistMetrics) int64 { return m.MerkleFetchNanos }, + }, + { + name: "dist.auto_sync.loops", unit: unitTick, counter: true, + desc: "Auto-sync ticks executed", + get: func(m DistMetrics) int64 { return m.AutoSyncLoops }, + }, + { + name: "dist.auto_sync.last_ns", unit: unitNanos, counter: false, + desc: "Duration of last auto-sync loop", + get: func(m DistMetrics) int64 { return m.LastAutoSyncNanos }, + }, + + // --- Tombstones --- + { + name: "dist.tombstones.active", unit: unitTombstone, counter: false, + desc: "Approximate active tombstones across all shards", + get: func(m DistMetrics) int64 { return m.TombstonesActive }, + }, + { + name: "dist.tombstones.purged", unit: unitTombstone, counter: true, + desc: "Cumulative tombstones purged by sweeper", + get: func(m DistMetrics) int64 { return m.TombstonesPurged }, + }, + + // --- Writes / quorum --- + { + name: "dist.write.attempts", unit: unitOp, counter: true, + desc: "Total Set operations attempted", + get: func(m DistMetrics) int64 { return m.WriteAttempts }, + }, + { + name: "dist.write.acks", unit: unitAck, counter: true, + desc: "Cumulative Set replica acks (includes primary)", + get: func(m DistMetrics) int64 { return m.WriteAcks }, + }, + { + name: "dist.write.quorum_failures", unit: unitOp, counter: true, + desc: "Set operations that failed quorum", + get: func(m DistMetrics) int64 { return m.WriteQuorumFailures }, + }, + + // --- Rebalance --- + { + name: "dist.rebalance.keys", unit: unitKey, counter: true, + desc: "Keys migrated during rebalance", + get: func(m DistMetrics) int64 { return m.RebalancedKeys }, + }, + { + name: "dist.rebalance.batches", unit: unitBatch, counter: true, + desc: "Rebalance batches processed", + get: func(m DistMetrics) int64 { return m.RebalanceBatches }, + }, + { + name: "dist.rebalance.throttle", unit: unitEvent, counter: true, + desc: "Rebalance throttle events (concurrency cap reached)", + get: func(m DistMetrics) int64 { return m.RebalanceThrottle }, + }, + { + name: "dist.rebalance.last_ns", unit: unitNanos, counter: false, + desc: "Duration of last rebalance scan", + get: func(m DistMetrics) int64 { return m.RebalanceLastNanos }, + }, + { + name: "dist.rebalance.replica_diff", unit: unitKey, counter: true, + desc: "Keys pushed to newly added replicas (replica-only diff)", + get: func(m DistMetrics) int64 { return m.RebalancedReplicaDiff }, + }, + { + name: "dist.rebalance.replica_diff_throttle", unit: unitEvent, counter: true, + desc: "Replica-diff scans that exited early due to per-tick limit", + get: func(m DistMetrics) int64 { return m.RebalanceReplicaDiffThrottle }, + }, + { + name: "dist.rebalance.primary", unit: unitKey, counter: true, + desc: "Keys whose primary ownership changed", + get: func(m DistMetrics) int64 { return m.RebalancedPrimary }, + }, + + // --- Membership state (gauges) --- + { + name: "dist.members.alive", unit: unitNode, counter: false, + desc: "Currently alive members in the cluster", + get: func(m DistMetrics) int64 { return m.MembersAlive }, + }, + { + name: "dist.members.suspect", unit: unitNode, counter: false, + desc: "Currently suspect members in the cluster", + get: func(m DistMetrics) int64 { return m.MembersSuspect }, + }, + { + name: "dist.members.dead", unit: unitNode, counter: false, + desc: "Currently dead members in the cluster", + get: func(m DistMetrics) int64 { return m.MembersDead }, + }, + { + name: "dist.membership.version", unit: unitVersion, counter: false, + desc: "Membership version (monotonic, increments on changes)", + get: func(m DistMetrics) int64 { return int64(m.MembershipVersion) }, + }, +} + +// installTelemetryDefaults wires the no-op fallbacks for logger, +// tracer, and meter so every call site downstream can use them without +// nil-checks; binds node identity onto the logger so call sites don't +// have to re-attach it on every record; and registers the OTel metric +// callback. Extracted from NewDistMemory to keep that constructor +// under the function-length lint cap. +func (dm *DistMemory) installTelemetryDefaults() { + if dm.logger == nil { + dm.logger = slog.New(slog.DiscardHandler) + } + + dm.logger = dm.logger.With( + slog.String("component", "dist_memory"), + slog.String("node_id", dm.nodeID), + ) + + if dm.tracer == nil { + dm.tracer = noop.NewTracerProvider().Tracer(distTracerName) + } + + if dm.meter == nil { + dm.meter = metricnoop.NewMeterProvider().Meter(distTracerName) + } + + dm.setupOTelMetrics() +} + +// distMetricInstruments holds the OTel observable instruments created +// for each distMetricSpec. Indexed by spec position so the callback +// can pair each spec back with its instrument; counter/gauge maps are +// disjoint. +type distMetricInstruments struct { + counters map[int]metric.Int64ObservableCounter + gauges map[int]metric.Int64ObservableGauge + instruments []metric.Observable +} + +// createInstrument turns one spec into the appropriate observable +// instrument and stashes it on `inst`. Errors are logged and the spec +// is skipped — registration glue must never abort cache startup. +func (dm *DistMemory) createInstrument(idx int, spec distMetricSpec, inst *distMetricInstruments) { + if spec.counter { + counter, err := dm.meter.Int64ObservableCounter(spec.name, + metric.WithDescription(spec.desc), metric.WithUnit(spec.unit)) + if err != nil { + dm.logger.Error("dist meter: counter registration failed", + slog.String("metric", spec.name), slog.Any("err", err)) + + return + } + + inst.counters[idx] = counter + inst.instruments = append(inst.instruments, counter) + + return + } + + gauge, err := dm.meter.Int64ObservableGauge(spec.name, + metric.WithDescription(spec.desc), metric.WithUnit(spec.unit)) + if err != nil { + dm.logger.Error("dist meter: gauge registration failed", + slog.String("metric", spec.name), slog.Any("err", err)) + + return + } + + inst.gauges[idx] = gauge + inst.instruments = append(inst.instruments, gauge) +} + +// setupOTelMetrics registers every distMetricSpec as an observable +// instrument and wires a single callback that observes all of them +// from one Metrics() snapshot per collection cycle. Failures are +// logged (not returned) because metric registration is library-side +// glue: the cache must remain functional even if the meter pipeline +// rejects an instrument. +func (dm *DistMemory) setupOTelMetrics() { + inst := &distMetricInstruments{ + counters: make(map[int]metric.Int64ObservableCounter, len(distMetricSpecs)), + gauges: make(map[int]metric.Int64ObservableGauge, len(distMetricSpecs)), + instruments: make([]metric.Observable, 0, len(distMetricSpecs)), + } + + for i, spec := range distMetricSpecs { + dm.createInstrument(i, spec, inst) + } + + if len(inst.instruments) == 0 { + return + } + + reg, regErr := dm.meter.RegisterCallback( + func(_ context.Context, observer metric.Observer) error { + snapshot := dm.Metrics() + + for i, spec := range distMetricSpecs { + if c, ok := inst.counters[i]; ok { + observer.ObserveInt64(c, spec.get(snapshot)) + + continue + } + + if g, ok := inst.gauges[i]; ok { + observer.ObserveInt64(g, spec.get(snapshot)) + } + } + + return nil + }, + inst.instruments..., + ) + if regErr != nil { + dm.logger.Error("dist meter: callback registration failed", slog.Any("err", regErr)) + + return + } + + dm.metricRegistration = reg +} + // applySet stores item locally and optionally replicates to other owners. // replicate indicates whether replication fan-out should occur (false for replica writes). func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate bool) { @@ -3017,10 +3680,32 @@ func (dm *DistMemory) applySet(ctx context.Context, item *cache.Item, replicate continue } - _ = transport.ForwardSet(ctx, string(oid), item, false) + _ = dm.replicateSetWithSpan(ctx, transport, oid, item) } } +// replicateSetWithSpan opens a child span around a single replica +// ForwardSet so operators can attribute fan-out latency per peer, and +// returns the underlying transport error for callers that count acks +// or queue hints. Used by both applySet's incoming-replica fan-out and +// the primary Set's replicateTo loop, so the trace tree shape is the +// same regardless of which path drove the replication. +func (dm *DistMemory) replicateSetWithSpan(ctx context.Context, transport DistTransport, oid cluster.NodeID, item *cache.Item) error { + ctx, span := dm.tracer.Start( + ctx, "dist.replicate.set", + trace.WithAttributes(attribute.String("peer.id", string(oid))), + ) + defer span.End() + + err := transport.ForwardSet(ctx, string(oid), item, false) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + // recordOriginalPrimary stores first-seen primary owner for key. func (dm *DistMemory) recordOriginalPrimary(sh *distShard, key string) { if sh == nil || sh.originalPrimary == nil || dm.ring == nil { @@ -3102,7 +3787,24 @@ func (dm *DistMemory) applyRemove(ctx context.Context, key string, replicate boo continue } - _ = transport.ForwardRemove(ctx, string(oid), key, false) + dm.replicateRemoveWithSpan(ctx, transport, oid, key) + } +} + +// replicateRemoveWithSpan mirrors replicateSetWithSpan for tombstone +// fan-out — keeps the trace tree symmetric so a Set followed by a +// Remove has the same shape under tracing. +func (dm *DistMemory) replicateRemoveWithSpan(ctx context.Context, transport DistTransport, oid cluster.NodeID, key string) { + ctx, span := dm.tracer.Start( + ctx, "dist.replicate.remove", + trace.WithAttributes(attribute.String("peer.id", string(oid))), + ) + defer span.End() + + err := transport.ForwardRemove(ctx, string(oid), key, false) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } } @@ -3153,6 +3855,13 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if dm.membership.Remove(node.ID) { dm.metrics.nodesRemoved.Add(1) dm.metrics.nodesDead.Add(1) + + dm.logger.Warn( + "peer pruned (dead)", + slog.String("peer_id", string(node.ID)), + slog.String("peer_addr", node.Address), + slog.Duration("elapsed_since_seen", elapsed), + ) } return @@ -3161,6 +3870,12 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if dm.hbSuspectAfter > 0 && elapsed > dm.hbSuspectAfter && node.State == cluster.NodeAlive { // suspect dm.membership.Mark(node.ID, cluster.NodeSuspect) dm.metrics.nodesSuspect.Add(1) + + dm.logger.Info( + "peer marked suspect (timeout)", + slog.String("peer_id", string(node.ID)), + slog.Duration("elapsed_since_seen", elapsed), + ) } transport := dm.loadTransport() @@ -3179,6 +3894,12 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if node.State == cluster.NodeAlive { // escalate dm.membership.Mark(node.ID, cluster.NodeSuspect) dm.metrics.nodesSuspect.Add(1) + + dm.logger.Info( + "peer marked suspect (probe failed)", + slog.String("peer_id", string(node.ID)), + slog.Any("err", err), + ) } return diff --git a/pkg/backend/dist_memory_test_helpers.go b/pkg/backend/dist_memory_test_helpers.go index d5b9b4c..faf9309 100644 --- a/pkg/backend/dist_memory_test_helpers.go +++ b/pkg/backend/dist_memory_test_helpers.go @@ -31,6 +31,7 @@ func (dm *DistMemory) EnableHTTPForTest(ctx context.Context) { server := newDistHTTPServer(dm.nodeAddr, limits, dm.httpAuth) server.ctx = dm.lifeCtx // handler-side cancellation tied to Stop + server.logger = dm.logger err := server.start(ctx, dm) if err != nil { diff --git a/pkg/middleware/otel_tracing.go b/pkg/middleware/otel_tracing.go index 9d37bbb..9d72a81 100644 --- a/pkg/middleware/otel_tracing.go +++ b/pkg/middleware/otel_tracing.go @@ -56,7 +56,8 @@ func (mw OTelTracingMiddleware) Set(ctx context.Context, key string, value any, ctx, span := mw.startSpan( ctx, "hypercache.Set", attribute.Int(attrs.AttrKeyLength, len(key)), - attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds())) + attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds()), + ) defer span.End() err := mw.next.Set(ctx, key, value, expiration) @@ -72,7 +73,8 @@ func (mw OTelTracingMiddleware) GetOrSet(ctx context.Context, key string, value ctx, span := mw.startSpan( ctx, "hypercache.GetOrSet", attribute.Int(attrs.AttrKeyLength, len(key)), - attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds())) + attribute.Int64(attrs.AttrExpirationMS, expiration.Milliseconds()), + ) defer span.End() v, err := mw.next.GetOrSet(ctx, key, value, expiration) diff --git a/pkg/stats/histogramcollector_test.go b/pkg/stats/histogramcollector_test.go index a457e7b..a12bc59 100644 --- a/pkg/stats/histogramcollector_test.go +++ b/pkg/stats/histogramcollector_test.go @@ -268,7 +268,7 @@ func TestHistogramStatsCollector_GetStatsSnapshotIsolated(t *testing.T) { // keeps memory usage flat under sustained recording. The previous // implementation appended forever and would grow unbounded. // -//nolint:revive +//nolint:nolintlint,revive func TestHistogramStatsCollector_NoMemoryLeak(t *testing.T) { t.Parallel() diff --git a/tests/dist_http_auth_test.go b/tests/dist_http_auth_test.go index 565714f..88a85db 100644 --- a/tests/dist_http_auth_test.go +++ b/tests/dist_http_auth_test.go @@ -30,7 +30,8 @@ func newAuthDistNode(t *testing.T, auth backend.DistHTTPAuth) *backend.DistMemor ctx := context.Background() addr := AllocatePort(t) - bi, err := backend.NewDistMemory(ctx, + bi, err := backend.NewDistMemory( + ctx, backend.WithDistNode("auth-test", addr), backend.WithDistReplication(1), backend.WithDistHTTPAuth(auth), @@ -193,7 +194,8 @@ func TestDistHTTPAuth_AcceptsValidToken(t *testing.T) { func newAuthReplicatedNode(t *testing.T, id, addr string, seeds []string, auth backend.DistHTTPAuth) *backend.DistMemory { t.Helper() - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistNode(id, addr), backend.WithDistSeeds(seeds), backend.WithDistReplication(2), @@ -286,7 +288,8 @@ func TestDistHTTPAuth_RejectsClientSignOnlyConfig(t *testing.T) { addr := AllocatePort(t) - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistNode("auth-reject", addr), backend.WithDistReplication(1), backend.WithDistHTTPAuth(backend.DistHTTPAuth{ diff --git a/tests/dist_http_lifecycle_test.go b/tests/dist_http_lifecycle_test.go index c0a8656..610b39b 100644 --- a/tests/dist_http_lifecycle_test.go +++ b/tests/dist_http_lifecycle_test.go @@ -23,7 +23,8 @@ func TestDistMemory_HandlerCtxCancelsOnStop(t *testing.T) { ctx := context.Background() addr := AllocatePort(t) - bi, err := backend.NewDistMemory(ctx, + bi, err := backend.NewDistMemory( + ctx, backend.WithDistNode("life-test", addr), backend.WithDistReplication(1), ) diff --git a/tests/dist_http_limits_test.go b/tests/dist_http_limits_test.go index 99c5e86..001a5c1 100644 --- a/tests/dist_http_limits_test.go +++ b/tests/dist_http_limits_test.go @@ -27,7 +27,8 @@ func TestDistHTTPServer_RejectsOversizedBody(t *testing.T) { ctx := context.Background() addr := AllocatePort(t) - bi, err := backend.NewDistMemory(ctx, + bi, err := backend.NewDistMemory( + ctx, backend.WithDistNode("oversized-server", addr), backend.WithDistReplication(1), backend.WithDistHTTPLimits(backend.DistHTTPLimits{BodyLimit: tinyBodyLimit}), @@ -123,7 +124,8 @@ func TestDistHTTPLimits_DefaultsApply(t *testing.T) { ctx := context.Background() addr := AllocatePort(t) - bi, err := backend.NewDistMemory(ctx, + bi, err := backend.NewDistMemory( + ctx, backend.WithDistNode("default-limits", addr), backend.WithDistReplication(1), ) @@ -160,7 +162,8 @@ func TestDistHTTPServer_StopRespectsCanceledContext(t *testing.T) { ctx := context.Background() addr := AllocatePort(t) - bi, err := backend.NewDistMemory(ctx, + bi, err := backend.NewDistMemory( + ctx, backend.WithDistNode("stop-test", addr), backend.WithDistReplication(1), ) diff --git a/tests/dist_http_tls_test.go b/tests/dist_http_tls_test.go index 1d1337c..8f6e21a 100644 --- a/tests/dist_http_tls_test.go +++ b/tests/dist_http_tls_test.go @@ -76,7 +76,8 @@ func generateTLSConfig(t *testing.T) *tls.Config { func newTLSNode(t *testing.T, id, addr string, seeds []string, tlsConfig *tls.Config) *backend.DistMemory { t.Helper() - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistNode(id, addr), backend.WithDistSeeds(seeds), backend.WithDistReplication(2), @@ -161,7 +162,8 @@ func TestDistHTTPTLS_PlaintextPeerRejected(t *testing.T) { tlsConfig := generateTLSConfig(t) addr := AllocatePort(t) - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistNode("tls-only", addr), backend.WithDistReplication(1), backend.WithDistHTTPLimits(backend.DistHTTPLimits{TLSConfig: tlsConfig}), diff --git a/tests/dist_logging_test.go b/tests/dist_logging_test.go new file mode 100644 index 0000000..e30e135 --- /dev/null +++ b/tests/dist_logging_test.go @@ -0,0 +1,189 @@ +package tests + +import ( + "context" + "log/slog" + "sync" + "testing" + + "github.com/hyp3rd/hypercache/pkg/backend" +) + +// captureStore is the shared backing storage for captureHandler clones +// produced via WithAttrs / WithGroup. slog calls WithAttrs to install +// the component/node_id attrs bound in NewDistMemory; that returned +// clone still needs to write into the original test's records slice, +// not its own. Sharing the store via a pointer is the simplest way to +// keep that invariant. +type captureStore struct { + mu sync.Mutex + records []slog.Record +} + +// captureHandler is a minimal slog.Handler that records every emitted +// record into a shared captureStore. Used to assert that DistMemory +// emits the expected structured-log events without text scraping. +type captureHandler struct { + store *captureStore + attrs []slog.Attr +} + +func newCaptureHandler() *captureHandler { return &captureHandler{store: &captureStore{}} } + +func (*captureHandler) Enabled(_ context.Context, _ slog.Level) bool { return true } + +func (h *captureHandler) Handle(_ context.Context, r slog.Record) error { + // Re-attach the WithAttrs-prefixed attrs onto the record so the + // caller can assert against them — slog routes With(...) calls + // through WithAttrs and those attrs would otherwise live only on + // the cloned handler, not the captured Record. + cloned := r.Clone() + for _, a := range h.attrs { + cloned.AddAttrs(a) + } + + h.store.mu.Lock() + defer h.store.mu.Unlock() + + h.store.records = append(h.store.records, cloned) + + return nil +} + +func (h *captureHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + merged := make([]slog.Attr, 0, len(h.attrs)+len(attrs)) + + merged = append(merged, h.attrs...) + merged = append(merged, attrs...) + + return &captureHandler{store: h.store, attrs: merged} +} + +func (h *captureHandler) WithGroup(_ string) slog.Handler { return h } + +// snapshot returns a copy of the recorded events under lock. +func (h *captureHandler) snapshot() []slog.Record { + h.store.mu.Lock() + defer h.store.mu.Unlock() + + out := make([]slog.Record, len(h.store.records)) + copy(out, h.store.records) + + return out +} + +// recordHasAttr is a small helper for the asserts below. slog.Record's +// Attrs is iterator-style (closure), so a one-line check needs a helper. +func recordHasAttr(r slog.Record, key, want string) bool { + found := false + + r.Attrs(func(a slog.Attr) bool { + if a.Key == key && a.Value.String() == want { + found = true + + return false + } + + return true + }) + + return found +} + +// TestDistMemory_LoggerEmitsListenerStart is the contract test for Phase +// A.1 logging: when a caller supplies a logger via WithDistLogger, the +// dist backend must emit a structured `dist HTTP listener started` +// record carrying the node identity bound at construction. +// +// Picking the listener-start event is deliberate — it fires on every +// successful node start, so the test is deterministic. Other log sites +// (peer suspect, hint dropped, migration failure) take the same plumbing +// and would only test the same wiring twice. +func TestDistMemory_LoggerEmitsListenerStart(t *testing.T) { + t.Parallel() + + addr := AllocatePort(t) + handler := newCaptureHandler() + logger := slog.New(handler) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("log-test-A", addr), + backend.WithDistReplication(1), + backend.WithDistLogger(logger), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + records := handler.snapshot() + + var startRec *slog.Record + + for i := range records { + if records[i].Message == "dist HTTP listener started" { + startRec = &records[i] + + break + } + } + + if startRec == nil { + t.Fatalf("expected `dist HTTP listener started` record; got %d records", len(records)) + } + + if startRec.Level != slog.LevelInfo { + t.Fatalf("expected Info level, got %v", startRec.Level) + } + + // node_id and component are bound at construction via .With(...) — + // recordHasAttr checks the per-record attrs, but slog routes the + // With-bound attrs through WithAttrs on the handler. The capture + // handler re-attaches them in Handle so the assertion below is + // against the merged set. + if !recordHasAttr(*startRec, "node_id", "log-test-A") { + t.Fatalf("expected node_id=log-test-A attr on record, got attrs missing it") + } + + if !recordHasAttr(*startRec, "component", "dist_memory") { + t.Fatalf("expected component=dist_memory attr on record") + } + + if !recordHasAttr(*startRec, "addr", addr) { + t.Fatalf("expected addr=%s attr on record", addr) + } +} + +// TestDistMemory_LoggerDefaultIsSilent confirms the zero-value path +// (no WithDistLogger) does not write anywhere — important because +// library code must not pollute the application's stderr by default. +// We assert this indirectly by constructing without a logger and +// verifying construction succeeds (the discard handler is a no-op). +func TestDistMemory_LoggerDefaultIsSilent(t *testing.T) { + t.Parallel() + + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("log-default", addr), + backend.WithDistReplication(1), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) +} diff --git a/tests/dist_metrics_test.go b/tests/dist_metrics_test.go new file mode 100644 index 0000000..a57181e --- /dev/null +++ b/tests/dist_metrics_test.go @@ -0,0 +1,215 @@ +package tests + +import ( + "context" + "testing" + "time" + + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// recordingMeterProvider builds a meter provider backed by a manual +// reader so the test can synchronously trigger collection and inspect +// the produced metrics. Mirrors recordingTracerProvider in the tracing +// test — same pattern, separate signal. +func recordingMeterProvider() (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { + reader := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + + return mp, reader +} + +// firstInt64DataPoint pulls the first data point from a Sum or Gauge +// aggregation. Returns 0,false when the aggregation is some other type +// (e.g. Histogram) or has no points yet. Extracted as a free helper so +// findMetricInt64 stays under the cognitive-complexity cap. +func firstInt64DataPoint(data metricdata.Aggregation) (int64, bool) { + switch agg := data.(type) { + case metricdata.Sum[int64]: + if len(agg.DataPoints) > 0 { + return agg.DataPoints[0].Value, true + } + + case metricdata.Gauge[int64]: + if len(agg.DataPoints) > 0 { + return agg.DataPoints[0].Value, true + } + + default: + // Other aggregations (Histogram, ExponentialHistogram, etc.) + // are not produced by the dist backend today — fall through. + } + + return 0, false +} + +// findMetricInt64 walks a collected ResourceMetrics looking for the +// first data point of an Int64-typed metric matching name. Returns +// (value, true) when found, (0, false) otherwise. +func findMetricInt64(rm metricdata.ResourceMetrics, name string) (int64, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name != name { + continue + } + + return firstInt64DataPoint(m.Data) + } + } + + return 0, false +} + +// TestDistMemory_MetricsExportsObservableCounters is the contract test +// for Phase A.3: when a meter provider is supplied, a Set + Get +// produces an observable `dist.write.attempts` counter that reflects +// the work done. Counter must increment monotonically. +// +// We exercise the most-load-bearing counter (write attempts) here; +// other 40+ instruments share the same registration mechanism, so a +// single working counter validates the whole table. +func TestDistMemory_MetricsExportsObservableCounters(t *testing.T) { + t.Parallel() + + mp, reader := recordingMeterProvider() + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("metric-A", addr), + backend.WithDistReplication(1), + backend.WithDistMeterProvider(mp), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + ctx := context.Background() + + for i := range 3 { + setErr := dm.Set(ctx, &cache.Item{ + Key: "metric-key", + Value: []byte{byte(i)}, + Version: uint64(i + 1), + Origin: "metric-A", + LastUpdated: time.Now(), + }) + if setErr != nil { + t.Fatalf("set: %v", setErr) + } + } + + var rm metricdata.ResourceMetrics + + collectErr := reader.Collect(ctx, &rm) + if collectErr != nil { + t.Fatalf("collect: %v", collectErr) + } + + got, found := findMetricInt64(rm, "dist.write.attempts") + if !found { + t.Fatalf("expected `dist.write.attempts` counter; not exported") + } + + if got != 3 { + t.Fatalf("expected dist.write.attempts=3 after 3 Sets, got %d", got) + } +} + +// TestDistMemory_MetricsExportsObservableGauges asserts the gauge path +// works alongside counters: `dist.members.alive` reflects current +// cluster state. The standalone DistMemory we build here has membership +// of 1 (itself), so the gauge must report 1 — proves the membership +// snapshot path inside the metric callback runs and surfaces live +// values, not just at-construction state. +func TestDistMemory_MetricsExportsObservableGauges(t *testing.T) { + t.Parallel() + + mp, reader := recordingMeterProvider() + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("metric-gauge-A", addr), + backend.WithDistReplication(1), + backend.WithDistMeterProvider(mp), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + var rm metricdata.ResourceMetrics + + collectErr := reader.Collect(context.Background(), &rm) + if collectErr != nil { + t.Fatalf("collect: %v", collectErr) + } + + got, found := findMetricInt64(rm, "dist.members.alive") + if !found { + t.Fatalf("expected `dist.members.alive` gauge; not exported") + } + + if got != 1 { + t.Fatalf("expected dist.members.alive=1 (self only), got %d", got) + } +} + +// TestDistMemory_MetricsDefaultIsNoop confirms the zero-value path +// (no WithDistMeterProvider) produces no metrics — symmetric with the +// tracing default-noop test. Asserted by constructing without a meter +// provider and verifying the cache still works. +func TestDistMemory_MetricsDefaultIsNoop(t *testing.T) { + t.Parallel() + + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("metric-default", addr), + backend.WithDistReplication(1), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + setErr := dm.Set(context.Background(), &cache.Item{ + Key: "default-metric-key", + Value: []byte("v"), + Version: 1, + Origin: "metric-default", + LastUpdated: time.Now(), + }) + if setErr != nil { + t.Fatalf("set: %v", setErr) + } + + if it, ok := dm.Get(context.Background(), "default-metric-key"); !ok || it == nil { + t.Fatalf("Get returned nothing for the key we just set") + } +} diff --git a/tests/dist_tracing_test.go b/tests/dist_tracing_test.go new file mode 100644 index 0000000..2d0b31a --- /dev/null +++ b/tests/dist_tracing_test.go @@ -0,0 +1,275 @@ +package tests + +import ( + "context" + "testing" + "time" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// recordingTracerProvider builds an in-memory tracer provider whose spans +// can be inspected via the returned recorder. Used by the assertions +// below to verify DistMemory emits the expected span tree without +// running an exporter. +func recordingTracerProvider() (*sdktrace.TracerProvider, *tracetest.SpanRecorder) { + rec := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + + return tp, rec +} + +// findSpan returns the first recorded span with the given name, or nil +// if none was emitted. Spans accumulate in the recorder across the +// whole test, so `findSpan` collapses the search behind a name match +// for readable assertions. +func findSpan(rec *tracetest.SpanRecorder, name string) sdktrace.ReadOnlySpan { + for _, s := range rec.Ended() { + if s.Name() == name { + return s + } + } + + return nil +} + +// spanAttr returns the string form of the named attribute on span, or +// the empty string when not present. The string-only return is +// deliberate — the assertions below check well-known attribute keys +// we emit ourselves, not arbitrary user attrs, so a typed accessor +// would just expand the helper without buying anything. +func spanAttr(span sdktrace.ReadOnlySpan, key string) string { + if span == nil { + return "" + } + + for _, a := range span.Attributes() { + if string(a.Key) == key { + return a.Value.Emit() + } + } + + return "" +} + +// newSingleNodeTracedDist is the constructor shared by the per-op +// tracing tests below. Replication=1 keeps the trace tree shallow so +// per-op assertions don't have to filter out fan-out spans — those are +// exercised by TestDistMemory_TracingEmitsReplicationChildSpans. +func newSingleNodeTracedDist(t *testing.T, id string) (*backend.DistMemory, *tracetest.SpanRecorder) { + t.Helper() + + tp, rec := recordingTracerProvider() + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode(id, addr), + backend.WithDistReplication(1), + backend.WithDistTracerProvider(tp), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + return dm, rec +} + +// TestDistMemory_TracingEmitsSetSpan is half of the Phase A.2 contract: +// when a tracer provider is supplied, Set emits a `dist.set` span +// carrying the configured write-consistency attribute. Split from the +// Get assertions so each test stays narrow. +func TestDistMemory_TracingEmitsSetSpan(t *testing.T) { + t.Parallel() + + dm, rec := newSingleNodeTracedDist(t, "trace-set-A") + + err := dm.Set(context.Background(), &cache.Item{ + Key: "trace-key", + Value: []byte("v"), + Version: 1, + Origin: "trace-set-A", + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set: %v", err) + } + + setSpan := findSpan(rec, "dist.set") + if setSpan == nil { + t.Fatal("expected a `dist.set` span; none recorded") + } + + if got := spanAttr(setSpan, "dist.consistency"); got != "QUORUM" { + t.Fatalf("expected dist.set consistency=QUORUM, got %q", got) + } +} + +// TestDistMemory_TracingEmitsGetSpansHitAndMiss is the other half of +// the Phase A.2 contract: Get emits one span per call carrying +// cache.hit, recording true on hit and false on miss. Both branches +// must fire so operators can distinguish hit-rate by traced +// dimensions, not just by aggregated metric counters. +func TestDistMemory_TracingEmitsGetSpansHitAndMiss(t *testing.T) { + t.Parallel() + + dm, rec := newSingleNodeTracedDist(t, "trace-get-A") + ctx := context.Background() + + setErr := dm.Set(ctx, &cache.Item{ + Key: "trace-key", + Value: []byte("v"), + Version: 1, + Origin: "trace-get-A", + LastUpdated: time.Now(), + }) + if setErr != nil { + t.Fatalf("set: %v", setErr) + } + + if it, ok := dm.Get(ctx, "trace-key"); !ok || it == nil { + t.Fatalf("Get returned nothing for the key we just set") + } + + if it, ok := dm.Get(ctx, "missing-key"); ok || it != nil { + t.Fatalf("Get returned a value for a key we never set") + } + + hitFound, missFound := false, false + + for _, s := range rec.Ended() { + if s.Name() != "dist.get" { + continue + } + + switch spanAttr(s, "cache.hit") { + case "true": + hitFound = true + case "false": + missFound = true + default: + // Span without a cache.hit attribute — should not happen + // in practice; treat as a contract violation. + t.Fatalf("dist.get span missing cache.hit attribute") + } + } + + if !hitFound || !missFound { + t.Fatalf("expected one hit + one miss across get spans (hit=%v miss=%v)", hitFound, missFound) + } +} + +// TestDistMemory_TracingEmitsReplicationChildSpans verifies that +// applySet's fan-out emits a `dist.replicate.set` child span per peer. +// Uses the deterministic in-process cluster helper (replication=2) +// instead of the HTTP transport so ring/membership setup is not racy +// — the previous HTTP+seeds variant of this test occasionally missed +// the fan-out window. +func TestDistMemory_TracingEmitsReplicationChildSpans(t *testing.T) { + t.Parallel() + + tp, rec := recordingTracerProvider() + ctx := context.Background() + + dc := SetupInProcessClusterRF(t, 2, 2, backend.WithDistTracerProvider(tp)) + + // Issue the Set on whichever node is the primary for our key — we + // pick a key, look up the primary, and write through that node so + // the fan-out path is exercised on the same DistMemory whose tracer + // we're recording. + const key = "rep-trace-key" + + owners := dc.Ring.Lookup(key) + if len(owners) < 2 { + t.Fatalf("expected ring to return >=2 owners for replication=2, got %d", len(owners)) + } + + var primary *backend.DistMemory + + for _, n := range dc.Nodes { + if n.LocalNodeID() == owners[0] { + primary = n + + break + } + } + + if primary == nil { + t.Fatalf("could not locate primary node for owners[0]=%s", owners[0]) + } + + err := primary.Set(ctx, &cache.Item{ + Key: key, + Value: []byte("v"), + Version: 1, + Origin: string(primary.LocalNodeID()), + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set: %v", err) + } + + replicateSpan := findSpan(rec, "dist.replicate.set") + if replicateSpan == nil { + t.Fatal("expected at least one `dist.replicate.set` child span; none recorded") + } + + if got := spanAttr(replicateSpan, "peer.id"); got == "" { + t.Fatalf("expected peer.id attr on replication span, got empty") + } +} + +// TestDistMemory_TracingDefaultIsNoop confirms the zero-value path +// (no WithDistTracerProvider) does not emit spans — important so +// library code never produces telemetry the operator didn't ask for. +// Asserted indirectly: the recorder never sees this DistMemory's spans +// because we never wired it up, and the cache still works. +func TestDistMemory_TracingDefaultIsNoop(t *testing.T) { + t.Parallel() + + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("trace-default", addr), + backend.WithDistReplication(1), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + ctx := context.Background() + + setErr := dm.Set(ctx, &cache.Item{ + Key: "default-key", + Value: []byte("v"), + Version: 1, + Origin: "trace-default", + LastUpdated: time.Now(), + }) + if setErr != nil { + t.Fatalf("set: %v", setErr) + } + + if it, ok := dm.Get(ctx, "default-key"); !ok || it == nil { + t.Fatalf("Get returned nothing for the key we just set") + } +} diff --git a/tests/hypercache_distmemory_failure_recovery_test.go b/tests/hypercache_distmemory_failure_recovery_test.go index 97a9a51..2844ec5 100644 --- a/tests/hypercache_distmemory_failure_recovery_test.go +++ b/tests/hypercache_distmemory_failure_recovery_test.go @@ -60,7 +60,8 @@ func assertNodeFailureMetrics(t *testing.T, m backend.DistMetrics) { func TestDistFailureRecovery(t *testing.T) { //nolint:paralleltest // mutates shared transport ctx := context.Background() - dc := SetupInProcessClusterRF(t, 2, 2, + dc := SetupInProcessClusterRF( + t, 2, 2, backend.WithDistReplication(2), backend.WithDistHeartbeat(15*time.Millisecond, 40*time.Millisecond, 90*time.Millisecond), backend.WithDistHintTTL(2*time.Minute), diff --git a/tests/hypercache_distmemory_heartbeat_sampling_test.go b/tests/hypercache_distmemory_heartbeat_sampling_test.go index 0d21778..78466e4 100644 --- a/tests/hypercache_distmemory_heartbeat_sampling_test.go +++ b/tests/hypercache_distmemory_heartbeat_sampling_test.go @@ -81,7 +81,8 @@ func TestHeartbeatSamplingAndTransitions(t *testing.T) { //nolint:paralleltest / // under shuffle. Previous values (interval=15ms, dead=90ms) were tight // enough that under heavy parallel test load the heartbeat goroutine could // starve and never advance the dead transition within deadline. - b1 := newDistPeerNode(t, membership, transport, "n1", + b1 := newDistPeerNode( + t, membership, transport, "n1", backend.WithDistHeartbeat(80*time.Millisecond, 320*time.Millisecond, 640*time.Millisecond), backend.WithDistHeartbeatSample(0), ) diff --git a/tests/hypercache_distmemory_heartbeat_test.go b/tests/hypercache_distmemory_heartbeat_test.go index b35d566..4cd75fb 100644 --- a/tests/hypercache_distmemory_heartbeat_test.go +++ b/tests/hypercache_distmemory_heartbeat_test.go @@ -155,7 +155,8 @@ func TestDistMemoryHeartbeatLiveness(t *testing.T) { //nolint:paralleltest // mu membership := cluster.NewMembership(ring) transport := backend.NewInProcessTransport() - b1 := newDistPeerNode(t, membership, transport, "n1", + b1 := newDistPeerNode( + t, membership, transport, "n1", backend.WithDistHeartbeat(interval, suspectAfter, deadAfter), ) b2 := newDistPeerNode(t, membership, transport, "n2") diff --git a/tests/hypercache_distmemory_hint_caps_test.go b/tests/hypercache_distmemory_hint_caps_test.go index b7afeae..7e88e22 100644 --- a/tests/hypercache_distmemory_hint_caps_test.go +++ b/tests/hypercache_distmemory_hint_caps_test.go @@ -21,7 +21,8 @@ func TestHintGlobalCaps(t *testing.T) { //nolint:paralleltest n1 := cluster.NewNode("", "n1") n2 := cluster.NewNode("", "n2") - b1i, _ := backend.NewDistMemory(ctx, + b1i, _ := backend.NewDistMemory( + ctx, backend.WithDistMembership(membership, n1), backend.WithDistTransport(transport), backend.WithDistReplication(2), @@ -32,7 +33,8 @@ func TestHintGlobalCaps(t *testing.T) { //nolint:paralleltest backend.WithDistHintMaxTotal(3), // very small global caps backend.WithDistHintMaxBytes(64), ) - b2i, _ := backend.NewDistMemory(ctx, + b2i, _ := backend.NewDistMemory( + ctx, backend.WithDistMembership(membership, n2), backend.WithDistTransport(transport), backend.WithDistReplication(2), diff --git a/tests/hypercache_distmemory_hinted_handoff_test.go b/tests/hypercache_distmemory_hinted_handoff_test.go index 702b080..f61f1d5 100644 --- a/tests/hypercache_distmemory_hinted_handoff_test.go +++ b/tests/hypercache_distmemory_hinted_handoff_test.go @@ -58,7 +58,8 @@ func newHintedHandoffNode(t *testing.T, m *cluster.Membership, id string, baseOp opts := make([]backend.DistMemoryOption, 0, len(baseOpts)+2) opts = append(opts, baseOpts...) - opts = append(opts, + opts = append( + opts, backend.WithDistNode(id, id), backend.WithDistMembership(m, cluster.NewNode(id, id)), ) diff --git a/tests/hypercache_distmemory_stale_quorum_test.go b/tests/hypercache_distmemory_stale_quorum_test.go index 2398b6a..1c5cd01 100644 --- a/tests/hypercache_distmemory_stale_quorum_test.go +++ b/tests/hypercache_distmemory_stale_quorum_test.go @@ -26,7 +26,8 @@ func pickNonAheadRequester(dc *DistCluster, ahead *backend.DistMemory) *backend. func TestDistMemoryStaleQuorum(t *testing.T) { t.Parallel() - dc := SetupInProcessCluster(t, 3, + dc := SetupInProcessCluster( + t, 3, backend.WithDistReadConsistency(backend.ConsistencyQuorum), ) diff --git a/tests/hypercache_distmemory_tiebreak_test.go b/tests/hypercache_distmemory_tiebreak_test.go index 1d228eb..f49a3d4 100644 --- a/tests/hypercache_distmemory_tiebreak_test.go +++ b/tests/hypercache_distmemory_tiebreak_test.go @@ -14,7 +14,8 @@ import ( func TestDistMemoryVersionTieBreak(t *testing.T) { //nolint:paralleltest // mutates shared transport const interval = 5 * time.Millisecond - dc := SetupInProcessCluster(t, 3, + dc := SetupInProcessCluster( + t, 3, backend.WithDistReplication(3), backend.WithDistHeartbeat(interval, 0, 0), backend.WithDistReadConsistency(backend.ConsistencyQuorum), diff --git a/tests/hypercache_distmemory_versioning_test.go b/tests/hypercache_distmemory_versioning_test.go index f99b637..efd6505 100644 --- a/tests/hypercache_distmemory_versioning_test.go +++ b/tests/hypercache_distmemory_versioning_test.go @@ -54,7 +54,8 @@ func findOrderedThreeOwnerKeyPrefix(node *backend.DistMemory, want []cluster.Nod func TestDistMemoryVersioningQuorum(t *testing.T) { //nolint:paralleltest // mutates shared transport const interval = 10 * time.Millisecond - dc := SetupInProcessCluster(t, 3, + dc := SetupInProcessCluster( + t, 3, backend.WithDistReplication(3), backend.WithDistHeartbeat(interval, 0, 0), backend.WithDistReadConsistency(backend.ConsistencyQuorum), diff --git a/tests/hypercache_distmemory_write_quorum_test.go b/tests/hypercache_distmemory_write_quorum_test.go index 9a71ea9..4c84545 100644 --- a/tests/hypercache_distmemory_write_quorum_test.go +++ b/tests/hypercache_distmemory_write_quorum_test.go @@ -17,7 +17,8 @@ import ( func TestWriteQuorumSuccess(t *testing.T) { t.Parallel() - dc := SetupInProcessClusterRF(t, 3, 2, + dc := SetupInProcessClusterRF( + t, 3, 2, backend.WithDistReplication(2), backend.WithDistWriteConsistency(backend.ConsistencyQuorum), ) @@ -43,7 +44,8 @@ func TestWriteQuorumSuccess(t *testing.T) { func TestWriteQuorumFailure(t *testing.T) { t.Parallel() - dc := SetupInProcessCluster(t, 3, + dc := SetupInProcessCluster( + t, 3, backend.WithDistReplication(3), backend.WithDistWriteConsistency(backend.ConsistencyAll), backend.WithDistHintTTL(time.Minute), diff --git a/tests/hypercache_http_merkle_test.go b/tests/hypercache_http_merkle_test.go index d2fbf18..3b4b77b 100644 --- a/tests/hypercache_http_merkle_test.go +++ b/tests/hypercache_http_merkle_test.go @@ -24,7 +24,8 @@ func newHTTPMerkleNode(t *testing.T, membership *cluster.Membership, id, addr st node := cluster.NewNode("", addr) - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistMembership(membership, node), backend.WithDistNode(id, addr), backend.WithDistMerkleChunkSize(2), diff --git a/tests/hypercache_mgmt_dist_test.go b/tests/hypercache_mgmt_dist_test.go index 77bcb56..437059e 100644 --- a/tests/hypercache_mgmt_dist_test.go +++ b/tests/hypercache_mgmt_dist_test.go @@ -41,7 +41,8 @@ func TestManagementHTTPDistMemory(t *testing.T) { //nolint:paralleltest // mgmt t.Fatalf("NewConfig: %v", err) } - cfg.HyperCacheOptions = append(cfg.HyperCacheOptions, + cfg.HyperCacheOptions = append( + cfg.HyperCacheOptions, hypercache.WithManagementHTTP[backend.DistMemory]("127.0.0.1:0"), ) cfg.DistMemoryOptions = []backend.DistMemoryOption{ diff --git a/tests/integration/dist_phase1_test.go b/tests/integration/dist_phase1_test.go index 7e298de..58cef1f 100644 --- a/tests/integration/dist_phase1_test.go +++ b/tests/integration/dist_phase1_test.go @@ -37,7 +37,8 @@ func allocatePort(tb testing.TB) string { func makePhase1Node(ctx context.Context, t *testing.T, id, addr string, seeds []string) *backend.DistMemory { t.Helper() - bm, err := backend.NewDistMemory(ctx, + bm, err := backend.NewDistMemory( + ctx, backend.WithDistNode(id, addr), backend.WithDistSeeds(seeds), backend.WithDistReplication(3), diff --git a/tests/management_http_test.go b/tests/management_http_test.go index 1870151..9bfc687 100644 --- a/tests/management_http_test.go +++ b/tests/management_http_test.go @@ -40,7 +40,8 @@ func TestManagementHTTP_BasicEndpoints(t *testing.T) { t.Fatalf("NewConfig: %v", err) } - cfg.HyperCacheOptions = append(cfg.HyperCacheOptions, + cfg.HyperCacheOptions = append( + cfg.HyperCacheOptions, hypercache.WithEvictionInterval[backend.InMemory](0), hypercache.WithManagementHTTP[backend.InMemory]("127.0.0.1:0"), ) diff --git a/tests/merkle_node_helper.go b/tests/merkle_node_helper.go index 345cd1b..03a7016 100644 --- a/tests/merkle_node_helper.go +++ b/tests/merkle_node_helper.go @@ -16,7 +16,8 @@ import ( func newMerkleNode(t *testing.T, transport *backend.InProcessTransport, id string) *backend.DistMemory { t.Helper() - bi, err := backend.NewDistMemory(context.Background(), + bi, err := backend.NewDistMemory( + context.Background(), backend.WithDistNode(id, AllocatePort(t)), backend.WithDistReplication(1), backend.WithDistMerkleChunkSize(2),