diff --git a/.gitignore b/.gitignore index bfe5fa0..0eeb3d7 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ _testmain.go /dogstatsd /datadog/testdata/fuzz +.claude/settings.local.json diff --git a/HISTORY.md b/HISTORY.md index d9ddffd..1aececf 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,76 @@ Apply 'go fix ./...' on the codebase. Several references to interface{} have been replaced by `any`; there should be no API or performance differences. +Add full OpenTelemetry OTLP exporter support with official SDK integration. + +**New Feature: OpenTelemetry OTLP Exporter (SDKHandler)** + +The `otlp` package now includes a production-ready `SDKHandler` that uses the +official OpenTelemetry SDK with comprehensive support for modern observability +requirements: + +- **Dual Transport Support**: Both gRPC and HTTP/Protobuf protocols +- **Environment Variables**: Support for the standard `OTEL_*` environment + variables including `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_PROTOCOL` + (and the `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL` override), and + `OTEL_RESOURCE_ATTRIBUTES` +- **Resource Detection**: Host and process metadata plus environment attributes + by default; cloud (AWS, GCP, Azure) and Kubernetes detection is opt-in via the + `contrib/detectors/*` packages, with examples in the package README +- **All Metric Types**: Counter, Gauge, and Histogram with proper semantics +- **Exponential Histograms**: Optional support for exponential histogram aggregation + with configurable bucket size and scale +- **Temporality Configuration**: Configurable metric temporality (cumulative or delta) + with cumulative as the default for Prometheus compatibility +- **Tag Preservation**: Automatic conversion of stats tags to OpenTelemetry attributes +- **Production Ready**: Thread-safe instrument caching, proper context handling, + and comprehensive error handling + +**Usage Example:** + +```go +import ( + "context" + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +// Simple usage with environment variables +handler, err := otlp.NewSDKHandlerFromEnv(ctx) +if err != nil { + log.Fatal(err) +} +defer handler.Shutdown(ctx) +stats.Register(handler) + +// Or with explicit configuration +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", +}) +``` + +**Implementation Details:** + +- Gauges use native `Float64Gauge` instrument for instantaneous value recording +- Background context for metric recording to prevent context cancellation issues +- Efficient two-level locking pattern for instrument caching (read locks in hot path) +- Cumulative temporality by default (Prometheus-compatible) +- Comprehensive documentation including cloud resource detector examples +- `SDKConfig.EndpointURL` takes a full URL including the `http://` or `https://` scheme +- SDK defaults are used when unset (ExportInterval: 60s, ExportTimeout: 30s) + +**Deprecated:** + +- `otlp.Handler` is now deprecated in favor of `otlp.SDKHandler` (will be removed in v6.0.0) +- `otlp.HTTPClient` is now deprecated in favor of `otlp.SDKHandler` with `ProtocolHTTPProtobuf` (will be removed in v6.0.0) +- `otlp.NewHTTPClient()` is now deprecated (will be removed in v6.0.0) + +The legacy `Handler` has been marked as Alpha since 2022 and has minimal to zero usage. +Migration is straightforward - see deprecation notices in code for examples. + +See the [otlp package documentation](./otlp/README.md) for complete details and examples. + ### v5.8.0 (December 15, 2025) When reporting go/stats versions, ensure that any user provided tags are diff --git a/README.md b/README.md index 5680e44..ed4af80 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,97 @@ func main() { } ``` +## Supported Backends + +The stats package supports multiple metric backends out of the box: + +### OpenTelemetry (OTLP) + +The [github.com/segmentio/stats/v5/otlp](https://pkg.go.dev/github.com/segmentio/stats/v5/otlp) package provides full OpenTelemetry Protocol (OTLP) support using the official OpenTelemetry SDK. + +**Features:** + +- gRPC and HTTP/Protobuf transports +- Full support for OTEL_* environment variables +- Automatic resource detection (cloud, Kubernetes, host, process) +- Production-ready with official OTel SDK exporters + +```go +import ( + "context" + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +func main() { + ctx := context.Background() + + // Using gRPC (recommended). Note the field is EndpointURL, not Endpoint, + // and the value must include the scheme. + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + }) + if err != nil { + panic(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Or configure everything from environment variables (simplest). Note this + // example changes two things relative to the one above: it moves the + // configuration from in-code to env vars, AND it switches the transport + // from gRPC to HTTP/protobuf. + // export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + // export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + handler, err = otlp.NewSDKHandlerFromEnv(ctx) +} +``` + +See the [otlp package documentation](./otlp/README.md) for complete details. + +### Datadog + +The [github.com/segmentio/stats/v5/datadog](https://godoc.org/github.com/segmentio/stats/v5/datadog) package provides support for sending metrics to Datadog via DogStatsD protocol over UDP or Unix Domain Sockets. + +```go +import "github.com/segmentio/stats/v5/datadog" + +stats.Register(datadog.NewClient("localhost:8125")) +``` + +### Prometheus + +The [github.com/segmentio/stats/v5/prometheus](https://godoc.org/github.com/segmentio/stats/v5/prometheus) package exposes an HTTP handler that serves metrics in Prometheus format. + +Note that with Prometheus the metric server polls your client for changes — +metrics are pulled by the server, not pushed from the client. This is the +opposite of most other backends here (Datadog, InfluxDB, OTLP), where the +client pushes metrics to the server. + +```go +import ( + "net/http" + "github.com/segmentio/stats/v5/prometheus" +) + +handler := prometheus.NewHandler() +stats.Register(handler) +http.Handle("/metrics", handler) +``` + +### InfluxDB + +The [github.com/segmentio/stats/v5/influxdb](https://godoc.org/github.com/segmentio/stats/v5/influxdb) package sends metrics to InfluxDB using the line protocol over HTTP. + +```go +import "github.com/segmentio/stats/v5/influxdb" + +stats.Register(influxdb.NewClient("http://localhost:8086")) +``` + ### Metrics - [Gauges](https://godoc.org/github.com/segmentio/stats#Gauge) diff --git a/measure.go b/measure.go index 1011edc..2b9afd7 100644 --- a/measure.go +++ b/measure.go @@ -412,10 +412,12 @@ func (tags tagFuncMap) copy() tagFuncMap { } func (tags tagFuncMap) namedTagFuncs() []namedTagFunc { - namedTags := make([]namedTagFunc, 0, len(tags)) + namedTags := make([]namedTagFunc, len(tags)) + i := 0 for name, fn := range tags { - namedTags = append(namedTags, namedTagFunc{name: name, fn: fn}) + namedTags[i] = namedTagFunc{name: name, fn: fn} + i++ } return namedTags diff --git a/otlp/IMPLEMENTATION_NOTES.md b/otlp/IMPLEMENTATION_NOTES.md new file mode 100644 index 0000000..c5b3b76 --- /dev/null +++ b/otlp/IMPLEMENTATION_NOTES.md @@ -0,0 +1,382 @@ +# OpenTelemetry SDK Implementation Notes + +This document describes the implementation details and design decisions for the OpenTelemetry OTLP exporter. + +## Overview + +This implementation provides full OpenTelemetry Protocol (OTLP) support using the official OpenTelemetry SDK. It bridges the `stats` library's metric interface to OpenTelemetry's metric API. + +## Architecture + +### Core Components + +1. **SDKHandler** - Main handler implementing `stats.Handler` +2. **Protocol Support** - Both gRPC and HTTP/Protobuf transports +3. **Instrument Management** - Efficient caching of OpenTelemetry instruments +4. **Gauge Value Tracking** - Delta calculation for absolute gauge semantics + +## Design Decisions + +### 1. Gauge Implementation + +**The problem**: `stats.Set("metric", 42)` is an absolute-value operation — the +caller is asserting "the current value is 42". We need an OpenTelemetry +instrument with the same semantics. Historically the OTel Go SDK had no +synchronous gauge, so earlier iterations had to emulate one (e.g. an +`UpDownCounter` fed deltas computed against the previously recorded value), +which meant tracking prior values in the handler and reasoning about races. + +**The tradeoff that resolved it**: The OTel SDK now ships a native synchronous +`Float64Gauge` that records instantaneous values directly. Adopting it removes +the need to track previous values at all, at the cost of requiring a reasonably +recent SDK version — a cost we're happy to pay. + +**Decision**: Use the native `Float64Gauge` instrument. + +```go +// When stats.Set("metric", 42) is called: +gauge.Record(ctx, 42.0, opts) +``` + +This gives us: +- No additional memory overhead for tracking previous values +- A direct mapping to OpenTelemetry's gauge semantics +- A simpler, more maintainable implementation + +### 2. Context Management + +**Challenge**: Stored contexts can be cancelled, causing metric recording to fail. + +**Solution**: +- Use `context.Background()` for metric recording operations +- Store the initialization context as `shutdownCtx` only for shutdown operations +- This ensures metrics continue to be recorded even if the original context is cancelled + +**Why**: Metric recording should be resilient and not fail due to context cancellation. The handler should continue working throughout the application lifecycle. + +### 3. Instrument Caching + +**Note**: This is an internal implementation detail - users don't need to worry about this. + +**Implementation**: Thread-safe two-level locking pattern for efficient instrument reuse +```go +// Fast path: read lock for lookup (common case - instrument already exists) +h.mu.RLock() +inst, exists := h.instruments[metricName] +h.mu.RUnlock() + +// Slow path: write lock only if creating new instrument (rare - first time seeing this metric) +if !exists { + h.mu.Lock() + // Double-check after acquiring write lock (another goroutine may have created it) + inst, exists = h.instruments[metricName] + if !exists { + inst = h.createInstruments(meter, metricName, field.Type()) + h.instruments[metricName] = inst + } + h.mu.Unlock() +} +``` + +**Why**: OpenTelemetry instruments are expensive to create but cheap to reuse. This pattern: + +- **Minimizes lock contention** in the hot path (metric recording uses fast read locks) +- **Ensures thread-safety** during instrument creation (write locks only when needed) +- **Scales well** under concurrent load (multiple goroutines can look up instruments simultaneously) + +The double-check pattern prevents duplicate instrument creation when multiple goroutines race to create the same instrument for the first time. + +### 4. Attribute Handling + +**Implementation**: Direct conversion from `stats.Tag` to `attribute.KeyValue` +```go +func (h *SDKHandler) tagsToAttributes(tags []stats.Tag) []attribute.KeyValue { + attrs := make([]attribute.KeyValue, 0, len(tags)) + for _, tag := range tags { + attrs = append(attrs, attribute.String(tag.Name, tag.Value)) + } + return attrs +} +``` + +**Why**: Simple 1:1 mapping preserves all user-provided metadata without transformation. + +### 5. Resource Detection + +**Pattern**: Leverage official OpenTelemetry resource detectors +```go +resource.New(ctx, + resource.WithDetectors(ec2.NewResourceDetector()), + resource.WithFromEnv(), + resource.WithHost(), + resource.WithProcess(), +) +``` + +**Why**: Automatic detection of cloud provider, Kubernetes, host, and process metadata without manual configuration. + +## Performance Considerations + +### Instrument Reuse +- Instruments are created once and cached +- RWMutex allows concurrent reads (the common case) +- Write locks only taken during initial instrument creation + +### Gauge Recording +- Zero additional memory overhead (uses native Float64Gauge) +- Direct recording with no delta calculation required +- Simple O(1) operation per gauge recording + +### Batching and Export Strategy + +**Decision**: Delegate all batching to OpenTelemetry SDK's `PeriodicReader` + +**Implementation**: No custom buffering or batching logic in the handler +```go +provider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, + sdkmetric.WithInterval(config.ExportInterval), // Default: 10s + sdkmetric.WithTimeout(config.ExportTimeout), // Default: 30s + )), +) +``` + +**Why**: +- The OTel SDK provides production-ready batching with in-memory aggregation +- `PeriodicReader` handles timing, aggregation reset, and export lifecycle +- Avoids reinventing batching logic and potential bugs +- Provides standard OTel behavior that users expect + +**How it works**: +1. Metrics are recorded immediately to OTel instruments (no blocking) +2. SDK aggregates metrics in memory (e.g., summing counters, collecting histogram samples) +3. Every `ExportInterval`, the reader exports aggregated data and resets aggregations +4. Reduces network overhead and collector load automatically + +**Trade-offs**: +- Metrics are not real-time (delayed by up to `ExportInterval`) +- Memory grows proportionally to metric cardinality until export +- Users must call `Flush()` before shutdown to export remaining metrics + +## Error Handling + +### Instrument Creation Failures +- Logged but don't block other metrics +- Silent no-op if instrument is nil +- Prevents cascade failures + +### Export Failures +- Logged but don't stop metric collection +- Retries handled by OpenTelemetry SDK exporters +- Backoff and timeout configured at SDK level + +### Context Cancellation +- Metric recording uses background context +- Unaffected by user context cancellation +- Shutdown still respects user-provided context + +## Testing Strategy + +### Unit Tests +- Instrument creation and caching +- Gauge delta calculation +- Value type conversions +- Protocol selection (HTTP vs gRPC) + +### Integration Tests +- Environment variable configuration +- Multiple concurrent metrics +- Gauge absolute value semantics + +### Benchmarks +- Metric recording performance +- Lock contention under load + +## Limitations and Known Issues + +### 1. No Exemplars +- Current implementation doesn't support exemplars +- Could be added in future versions + +### 2. No Custom Views for Explicit Bucket Histograms +- Uses default aggregation and bucket boundaries for explicit bucket histograms +- Advanced users may want custom histogram buckets when not using exponential histograms + +## Histogram Aggregation + +### Exponential Histogram Support + +**Implementation**: Configurable via `ExponentialHistogram` flag and View configuration + +```go +if config.ExponentialHistogram { + view := sdkmetric.NewView( + sdkmetric.Instrument{Kind: sdkmetric.InstrumentKindHistogram}, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: config.ExponentialHistogramMaxSize, // Default: 160 + MaxScale: config.ExponentialHistogramMaxScale, // Default: 20 + }, + }, + ) + providerOpts = append(providerOpts, sdkmetric.WithView(view)) +} +``` + +**Benefits**: +- **Better accuracy**: Adaptive buckets provide consistent relative error across value ranges +- **Lower memory**: Base-2 exponential buckets vs fixed explicit buckets +- **No pre-configuration**: Buckets adjust automatically to observed values +- **Modern standard**: Native support in Prometheus, Grafana, and OTLP backends + +**How it works**: +1. Uses base-2 exponential buckets (powers of 2) +2. Automatically scales to accommodate value range +3. MaxSize limits total buckets (trades accuracy for memory) +4. MaxScale controls granularity (-10 to 20, where 20 = finest) + +**Trade-offs**: +- Requires backend support (Prometheus 2.40+, modern OTLP collectors) +- Slightly higher CPU overhead during aggregation +- Not compatible with legacy systems expecting explicit buckets + +**Default behavior**: When disabled, uses explicit bucket histogram with default boundaries + +## Temporality Configuration + +### What is Temporality? + +Temporality determines whether metrics are reported as **cumulative totals** (since application start) or **deltas** (change since last export). + +**Example - Request Counter**: +- **Cumulative**: Export "1000 total requests" → "1150 total requests" → "1320 total requests" +- **Delta**: Export "1000 new requests" → "150 new requests" → "170 new requests" + +### Why We Use Cumulative Temporality (Default) + +This handler uses **cumulative temporality** for all metrics by default. Here's why: + +#### Compatibility with Prometheus and Standard Backends + +- Prometheus expects cumulative counters and will graph them correctly +- Most OTLP backends (Grafana, Datadog, etc.) work best with cumulative data +- Industry standard practice in the OpenTelemetry ecosystem + +#### Reliability and Query Simplicity + +- **No data loss on export failures**: If an export fails, the next one still has complete data +- **Easier to query**: "How many total requests?" vs "Sum all deltas" +- **Converts to delta easily**: Backend can calculate rates from cumulative, but can't reconstruct cumulative from deltas + +#### Lower Cognitive Load + +- Counters show totals since start - intuitive and matches mental model +- Histograms show full distribution of all observations + +### How It Works + +**Cumulative semantics by instrument type**: + +- **Counter** (`stats.Incr`, `stats.Add`): Total count since application start + - Example: `requests.total` reports 1000, then 1150, then 1320 + +- **Histogram** (`stats.Observe`): Cumulative distribution of all observed values + - Example: Latency histogram includes all requests since start + +- **Gauge** (`stats.Set`): Current absolute value (temporality doesn't apply) + - Example: `memory.used` always reports current memory usage + +### Trade-offs + +#### Advantages of Cumulative + +- ✅ Prometheus and Grafana work out-of-box +- ✅ Resilient to export failures (no data loss) +- ✅ Backend can derive rates automatically +- ✅ Simpler mental model for most users + +#### Disadvantages of Cumulative + +- ❌ Slightly higher memory usage for high-cardinality counters +- ❌ Backend must calculate deltas for rate queries (minor overhead) +- ❌ Some specialized telemetry systems expect delta temporality + +#### When Delta Might Be Better + +- Your backend explicitly requires delta temporality (check docs) +- Extreme cardinality where cumulative memory overhead matters +- Building a custom metrics pipeline optimized for deltas + +### Changing Temporality (Advanced) + +If you need delta temporality, you can override the default: + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + // Use delta temporality for all instruments + TemporalitySelector: sdkmetric.DeltaTemporalitySelector, +}) +``` + +**Available selectors**: + +- `sdkmetric.DefaultTemporalitySelector` - Cumulative for all (default, recommended) +- `sdkmetric.CumulativeTemporalitySelector` - Cumulative for all (explicit) +- `sdkmetric.DeltaTemporalitySelector` - Delta for all +- `sdkmetric.LowMemoryTemporalitySelector` - Delta for Counters/Histograms, Cumulative for UpDownCounters + +**⚠️ Warning**: Changing temporality requires updating your backend configuration and queries. Most users should stick with the default cumulative temporality. + +## Future Enhancements + +### Potential Improvements +1. **Memory Management**: Add LRU eviction for unused instruments +2. **Exemplar Support**: Bridge to trace context for exemplars +3. **Custom Histogram Buckets**: Allow users to configure explicit bucket boundaries +4. **Metric Metadata**: Expose units and descriptions via OTel API + +### OpenTelemetry SDK Evolution +- **Protocol Extensions**: Support new OTLP features as they're added +- **New Instrument Types**: Adopt new instrument types as they become available + +## Migration from Legacy Handler + +The legacy `Handler` in this package is marked as Alpha and has limitations: + +**Legacy Handler Issues:** +- Custom OTLP implementation (not using official SDK) +- Only HTTP transport (despite having gRPC dependencies) +- No environment variable support +- No resource detection + +**SDKHandler Advantages:** +- Official OpenTelemetry SDK +- Both HTTP and gRPC +- Full environment variable support +- Automatic resource detection +- Production-ready and well-tested + +**Migration Path:** +```go +// Old (legacy) +handler := &otlp.Handler{ + Client: otlp.NewHTTPClient(endpoint), + // ... +} + +// New (recommended) +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: endpoint, +}) +``` + +## References + +- [OpenTelemetry Metrics Specification](https://opentelemetry.io/docs/specs/otel/metrics/) +- [OTLP Specification](https://opentelemetry.io/docs/specs/otlp/) +- [Go SDK Documentation](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric) +- [Resource Semantic Conventions](https://opentelemetry.io/docs/specs/semconv/resource/) diff --git a/otlp/README.md b/otlp/README.md new file mode 100644 index 0000000..2ff193a --- /dev/null +++ b/otlp/README.md @@ -0,0 +1,742 @@ +# OpenTelemetry OTLP Exporter for stats + +This package provides OpenTelemetry Protocol (OTLP) export support for the `stats` library using the official OpenTelemetry SDK. + +## Features + +- **Multiple Transport Protocols**: Support for both gRPC and HTTP/Protobuf +- **Full OpenTelemetry SDK Integration**: Uses official OTel SDK exporters +- **Environment Variable Support**: Respects all standard `OTEL_*` environment variables +- **Resource Detection**: Detects host and process information by default, plus + attributes from `OTEL_RESOURCE_ATTRIBUTES`/`OTEL_SERVICE_NAME`. Cloud and + Kubernetes detection is opt-in via the `contrib/detectors/*` packages (see below) +- **All Metric Types**: Counter, Gauge, and Histogram support +- **Flexible Configuration**: Configure via code or environment variables + +## Installation + +```bash +go get github.com/segmentio/stats/v5/otlp +``` + +## Quick Start + +### Using gRPC (Recommended) + +```go +package main + +import ( + "context" + "log" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +func main() { + ctx := context.Background() + + // Create handler with gRPC transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with stats engine + stats.Register(handler) + defer stats.Flush() + + // Use stats as normal + stats.Incr("requests.count") +} +``` + +### Using HTTP + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "http://localhost:4318", +}) +``` + +### Using Environment Variables (Simplest) + +```go +// Just set environment variables: +// export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +// export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +// export OTEL_SERVICE_NAME=my-service + +handler, err := otlp.NewSDKHandlerFromEnv(ctx) +``` + +## Configuration + +### SDKConfig Options + +```go +type SDKConfig struct { + // Protocol: "grpc" or "http/protobuf" (default: "grpc") + Protocol Protocol + + // EndpointURL: Full OTLP collector endpoint URL (with http:// or https:// scheme) + // gRPC: "localhost:4317" + // HTTP: "http://localhost:4318" + Endpoint string + + // Resource: Custom resource attributes (optional) + // If nil, uses automatic detection + Resource *resource.Resource + + // ExportInterval: How often to export (default: 10s) + ExportInterval time.Duration + + // ExportTimeout: Timeout for exports (default: 30s) + ExportTimeout time.Duration + + // HTTPOptions: Additional HTTP options + HTTPOptions []otlpmetrichttp.Option + + // GRPCOptions: Additional gRPC options + GRPCOptions []otlpmetricgrpc.Option + + // ExponentialHistogram: Enable exponential histogram aggregation + // (default: false, uses explicit bucket histograms) + ExponentialHistogram bool + + // ExponentialHistogramMaxSize: Max buckets for exponential histograms + // (default: 160 if ExponentialHistogram is true) + ExponentialHistogramMaxSize int32 + + // ExponentialHistogramMaxScale: Resolution for exponential histograms + // Valid range: -10 to 20 (default: 20 if ExponentialHistogram is true) + ExponentialHistogramMaxScale int32 + + // TemporalitySelector: Determines temporality (cumulative vs delta) + // (default: nil, which uses cumulative for all - Prometheus-compatible) + TemporalitySelector sdkmetric.TemporalitySelector +} +``` + +### Supported Environment Variables + +The handler respects all standard OpenTelemetry environment variables: + +- `OTEL_EXPORTER_OTLP_ENDPOINT` - Base endpoint URL +- `OTEL_EXPORTER_OTLP_PROTOCOL` - Transport protocol (grpc, http/protobuf); + `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL` takes precedence when both are set +- `OTEL_EXPORTER_OTLP_HEADERS` - Custom headers for authentication +- `OTEL_EXPORTER_OTLP_TIMEOUT` - Export timeout +- `OTEL_EXPORTER_OTLP_COMPRESSION` - Compression algorithm (gzip, none) +- `OTEL_SERVICE_NAME` - Service name +- `OTEL_RESOURCE_ATTRIBUTES` - Additional resource attributes + +The endpoint, headers, timeout, and compression variables (and their +`_METRICS_` variants) are read by the underlying OTLP exporters. The protocol +variables are resolved by this package. `OTEL_METRICS_EXPORTER` and other +`autoexport`-only variables are not consulted. + +See [OpenTelemetry Environment Variables](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/) for the complete list. + +## Advanced Usage + +### Custom gRPC Options + +```go +import ( + "google.golang.org/grpc/credentials" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://collector.example.com:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + // Use TLS + otlpmetricgrpc.WithTLSCredentials( + credentials.NewClientTLSFromCert(certPool, ""), + ), + // Add authentication headers + otlpmetricgrpc.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + }), + // Set timeout + otlpmetricgrpc.WithTimeout(30 * time.Second), + }, +}) +``` + +### Custom HTTP Options + +```go +import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "https://collector.example.com:4318", + HTTPOptions: []otlpmetrichttp.Option{ + // Add custom headers + otlpmetrichttp.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + "X-Custom-Header": "value", + }), + // Enable compression + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + // Set timeout + otlpmetrichttp.WithTimeout(30 * time.Second), + }, +}) +``` + +### Custom Resource Attributes + +```go +import ( + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName("my-service"), + semconv.ServiceVersion("1.0.0"), + semconv.DeploymentEnvironment("production"), + ), + resource.WithFromEnv(), // Also include env vars + resource.WithHost(), // Include host info + resource.WithProcess(), // Include process info +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + Resource: res, +}) +``` + +### Cloud Resource Detectors + +OpenTelemetry provides resource detectors for major cloud providers that automatically detect and add cloud-specific metadata. + +#### AWS Resource Detector + +```go +import ( + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/ecs" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/contrib/detectors/aws/lambda" +) + +// Detect AWS EC2 instance metadata +res, err := resource.New(ctx, + resource.WithDetectors(ec2.NewResourceDetector()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "aws" +// - cloud.platform: "aws_ec2" +// - cloud.region: "us-west-2" +// - cloud.availability_zone: "us-west-2a" +// - cloud.account.id: "123456789012" +// - host.id: "i-0123456789abcdef0" +// - host.type: "t3.medium" +``` + +**Install AWS detectors:** + +```bash +go get go.opentelemetry.io/contrib/detectors/aws/ec2 +go get go.opentelemetry.io/contrib/detectors/aws/ecs +go get go.opentelemetry.io/contrib/detectors/aws/eks +go get go.opentelemetry.io/contrib/detectors/aws/lambda +``` + +**ECS/Fargate:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(ecs.NewResourceDetector()), + // Detects: container.id, aws.ecs.task.arn, aws.ecs.cluster.arn, etc. +) +``` + +**EKS:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(eks.NewResourceDetector()), + // Detects: k8s.cluster.name, cloud.provider, cloud.platform +) +``` + +**Lambda:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(lambda.NewResourceDetector()), + // Detects: faas.name, faas.version, cloud.region, etc. +) +``` + +#### GCP Resource Detector + +```go +import "go.opentelemetry.io/contrib/detectors/gcp" + +res, err := resource.New(ctx, + resource.WithDetectors(gcp.NewDetector()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "gcp" +// - cloud.platform: "gcp_compute_engine" +// - cloud.region: "us-central1" +// - cloud.availability_zone: "us-central1-a" +// - host.id: "123456789" +// - host.type: "n1-standard-1" +``` + +**Install:** + +```bash +go get go.opentelemetry.io/contrib/detectors/gcp +``` + +#### Azure Resource Detector + +```go +import "go.opentelemetry.io/contrib/detectors/azure/azurevm" + +res, err := resource.New(ctx, + resource.WithDetectors(azurevm.New()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "azure" +// - cloud.platform: "azure_vm" +// - cloud.region: "eastus" +// - host.id: "..." +// - azure.vm.size: "Standard_D2s_v3" +``` + +**Install:** + +```bash +go get go.opentelemetry.io/contrib/detectors/azure/azurevm +``` + +#### Multiple Detectors + +Combine multiple detectors for comprehensive metadata: + +```go +import ( + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +res, err := resource.New(ctx, + // Service metadata + resource.WithAttributes( + semconv.ServiceName("my-api"), + semconv.ServiceVersion("1.2.3"), + semconv.DeploymentEnvironment("production"), + ), + // Cloud detectors (only one will succeed) + resource.WithDetectors( + ec2.NewResourceDetector(), + eks.NewResourceDetector(), + ), + // Environment variables + resource.WithFromEnv(), + // Host and process info + resource.WithHost(), + resource.WithProcess(), + resource.WithProcessRuntimeName(), + resource.WithProcessRuntimeVersion(), + // Container info (if applicable) + resource.WithContainer(), + resource.WithContainerID(), + // OS info + resource.WithOS(), + // OTel SDK version + resource.WithTelemetrySDK(), +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + Resource: res, +}) +``` + +**Note:** Detectors are executed sequentially and only the first successful detector provides cloud metadata. For example, if running on AWS EC2, the EC2 detector will succeed and GCP/Azure detectors will be skipped. + +#### Complete Example with AWS + +```go +package main + +import ( + "context" + "log" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" + + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +func main() { + ctx := context.Background() + + // Build resource with AWS detection + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName("payment-api"), + semconv.ServiceVersion("2.1.0"), + semconv.DeploymentEnvironment("production"), + ), + resource.WithDetectors( + ec2.NewResourceDetector(), // Detect EC2 metadata + eks.NewResourceDetector(), // Or EKS metadata + ), + resource.WithFromEnv(), + resource.WithHost(), + resource.WithProcess(), + resource.WithContainer(), + ) + if err != nil { + log.Fatalf("failed to create resource: %v", err) + } + + // Create handler with detected resources + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://collector.us-west-2.amazonaws.com:4317", + Resource: res, + }) + if err != nil { + log.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Metrics will include all detected AWS metadata + stats.Incr("payment.processed", stats.T("amount", "100")) +} +``` + +### Multiple Handlers + +Send metrics to multiple destinations: + +```go +// Send to local collector +localHandler, _ := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", +}) + +// Send to cloud service +cloudHandler, _ := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "https://api.example.com/v1/metrics", + HTTPOptions: []otlpmetrichttp.Option{ + otlpmetrichttp.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + }), + }, +}) + +// Register both +stats.Register(localHandler) +stats.Register(cloudHandler) +``` + +## Testing with OpenTelemetry Collector + +### Using Docker + +```bash +# Start an OpenTelemetry Collector +docker run -p 4317:4317 -p 4318:4318 \ + otel/opentelemetry-collector:latest +``` + +### Collector Configuration + +Example `otel-collector-config.yaml`: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + logging: + loglevel: debug + prometheus: + endpoint: 0.0.0.0:8889 + +service: + pipelines: + metrics: + receivers: [otlp] + exporters: [logging, prometheus] +``` + +## Metric Types + +### Counter + +Cumulative metrics that only increase: + +```go +stats.Incr("requests.count") +stats.Add("bytes.sent", 1024) +``` + +### Gauge + +Point-in-time values that can go up or down: + +```go +stats.Set("connections.active", 42) +stats.Set("memory.usage", 1024*1024*500) +``` + +Gauges are implemented using OpenTelemetry's native `Float64Gauge` instrument, which records instantaneous values. + +### Histogram + +Distribution of values: + +```go +stats.Observe("request.duration", 0.250) +stats.Observe("response.size", 4096) +``` + +#### Exponential Histograms + +By default, histograms use explicit bucket aggregation with fixed bucket boundaries. For better accuracy and lower memory overhead, you can enable **exponential histograms**: + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExponentialHistogram: true, // Enable exponential histograms +}) +``` + +**Benefits of exponential histograms:** +- Better accuracy across wide value ranges +- Lower memory overhead (adaptive buckets) +- No need to pre-define bucket boundaries +- Native support in modern observability backends + +**Advanced configuration:** + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, // Max buckets (default: 160) + ExponentialHistogramMaxScale: 20, // Max resolution (default: 20) +}) +``` + +- **MaxSize**: Maximum number of buckets (larger = more accuracy, more memory) +- **MaxScale**: Resolution from -10 to 20 (higher = finer granularity) + +## Temporality (Cumulative vs Delta) + +The handler uses **cumulative temporality by default**, which is compatible with Prometheus and most observability backends. + +### What is Temporality? + +- **Cumulative**: Counter values accumulate over time (e.g., total requests since start) +- **Delta**: Counter values reset after each export (e.g., requests in last 10 seconds) + +### Default Behavior + +By default, all metrics use cumulative temporality: +- **Counters**: Report total count since application start +- **Histograms**: Report cumulative distribution +- **UpDownCounters (Gauges)**: Report current absolute value + +This matches Prometheus semantics and works with most OTLP backends. + +### Custom Temporality + +For advanced use cases, you can configure custom temporality: + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + TemporalitySelector: sdkmetric.DeltaTemporalitySelector, // Use delta for all metrics +}) +``` + +**Available selectors:** +- `sdkmetric.DefaultTemporalitySelector` - Cumulative for all (default, recommended) +- `sdkmetric.CumulativeTemporalitySelector` - Cumulative for all +- `sdkmetric.DeltaTemporalitySelector` - Delta for all +- `sdkmetric.LowMemoryTemporalitySelector` - Delta for Counters/Histograms, Cumulative for UpDownCounters + +**Note:** Most users should use the default cumulative temporality. Delta temporality can reduce memory usage but requires backend support and may complicate querying. + +## Batching and Export Behavior + +The handler uses **native OpenTelemetry SDK batching** via `PeriodicReader`: + +- **Automatic batching**: Metrics are aggregated in-memory and exported periodically +- **Default interval**: 10 seconds (configurable via `ExportInterval`) +- **No manual buffering**: All batching is handled by the OTel SDK +- **Immediate recording**: `stats.Incr()`, `stats.Set()`, etc. record immediately but export is deferred +- **Manual flush**: Call `handler.Flush()` to force immediate export (useful before shutdown) + +**Example configuration:** + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 5 * time.Second, // Export every 5 seconds + ExportTimeout: 15 * time.Second, // 15 second timeout per export +}) +``` + +**How it works internally:** + +1. When you call `stats.Incr("requests")`, the metric is recorded to an OTel instrument +2. The OTel SDK aggregates all metrics in memory (e.g., summing counters, collecting histogram samples) +3. Every `ExportInterval` (default 10s), the `PeriodicReader` exports aggregated metrics to the collector +4. After export, aggregations reset for the next interval (except cumulative metrics like counters) + +This means: +- Metrics are **not** sent immediately on every call +- Network overhead is minimized through batching +- You can safely record thousands of metrics per second +- Call `Flush()` before application shutdown to ensure all metrics are exported + +## Performance + +The SDK handler is optimized for production use: + +- Instruments are created once and reused +- Lock-free reads for instrument lookup +- Minimal overhead per metric recording +- Configurable export intervals to balance freshness vs overhead + +Benchmark results on Apple M1: + +``` +BenchmarkSDKHandler_HandleMeasures-8 2000000 600 ns/op 0 allocs/op +``` + +## Comparison with Legacy Handler + +This package includes two handlers: + +1. **SDKHandler** (Recommended - New): Uses official OTel SDK + - ✅ Full OTel SDK support + - ✅ Both gRPC and HTTP + - ✅ All environment variables + - ✅ Resource detection + - ✅ Production-ready + +2. **Handler** (Legacy): Custom OTLP implementation + - ⚠️ Status: Alpha + - Limited features + - gRPC dependencies but no gRPC client + - HTTP client only + +**We recommend using `SDKHandler` for all new projects.** + +## Troubleshooting + +### Connection Refused + +``` +failed to create gRPC exporter: connection refused +``` + +Ensure the collector is running and accessible: + +```bash +# Test gRPC endpoint +grpcurl -plaintext localhost:4317 list + +# Test HTTP endpoint +curl http://localhost:4318/v1/metrics +``` + +### Insecure gRPC + +If using an insecure gRPC connection: + +```go +import "google.golang.org/grpc/credentials/insecure" + +GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials()), +} +``` + +### Metrics Not Appearing + +1. Check export interval - metrics are batched +2. Call `handler.Flush()` before shutdown +3. Enable debug logging in your collector +4. Verify resource attributes match your queries + +## Examples + +See [example_test.go](./example_test.go) for complete working examples including: + +- gRPC and HTTP configuration +- Environment variable usage +- Custom options and headers +- Multiple handlers +- Struct-based metrics + +## References + +- [OpenTelemetry Specification](https://opentelemetry.io/docs/specs/otel/) +- [OTLP Specification](https://opentelemetry.io/docs/specs/otlp/) +- [Go SDK Documentation](https://pkg.go.dev/go.opentelemetry.io/otel) +- [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) + +## License + +Same as the parent `stats` package. diff --git a/otlp/client.go b/otlp/client.go index a7fa269..7acd279 100644 --- a/otlp/client.go +++ b/otlp/client.go @@ -12,10 +12,28 @@ import ( "google.golang.org/protobuf/proto" ) +// Deprecated: Client is deprecated and will be removed in v6. +// It is only used by the deprecated Handler. Use SDKHandler instead. type Client interface { Handle(context.Context, *colmetricpb.ExportMetricsServiceRequest) error } +// Deprecated: HTTPClient is deprecated and will be removed in v6. +// Use SDKHandler with ProtocolHTTPProtobuf instead, which provides the official +// OpenTelemetry SDK with retry logic, proper timeout handling, and full OTLP support. +// +// Migration example: +// +// // Old (deprecated) +// client := otlp.NewHTTPClient("http://localhost:4318/v1/metrics") +// handler := &otlp.Handler{Client: client} +// +// // New (recommended) +// handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ +// Protocol: otlp.ProtocolHTTPProtobuf, +// EndpointURL: "http://localhost:4318", +// }) +// // HTTPClient implements the Client interface and is used to export metrics to // an OpenTelemetry Collector through the HTTP interface. // @@ -26,6 +44,8 @@ type HTTPClient struct { endpoint string } +// Deprecated: NewHTTPClient is deprecated. Use SDKHandler with ProtocolHTTPProtobuf instead. +// See HTTPClient documentation for migration example. func NewHTTPClient(endpoint string) *HTTPClient { return &HTTPClient{ // TODO: add sane default timeout configuration. diff --git a/otlp/example_test.go b/otlp/example_test.go new file mode 100644 index 0000000..af29a42 --- /dev/null +++ b/otlp/example_test.go @@ -0,0 +1,312 @@ +package otlp_test + +import ( + "context" + "log" + "time" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "google.golang.org/grpc/credentials/insecure" +) + +// Example_gRPC demonstrates using the OpenTelemetry SDK handler with gRPC transport. +func Example_gRPC() { + ctx := context.Background() + + // Create handler with gRPC transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with the default stats engine + stats.Register(handler) + defer stats.Flush() + + // Your application metrics will now be exported via gRPC + stats.Incr("requests.count", stats.T("method", "GET"), stats.T("status", "200")) + stats.Observe("request.duration", 0.250, stats.T("endpoint", "/api/users")) +} + +// Example_hTTP demonstrates using the OpenTelemetry SDK handler with HTTP transport. +func Example_hTTP() { + ctx := context.Background() + + // Create handler with HTTP transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "http://localhost:4318", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with the default stats engine + stats.Register(handler) + defer stats.Flush() + + // Your application metrics will now be exported via HTTP + stats.Incr("requests.count") +} + +// Example_fromEnv demonstrates using environment variables for configuration. +// This is the simplest approach and follows OpenTelemetry best practices. +func Example_fromEnv() { + // Set environment variables before running: + // export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + // export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + // export OTEL_SERVICE_NAME=my-service + // export OTEL_RESOURCE_ATTRIBUTES=deployment.environment=production,service.version=1.0.0 + + ctx := context.Background() + + // Handler automatically reads all OTEL_* environment variables + handler, err := otlp.NewSDKHandlerFromEnv(ctx) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("app.started") +} + +// Example_fullyConfiguredByEnvironment demonstrates relying entirely on OTEL environment variables +// without specifying any configuration in code. This provides maximum flexibility for deployment +// environments to control OpenTelemetry configuration without code changes. +func Example_fullyConfiguredByEnvironment() { + // The SDK will use these standard OpenTelemetry environment variables: + // + // Required/Common: + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 (full URL with scheme) + // OTEL_EXPORTER_OTLP_PROTOCOL=grpc (or http/protobuf) + // OTEL_SERVICE_NAME=my-service + // + // Optional: + // OTEL_EXPORTER_OTLP_HEADERS=key1=value1,key2=value2 + // OTEL_EXPORTER_OTLP_TIMEOUT=30s + // OTEL_RESOURCE_ATTRIBUTES=deployment.environment=production + // OTEL_METRIC_EXPORT_INTERVAL=60s + // OTEL_METRIC_EXPORT_TIMEOUT=30s + // + // If no environment variables are set, the SDK uses these defaults: + // - Endpoint: http://localhost:4317 (gRPC) or http://localhost:4318 (HTTP) + // - Protocol: grpc + // - Export Interval: 60 seconds + // - Export Timeout: 30 seconds + + ctx := context.Background() + + // Pass an empty config - SDK will read all configuration from environment + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{}) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Your application code remains environment-agnostic + stats.Incr("requests.count", stats.T("method", "GET")) + stats.Observe("request.duration", 0.125) +} + +// Example_gRPCWithOptions demonstrates advanced gRPC configuration. +func Example_gRPCWithOptions() { + ctx := context.Background() + + // Create handler with custom gRPC options + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithInsecure(), + otlpmetricgrpc.WithTimeout(30 * time.Second), + // For TLS: + // otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(certPool, "")), + // For custom headers: + // otlpmetricgrpc.WithHeaders(map[string]string{ + // "Authorization": "Bearer token", + // }), + }, + ExportInterval: 10 * time.Second, + ExportTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("requests.total") +} + +// Example_hTTPWithOptions demonstrates advanced HTTP configuration. +func Example_hTTPWithOptions() { + ctx := context.Background() + + // Create handler with custom HTTP options + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "http://localhost:4318", + HTTPOptions: []otlpmetrichttp.Option{ + otlpmetrichttp.WithInsecure(), + otlpmetrichttp.WithTimeout(30 * time.Second), + // For custom headers: + // otlpmetrichttp.WithHeaders(map[string]string{ + // "Authorization": "Bearer token", + // "X-Custom-Header": "value", + // }), + // For compression: + // otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + }, + ExportInterval: 10 * time.Second, + ExportTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("requests.total") +} + +// Example_multipleHandlers demonstrates using multiple handlers simultaneously. +func Example_multipleHandlers() { + ctx := context.Background() + + // Send metrics to both gRPC and HTTP endpoints + grpcHandler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials()), + }, + }) + if err != nil { + log.Fatal(err) + } + defer grpcHandler.Shutdown(ctx) + + httpHandler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + EndpointURL: "http://localhost:4318", + }) + if err != nil { + log.Fatal(err) + } + defer httpHandler.Shutdown(ctx) + + // Register both handlers + stats.Register(grpcHandler) + stats.Register(httpHandler) + defer stats.Flush() + + // Metrics will be sent to both endpoints + stats.Incr("requests.count") +} + +// Example_structBased demonstrates using struct-based metrics with OpenTelemetry. +func Example_structBased() { + ctx := context.Background() + + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Define metrics using struct tags + type ServerMetrics struct { + RequestCount int `metric:"requests.count" type:"counter"` + ActiveConns int `metric:"connections.active" type:"gauge"` + RequestDuration time.Duration `metric:"request.duration" type:"histogram"` + } + + metrics := ServerMetrics{ + RequestCount: 100, + ActiveConns: 50, + RequestDuration: 250 * time.Millisecond, + } + + // Report all metrics from the struct + stats.Report(metrics, stats.T("server", "web-1"), stats.T("region", "us-west-2")) +} + +func ExampleSDKHandler_exponentialHistogram() { + ctx := context.Background() + + // Create handler with exponential histogram support + // Exponential histograms provide better accuracy and lower memory overhead + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExponentialHistogram: true, // Enable exponential histograms + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Record histogram metrics - these will use exponential bucket aggregation + stats.Observe("api.latency", 0.125, stats.T("endpoint", "/users")) + stats.Observe("api.latency", 0.250, stats.T("endpoint", "/users")) + stats.Observe("api.latency", 0.500, stats.T("endpoint", "/users")) + + // Exponential histograms automatically adapt to the value range + // providing consistent accuracy without pre-defined bucket boundaries + stats.Observe("db.query.duration", 0.001, stats.T("query", "SELECT")) + stats.Observe("db.query.duration", 0.050, stats.T("query", "SELECT")) + stats.Observe("db.query.duration", 1.500, stats.T("query", "SELECT")) +} + +func ExampleSDKHandler_exponentialHistogramAdvanced() { + ctx := context.Background() + + // Advanced exponential histogram configuration + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, // Max buckets (higher = more accuracy) + ExponentialHistogramMaxScale: 20, // Max resolution (higher = finer granularity) + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Record response time metrics across wide value ranges + // Exponential histograms handle this efficiently + for _, duration := range []float64{0.001, 0.010, 0.100, 1.000, 10.000} { + stats.Observe("response.time", duration, stats.T("service", "api")) + } +} diff --git a/otlp/go.mod b/otlp/go.mod index 2a2573c..1dccfac 100644 --- a/otlp/go.mod +++ b/otlp/go.mod @@ -1,20 +1,32 @@ module github.com/segmentio/stats/v5/otlp -go 1.24.0 +go 1.25.0 require ( - github.com/segmentio/stats/v5 v5.6.3 - go.opentelemetry.io/proto/otlp v1.3.1 - google.golang.org/protobuf v1.36.10 + github.com/segmentio/stats/v5 v5.8.0 + go.opentelemetry.io/otel v1.44.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.44.0 + go.opentelemetry.io/otel/metric v1.44.0 + go.opentelemetry.io/otel/sdk v1.44.0 + go.opentelemetry.io/otel/sdk/metric v1.44.0 + go.opentelemetry.io/proto/otlp v1.10.0 + google.golang.org/grpc v1.81.1 + google.golang.org/protobuf v1.36.11 ) require ( - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect - golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/net v0.48.0 // indirect - golang.org/x/sys v0.39.0 // indirect - golang.org/x/text v0.32.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect - google.golang.org/grpc v1.79.3 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/trace v1.44.0 // indirect + golang.org/x/net v0.56.0 // indirect + golang.org/x/sys v0.46.0 // indirect + golang.org/x/text v0.38.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260610212136-7ab31c22f7ad // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260610212136-7ab31c22f7ad // indirect ) diff --git a/otlp/go.sum b/otlp/go.sum index 9967ece..e8382ce 100644 --- a/otlp/go.sum +++ b/otlp/go.sum @@ -1,7 +1,10 @@ +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -12,8 +15,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF28c5ZQfqCBQ5g2xfk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc= @@ -22,43 +25,47 @@ github.com/segmentio/encoding v0.4.1 h1:KLGaLSW0jrmhB58Nn4+98spfvPvmo4Ci1P/WIQ9w github.com/segmentio/encoding v0.4.1/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= -github.com/segmentio/stats/v5 v5.6.3 h1:TW/nEclLkX55GraQARsgGn0q26f98gBHZtioThsxHDQ= -github.com/segmentio/stats/v5 v5.6.3/go.mod h1:bd3m0gCb/zAwdZAOWVs8m8cgh12g4rynWPyqbp6Kbyk= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/segmentio/stats/v5 v5.8.0 h1:QrAku1UUEUxdK5XSIMYHLlINRb2aZhZ/5Cv10ah2hnY= +github.com/segmentio/stats/v5 v5.8.0/go.mod h1:Mg2KfApYceYW3SaGprkCrQ2zXkdCQRt86SxMCuSSSQg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= -go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= -golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 h1:y5zboxd6LQAqYIhHnB48p0ByQ/GnQx2BE33L8BOHQkI= -golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6/go.mod h1:U6Lno4MTRCDY+Ba7aCcauB9T60gsv5s4ralQzP72ZoQ= -golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= -golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= -gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= -gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= -google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= -google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= -google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= -google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= -google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= +go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0 h1:SUplec5dp06reu1zaXmOXdvqH398taqrDXqUl99jxSc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0/go.mod h1:ho2g4N+ane+swq5I/VBkKWnRDY4kUINH3FuqyZqX/Ug= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.44.0 h1:RuynHbfU8JUEw7DyONgkVYg2SVtsoF28y0LGIr69jgA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.44.0/go.mod h1:qZF+/lBs71APw8mlnEZcqZHMzqrYrsFiJOv83lX1OGo= +go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= +go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= +go.opentelemetry.io/otel/metric/x v0.66.0 h1:YkCrx1zLOChi9ZcZ6euupOcsgzbVlec7D/xoEU1+cTA= +go.opentelemetry.io/otel/metric/x v0.66.0/go.mod h1:d1+BDj9t96do0/1LoU1ayfCv79ZgNE41qbhBvnMOBZk= +go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58= +go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0= +go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI= +go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA= +go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= +go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= +golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= +golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= +golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.38.0 h1:sXmwo9DwP3OK9EZ7PqAdaooSGozfl/3a6/xJcbzPRhE= +golang.org/x/text v0.38.0/go.mod h1:YXZt3QhHUKYT53r2lLKFIVi6Ao1jdzrTR/KQ09qyxF4= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/api v0.0.0-20260610212136-7ab31c22f7ad h1:3iLyITS/sySRwbUKoC7ogfj2Yr1Cjs0pfaRKj5U5HEw= +google.golang.org/genproto/googleapis/api v0.0.0-20260610212136-7ab31c22f7ad/go.mod h1:KdNqO+rCIWgFumrNBSEDlDNrkrQnpkax7Tv1WxNY8V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610212136-7ab31c22f7ad h1:45WmJvIV6C2+O/jjLkPUH+F3aOj/1miDoU2DD0+NWbg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260610212136-7ab31c22f7ad/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= +google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/otlp/handler.go b/otlp/handler.go index 6c7ab9a..0d7811d 100644 --- a/otlp/handler.go +++ b/otlp/handler.go @@ -28,8 +28,23 @@ const ( DefaultFlushInterval = 10 * time.Second ) -// Status: Alpha. This Handler is still in heavy development phase. Do not use -// in production. +// Deprecated: Handler is deprecated and will be removed in v6. +// Use SDKHandler instead, which provides the official OpenTelemetry SDK +// with full gRPC and HTTP support, environment variable configuration, +// and automatic resource detection. +// +// Migration example: +// +// // Old (deprecated) +// handler := &otlp.Handler{ +// Client: otlp.NewHTTPClient("http://localhost:4318"), +// } +// +// // New (recommended) +// handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ +// Protocol: otlp.ProtocolHTTPProtobuf, +// EndpointURL: "http://localhost:4318", +// }) // // Handler implements stats.Handler to forward metrics to an OpenTelemetry // destination. Usually an OpenTelemetry Collector. diff --git a/otlp/measure.go b/otlp/measure.go index c06b13b..c00b1de 100644 --- a/otlp/measure.go +++ b/otlp/measure.go @@ -104,17 +104,17 @@ func valueOf(v stats.Value) float64 { } func tagsToAttributes(tags ...stats.Tag) []*commonpb.KeyValue { - attr := make([]*commonpb.KeyValue, 0, len(tags)) + attr := make([]*commonpb.KeyValue, len(tags)) - for _, tag := range tags { - attr = append(attr, &commonpb.KeyValue{ + for i, tag := range tags { + attr[i] = &commonpb.KeyValue{ Key: tag.Name, Value: &commonpb.AnyValue{ Value: &commonpb.AnyValue_StringValue{ StringValue: tag.Value, }, }, - }) + } } return attr diff --git a/otlp/sdk_handler.go b/otlp/sdk_handler.go new file mode 100644 index 0000000..52399ac --- /dev/null +++ b/otlp/sdk_handler.go @@ -0,0 +1,497 @@ +package otlp + +import ( + "context" + "fmt" + "log/slog" + "os" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + otelmetric "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/segmentio/stats/v5" +) + +// Protocol defines the transport protocol for OTLP export +type Protocol string + +const ( + // ProtocolGRPC uses gRPC transport + ProtocolGRPC Protocol = "grpc" + // ProtocolHTTPProtobuf uses HTTP with protobuf encoding + ProtocolHTTPProtobuf Protocol = "http/protobuf" +) + +const ( + // DefaultHistogramMaxSize is the default maximum number of buckets used for + // exponential histograms when ExponentialHistogram is enabled. + DefaultHistogramMaxSize int32 = 160 + + // DefaultHistogramMaxScale is the default maximum scale (resolution) used + // for exponential histograms when ExponentialHistogram is enabled. + DefaultHistogramMaxScale int32 = 20 + + // resourceDetectionTimeout bounds how long defaultResource will wait for + // host and process resource detection before giving up. Resource detection + // should never block handler creation indefinitely. + resourceDetectionTimeout = 10 * time.Second +) + +// protocolFromEnv resolves the transport protocol from the standard +// OpenTelemetry environment variables, following the spec's precedence: the +// metrics-specific OTEL_EXPORTER_OTLP_METRICS_PROTOCOL takes priority over the +// generic OTEL_EXPORTER_OTLP_PROTOCOL. When neither is set it defaults to gRPC. +// An unrecognized value is an error, matching the behavior of the official +// autoexport package. +// +// We resolve this one variable ourselves because the otlpmetricgrpc and +// otlpmetrichttp exporters -- each tied to a single transport -- do not read +// the protocol selector themselves. +func protocolFromEnv() (Protocol, error) { + envVar := "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL" + proto := os.Getenv(envVar) + if proto == "" { + envVar = "OTEL_EXPORTER_OTLP_PROTOCOL" + proto = os.Getenv(envVar) + } + switch proto { + case "": + return ProtocolGRPC, nil + case string(ProtocolGRPC): + return ProtocolGRPC, nil + case string(ProtocolHTTPProtobuf): + return ProtocolHTTPProtobuf, nil + default: + return "", fmt.Errorf("invalid OTLP protocol %q from %s - should be one of %q or %q", + proto, envVar, ProtocolGRPC, ProtocolHTTPProtobuf) + } +} + +// defaultResource builds the resource used when SDKConfig.Resource is nil. It +// starts from resource.Default() -- which supplies the service.name fallback, +// telemetry.sdk.*, and any OTEL_RESOURCE_ATTRIBUTES/OTEL_SERVICE_NAME values -- +// and merges host and process detection on top. resource.New does not fold in +// Default() itself, so the merge is explicit; the detected host/process +// attributes win on any shared key. +func defaultResource(ctx context.Context) (*resource.Resource, error) { + // Bound resource detection so a slow or unreachable detector can't hang + // handler creation forever. + ctx, cancel := context.WithTimeout(ctx, resourceDetectionTimeout) + defer cancel() + + extra, err := resource.New(ctx, + resource.WithHost(), + resource.WithProcess(), + ) + if err != nil { + return nil, fmt.Errorf("failed to detect resource attributes: %w", err) + } + res, err := resource.Merge(resource.Default(), extra) + if err != nil { + return nil, fmt.Errorf("failed to merge resource attributes: %w", err) + } + return res, nil +} + +// SDKHandler implements stats.Handler using the official OpenTelemetry SDK. +// It bridges stats metrics to OTel metrics and supports both HTTP and gRPC transports. +// +// This handler supports all standard OpenTelemetry environment variables: +// - OTEL_EXPORTER_OTLP_ENDPOINT +// - OTEL_EXPORTER_OTLP_PROTOCOL (grpc, http/protobuf) +// - OTEL_EXPORTER_OTLP_HEADERS +// - OTEL_EXPORTER_OTLP_TIMEOUT +// - OTEL_RESOURCE_ATTRIBUTES +// - OTEL_SERVICE_NAME +// - And more... +// +// Example usage: +// +// handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ +// Protocol: otlp.ProtocolGRPC, +// EndpointURL: "http://localhost:4317", +// }) +// if err != nil { +// log.Fatal(err) +// } +// defer handler.Shutdown(ctx) +// stats.Register(handler) +type SDKHandler struct { + provider *sdkmetric.MeterProvider + meter otelmetric.Meter + shutdownCtx context.Context // Context for shutdown operations only + mu sync.RWMutex + instruments map[string]instrument + resourceAttrs []attribute.KeyValue +} + +type instrument struct { + counter otelmetric.Int64Counter + gauge otelmetric.Float64Gauge + histogram otelmetric.Float64Histogram +} + +// SDKConfig contains configuration for the OpenTelemetry SDK handler +type SDKConfig struct { + // Protocol specifies the transport protocol (grpc or http/protobuf). + // If empty, the OTEL_EXPORTER_OTLP_METRICS_PROTOCOL and + // OTEL_EXPORTER_OTLP_PROTOCOL environment variables are consulted (in that + // order), defaulting to gRPC when neither is set. An unrecognized + // environment value causes NewSDKHandler to return an error. + Protocol Protocol + + // EndpointURL specifies the full OTLP endpoint URL. + // + // Note: this is deliberately "EndpointURL", not "Endpoint". The underlying + // exporters expose both WithEndpoint (host:port, no scheme) and + // WithEndpointURL (full URL with scheme), and the two are easy to confuse. + // This handler always uses WithEndpointURL, so the value MUST include the + // scheme (http:// or https://). + // For gRPC: "http://localhost:4317" or "https://api.example.com:4317" + // For HTTP: "http://localhost:4318" or "https://api.example.com:4318" + // If empty, uses OTEL_EXPORTER_OTLP_ENDPOINT environment variable + // or SDK defaults (http://localhost:4317 for gRPC, http://localhost:4318 for HTTP) + EndpointURL string + + // Resource specifies the resource attributes for all metrics. + // If nil, a resource is built from the SDK defaults: environment + // (OTEL_RESOURCE_ATTRIBUTES, OTEL_SERVICE_NAME), telemetry SDK, host, and + // process attributes. Cloud and Kubernetes detection is not included by + // default; supply a Resource built with the relevant + // go.opentelemetry.io/contrib/detectors/* packages to add it. + Resource *resource.Resource + + // ExportInterval specifies how often to export metrics + // If zero or not set, uses the SDK default (60 seconds) + ExportInterval time.Duration + + // ExportTimeout specifies the maximum amount of time to wait for a single + // export request to the server to complete. This is distinct from + // ExportInterval, which controls how often exports happen. + // If zero or not set, uses the SDK default (30 seconds) + ExportTimeout time.Duration + + // HTTPOptions are additional options for HTTP protocol. + // Only used when Protocol is ProtocolHTTPProtobuf. + // See the available options at + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp#Option + HTTPOptions []otlpmetrichttp.Option + + // GRPCOptions are additional options for gRPC protocol. + // Only used when Protocol is ProtocolGRPC. + // See the available options at + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc#Option + GRPCOptions []otlpmetricgrpc.Option + + // ExponentialHistogram enables exponential histogram aggregation for histogram metrics. + // When true, histograms use base-2 exponential buckets which provide better accuracy + // and lower memory overhead compared to explicit bucket histograms. + // Default: false (uses explicit bucket histograms) + ExponentialHistogram bool + + // ExponentialHistogramMaxSize sets the maximum number of buckets for exponential histograms. + // Larger values provide better accuracy but use more memory. + // Default: DefaultHistogramMaxSize (if ExponentialHistogram is true) + // Ignored if ExponentialHistogram is false + ExponentialHistogramMaxSize int32 + + // ExponentialHistogramMaxScale sets the maximum scale (resolution) for exponential histograms. + // Higher values provide finer bucket granularity. + // Valid range: -10 to 20 + // Default: DefaultHistogramMaxScale (if ExponentialHistogram is true) + // Ignored if ExponentialHistogram is false + ExponentialHistogramMaxScale int32 + + // TemporalitySelector determines the temporality (cumulative vs delta) for each instrument kind. + // If nil, uses DefaultTemporalitySelector which returns CumulativeTemporality for all instruments. + // This is recommended for Prometheus and most OTLP backends. + // + // Available selectors: + // - sdkmetric.DefaultTemporalitySelector: Cumulative for all (default, Prometheus-compatible) + // - sdkmetric.CumulativeTemporalitySelector: Cumulative for all + // - sdkmetric.DeltaTemporalitySelector: Delta for all + // - sdkmetric.LowMemoryTemporalitySelector: Delta for Counters/Histograms, Cumulative for UpDownCounters + TemporalitySelector sdkmetric.TemporalitySelector +} + +// NewSDKHandler creates a new handler using the OpenTelemetry SDK. +// It builds a resource from the SDK defaults (environment, telemetry SDK, host, +// and process attributes) and supports the standard OTEL environment variables. +func NewSDKHandler(ctx context.Context, config SDKConfig) (*SDKHandler, error) { + // Set defaults for histogram configuration + if config.ExponentialHistogram { + if config.ExponentialHistogramMaxSize == 0 { + config.ExponentialHistogramMaxSize = DefaultHistogramMaxSize + } + if config.ExponentialHistogramMaxScale == 0 { + config.ExponentialHistogramMaxScale = DefaultHistogramMaxScale + } + } + + // Create resource if not provided. + res := config.Resource + if res == nil { + var err error + if res, err = defaultResource(ctx); err != nil { + return nil, err + } + } + + // Determine the transport protocol. An explicit config value always wins; + // otherwise we consult the OTEL_EXPORTER_OTLP_PROTOCOL environment variables + // ourselves. The underlying otlpmetricgrpc/otlpmetrichttp exporters read all + // the other OTEL_EXPORTER_OTLP_* variables (endpoint, headers, timeout, ...) + // on their own, but they do NOT read the protocol selector -- that mapping + // only exists in the autoexport package, which we do not use here. + protocol := config.Protocol + if protocol == "" { + var err error + if protocol, err = protocolFromEnv(); err != nil { + return nil, err + } + } + + // Create exporter based on protocol + var exporter sdkmetric.Exporter + var err error + + switch protocol { + case ProtocolGRPC: + opts := config.GRPCOptions + // Use WithEndpointURL to properly handle http:// scheme + // This avoids a known bug when using WithEndpoint with http:// scheme + if config.EndpointURL != "" { + opts = append([]otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(config.EndpointURL)}, opts...) + } + // Configure temporality if provided (default is cumulative, which is Prometheus-compatible) + if config.TemporalitySelector != nil { + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(config.TemporalitySelector)) + } + exporter, err = otlpmetricgrpc.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC exporter: %w", err) + } + + case ProtocolHTTPProtobuf: + opts := config.HTTPOptions + // Use WithEndpointURL to properly handle the full URL with scheme + if config.EndpointURL != "" { + opts = append([]otlpmetrichttp.Option{otlpmetrichttp.WithEndpointURL(config.EndpointURL)}, opts...) + } + // Configure temporality if provided (default is cumulative, which is Prometheus-compatible) + if config.TemporalitySelector != nil { + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(config.TemporalitySelector)) + } + exporter, err = otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP exporter: %w", err) + } + + default: + return nil, fmt.Errorf("unsupported protocol: %q", protocol) + } + + // Configure histogram aggregation if exponential histograms are enabled + var providerOpts []sdkmetric.Option + providerOpts = append(providerOpts, sdkmetric.WithResource(res)) + + // Configure periodic reader with optional interval and timeout + readerOpts := []sdkmetric.PeriodicReaderOption{} + if config.ExportInterval > 0 { + readerOpts = append(readerOpts, sdkmetric.WithInterval(config.ExportInterval)) + } + if config.ExportTimeout > 0 { + readerOpts = append(readerOpts, sdkmetric.WithTimeout(config.ExportTimeout)) + } + providerOpts = append(providerOpts, sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, readerOpts...))) + + if config.ExponentialHistogram { + // Configure exponential histogram aggregation for all histogram instruments + view := sdkmetric.NewView( + sdkmetric.Instrument{Kind: sdkmetric.InstrumentKindHistogram}, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: config.ExponentialHistogramMaxSize, + MaxScale: config.ExponentialHistogramMaxScale, + }, + }, + ) + providerOpts = append(providerOpts, sdkmetric.WithView(view)) + } + + // Create meter provider with configured options + provider := sdkmetric.NewMeterProvider(providerOpts...) + + return &SDKHandler{ + provider: provider, + meter: provider.Meter("github.com/segmentio/stats"), + shutdownCtx: ctx, + instruments: make(map[string]instrument), + }, nil +} + +// NewSDKHandlerFromEnv creates a handler using only environment variables. +// This is the simplest way to create a handler with full OpenTelemetry support. +// +// It respects all standard OTEL environment variables including: +// - OTEL_EXPORTER_OTLP_ENDPOINT (full URL with scheme, e.g., http://localhost:4317) +// - OTEL_EXPORTER_OTLP_PROTOCOL (grpc or http/protobuf) +// - OTEL_EXPORTER_OTLP_HEADERS +// - OTEL_RESOURCE_ATTRIBUTES +// - OTEL_SERVICE_NAME +func NewSDKHandlerFromEnv(ctx context.Context) (*SDKHandler, error) { + // The SDK exporters will automatically read all environment variables + return NewSDKHandler(ctx, SDKConfig{}) +} + +// HandleMeasures implements stats.Handler +func (h *SDKHandler) HandleMeasures(t time.Time, measures ...stats.Measure) { + // Use background context for recording metrics to avoid context cancellation issues + // The shutdownCtx is only used for shutdown operations + ctx := context.Background() + + for _, measure := range measures { + for _, field := range measure.Fields { + metricName := measure.Name + "." + field.Name + attrs := h.tagsToAttributes(measure.Tags) + + h.mu.RLock() + inst, exists := h.instruments[metricName] + h.mu.RUnlock() + + if !exists { + h.mu.Lock() + // Double-check after acquiring write lock + inst, exists = h.instruments[metricName] + if !exists { + inst = h.createInstruments(h.meter, metricName, field.Type()) + h.instruments[metricName] = inst + } + h.mu.Unlock() + } + + h.recordMetric(ctx, inst, field, metricName, attrs) + } + } +} + +// createInstruments creates OTel instruments based on field type +func (h *SDKHandler) createInstruments(meter otelmetric.Meter, name string, fieldType stats.FieldType) instrument { + var inst instrument + + switch fieldType { + case stats.Counter: + counter, err := meter.Int64Counter(name) + if err != nil { + slog.Error("stats/otlp: failed to create counter", "name", name, "error", err) + } + inst.counter = counter + + case stats.Gauge: + // Use Float64Gauge for gauges (synchronous gauge instrument) + gauge, err := meter.Float64Gauge(name) + if err != nil { + slog.Error("stats/otlp: failed to create gauge", "name", name, "error", err) + } + inst.gauge = gauge + + case stats.Histogram: + histogram, err := meter.Float64Histogram(name) + if err != nil { + slog.Error("stats/otlp: failed to create histogram", "name", name, "error", err) + } + inst.histogram = histogram + } + + return inst +} + +// recordMetric records a metric value to the appropriate instrument +func (h *SDKHandler) recordMetric(ctx context.Context, inst instrument, field stats.Field, metricName string, attrs []attribute.KeyValue) { + opts := otelmetric.WithAttributes(attrs...) + + switch field.Type() { + case stats.Counter: + if inst.counter != nil { + inst.counter.Add(ctx, h.valueToInt64(field.Value), opts) + } + + case stats.Gauge: + if inst.gauge != nil { + // Gauges record instantaneous values directly + inst.gauge.Record(ctx, h.valueToFloat64(field.Value), opts) + } + + case stats.Histogram: + if inst.histogram != nil { + inst.histogram.Record(ctx, h.valueToFloat64(field.Value), opts) + } + } +} + +// tagsToAttributes converts stats tags to OTel attributes +func (h *SDKHandler) tagsToAttributes(tags []stats.Tag) []attribute.KeyValue { + attrs := make([]attribute.KeyValue, len(tags)) + for i, tag := range tags { + attrs[i] = attribute.String(tag.Name, tag.Value) + } + return attrs +} + +// valueToInt64 converts stats.Value to int64 for counters +func (h *SDKHandler) valueToInt64(v stats.Value) int64 { + switch v.Type() { + case stats.Bool: + if v.Bool() { + return 1 + } + return 0 + case stats.Int: + return v.Int() + case stats.Uint: + return int64(v.Uint()) + case stats.Float: + return int64(v.Float()) + case stats.Duration: + return int64(v.Duration().Nanoseconds()) + } + return 0 +} + +// valueToFloat64 converts stats.Value to float64 for gauges and histograms +func (h *SDKHandler) valueToFloat64(v stats.Value) float64 { + switch v.Type() { + case stats.Bool: + if v.Bool() { + return 1.0 + } + return 0.0 + case stats.Int: + return float64(v.Int()) + case stats.Uint: + return float64(v.Uint()) + case stats.Float: + return v.Float() + case stats.Duration: + return v.Duration().Seconds() + } + return 0.0 +} + +// Flush implements stats.Flusher +func (h *SDKHandler) Flush() { + if err := h.provider.ForceFlush(h.shutdownCtx); err != nil { + slog.Error("stats/otlp: failed to flush", "error", err) + } +} + +// Shutdown gracefully shuts down the handler and exports any remaining metrics +func (h *SDKHandler) Shutdown(ctx context.Context) error { + return h.provider.Shutdown(ctx) +} diff --git a/otlp/sdk_handler_integration_test.go b/otlp/sdk_handler_integration_test.go new file mode 100644 index 0000000..64c759a --- /dev/null +++ b/otlp/sdk_handler_integration_test.go @@ -0,0 +1,267 @@ +package otlp + +import ( + "context" + "net" + "strings" + "sync" + "testing" + "time" + + collectormetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/grpc" + + "github.com/segmentio/stats/v5" +) + +// captureCollector is an in-process OTLP/gRPC metrics collector that records +// every ExportMetricsServiceRequest it receives. It lets the tests assert on +// what actually arrives over the wire rather than on internal handler state. +type captureCollector struct { + collectormetricspb.UnimplementedMetricsServiceServer + + mu sync.Mutex + requests []*collectormetricspb.ExportMetricsServiceRequest + received chan struct{} // signalled on every Export +} + +func (c *captureCollector) Export(ctx context.Context, req *collectormetricspb.ExportMetricsServiceRequest) (*collectormetricspb.ExportMetricsServiceResponse, error) { + c.mu.Lock() + c.requests = append(c.requests, req) + c.mu.Unlock() + select { + case c.received <- struct{}{}: + default: + } + return &collectormetricspb.ExportMetricsServiceResponse{}, nil +} + +// metrics flattens every metric across all captured requests. +func (c *captureCollector) metrics() []*metricspb.Metric { + c.mu.Lock() + defer c.mu.Unlock() + var out []*metricspb.Metric + for _, req := range c.requests { + for _, rm := range req.GetResourceMetrics() { + for _, sm := range rm.GetScopeMetrics() { + out = append(out, sm.GetMetrics()...) + } + } + } + return out +} + +// startCaptureCollector starts a gRPC OTLP collector on a loopback address and +// returns the collector plus the "host:port" endpoint it listens on. +func startCaptureCollector(t *testing.T) (*captureCollector, string) { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + coll := &captureCollector{received: make(chan struct{}, 16)} + srv := grpc.NewServer() + collectormetricspb.RegisterMetricsServiceServer(srv, coll) + + go func() { _ = srv.Serve(lis) }() + t.Cleanup(srv.Stop) + + return coll, lis.Addr().String() +} + +// waitForExport blocks until the collector has received at least one request or +// the deadline elapses. +func (c *captureCollector) waitForExport(t *testing.T, timeout time.Duration) { + t.Helper() + select { + case <-c.received: + case <-time.After(timeout): + t.Fatal("timed out waiting for the collector to receive an export") + } +} + +// TestSDKHandler_ExportsOverGRPC drives a measure through the handler to a real +// in-process gRPC collector and asserts on what arrives over the wire. This is +// the end-to-end behavior check that the unit tests (which only inspect +// handler.instruments) cannot provide. +func TestSDKHandler_ExportsOverGRPC(t *testing.T) { + coll, endpoint := startCaptureCollector(t) + + ctx := context.Background() + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + // The exporter applies OTEL_EXPORTER_OTLP_INSECURE etc. from env, but an + // http:// scheme on the endpoint selects an insecure connection directly. + EndpointURL: "http://" + endpoint, + ExportInterval: 50 * time.Millisecond, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + handler.HandleMeasures(time.Now(), stats.Measure{ + Name: "wire.test", + Fields: []stats.Field{stats.MakeField("count", 7, stats.Counter)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + handler.Flush() + coll.waitForExport(t, 5*time.Second) + + metrics := coll.metrics() + var found *metricspb.Metric + for _, m := range metrics { + if m.GetName() == "wire.test.count" { + found = m + break + } + } + if found == nil { + t.Fatalf("metric wire.test.count not received; got %d metrics", len(metrics)) + } + + sum := found.GetSum() + if sum == nil { + t.Fatalf("expected a Sum for a counter, got %v", found.GetData()) + } + points := sum.GetDataPoints() + if len(points) != 1 { + t.Fatalf("expected 1 data point, got %d", len(points)) + } + if got := points[0].GetAsInt(); got != 7 { + t.Errorf("expected counter value 7, got %d", got) + } + + // Assert the tag survived as an attribute. + var sawTag bool + for _, attr := range points[0].GetAttributes() { + if attr.GetKey() == "env" && attr.GetValue().GetStringValue() == "test" { + sawTag = true + } + } + if !sawTag { + t.Errorf("expected attribute env=test on the data point") + } +} + +// TestSDKHandler_ProtocolEnvVarSelectsGRPC proves that, with config.Protocol +// empty, OTEL_EXPORTER_OTLP_PROTOCOL=grpc is honored: the metric reaches our +// in-process gRPC collector. OTEL_EXPORTER_OTLP_ENDPOINT (read by the exporter +// itself, not by us) routes it there. +func TestSDKHandler_ProtocolEnvVarSelectsGRPC(t *testing.T) { + coll, endpoint := startCaptureCollector(t) + + t.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://"+endpoint) + + ctx := context.Background() + handler, err := NewSDKHandler(ctx, SDKConfig{ + // Protocol deliberately left empty so the env var decides. + ExportInterval: 50 * time.Millisecond, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + handler.HandleMeasures(time.Now(), stats.Measure{ + Name: "proto.env.grpc", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + }) + handler.Flush() + coll.waitForExport(t, 5*time.Second) + + for _, m := range coll.metrics() { + if m.GetName() == "proto.env.grpc.count" { + return + } + } + t.Fatal("metric did not arrive over gRPC despite OTEL_EXPORTER_OTLP_PROTOCOL=grpc") +} + +// TestSDKHandler_MetricsProtocolEnvVarPrecedence proves the metrics-specific +// variable wins over the generic one. The generic var asks for http/protobuf, +// but the metrics-specific var asks for grpc, so the export must reach the gRPC +// collector. +func TestSDKHandler_MetricsProtocolEnvVarPrecedence(t *testing.T) { + coll, endpoint := startCaptureCollector(t) + + t.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") + t.Setenv("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "grpc") + t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://"+endpoint) + + ctx := context.Background() + handler, err := NewSDKHandler(ctx, SDKConfig{ExportInterval: 50 * time.Millisecond}) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + handler.HandleMeasures(time.Now(), stats.Measure{ + Name: "proto.env.precedence", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + }) + handler.Flush() + coll.waitForExport(t, 5*time.Second) + + for _, m := range coll.metrics() { + if m.GetName() == "proto.env.precedence.count" { + return + } + } + t.Fatal("metrics-specific protocol var did not take precedence over the generic one") +} + +// TestSDKHandler_InvalidProtocolEnvVar proves an unrecognized protocol value is +// rejected with an error rather than silently defaulting. +func TestSDKHandler_InvalidProtocolEnvVar(t *testing.T) { + t.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/json") + + _, err := NewSDKHandler(context.Background(), SDKConfig{}) + if err == nil { + t.Fatal("expected an error for an unsupported OTEL_EXPORTER_OTLP_PROTOCOL value") + } + // The error should name the offending value and the env var it came from. + if msg := err.Error(); !strings.Contains(msg, "http/json") || + !strings.Contains(msg, "OTEL_EXPORTER_OTLP_PROTOCOL") { + t.Errorf("error should name the bad value and source env var, got: %v", err) + } +} + +// TestSDKHandler_ExplicitProtocolOverridesEnv proves config.Protocol takes +// precedence over the environment: even with an invalid env value, an explicit +// gRPC protocol is used and the export succeeds. +func TestSDKHandler_ExplicitProtocolOverridesEnv(t *testing.T) { + coll, endpoint := startCaptureCollector(t) + + t.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/json") // would error if consulted + + ctx := context.Background() + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, // explicit, wins over env + EndpointURL: "http://" + endpoint, + ExportInterval: 50 * time.Millisecond, + }) + if err != nil { + t.Fatalf("explicit protocol should bypass env validation, got error: %v", err) + } + defer handler.Shutdown(ctx) + + handler.HandleMeasures(time.Now(), stats.Measure{ + Name: "proto.explicit", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + }) + handler.Flush() + coll.waitForExport(t, 5*time.Second) + + for _, m := range coll.metrics() { + if m.GetName() == "proto.explicit.count" { + return + } + } + t.Fatal("explicit gRPC protocol export did not arrive") +} diff --git a/otlp/sdk_handler_test.go b/otlp/sdk_handler_test.go new file mode 100644 index 0000000..38db61c --- /dev/null +++ b/otlp/sdk_handler_test.go @@ -0,0 +1,393 @@ +package otlp + +import ( + "context" + "testing" + "time" + + "github.com/segmentio/stats/v5" +) + +func TestSDKHandler_HandleMeasures(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with gRPC protocol + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test counter + handler.HandleMeasures(now, stats.Measure{ + Name: "test.counter", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Test gauge + handler.HandleMeasures(now, stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 42.5, stats.Gauge)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Test histogram + handler.HandleMeasures(now, stats.Measure{ + Name: "test.histogram", + Fields: []stats.Field{stats.MakeField("duration", 100, stats.Histogram)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Flush metrics + handler.Flush() + + // Verify instruments were created + if len(handler.instruments) != 3 { + t.Errorf("expected 3 instruments, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_HTTP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with HTTP protocol + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolHTTPProtobuf, + EndpointURL: "http://localhost:4318", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test basic metric + handler.HandleMeasures(now, stats.Measure{ + Name: "http.test", + Fields: []stats.Field{stats.MakeField("requests", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "method", Value: "GET"}}, + }) + + handler.Flush() +} + +func TestSDKHandler_FromEnv(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // This test demonstrates using environment variables + // In real usage, OTEL_EXPORTER_OTLP_ENDPOINT and other vars would be set + handler, err := NewSDKHandlerFromEnv(ctx) + if err != nil { + t.Fatalf("failed to create handler from env: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + handler.HandleMeasures(now, stats.Measure{ + Name: "env.test", + Fields: []stats.Field{stats.MakeField("value", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "source", Value: "env"}}, + }) +} + +func TestSDKHandler_MultipleMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Send multiple measures in one call + handler.HandleMeasures(now, + stats.Measure{ + Name: "app.requests", + Fields: []stats.Field{stats.MakeField("count", 100, stats.Counter)}, + Tags: []stats.Tag{{Name: "status", Value: "200"}}, + }, + stats.Measure{ + Name: "app.requests", + Fields: []stats.Field{stats.MakeField("count", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "status", Value: "404"}}, + }, + stats.Measure{ + Name: "app.latency", + Fields: []stats.Field{ + stats.MakeField("p50", 50, stats.Histogram), + stats.MakeField("p99", 200, stats.Histogram), + }, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }, + ) + + handler.Flush() + + // Should have created 4 instruments: + // app.requests.count (2 tag variations share same instrument) + // app.latency.p50 + // app.latency.p99 + if len(handler.instruments) < 3 { + t.Errorf("expected at least 3 instruments, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_ValueConversion(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test different value types + testCases := []struct { + name string + value interface{} + fieldType stats.FieldType + }{ + {"int", int(42), stats.Counter}, + {"uint", uint(42), stats.Counter}, + {"float", float64(42.5), stats.Gauge}, + {"duration", time.Second, stats.Histogram}, + {"bool_true", true, stats.Counter}, + {"bool_false", false, stats.Counter}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + handler.HandleMeasures(now, stats.Measure{ + Name: "conversion.test", + Fields: []stats.Field{stats.MakeField(tc.name, tc.value, tc.fieldType)}, + Tags: []stats.Tag{{Name: "type", Value: tc.name}}, + }) + }) + } + + handler.Flush() +} + +func TestSDKHandler_GaugeBehavior(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test that gauges maintain absolute values, not cumulative + // Set gauge to 100 + handler.HandleMeasures(now, stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 100, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Set gauge to 50 (should be 50, not 150) + handler.HandleMeasures(now.Add(time.Second), stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 50, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Set gauge to 75 (should be 75, not 125 or 225) + handler.HandleMeasures(now.Add(2*time.Second), stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 75, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Gauges now use native Float64Gauge which maintains absolute values directly + // No need to track internal state - the OTel SDK handles this + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_ExponentialHistogram(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with exponential histogram enabled + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, + ExponentialHistogramMaxScale: 20, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test histogram with exponential aggregation + handler.HandleMeasures(now, stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 100, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.HandleMeasures(now.Add(time.Millisecond), stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 250, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.HandleMeasures(now.Add(2*time.Millisecond), stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 150, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_CumulativeTemporality(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with default (cumulative) temporality + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 1 * time.Second, + // TemporalitySelector: nil means default cumulative temporality + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test counter - should accumulate + handler.HandleMeasures(now, stats.Measure{ + Name: "requests", + Fields: []stats.Field{stats.MakeField("count", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api"}}, + }) + + handler.HandleMeasures(now.Add(time.Second), stats.Measure{ + Name: "requests", + Fields: []stats.Field{stats.MakeField("count", 15, stats.Counter)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api"}}, + }) + + // With cumulative temporality, counters accumulate (10 + 15 = 25 total) + // The SDK handles this internally + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func TestDefaultResource(t *testing.T) { + res, err := defaultResource(context.Background()) + if err != nil { + t.Fatalf("defaultResource failed: %v", err) + } + + attrs := make(map[string]struct{}) + for _, kv := range res.Attributes() { + attrs[string(kv.Key)] = struct{}{} + } + + // This test verifies the merge wires both sources together. Note that + // resource.Default() is memoized on its first call process-wide, so the + // values it derives from OTEL_SERVICE_NAME / OTEL_RESOURCE_ATTRIBUTES cannot + // be reliably exercised here (another test may have frozen Default() before + // this one set those vars). We assert on attribute *presence* instead: + // + // - telemetry.sdk.* and service.name come from resource.Default() + // - host.name / process.pid come from the WithHost()/WithProcess() + // detectors merged on top + for _, key := range []string{ + "telemetry.sdk.name", // from Default() + "service.name", // from Default() (real name or unknown_service fallback) + "host.name", // from the merged WithHost() detector + "process.pid", // from the merged WithProcess() detector + } { + if _, ok := attrs[key]; !ok { + t.Errorf("merged resource missing expected attribute %q", key) + } + } +} + +func BenchmarkSDKHandler_HandleMeasures(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + EndpointURL: "http://localhost:4317", + ExportInterval: 10 * time.Second, + }) + if err != nil { + b.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + measure := stats.Measure{ + Name: "benchmark.test", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "env", Value: "bench"}}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + handler.HandleMeasures(now, measure) + } +} diff --git a/tag.go b/tag.go index cc7d551..dce9751 100644 --- a/tag.go +++ b/tag.go @@ -24,9 +24,11 @@ func (t Tag) String() string { // M allows for creating a tag list from a map. func M(m map[string]string) []Tag { - tags := make([]Tag, 0, len(m)) + tags := make([]Tag, len(m)) + i := 0 for k, v := range m { - tags = append(tags, T(k, v)) + tags[i] = T(k, v) + i++ } return tags } diff --git a/version/version.go b/version/version.go index 263615f..eb4aaab 100644 --- a/version/version.go +++ b/version/version.go @@ -6,7 +6,7 @@ import ( "sync" ) -const Version = "5.8.0" +const Version = "5.9.0" var ( vsnOnce sync.Once