From d47bf99f0c561638c25d39609cb7b33eb346abb4 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 5 May 2026 15:37:35 +0530 Subject: [PATCH 1/4] Telemetry: stop double-retrying, identify traffic, tune breaker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After v1.11.0 enabled telemetry by default via the server feature flag, high-QPS workloads produced excessive 429s on /telemetry-ext. Three issues compounded: 1. Double-retry. The exporter ran its own retry loop on top of the retryablehttp-wrapped HTTP client (internal/client.RetryableClient), which already retries 429/5xx with Retry-After. Result: up to RetryMax * (MaxRetries+1) HTTP attempts per export, all collapsed into one circuit-breaker outcome — so the breaker barely opened. 2. Untraceable in access logs. Telemetry POSTs and feature-flag GETs sent no User-Agent, so 429s were tagged Go-http-client/1.1 and could not be attributed to godatabrickssqlconnector by version. 3. High request volume. FlushInterval=5s, BatchSize=100. Changes: - telemetry/exporter.go: drop the retry loop entirely. doExport now makes a single HTTP request; transient retries (429/5xx, Retry-After) are owned by the underlying retryablehttp client. Each export call → exactly one breaker outcome. - telemetry/exporter.go, telemetry/featureflag.go: set User-Agent header on telemetry POST and feature-flag GET. Built once at the connector site (buildUserAgent in connector.go), mirroring internal/client/client.go format (DriverName/DriverVersion + UserAgentEntry + agent product), and plumbed via TelemetryInitOptions.UserAgent. - telemetry/config.go: FlushInterval 5s → 30s, BatchSize 100 → 200. Remove MaxRetries/RetryDelay from telemetry.Config and TelemetryInitOptions; telemetry_retry_count/_delay DSN params still parse for backwards compat but are no-ops. - telemetry/circuitbreaker.go: lower minimumNumberOfCalls 20 → 10 (so low-traffic clients can still trip the breaker on a sustained outage now that each export is one signal), and raise waitDurationInOpenState 30s → 60s (respect typical Retry-After). - Tests: removed obsolete retry/backoff tests; added single-attempt assertion across 4xx/429/5xx; added User-Agent assertions on both endpoints. Co-authored-by: Isaac --- connector.go | 19 ++- telemetry/aggregator_test.go | 12 +- telemetry/benchmark_test.go | 17 ++- telemetry/circuitbreaker.go | 19 ++- telemetry/client.go | 4 +- telemetry/config.go | 52 ++----- telemetry/config_test.go | 127 ++++------------- telemetry/driver_integration.go | 34 ++--- telemetry/exporter.go | 87 ++++-------- telemetry/exporter_test.go | 235 +++++++------------------------- telemetry/featureflag.go | 9 +- telemetry/featureflag_test.go | 44 ++++-- telemetry/integration_test.go | 13 +- telemetry/manager.go | 4 +- telemetry/manager_test.go | 36 ++--- 15 files changed, 234 insertions(+), 478 deletions(-) diff --git a/connector.go b/connector.go index 4d019651..d86d5679 100644 --- a/connector.go +++ b/connector.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/databricks-sql-go/auth/tokenprovider" "github.com/databricks/databricks-sql-go/driverctx" dbsqlerr "github.com/databricks/databricks-sql-go/errors" + "github.com/databricks/databricks-sql-go/internal/agent" "github.com/databricks/databricks-sql-go/internal/cli_service" "github.com/databricks/databricks-sql-go/internal/client" "github.com/databricks/databricks-sql-go/internal/config" @@ -95,12 +96,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{ Host: c.cfg.Host, DriverVersion: c.cfg.DriverVersion, + UserAgent: buildUserAgent(c.cfg), HTTPClient: telemetryClient, EnableTelemetry: c.cfg.EnableTelemetry, BatchSize: c.cfg.TelemetryBatchSize, FlushInterval: c.cfg.TelemetryFlushInterval, - RetryCount: c.cfg.TelemetryRetryCount, - RetryDelay: c.cfg.TelemetryRetryDelay, }) if conn.telemetry != nil { log.Debug().Msg("telemetry initialized for connection") @@ -117,6 +117,21 @@ func (c *connector) Driver() driver.Driver { return &databricksDriver{} } +// buildUserAgent constructs the User-Agent header value used by the driver. +// Mirrors the format set on the Thrift HTTP client in +// internal/client/client.go so telemetry, feature-flag, and Thrift requests +// all carry the same identifier. +func buildUserAgent(cfg *config.Config) string { + userAgent := fmt.Sprintf("%s/%s", cfg.DriverName, cfg.DriverVersion) + if cfg.UserAgentEntry != "" { + userAgent = fmt.Sprintf("%s/%s (%s)", cfg.DriverName, cfg.DriverVersion, cfg.UserAgentEntry) + } + if agentProduct := agent.Detect(); agentProduct != "" { + userAgent = fmt.Sprintf("%s agent/%s", userAgent, agentProduct) + } + return userAgent +} + var _ driver.Connector = (*connector)(nil) type ConnOption func(*config.Config) diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go index f98c44c5..0bc2a566 100644 --- a/telemetry/aggregator_test.go +++ b/telemetry/aggregator_test.go @@ -40,7 +40,7 @@ func TestAggregatorClose_WaitsForInFlightWorkerExports(t *testing.T) { cfg.BatchSize = 1 // one metric per batch → one worker export per metric httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -98,7 +98,7 @@ func TestAggregatorClose_DrainsPendingQueueJobsBeforeCancel(t *testing.T) { cfg.BatchSize = 100 // large batch — won't auto-flush on size httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // Use a single-worker aggregator with a tiny queue to make the "pending in queue" // scenario deterministic: we manually call flushUnlocked to enqueue a job. @@ -150,7 +150,7 @@ func TestAggregatorFlushUnlocked_InFlightAddBeforeSend(t *testing.T) { cfg.BatchSize = 1 httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -183,7 +183,7 @@ func TestAggregatorClose_SafeToCallMultipleTimes(t *testing.T) { cfg := DefaultConfig() httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -225,7 +225,7 @@ func TestAggregatorFlushUnlocked_DropWhenQueueFull(t *testing.T) { httpClient := &http.Client{Timeout: 1 * time.Second} // Use a no-op exporter — we never actually export in this test. - exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", httpClient, cfg) + exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) // Cancel the aggregator context immediately so workers stop consuming from the queue. @@ -303,7 +303,7 @@ func TestAggregatorClose_RespectsContextTimeout(t *testing.T) { cfg.BatchSize = 1 httpClient := &http.Client{Timeout: 10 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) // Record a metric that triggers an immediate flush (terminal op). diff --git a/telemetry/benchmark_test.go b/telemetry/benchmark_test.go index ea20e1fe..f42011f9 100644 --- a/telemetry/benchmark_test.go +++ b/telemetry/benchmark_test.go @@ -25,7 +25,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) { cfg.BatchSize = 1000 cfg.FlushInterval = 10 * time.Minute // suppress periodic flush during bench - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -44,7 +44,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) { // The delta between Enabled and Disabled is the pure telemetry cost. func BenchmarkInterceptor_Overhead_Disabled(b *testing.B) { cfg := DefaultConfig() - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -66,7 +66,7 @@ func BenchmarkAggregator_RecordMetric(b *testing.B) { cfg.BatchSize = 10000 cfg.FlushInterval = 10 * time.Minute - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -95,9 +95,8 @@ func BenchmarkExporter_Export(b *testing.B) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 0 httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := make([]*telemetryMetric, 10) for i := range metrics { @@ -136,7 +135,7 @@ func BenchmarkConcurrentConnections_PerHostSharing(b *testing.B) { b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client != nil { _ = manager.releaseClient(host) } @@ -185,7 +184,7 @@ func TestLoadTesting_ConcurrentConnections(t *testing.T) { go func() { defer wg.Done() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client == nil { atomic.AddInt64(&errors, 1) return @@ -237,7 +236,7 @@ func TestGracefulShutdown_ReferenceCountingCleanup(t *testing.T) { // Open 3 connections per host for _, host := range hosts { for i := 0; i < 3; i++ { - if client := manager.getOrCreateClient(host, "test-version", httpClient, cfg); client == nil { + if client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg); client == nil { t.Fatalf("expected client for host %s", host) } } @@ -288,7 +287,7 @@ func TestGracefulShutdown_FinalFlush(t *testing.T) { cfg.FlushInterval = 10 * time.Minute // prevent auto-flush httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go index e399f03f..7088afa6 100644 --- a/telemetry/circuitbreaker.go +++ b/telemetry/circuitbreaker.go @@ -62,14 +62,21 @@ type circuitBreakerConfig struct { permittedCallsInHalfOpen int // Number of test calls in half-open state } -// defaultCircuitBreakerConfig returns default configuration matching JDBC. +// defaultCircuitBreakerConfig returns default configuration. +// +// Each export call is now a single logical request to /telemetry-ext (the +// retryablehttp layer handles transient retries internally), so each breaker +// call corresponds to one observed outcome. minimumNumberOfCalls is set low +// enough that low-traffic clients can still trip the breaker on a sustained +// outage; waitDurationInOpenState is long enough to respect typical +// Retry-After windows from the server. func defaultCircuitBreakerConfig() circuitBreakerConfig { return circuitBreakerConfig{ - failureRateThreshold: 50, // 50% failure rate - minimumNumberOfCalls: 20, // Minimum sample size - slidingWindowSize: 30, // Keep recent 30 calls - waitDurationInOpenState: 30 * time.Second, - permittedCallsInHalfOpen: 3, // Test with 3 calls + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 30, + waitDurationInOpenState: 60 * time.Second, + permittedCallsInHalfOpen: 3, } } diff --git a/telemetry/client.go b/telemetry/client.go index 25c68e88..0432a086 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -31,9 +31,9 @@ type telemetryClient struct { } // newTelemetryClient creates a new telemetry client for the given host. -func newTelemetryClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient { +func newTelemetryClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient { // Create exporter - exporter := newTelemetryExporter(host, driverVersion, httpClient, cfg) + exporter := newTelemetryExporter(host, driverVersion, userAgent, httpClient, cfg) // Create aggregator with exporter aggregator := newMetricsAggregator(exporter, cfg) diff --git a/telemetry/config.go b/telemetry/config.go index 9054cb36..ceb0ac21 100644 --- a/telemetry/config.go +++ b/telemetry/config.go @@ -5,18 +5,6 @@ import ( "net/http" "strconv" "time" - - "github.com/databricks/databricks-sql-go/logger" -) - -const ( - // maxTelemetryRetryCount caps DSN-provided retry count to prevent - // excessive retries from misconfiguration. - maxTelemetryRetryCount = 10 - - // maxTelemetryRetryDelay caps DSN-provided retry delay to prevent - // excessively long backoff from misconfiguration. - maxTelemetryRetryDelay = 30 * time.Second ) // Config holds telemetry configuration. @@ -36,12 +24,6 @@ type Config struct { // FlushInterval is how often to flush metrics FlushInterval time.Duration - // MaxRetries is the maximum number of retry attempts - MaxRetries int - - // RetryDelay is the base delay between retries - RetryDelay time.Duration - // CircuitBreakerEnabled enables circuit breaker protection CircuitBreakerEnabled bool @@ -65,10 +47,8 @@ func DefaultConfig() *Config { return &Config{ Enabled: false, EnableTelemetry: nil, // unset — server feature flag decides - BatchSize: 100, - FlushInterval: 5 * time.Second, - MaxRetries: 3, - RetryDelay: 100 * time.Millisecond, + BatchSize: 200, + FlushInterval: 30 * time.Second, CircuitBreakerEnabled: true, CircuitBreakerThreshold: 5, CircuitBreakerTimeout: 1 * time.Minute, @@ -97,26 +77,10 @@ func ParseTelemetryConfig(params map[string]string) *Config { } } - if v, ok := params["telemetry_retry_count"]; ok { - if n, err := strconv.Atoi(v); err == nil && n >= 0 { - if n > maxTelemetryRetryCount { - logger.Debug().Msgf("telemetry: retry_count %d exceeds max %d, clamping", n, maxTelemetryRetryCount) - n = maxTelemetryRetryCount - } - cfg.MaxRetries = n - } - } - - if v, ok := params["telemetry_retry_delay"]; ok { - if d, err := time.ParseDuration(v); err == nil && d > 0 { - if d > maxTelemetryRetryDelay { - logger.Debug().Msgf("telemetry: retry_delay %v exceeds max %v, clamping", d, maxTelemetryRetryDelay) - d = maxTelemetryRetryDelay - } - cfg.RetryDelay = d - } - } - + // Note: telemetry_retry_count and telemetry_retry_delay DSN parameters + // are accepted for backwards compatibility but are no longer applied + // here. Retries are owned by the underlying retryablehttp-wrapped client + // (see internal/client.RetryableClient), which honors Retry-After. return cfg } @@ -126,12 +90,12 @@ func ParseTelemetryConfig(params map[string]string) *Config { // (databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver). // // In all other cases — explicit opt-out or server flag absent/unreachable — returns false. -func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, httpClient *http.Client) bool { +func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, userAgent string, httpClient *http.Client) bool { if cfg.EnableTelemetry != nil { return *cfg.EnableTelemetry } - serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, httpClient) + serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, userAgent, httpClient) if err != nil { return false } diff --git a/telemetry/config_test.go b/telemetry/config_test.go index 6806b184..099d90a6 100644 --- a/telemetry/config_test.go +++ b/telemetry/config_test.go @@ -17,17 +17,11 @@ func TestDefaultConfig(t *testing.T) { if cfg.EnableTelemetry != nil { t.Error("Expected EnableTelemetry to be nil (unset) by default") } - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize) - } - if cfg.FlushInterval != 5*time.Second { - t.Errorf("Expected FlushInterval 5s, got %v", cfg.FlushInterval) - } - if cfg.MaxRetries != 3 { - t.Errorf("Expected MaxRetries 3, got %d", cfg.MaxRetries) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize 200, got %d", cfg.BatchSize) } - if cfg.RetryDelay != 100*time.Millisecond { - t.Errorf("Expected RetryDelay 100ms, got %v", cfg.RetryDelay) + if cfg.FlushInterval != 30*time.Second { + t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval) } if !cfg.CircuitBreakerEnabled { t.Error("Expected CircuitBreakerEnabled true, got false") @@ -75,24 +69,24 @@ func TestParseTelemetryConfig_BatchSize(t *testing.T) { func TestParseTelemetryConfig_BatchSizeInvalid(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "invalid"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200, got %d", cfg.BatchSize) } } func TestParseTelemetryConfig_BatchSizeZero(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "0"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100 when zero, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200 when zero, got %d", cfg.BatchSize) } } func TestParseTelemetryConfig_BatchSizeNegative(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "-10"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100 when negative, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200 when negative, got %d", cfg.BatchSize) } } @@ -107,77 +101,22 @@ func TestParseTelemetryConfig_FlushInterval(t *testing.T) { func TestParseTelemetryConfig_FlushIntervalInvalid(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_flush_interval": "invalid"}) - if cfg.FlushInterval != 5*time.Second { - t.Errorf("Expected FlushInterval to fallback to 5s, got %v", cfg.FlushInterval) - } -} - -func TestParseTelemetryConfig_RetryCount(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "5"}) - - if cfg.MaxRetries != 5 { - t.Errorf("Expected MaxRetries 5, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountZero(t *testing.T) { - // Zero is valid — it disables retries entirely (unlike batch_size where zero is rejected) - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "0"}) - - if cfg.MaxRetries != 0 { - t.Errorf("Expected MaxRetries 0 (disable retries), got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountInvalid(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "invalid"}) - - if cfg.MaxRetries != 3 { - t.Errorf("Expected MaxRetries to fallback to 3, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryDelay(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "500ms"}) - - if cfg.RetryDelay != 500*time.Millisecond { - t.Errorf("Expected RetryDelay 500ms, got %v", cfg.RetryDelay) - } -} - -func TestParseTelemetryConfig_RetryDelayInvalid(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "invalid"}) - - if cfg.RetryDelay != 100*time.Millisecond { - t.Errorf("Expected RetryDelay to fallback to 100ms, got %v", cfg.RetryDelay) - } -} - -func TestParseTelemetryConfig_RetryCountExceedsCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "15"}) - if cfg.MaxRetries != maxTelemetryRetryCount { - t.Errorf("Expected MaxRetries clamped to %d, got %d", maxTelemetryRetryCount, cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountAtCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "10"}) - if cfg.MaxRetries != 10 { - t.Errorf("Expected MaxRetries 10, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryDelayExceedsCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "60s"}) - if cfg.RetryDelay != maxTelemetryRetryDelay { - t.Errorf("Expected RetryDelay clamped to %v, got %v", maxTelemetryRetryDelay, cfg.RetryDelay) + if cfg.FlushInterval != 30*time.Second { + t.Errorf("Expected FlushInterval to fallback to 30s, got %v", cfg.FlushInterval) } } -func TestParseTelemetryConfig_RetryDelayAtCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "30s"}) - if cfg.RetryDelay != 30*time.Second { - t.Errorf("Expected RetryDelay 30s, got %v", cfg.RetryDelay) +// telemetry_retry_count and telemetry_retry_delay DSN parameters are accepted +// for backwards compatibility but are no longer applied — retries are owned by +// the underlying retryablehttp-wrapped HTTP client. +func TestParseTelemetryConfig_RetryParamsAccepted(t *testing.T) { + cfg := ParseTelemetryConfig(map[string]string{ + "telemetry_retry_count": "5", + "telemetry_retry_delay": "500ms", + }) + // Should parse without error and produce a usable config. + if cfg == nil { + t.Fatal("expected non-nil config") } } @@ -186,8 +125,6 @@ func TestParseTelemetryConfig_AllParams(t *testing.T) { "enableTelemetry": "true", "telemetry_batch_size": "200", "telemetry_flush_interval": "30s", - "telemetry_retry_count": "5", - "telemetry_retry_delay": "250ms", }) if cfg.EnableTelemetry == nil || !*cfg.EnableTelemetry { @@ -199,12 +136,6 @@ func TestParseTelemetryConfig_AllParams(t *testing.T) { if cfg.FlushInterval != 30*time.Second { t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval) } - if cfg.MaxRetries != 5 { - t.Errorf("Expected MaxRetries 5, got %d", cfg.MaxRetries) - } - if cfg.RetryDelay != 250*time.Millisecond { - t.Errorf("Expected RetryDelay 250ms, got %v", cfg.RetryDelay) - } } // TestIsTelemetryEnabled_ExplicitOptOut: client sets enableTelemetry=false → @@ -216,7 +147,7 @@ func TestIsTelemetryEnabled_ExplicitOptOut(t *testing.T) { })) defer server.Close() - result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(false)}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(false)}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when client sets enableTelemetry=false, got enabled") @@ -226,7 +157,7 @@ func TestIsTelemetryEnabled_ExplicitOptOut(t *testing.T) { // TestIsTelemetryEnabled_ExplicitOptIn: client sets enableTelemetry=true → // enabled without any server call (unreachable host proves no network call is made). func TestIsTelemetryEnabled_ExplicitOptIn(t *testing.T) { - result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(true)}, "http://unreachable-host", "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(true)}, "http://unreachable-host", "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if !result { t.Error("Expected telemetry to be enabled when client sets enableTelemetry=true, got disabled") @@ -245,7 +176,7 @@ func TestIsTelemetryEnabled_ServerEnabled(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if !result { t.Error("Expected telemetry to be enabled when server flag is true and EnableTelemetry is nil, got disabled") @@ -264,7 +195,7 @@ func TestIsTelemetryEnabled_ServerDisabled(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server flag is false and EnableTelemetry is nil, got enabled") @@ -282,7 +213,7 @@ func TestIsTelemetryEnabled_ServerError(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server errors and EnableTelemetry is nil, got enabled") @@ -295,7 +226,7 @@ func TestIsTelemetryEnabled_ServerUnreachable(t *testing.T) { flagCache.getOrCreateContext("http://localhost:9999") defer flagCache.releaseContext("http://localhost:9999") - result := isTelemetryEnabled(context.Background(), &Config{}, "http://localhost:9999", "test-version", &http.Client{Timeout: 1 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, "http://localhost:9999", "test-version", "test-ua", &http.Client{Timeout: 1 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server is unreachable and EnableTelemetry is nil, got enabled") diff --git a/telemetry/driver_integration.go b/telemetry/driver_integration.go index f8b32ffc..e33c7537 100644 --- a/telemetry/driver_integration.go +++ b/telemetry/driver_integration.go @@ -16,6 +16,11 @@ type TelemetryInitOptions struct { // DriverVersion is the driver version string. DriverVersion string + // UserAgent is the User-Agent header sent on telemetry export and + // feature-flag requests. Should match the value the Thrift client uses so + // telemetry traffic is attributable to the driver in access logs. + UserAgent string + // HTTPClient is the HTTP client used for both feature-flag checks and // telemetry export. The /telemetry-ext endpoint requires authentication, // so this should be the authenticated driver client. @@ -27,20 +32,11 @@ type TelemetryInitOptions struct { // false — client explicitly opted out EnableTelemetry config.ConfigValue[bool] - // BatchSize is the number of metrics per batch (0 = use default 100). + // BatchSize is the number of metrics per batch (0 = use default). BatchSize int - // FlushInterval is the flush interval (0 = use default 5s). + // FlushInterval is the flush interval (0 = use default). FlushInterval time.Duration - - // RetryCount is max retry attempts (-1 = use default 3; 0 = disable retries). - // IMPORTANT: Go's zero-value for int is 0, which disables retries. Callers - // constructing TelemetryInitOptions must set RetryCount = -1 explicitly to - // get the default retry behavior. - RetryCount int - - // RetryDelay is the base delay between retries (0 = use default 100ms). - RetryDelay time.Duration } // InitializeForConnection initializes telemetry for a database connection. @@ -60,25 +56,13 @@ func InitializeForConnection(ctx context.Context, opts TelemetryInitOptions) *In if opts.FlushInterval > 0 { cfg.FlushInterval = opts.FlushInterval } - if opts.RetryCount >= 0 { - cfg.MaxRetries = opts.RetryCount - if cfg.MaxRetries > maxTelemetryRetryCount { - cfg.MaxRetries = maxTelemetryRetryCount - } - } - if opts.RetryDelay > 0 { - cfg.RetryDelay = opts.RetryDelay - if cfg.RetryDelay > maxTelemetryRetryDelay { - cfg.RetryDelay = maxTelemetryRetryDelay - } - } // Get feature flag cache context FIRST (for reference counting) flagCache := getFeatureFlagCache() flagCache.getOrCreateContext(opts.Host) // Check if telemetry should be enabled - enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.HTTPClient) + enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.UserAgent, opts.HTTPClient) if !enabled { flagCache.releaseContext(opts.Host) return nil @@ -86,7 +70,7 @@ func InitializeForConnection(ctx context.Context, opts TelemetryInitOptions) *In // Get or create telemetry client for this host clientMgr := getClientManager() - telemetryClient := clientMgr.getOrCreateClient(opts.Host, opts.DriverVersion, opts.HTTPClient, cfg) + telemetryClient := clientMgr.getOrCreateClient(opts.Host, opts.DriverVersion, opts.UserAgent, opts.HTTPClient, cfg) if telemetryClient == nil { // Client failed to start; release the flag cache ref we incremented above flagCache.releaseContext(opts.Host) diff --git a/telemetry/exporter.go b/telemetry/exporter.go index 3ecbf81f..c87933de 100644 --- a/telemetry/exporter.go +++ b/telemetry/exporter.go @@ -24,6 +24,7 @@ const ( type telemetryExporter struct { host string driverVersion string + userAgent string httpClient *http.Client circuitBreaker *circuitBreaker cfg *Config @@ -50,10 +51,11 @@ func ensureHTTPScheme(host string) string { } // newTelemetryExporter creates a new exporter. -func newTelemetryExporter(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryExporter { +func newTelemetryExporter(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryExporter { return &telemetryExporter{ host: host, driverVersion: driverVersion, + userAgent: userAgent, httpClient: httpClient, circuitBreaker: getCircuitBreakerManager().getCircuitBreaker(host), cfg: cfg, @@ -85,78 +87,43 @@ func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetr } } -// doExport performs the actual export with retries and exponential backoff. +// doExport sends one telemetry request. It does NOT retry — retries are +// handled by the underlying retryablehttp-wrapped HTTP client (see +// internal/client.RetryableClient), which already retries 429/5xx with the +// server-provided Retry-After header. Any non-2xx outcome here is therefore +// the *post-retry* result, and is returned to the caller so the circuit +// breaker counts it as one failure per export. func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMetric) error { - // Create telemetry request with base64-encoded logs request, err := createTelemetryRequest(metrics, e.driverVersion) if err != nil { return fmt.Errorf("failed to create telemetry request: %w", err) } - // Serialize request data, err := json.Marshal(request) if err != nil { return fmt.Errorf("failed to marshal request: %w", err) } - // Determine endpoint - hostURL := ensureHTTPScheme(e.host) - endpoint := hostURL + telemetryEndpointPath - - // Retry logic with exponential backoff - maxRetries := e.cfg.MaxRetries - for attempt := 0; attempt <= maxRetries; attempt++ { - // Exponential backoff (except for first attempt) - if attempt > 0 { - backoff := time.Duration(1<= 200 && resp.StatusCode < 300 { - return nil // Success - } + endpoint := ensureHTTPScheme(e.host) + telemetryEndpointPath - // Check if retryable - if !isRetryableStatus(resp.StatusCode) { - return fmt.Errorf("non-retryable status: %d", resp.StatusCode) - } - - if attempt == maxRetries { - return fmt.Errorf("failed after %d retries: status %d", maxRetries, resp.StatusCode) - } + req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if e.userAgent != "" { + req.Header.Set("User-Agent", e.userAgent) } - return nil -} + resp, err := e.httpClient.Do(req) + if err != nil { + return fmt.Errorf("telemetry export failed: %w", err) + } + _, _ = io.ReadAll(resp.Body) + resp.Body.Close() //nolint:errcheck,gosec // G104: close after response is read -// isRetryableStatus returns true if HTTP status is retryable. -// Retryable statuses: 429 (Too Many Requests), 503 (Service Unavailable), 5xx (Server Errors) -func isRetryableStatus(status int) bool { - return status == 429 || status == 503 || status >= 500 + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("telemetry export failed: status %d", resp.StatusCode) } diff --git a/telemetry/exporter_test.go b/telemetry/exporter_test.go index 10156f82..008dd377 100644 --- a/telemetry/exporter_test.go +++ b/telemetry/exporter_test.go @@ -16,7 +16,7 @@ func TestNewTelemetryExporter(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} host := "test-host" - exporter := newTelemetryExporter(host, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(host, "test-version", "test-ua", httpClient, cfg) if exporter.host != host { t.Errorf("Expected host %s, got %s", host, exporter.host) @@ -73,7 +73,7 @@ func TestExport_Success(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -93,109 +93,64 @@ func TestExport_Success(t *testing.T) { } } -func TestExport_RetryOn5xx(t *testing.T) { - attemptCount := int32(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count := atomic.AddInt32(&attemptCount, 1) - if count < 3 { - // Fail first 2 attempts - w.WriteHeader(http.StatusInternalServerError) - } else { - // Succeed on 3rd attempt - w.WriteHeader(http.StatusOK) - } - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have retried and succeeded - if atomic.LoadInt32(&attemptCount) != 3 { - t.Errorf("Expected 3 attempts, got %d", attemptCount) - } -} - -func TestExport_NonRetryable4xx(t *testing.T) { - attemptCount := int32(0) +// TestExport_SetsUserAgent verifies the configured User-Agent is sent on the +// telemetry POST so traffic is attributable in access logs. +func TestExport_SetsUserAgent(t *testing.T) { + const wantUA = "godatabrickssqlconnector/9.9.9" + gotUA := "" server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&attemptCount, 1) - w.WriteHeader(http.StatusBadRequest) // 400 is not retryable + gotUA = r.Header.Get("User-Agent") + w.WriteHeader(http.StatusOK) })) defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} + exporter := newTelemetryExporter(server.URL, "9.9.9", wantUA, httpClient, cfg) - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) + exporter.export(context.Background(), []*telemetryMetric{{ + metricType: "connection", timestamp: time.Now(), + }}) - // Should only try once (no retries for 4xx) - if atomic.LoadInt32(&attemptCount) != 1 { - t.Errorf("Expected 1 attempt, got %d", attemptCount) + if gotUA != wantUA { + t.Errorf("User-Agent: got %q, want %q", gotUA, wantUA) } } -func TestExport_Retry429(t *testing.T) { - attemptCount := int32(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count := atomic.AddInt32(&attemptCount, 1) - if count < 2 { - w.WriteHeader(http.StatusTooManyRequests) // 429 is retryable - } else { - w.WriteHeader(http.StatusOK) - } - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, +// TestExport_SingleAttemptPerExport asserts that doExport itself never +// retries — a single export is exactly one HTTP transaction. Retries are +// owned by the underlying retryablehttp-wrapped client (not exercised here +// because the test uses a plain *http.Client). Each export → one breaker +// outcome. +func TestExport_SingleAttemptPerExport(t *testing.T) { + cases := []struct { + name string + status int + }{ + {"400", http.StatusBadRequest}, + {"429", http.StatusTooManyRequests}, + {"500", http.StatusInternalServerError}, + {"503", http.StatusServiceUnavailable}, } - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have retried and succeeded - if atomic.LoadInt32(&attemptCount) != 2 { - t.Errorf("Expected 2 attempts, got %d", attemptCount) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attemptCount, 1) + w.WriteHeader(tc.status) + })) + defer server.Close() + + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}, DefaultConfig()) + exporter.export(context.Background(), []*telemetryMetric{{ + metricType: "connection", timestamp: time.Now(), + }}) + + if got := atomic.LoadInt32(&attemptCount); got != 1 { + t.Errorf("status %d: expected 1 attempt at exporter layer, got %d", tc.status, got) + } + }) } } @@ -211,7 +166,7 @@ func TestExport_CircuitBreakerOpen(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // Open the circuit breaker by recording failures cb := exporter.circuitBreaker @@ -243,33 +198,6 @@ func TestExport_CircuitBreakerOpen(t *testing.T) { } } -func TestIsRetryableStatus(t *testing.T) { - tests := []struct { - status int - retryable bool - description string - }{ - {200, false, "200 OK is not retryable"}, - {201, false, "201 Created is not retryable"}, - {400, false, "400 Bad Request is not retryable"}, - {401, false, "401 Unauthorized is not retryable"}, - {403, false, "403 Forbidden is not retryable"}, - {404, false, "404 Not Found is not retryable"}, - {429, true, "429 Too Many Requests is retryable"}, - {500, true, "500 Internal Server Error is retryable"}, - {502, true, "502 Bad Gateway is retryable"}, - {503, true, "503 Service Unavailable is retryable"}, - {504, true, "504 Gateway Timeout is retryable"}, - } - - for _, tt := range tests { - result := isRetryableStatus(tt.status) - if result != tt.retryable { - t.Errorf("%s: expected %v, got %v", tt.description, tt.retryable, result) - } - } -} - func TestExport_ErrorSwallowing(t *testing.T) { // Server that always fails server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -278,12 +206,10 @@ func TestExport_ErrorSwallowing(t *testing.T) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 1 - cfg.RetryDelay = 10 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -314,12 +240,10 @@ func TestExport_ContextCancellation(t *testing.T) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 50 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -336,62 +260,3 @@ func TestExport_ContextCancellation(t *testing.T) { exporter.export(ctx, metrics) // If we get here, context cancellation is handled properly } - -func TestExport_ExponentialBackoff(t *testing.T) { - attemptTimes := make([]time.Time, 0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - attemptTimes = append(attemptTimes, time.Now()) - // Always fail to test all retries - w.WriteHeader(http.StatusInternalServerError) - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 50 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have 4 attempts (1 initial + 3 retries) - if len(attemptTimes) != 4 { - t.Errorf("Expected 4 attempts, got %d", len(attemptTimes)) - return - } - - // Verify exponential backoff delays - // Attempt 0: immediate - // Attempt 1: +50ms (2^0 * 50ms) - // Attempt 2: +100ms (2^1 * 50ms) - // Attempt 3: +200ms (2^2 * 50ms) - - delay1 := attemptTimes[1].Sub(attemptTimes[0]) - delay2 := attemptTimes[2].Sub(attemptTimes[1]) - delay3 := attemptTimes[3].Sub(attemptTimes[2]) - - // Allow 30ms tolerance for timing variations - tolerance := 30 * time.Millisecond - - if delay1 < (50*time.Millisecond-tolerance) || delay1 > (50*time.Millisecond+tolerance) { - t.Errorf("Expected delay1 ~50ms, got %v", delay1) - } - - if delay2 < (100*time.Millisecond-tolerance) || delay2 > (100*time.Millisecond+tolerance) { - t.Errorf("Expected delay2 ~100ms, got %v", delay2) - } - - if delay3 < (200*time.Millisecond-tolerance) || delay3 > (200*time.Millisecond+tolerance) { - t.Errorf("Expected delay3 ~200ms, got %v", delay3) - } -} diff --git a/telemetry/featureflag.go b/telemetry/featureflag.go index 81696baa..d52e1e40 100644 --- a/telemetry/featureflag.go +++ b/telemetry/featureflag.go @@ -86,7 +86,7 @@ func (c *featureFlagCache) releaseContext(host string) { // isTelemetryEnabled checks if telemetry is enabled for the host. // Uses cached value if available and not expired. -func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) { +func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, userAgent string, httpClient *http.Client) (bool, error) { c.mu.RLock() flagCtx, exists := c.contexts[host] c.mu.RUnlock() @@ -135,7 +135,7 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, flagCtx.mu.Unlock() // Fetch fresh value (outside lock so other readers are not blocked). - enabled, err := fetchFeatureFlag(ctx, host, driverVersion, httpClient) + enabled, err := fetchFeatureFlag(ctx, host, driverVersion, userAgent, httpClient) // Update cache. flagCtx.mu.Lock() @@ -166,7 +166,7 @@ func (c *featureFlagContext) isExpired() bool { } // fetchFeatureFlag fetches the feature flag value from Databricks. -func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) { +func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, userAgent string, httpClient *http.Client) (bool, error) { // Add timeout to context if it doesn't have a deadline if _, hasDeadline := ctx.Deadline(); !hasDeadline { var cancel context.CancelFunc @@ -182,6 +182,9 @@ func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, ht if err != nil { return false, fmt.Errorf("failed to create feature flag request: %w", err) } + if userAgent != "" { + req.Header.Set("User-Agent", userAgent) + } resp, err := httpClient.Do(req) if err != nil { diff --git a/telemetry/featureflag_test.go b/telemetry/featureflag_test.go index 4ffbf07b..6c789410 100644 --- a/telemetry/featureflag_test.go +++ b/telemetry/featureflag_test.go @@ -99,7 +99,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Cached(t *testing.T) { ctx.lastFetched = time.Now() // Should return cached value without HTTP call - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", nil) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -133,7 +133,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Expired(t *testing.T) { // Should fetch fresh value httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -158,7 +158,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_NoContext(t *testing.T) { host := "non-existent-host.databricks.com" // Should return false for non-existent context - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", nil) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -188,7 +188,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_ErrorFallback(t *testing.T) { // Should return cached value on error httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error (fallback to cache), got %v", err) } @@ -213,7 +213,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_ErrorNoCache(t *testing.T) { // No cached value, should return error httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error when no cache available and fetch fails") } @@ -330,7 +330,7 @@ func TestFetchFeatureFlag_Success(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -339,6 +339,28 @@ func TestFetchFeatureFlag_Success(t *testing.T) { } } +// TestFetchFeatureFlag_SetsUserAgent verifies the configured User-Agent is +// sent on feature-flag GETs so traffic is attributable in access logs. +func TestFetchFeatureFlag_SetsUserAgent(t *testing.T) { + const wantUA = "godatabrickssqlconnector/9.9.9" + gotUA := "" + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotUA = r.Header.Get("User-Agent") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"flags": [], "ttl_seconds": 300}`)) + })) + defer server.Close() + + _, err := fetchFeatureFlag(context.Background(), server.URL, "9.9.9", wantUA, &http.Client{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotUA != wantUA { + t.Errorf("User-Agent: got %q, want %q", gotUA, wantUA) + } +} + func TestFetchFeatureFlag_Disabled(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -350,7 +372,7 @@ func TestFetchFeatureFlag_Disabled(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -370,7 +392,7 @@ func TestFetchFeatureFlag_FlagNotPresent(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -388,7 +410,7 @@ func TestFetchFeatureFlag_HTTPError(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + _, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for HTTP 500") } @@ -405,7 +427,7 @@ func TestFetchFeatureFlag_InvalidJSON(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + _, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for invalid JSON") } @@ -424,7 +446,7 @@ func TestFetchFeatureFlag_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately - _, err := fetchFeatureFlag(ctx, host, "test-version", httpClient) + _, err := fetchFeatureFlag(ctx, host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for cancelled context") } diff --git a/telemetry/integration_test.go b/telemetry/integration_test.go index 20bd2fc0..b5ea8091 100644 --- a/telemetry/integration_test.go +++ b/telemetry/integration_test.go @@ -48,7 +48,7 @@ func TestIntegration_EndToEnd_WithCircuitBreaker(t *testing.T) { defer server.Close() // Create telemetry client - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -84,7 +84,6 @@ func TestIntegration_CircuitBreakerOpening(t *testing.T) { cfg := DefaultConfig() cfg.FlushInterval = 50 * time.Millisecond - cfg.MaxRetries = 0 // No retries for faster test httpClient := &http.Client{Timeout: 5 * time.Second} requestCount := int32(0) @@ -95,7 +94,7 @@ func TestIntegration_CircuitBreakerOpening(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -166,7 +165,7 @@ func TestIntegration_PrivacyCompliance_NoQueryText(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -237,7 +236,7 @@ func TestIntegration_TelemetryEventCorrectnessAllFields(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, testDriverVersion, httpClient, cfg) + exporter := newTelemetryExporter(server.URL, testDriverVersion, "test-ua", httpClient, cfg) metric := &telemetryMetric{ metricType: "operation", @@ -404,7 +403,7 @@ func TestIntegration_OperationLatencyMs_ZeroNotOmitted(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // latencyMs=0 — simulates a CloseOperation that completed in <1ms. metric := &telemetryMetric{ @@ -476,7 +475,7 @@ func TestIntegration_ChunkTotalPresent_DerivedFromChunkCount(t *testing.T) { totalChunksIterated = 32 ) - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metric := &telemetryMetric{ metricType: "operation", timestamp: time.Now(), diff --git a/telemetry/manager.go b/telemetry/manager.go index 8977e924..375f3d60 100644 --- a/telemetry/manager.go +++ b/telemetry/manager.go @@ -45,13 +45,13 @@ func getClientManager() *clientManager { // getOrCreateClient gets or creates a telemetry client for the host. // Increments reference count. -func (m *clientManager) getOrCreateClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient { +func (m *clientManager) getOrCreateClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient { m.mu.Lock() defer m.mu.Unlock() holder, exists := m.clients[host] if !exists { - client := newTelemetryClient(host, driverVersion, httpClient, cfg) + client := newTelemetryClient(host, driverVersion, userAgent, httpClient, cfg) if err := client.start(); err != nil { // Failed to start client, don't add to map logger.Logger.Debug().Str("host", host).Err(err).Msg("failed to start telemetry client") diff --git a/telemetry/manager_test.go b/telemetry/manager_test.go index 51461452..3567d3b9 100644 --- a/telemetry/manager_test.go +++ b/telemetry/manager_test.go @@ -29,7 +29,7 @@ func TestClientManager_GetOrCreateClient(t *testing.T) { cfg := DefaultConfig() // First call should create client and increment refCount to 1 - client1 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client1 == nil { t.Fatal("Expected client to be created") } @@ -46,7 +46,7 @@ func TestClientManager_GetOrCreateClient(t *testing.T) { } // Second call should reuse client and increment refCount to 2 - client2 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client2 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client2 != client1 { t.Error("Expected to get the same client instance") } @@ -65,8 +65,8 @@ func TestClientManager_GetOrCreateClient_DifferentHosts(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client1 := manager.getOrCreateClient(host1, "test-version", httpClient, cfg) - client2 := manager.getOrCreateClient(host2, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host1, "test-version", "test-ua", httpClient, cfg) + client2 := manager.getOrCreateClient(host2, "test-version", "test-ua", httpClient, cfg) if client1 == client2 { t.Error("Expected different clients for different hosts") @@ -87,8 +87,8 @@ func TestClientManager_ReleaseClient(t *testing.T) { cfg := DefaultConfig() // Create client with refCount = 2 - manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) // First release should decrement to 1 err := manager.releaseClient(host) @@ -151,7 +151,7 @@ func TestClientManager_ConcurrentAccess(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client == nil { t.Error("Expected client to be created") } @@ -207,7 +207,7 @@ func TestClientManager_ConcurrentAccessMultipleHosts(t *testing.T) { wg.Add(1) go func(h string) { defer wg.Done() - _ = manager.getOrCreateClient(h, "test-version", httpClient, cfg) + _ = manager.getOrCreateClient(h, "test-version", "test-ua", httpClient, cfg) }(host) } } @@ -241,7 +241,7 @@ func TestClientManager_ReleaseClientPartial(t *testing.T) { // Create 5 references for i := 0; i < 5; i++ { - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) } // Release 3 references @@ -271,7 +271,7 @@ func TestClientManager_ClientStartCalled(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if !client.started { t.Error("Expected start() to be called on new client") @@ -287,7 +287,7 @@ func TestClientManager_ClientCloseCalled(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) _ = manager.releaseClient(host) if !client.closed { @@ -305,9 +305,9 @@ func TestClientManager_MultipleGetOrCreateSameClient(t *testing.T) { cfg := DefaultConfig() // Get same client multiple times - client1 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - client2 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - client3 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + client2 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + client3 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) // All should be same instance if client1 != client2 || client2 != client3 { @@ -337,7 +337,7 @@ func TestClientManager_Shutdown(t *testing.T) { // Create clients for multiple hosts clients := make([]*telemetryClient, 0, len(hosts)) for _, host := range hosts { - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) clients = append(clients, client) } @@ -375,9 +375,9 @@ func TestClientManager_ShutdownWithActiveRefs(t *testing.T) { cfg := DefaultConfig() // Create client with multiple references - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) holder := manager.clients[host] if holder.refCount != 3 { From d1302078bfcc85acf908966e1cc75d425b801fda Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Wed, 13 May 2026 12:33:58 +0530 Subject: [PATCH 2/4] Telemetry: skip retryablehttp 429 retries, honour Retry-After, jitter first flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit removed the exporter's inner retry loop, but the retryablehttp layer (RetryMax=4) was still amplifying each telemetry export into up to 5 attempts on a 429 — defeating the breaker because only the final outcome was observed. - internal/client: add WithSkipRateLimitRetry context helper. RetryPolicy treats 429 as non-retryable when the flag is set; 5xx and transport errors are unaffected. Set on telemetry POSTs and feature-flag GETs so each rate-limited request is exactly one HTTP transaction = one breaker signal. - telemetry/circuitbreaker: record server-provided Retry-After deltas from 429 responses; the open-state wait becomes max(waitDurationInOpenState, hint). Hint cleared on the next half-open -> closed transition. - telemetry/aggregator: offset the first flush by a random [0, FlushInterval) so a fleet of clients booted together does not phase-align bursts. Co-authored-by: Isaac --- internal/client/client.go | 27 +++++++ internal/client/client_test.go | 35 +++++++++ telemetry/aggregator.go | 16 +++++ telemetry/circuitbreaker.go | 39 +++++++++- telemetry/circuitbreaker_test.go | 118 +++++++++++++++++++++++++++++++ telemetry/exporter.go | 52 +++++++++++--- telemetry/exporter_test.go | 36 ++++++++++ telemetry/featureflag.go | 7 ++ 8 files changed, 319 insertions(+), 11 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 6aca7428..276db8f6 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -45,8 +45,25 @@ type contextKey int const ( ClientMethod contextKey = iota + // SkipRateLimitRetry, when set to true on the request context, makes + // RetryPolicy treat 429 Too Many Requests as a non-retryable response — + // the request returns the 429 error after a single attempt. 5xx and + // transport errors are unaffected and still retried. + // + // Set by callers that own their own backoff for rate-limited endpoints + // (e.g. the telemetry exporter, whose circuit breaker requires one HTTP + // transaction per call so it can see the 429 directly). + SkipRateLimitRetry ) +// WithSkipRateLimitRetry returns a context that disables retryablehttp's +// retry-on-429 behavior for any request issued with it. The caller takes +// responsibility for handling rate-limit responses (e.g. via a circuit +// breaker). 5xx responses and transport errors are still retried. +func WithSkipRateLimitRetry(ctx context.Context) context.Context { + return context.WithValue(ctx, SkipRateLimitRetry, true) +} + type clientMethod int //go:generate go run golang.org/x/tools/cmd/stringer -type=clientMethod -trimprefix=clientMethod @@ -712,6 +729,16 @@ func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, err retryAfter = resp.Header.Get("Retry-After") } + // Callers that own their own rate-limit backoff (e.g. the telemetry + // circuit breaker) opt out of 429 retries via SkipRateLimitRetry so + // each HTTP attempt is a distinct signal to them. 503s still retry + // normally even when this flag is set. + if resp.StatusCode == http.StatusTooManyRequests { + if skip, _ := ctx.Value(SkipRateLimitRetry).(bool); skip { + return false, dbsqlerrint.NewRetryableError(checkErr, retryAfter) + } + } + return true, dbsqlerrint.NewRetryableError(checkErr, retryAfter) } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 3c1cf806..471a2994 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -217,6 +217,41 @@ func TestRetryPolicy(t *testing.T) { }) + // SkipRateLimitRetry on the context turns 429 into a non-retryable + // response so the caller can own its own backoff. 5xx and transport + // errors are unaffected — the flag is narrowly scoped to rate-limit + // responses. + // + // We use clientMethodOpenSession (which is a retryable method) so the + // generic 5xx branch fires; the default clientMethodUnknown is itself + // in nonRetryableClientMethods and would suppress 500 retries for an + // unrelated reason. + t.Run("SkipRateLimitRetry only affects 429", func(t *testing.T) { + base := context.WithValue(context.Background(), ClientMethod, clientMethodOpenSession) + ctx := WithSkipRateLimitRetry(base) + + resp429 := &http.Response{StatusCode: http.StatusTooManyRequests} + retry, err := RetryPolicy(ctx, resp429, nil) + require.False(t, retry, "429 with SkipRateLimitRetry should not be retried") + require.Error(t, err, "429 should still surface an error") + + // 503 is the other code isRetryableServerResponse recognizes; the + // flag should NOT suppress its retry. + resp503 := &http.Response{StatusCode: http.StatusServiceUnavailable} + retry, _ = RetryPolicy(ctx, resp503, nil) + require.True(t, retry, "503 must still retry even with SkipRateLimitRetry") + + // 500 takes the generic 5xx branch — also unaffected for a + // retryable client method. + resp500 := &http.Response{StatusCode: http.StatusInternalServerError} + retry, _ = RetryPolicy(ctx, resp500, nil) + require.True(t, retry, "500 must still retry even with SkipRateLimitRetry") + + // Without the flag, 429 retries as before. + retry, _ = RetryPolicy(base, resp429, nil) + require.True(t, retry, "default behavior: 429 retries") + }) + t.Run("test handling client method errors", func(t *testing.T) { cases := []struct { base string diff --git a/telemetry/aggregator.go b/telemetry/aggregator.go index 97c37f64..8d2532f9 100644 --- a/telemetry/aggregator.go +++ b/telemetry/aggregator.go @@ -2,6 +2,7 @@ package telemetry import ( "context" + "math/rand" "sync" "time" @@ -220,7 +221,22 @@ func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID } // flushLoop runs periodic flush in background. +// +// The first flush is offset by a random jitter in [0, flushInterval) so a +// fleet of clients started together (e.g. partner-connector workers booted +// from the same scheduler) does not align all their flushes on the same +// wall-clock phase and produce synchronized request bursts. func (agg *metricsAggregator) flushLoop() { + jitter := time.Duration(rand.Int63n(int64(agg.flushInterval))) //nolint:gosec // G404: not used for security + jitterTimer := time.NewTimer(jitter) + defer jitterTimer.Stop() + select { + case <-jitterTimer.C: + agg.flush(agg.ctx) + case <-agg.stopCh: + return + } + agg.flushTimer = time.NewTicker(agg.flushInterval) defer agg.flushTimer.Stop() diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go index 7088afa6..c1a716fb 100644 --- a/telemetry/circuitbreaker.go +++ b/telemetry/circuitbreaker.go @@ -50,6 +50,12 @@ type circuitBreaker struct { // Half-open state tracking halfOpenSuccesses int + // retryAfterHint, if non-zero, overrides waitDurationInOpenState for + // the next open-state interval. Populated from server-provided + // Retry-After headers on 429 responses; cleared when the breaker + // transitions to half-open or closed. + retryAfterHint time.Duration + config circuitBreakerConfig } @@ -101,9 +107,15 @@ func (cb *circuitBreaker) execute(ctx context.Context, fn func() error) error { switch state { case stateOpen: - // Check if wait duration has passed + // Check if wait duration has passed. If the server hinted a + // Retry-After on the failure that opened the breaker, honor the + // larger of (configured wait, server hint). cb.mu.RLock() - shouldRetry := time.Since(cb.lastStateTime) > cb.config.waitDurationInOpenState + wait := cb.config.waitDurationInOpenState + if cb.retryAfterHint > wait { + wait = cb.retryAfterHint + } + shouldRetry := time.Since(cb.lastStateTime) > wait cb.mu.RUnlock() if shouldRetry { @@ -154,8 +166,11 @@ func (cb *circuitBreaker) recordCall(result callResult) { cb.halfOpenSuccesses++ if cb.halfOpenSuccesses >= cb.config.permittedCallsInHalfOpen { - // Enough successes to close circuit + // Enough successes to close circuit. Drop any stale Retry-After + // hint so it doesn't extend a future open interval after the + // server has recovered. cb.resetWindowUnlocked() + cb.retryAfterHint = 0 cb.setStateUnlocked(stateClosed) } return @@ -218,6 +233,24 @@ func (cb *circuitBreaker) resetWindowUnlocked() { cb.halfOpenSuccesses = 0 } +// extendOpenStateAtLeast records a server-provided Retry-After hint. The +// next time the breaker is open, it will stay open for at least the given +// duration (and at least waitDurationInOpenState). The hint is cleared on +// the next half-open/closed transition. +// +// Safe to call concurrently from any caller (the exporter parses +// Retry-After on 429 responses and forwards it here). +func (cb *circuitBreaker) extendOpenStateAtLeast(d time.Duration) { + if d <= 0 { + return + } + cb.mu.Lock() + if d > cb.retryAfterHint { + cb.retryAfterHint = d + } + cb.mu.Unlock() +} + // setState transitions to a new state. func (cb *circuitBreaker) setState(newState circuitState) { cb.mu.Lock() diff --git a/telemetry/circuitbreaker_test.go b/telemetry/circuitbreaker_test.go index 92c2ecc2..06602cb9 100644 --- a/telemetry/circuitbreaker_test.go +++ b/telemetry/circuitbreaker_test.go @@ -479,3 +479,121 @@ func TestCircuitBreaker_ContextCancellation(t *testing.T) { t.Errorf("Expected no error, got %v", err) } } + +// TestCircuitBreaker_RetryAfterHintExtendsOpen verifies that a server +// Retry-After hint, recorded while the breaker is open, keeps the breaker +// open at least that long even if waitDurationInOpenState would expire +// sooner. +func TestCircuitBreaker_RetryAfterHintExtendsOpen(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 20, + waitDurationInOpenState: 50 * time.Millisecond, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + ctx := context.Background() + + failFunc := func() error { return errors.New("test error") } + + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + if cb.getState() != stateOpen { + t.Fatalf("expected Open after 10 failures, got %v", cb.getState()) + } + + // Server hinted a much longer cool-down than the configured 50ms. + cb.extendOpenStateAtLeast(300 * time.Millisecond) + + // After 100ms, normally the breaker would already permit a half-open + // probe — but the hint should keep it open. + time.Sleep(100 * time.Millisecond) + err := cb.execute(ctx, func() error { return nil }) + if err != ErrCircuitOpen { + t.Fatalf("expected ErrCircuitOpen while hint window still active, got %v", err) + } + + // After the full hint window elapses, the breaker should permit a + // half-open probe. + time.Sleep(250 * time.Millisecond) + if err := cb.execute(ctx, func() error { return nil }); err != nil { + t.Errorf("expected probe to succeed after hint window, got %v", err) + } + if cb.getState() != stateHalfOpen { + t.Errorf("expected HalfOpen after hint window, got %v", cb.getState()) + } +} + +// TestCircuitBreaker_RetryAfterHintClearedOnClose verifies that a stale +// hint does not extend a future open interval once the breaker has +// recovered. +func TestCircuitBreaker_RetryAfterHintClearedOnClose(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 4, + slidingWindowSize: 10, + waitDurationInOpenState: 20 * time.Millisecond, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + ctx := context.Background() + + // Open, record a long hint, then recover via half-open → closed. + failFunc := func() error { return errors.New("test error") } + for i := 0; i < 4; i++ { + _ = cb.execute(ctx, failFunc) + } + cb.extendOpenStateAtLeast(10 * time.Second) // long stale hint + + time.Sleep(cfg.waitDurationInOpenState + 5*time.Millisecond) + // We expect the hint to keep us open here — sanity check. + if err := cb.execute(ctx, func() error { return nil }); err != ErrCircuitOpen { + t.Fatalf("hint should keep breaker open, got %v", err) + } + + // Force close by reaching directly into internal state — simulates a + // successful recovery path where the hint should be dropped. + cb.mu.Lock() + cb.resetWindowUnlocked() + cb.setStateUnlocked(stateHalfOpen) + cb.mu.Unlock() + for i := 0; i < cfg.permittedCallsInHalfOpen; i++ { + _ = cb.execute(ctx, func() error { return nil }) + } + if cb.getState() != stateClosed { + t.Fatalf("expected Closed after enough probe successes, got %v", cb.getState()) + } + + cb.mu.RLock() + hint := cb.retryAfterHint + cb.mu.RUnlock() + if hint != 0 { + t.Errorf("retryAfterHint should be cleared on transition to Closed, got %v", hint) + } +} + +// TestParseRetryAfter exercises the header parser. We deliberately ignore +// HTTP-date form (RFC 7231 §7.1.3): in practice rate-limit responses use +// delta-seconds and under-backing-off is safer than mis-parsing. +func TestParseRetryAfter(t *testing.T) { + cases := []struct { + in string + want time.Duration + }{ + {"", 0}, + {"0", 0}, + {"-1", 0}, + {"abc", 0}, + {"5", 5 * time.Second}, + {" 120 ", 120 * time.Second}, + {"Wed, 21 Oct 2015 07:28:00 GMT", 0}, // HTTP-date form — intentionally not parsed + } + for _, c := range cases { + got := parseRetryAfter(c.in) + if got != c.want { + t.Errorf("parseRetryAfter(%q) = %v, want %v", c.in, got, c.want) + } + } +} diff --git a/telemetry/exporter.go b/telemetry/exporter.go index c87933de..9ab44299 100644 --- a/telemetry/exporter.go +++ b/telemetry/exporter.go @@ -7,9 +7,11 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" + "github.com/databricks/databricks-sql-go/internal/client" "github.com/databricks/databricks-sql-go/logger" ) @@ -106,6 +108,11 @@ func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMe endpoint := ensureHTTPScheme(e.host) + telemetryEndpointPath + // Opt out of retryablehttp's 429 retries — the circuit breaker owns + // the rate-limit backoff and needs one HTTP transaction per call to + // trip on persistent throttling. 5xx/transport retries are unaffected. + ctx = client.WithSkipRateLimitRetry(ctx) + req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewReader(data)) if err != nil { return fmt.Errorf("failed to create request: %w", err) @@ -115,15 +122,44 @@ func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMe req.Header.Set("User-Agent", e.userAgent) } - resp, err := e.httpClient.Do(req) - if err != nil { - return fmt.Errorf("telemetry export failed: %w", err) + // With SkipRateLimitRetry set, retryablehttp returns (resp, err) for a + // 429: a wrapped error AND the actual response. Inspect the response + // before short-circuiting on err so we can read Retry-After. + resp, doErr := e.httpClient.Do(req) + if resp != nil { + _, _ = io.ReadAll(resp.Body) + resp.Body.Close() //nolint:errcheck,gosec // G104: close after response is read + + if resp.StatusCode == http.StatusTooManyRequests { + if hint := parseRetryAfter(resp.Header.Get("Retry-After")); hint > 0 { + e.circuitBreaker.extendOpenStateAtLeast(hint) + } + return fmt.Errorf("telemetry export failed: status %d", resp.StatusCode) + } + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + if doErr == nil { + return fmt.Errorf("telemetry export failed: status %d", resp.StatusCode) + } + } + if doErr != nil { + return fmt.Errorf("telemetry export failed: %w", doErr) } - _, _ = io.ReadAll(resp.Body) - resp.Body.Close() //nolint:errcheck,gosec // G104: close after response is read + return nil +} - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - return nil +// parseRetryAfter parses the Retry-After header per RFC 7231. Only the +// delta-seconds form is honored; HTTP-date is rare in practice for rate +// limiting and we'd rather under-back-off than mis-parse. Returns 0 on +// any failure. +func parseRetryAfter(s string) time.Duration { + if s == "" { + return 0 + } + sec, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64) + if err != nil || sec <= 0 { + return 0 } - return fmt.Errorf("telemetry export failed: status %d", resp.StatusCode) + return time.Duration(sec) * time.Second } diff --git a/telemetry/exporter_test.go b/telemetry/exporter_test.go index 008dd377..b999b433 100644 --- a/telemetry/exporter_test.go +++ b/telemetry/exporter_test.go @@ -231,6 +231,42 @@ func TestExport_ErrorSwallowing(t *testing.T) { // If we get here without panic, error swallowing works } +// TestExport_429RecordsRetryAfter verifies that a 429 with a Retry-After +// header pushes its delta into the per-host circuit breaker so subsequent +// open-state checks respect the server hint instead of the default +// waitDurationInOpenState. +func TestExport_429RecordsRetryAfter(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Retry-After", "42") + w.WriteHeader(http.StatusTooManyRequests) + })) + defer server.Close() + + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", + &http.Client{Timeout: 5 * time.Second}, DefaultConfig()) + + // Sanity: a fresh breaker has no hint. + exporter.circuitBreaker.mu.RLock() + if hint := exporter.circuitBreaker.retryAfterHint; hint != 0 { + exporter.circuitBreaker.mu.RUnlock() + t.Fatalf("expected fresh breaker to have no hint, got %v", hint) + } + exporter.circuitBreaker.mu.RUnlock() + + exporter.export(context.Background(), []*telemetryMetric{{ + metricType: "connection", timestamp: time.Now(), + }}) + + exporter.circuitBreaker.mu.RLock() + hint := exporter.circuitBreaker.retryAfterHint + exporter.circuitBreaker.mu.RUnlock() + + if hint != 42*time.Second { + t.Errorf("breaker retryAfterHint after 429 with Retry-After:42: got %v, want %v", + hint, 42*time.Second) + } +} + func TestExport_ContextCancellation(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Slow server diff --git a/telemetry/featureflag.go b/telemetry/featureflag.go index d52e1e40..d570374d 100644 --- a/telemetry/featureflag.go +++ b/telemetry/featureflag.go @@ -8,6 +8,8 @@ import ( "net/http" "sync" "time" + + "github.com/databricks/databricks-sql-go/internal/client" ) const ( @@ -178,6 +180,11 @@ func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, us hostURL := ensureHTTPScheme(host) endpoint := fmt.Sprintf("%s%s%s", hostURL, featureFlagEndpointPath, driverVersion) + // Feature-flag GET shares the same rate-limit group as /telemetry-ext on + // the server side, so a 429 here should also fail fast rather than being + // retried 5× by retryablehttp. + ctx = client.WithSkipRateLimitRetry(ctx) + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) if err != nil { return false, fmt.Errorf("failed to create feature flag request: %w", err) From 203e1efa657e3b971775173c8dfeec53773f74b4 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Wed, 13 May 2026 12:55:55 +0000 Subject: [PATCH 3/4] Telemetry: address PR #354 review findings - Doc accuracy on retry semantics: WithSkipRateLimitRetry only suppresses 429s. Generic 5xx and transport errors are governed by ClientMethod, so telemetry contexts (which set no ClientMethod) get one-shot semantics on those failure modes; 503 still retries via the method-agnostic path. Updated SkipRateLimitRetry godoc and doExport comment to match. - Drop dead TelemetryRetryCount / TelemetryRetryDelay fields from UserConfig. The DSN keys are still consumed (so they do not leak into session params) but no longer stored. Cleaned up assertions in config_test.go and connector_test.go. - Clear retryAfterHint when the breaker transitions Open -> Half-Open, so a stale hint cannot extend a later reopen that did not carry its own Retry-After. Collapsed the two-step lock dance in execute() into one critical section. Updated godoc. - Guard rand.Int63n against a zero flushInterval in flushLoop. - Document the first-caller-wins User-Agent behaviour on the per-host telemetry client singleton, since later connections on the same host reuse the cached client. Co-authored-by: Isaac --- connector_test.go | 73 ++++++++++++++++------------------ internal/client/client.go | 16 ++++++-- internal/config/config.go | 28 ++++--------- internal/config/config_test.go | 52 ++++++++---------------- telemetry/aggregator.go | 5 ++- telemetry/circuitbreaker.go | 26 +++++++----- telemetry/exporter.go | 19 ++++++--- telemetry/manager.go | 7 ++++ 8 files changed, 112 insertions(+), 114 deletions(-) diff --git a/connector_test.go b/connector_test.go index d613027f..de3ea603 100644 --- a/connector_test.go +++ b/connector_test.go @@ -57,18 +57,17 @@ func TestNewConnector(t *testing.T) { AccessToken: accessToken, Authenticator: &pat.PATAuth{AccessToken: accessToken}, HTTPPath: "/" + httpPath, - MaxRows: maxRows, - QueryTimeout: timeout, - Catalog: catalog, - Schema: schema, - UserAgentEntry: userAgentEntry, - SessionParams: sessionParams, - RetryMax: 10, - RetryWaitMin: 3 * time.Second, - RetryWaitMax: 60 * time.Second, - Transport: roundTripper, - TelemetryRetryCount: -1, - CloudFetchConfig: expectedCloudFetchConfig, + MaxRows: maxRows, + QueryTimeout: timeout, + Catalog: catalog, + Schema: schema, + UserAgentEntry: userAgentEntry, + SessionParams: sessionParams, + RetryMax: 10, + RetryWaitMin: 3 * time.Second, + RetryWaitMax: 60 * time.Second, + Transport: roundTripper, + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.DriverVersion = DriverVersion @@ -99,19 +98,18 @@ func TestNewConnector(t *testing.T) { CloudFetchSpeedThresholdMbps: 0.1, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, - MaxRows: maxRows, - SessionParams: sessionParams, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, - CloudFetchConfig: expectedCloudFetchConfig, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, + MaxRows: maxRows, + SessionParams: sessionParams, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.UserConfig = expectedUserConfig @@ -142,19 +140,18 @@ func TestNewConnector(t *testing.T) { CloudFetchSpeedThresholdMbps: 0.1, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, - MaxRows: maxRows, - SessionParams: sessionParams, - RetryMax: -1, - RetryWaitMin: 0, - RetryWaitMax: 0, - TelemetryRetryCount: -1, - CloudFetchConfig: expectedCloudFetchConfig, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, + MaxRows: maxRows, + SessionParams: sessionParams, + RetryMax: -1, + RetryWaitMin: 0, + RetryWaitMax: 0, + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.DriverVersion = DriverVersion diff --git a/internal/client/client.go b/internal/client/client.go index 276db8f6..676d1294 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -47,8 +47,11 @@ const ( ClientMethod contextKey = iota // SkipRateLimitRetry, when set to true on the request context, makes // RetryPolicy treat 429 Too Many Requests as a non-retryable response — - // the request returns the 429 error after a single attempt. 5xx and - // transport errors are unaffected and still retried. + // the request returns the 429 error after a single attempt. The flag + // is narrowly scoped to 429: 503 still retries via the + // isRetryableServerResponse path (which is not method-gated), and any + // other retry decision (generic 5xx / transport error) is governed by + // the request's ClientMethod as usual. // // Set by callers that own their own backoff for rate-limited endpoints // (e.g. the telemetry exporter, whose circuit breaker requires one HTTP @@ -59,7 +62,14 @@ const ( // WithSkipRateLimitRetry returns a context that disables retryablehttp's // retry-on-429 behavior for any request issued with it. The caller takes // responsibility for handling rate-limit responses (e.g. via a circuit -// breaker). 5xx responses and transport errors are still retried. +// breaker). +// +// Note: this flag only affects 429. Whether 5xx generic responses +// (500/502/504) and transport errors are retried depends on the +// ClientMethod set on the context (see nonRetryableClientMethods). +// Callers that do not set a ClientMethod (e.g. the telemetry exporter) +// effectively get one-shot semantics on those failure modes; their +// circuit breaker is expected to absorb them. func WithSkipRateLimitRetry(ctx context.Context) context.Context { return context.WithValue(ctx, SkipRateLimitRetry, true) } diff --git a/internal/config/config.go b/internal/config/config.go index b8be59cb..13fdd840 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -104,8 +104,6 @@ type UserConfig struct { EnableTelemetry ConfigValue[bool] TelemetryBatchSize int // 0 = use default (100) TelemetryFlushInterval time.Duration // 0 = use default (5s) - TelemetryRetryCount int // -1 = use default (3); 0 = disable retries; set via telemetry_retry_count - TelemetryRetryDelay time.Duration // 0 = use default (100ms); set via telemetry_retry_delay Transport http.RoundTripper UseLz4Compression bool EnableMetricViewMetadata bool @@ -155,8 +153,6 @@ func (ucfg UserConfig) DeepCopy() UserConfig { EnableTelemetry: ucfg.EnableTelemetry, TelemetryBatchSize: ucfg.TelemetryBatchSize, TelemetryFlushInterval: ucfg.TelemetryFlushInterval, - TelemetryRetryCount: ucfg.TelemetryRetryCount, - TelemetryRetryDelay: ucfg.TelemetryRetryDelay, } } @@ -195,10 +191,6 @@ func (ucfg UserConfig) WithDefaults() UserConfig { // EnableTelemetry defaults to unset (ConfigValue zero value), // meaning telemetry is controlled by server feature flags. - // TelemetryRetryCount uses -1 as "not set" so that an explicit 0 from the - // DSN (meaning "disable retries") is distinguishable from the default. - ucfg.TelemetryRetryCount = -1 - return ucfg } @@ -322,19 +314,13 @@ func ParseDSN(dsn string) (UserConfig, error) { ucfg.TelemetryFlushInterval = d } } - if retryCount, ok, err := params.extractAsInt("telemetry_retry_count"); ok { - if err != nil { - return UserConfig{}, err - } - if retryCount >= 0 { - ucfg.TelemetryRetryCount = retryCount - } - } - if retryDelay, ok := params.extract("telemetry_retry_delay"); ok { - if d, err := time.ParseDuration(retryDelay); err == nil && d > 0 { - ucfg.TelemetryRetryDelay = d - } - } + // telemetry_retry_count and telemetry_retry_delay are accepted for + // backwards compatibility but no longer applied — retries for + // telemetry traffic are owned by the underlying retryable HTTP + // client. Extract and discard the values so they don't fall through + // into session params below. + _, _, _ = params.extractAsInt("telemetry_retry_count") + _, _ = params.extract("telemetry_retry_delay") // for timezone we do a case insensitive key match. // We use getNoCase because we want to leave timezone in the params so that it will also diff --git a/internal/config/config_test.go b/internal/config/config_test.go index b5e52e4c..54756a9c 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -44,7 +44,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", @@ -65,7 +64,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", @@ -85,7 +83,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantErr: false, @@ -104,7 +101,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantErr: false, @@ -126,7 +122,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -149,7 +144,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -169,7 +163,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -191,7 +184,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", @@ -213,7 +205,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", @@ -235,7 +226,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -256,7 +246,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 10, @@ -282,7 +271,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -312,7 +300,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -337,7 +324,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", @@ -361,7 +347,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", @@ -420,7 +405,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://:443", @@ -446,7 +430,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -471,7 +454,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -496,7 +478,6 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", @@ -550,29 +531,30 @@ func TestParseConfig(t *testing.T) { RetryMax: 4, RetryWaitMin: 1 * time.Second, RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: -1, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, }, { - name: "with telemetry_retry_count=0 (disable retries)", - args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?telemetry_retry_count=0"}, + // telemetry_retry_count is accepted for backwards compatibility but + // no longer applied; verify the DSN parses without error and the + // param is not surfaced as a session param. + name: "with telemetry_retry_count=0 (backwards-compat no-op)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?telemetry_retry_count=0&telemetry_retry_delay=100ms"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - AccessToken: "supersecret", - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - TelemetryRetryCount: 0, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, diff --git a/telemetry/aggregator.go b/telemetry/aggregator.go index 8d2532f9..76565129 100644 --- a/telemetry/aggregator.go +++ b/telemetry/aggregator.go @@ -227,7 +227,10 @@ func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID // from the same scheduler) does not align all their flushes on the same // wall-clock phase and produce synchronized request bursts. func (agg *metricsAggregator) flushLoop() { - jitter := time.Duration(rand.Int63n(int64(agg.flushInterval))) //nolint:gosec // G404: not used for security + var jitter time.Duration + if agg.flushInterval > 0 { + jitter = time.Duration(rand.Int63n(int64(agg.flushInterval))) //nolint:gosec // G404: not used for security + } jitterTimer := time.NewTimer(jitter) defer jitterTimer.Stop() select { diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go index c1a716fb..46731779 100644 --- a/telemetry/circuitbreaker.go +++ b/telemetry/circuitbreaker.go @@ -51,9 +51,10 @@ type circuitBreaker struct { halfOpenSuccesses int // retryAfterHint, if non-zero, overrides waitDurationInOpenState for - // the next open-state interval. Populated from server-provided + // the current open-state interval. Populated from server-provided // Retry-After headers on 429 responses; cleared when the breaker - // transitions to half-open or closed. + // transitions out of Open (to half-open after the wait elapses, or + // to closed via half-open success). retryAfterHint time.Duration config circuitBreakerConfig @@ -110,17 +111,22 @@ func (cb *circuitBreaker) execute(ctx context.Context, fn func() error) error { // Check if wait duration has passed. If the server hinted a // Retry-After on the failure that opened the breaker, honor the // larger of (configured wait, server hint). - cb.mu.RLock() + cb.mu.Lock() wait := cb.config.waitDurationInOpenState if cb.retryAfterHint > wait { wait = cb.retryAfterHint } shouldRetry := time.Since(cb.lastStateTime) > wait - cb.mu.RUnlock() + if shouldRetry { + // The wait window the hint asked for has elapsed — clear it + // so a later reopen (e.g. half-open failure with no new hint) + // doesn't extend itself by a stale duration. + cb.retryAfterHint = 0 + cb.setStateUnlocked(stateHalfOpen) + } + cb.mu.Unlock() if shouldRetry { - // Transition to half-open - cb.setState(stateHalfOpen) return cb.tryExecute(ctx, fn) } return ErrCircuitOpen @@ -233,10 +239,10 @@ func (cb *circuitBreaker) resetWindowUnlocked() { cb.halfOpenSuccesses = 0 } -// extendOpenStateAtLeast records a server-provided Retry-After hint. The -// next time the breaker is open, it will stay open for at least the given -// duration (and at least waitDurationInOpenState). The hint is cleared on -// the next half-open/closed transition. +// extendOpenStateAtLeast records a server-provided Retry-After hint. While +// the breaker is open, it will stay open for at least the given duration +// (and at least waitDurationInOpenState). The hint is cleared when the +// breaker transitions out of Open. // // Safe to call concurrently from any caller (the exporter parses // Retry-After on 429 responses and forwards it here). diff --git a/telemetry/exporter.go b/telemetry/exporter.go index 9ab44299..af127bba 100644 --- a/telemetry/exporter.go +++ b/telemetry/exporter.go @@ -89,12 +89,19 @@ func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetr } } -// doExport sends one telemetry request. It does NOT retry — retries are -// handled by the underlying retryablehttp-wrapped HTTP client (see -// internal/client.RetryableClient), which already retries 429/5xx with the -// server-provided Retry-After header. Any non-2xx outcome here is therefore -// the *post-retry* result, and is returned to the caller so the circuit -// breaker counts it as one failure per export. +// doExport sends one telemetry request. It does NOT loop on retries — +// transient-retry policy is delegated to the underlying retryablehttp-wrapped +// HTTP client (see internal/client.RetryableClient). Telemetry contexts do +// not set a ClientMethod, so per internal/client.RetryPolicy: +// - 429 → suppressed by WithSkipRateLimitRetry below; the circuit breaker +// handles backoff using the server's Retry-After hint. +// - 503 → retried by retryablehttp (isRetryableServerResponse is not gated +// on ClientMethod) using its configured wait policy and Retry-After. +// - generic 5xx (500/502/504) and transport errors → one attempt; the +// circuit breaker counts them as a failure per export. +// Any non-2xx outcome reaching this function is therefore the *post-retry* +// (or single-attempt) result, returned so the breaker observes exactly one +// signal per export call. func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMetric) error { request, err := createTelemetryRequest(metrics, e.driverVersion) if err != nil { diff --git a/telemetry/manager.go b/telemetry/manager.go index 375f3d60..19a72c20 100644 --- a/telemetry/manager.go +++ b/telemetry/manager.go @@ -45,6 +45,13 @@ func getClientManager() *clientManager { // getOrCreateClient gets or creates a telemetry client for the host. // Increments reference count. +// +// The first caller for a host fixes the User-Agent for the lifetime of the +// host's telemetry client. A later connection on the same host with a +// different WithUserAgentEntry will see its telemetry traffic attributed +// to the first caller's UA in access logs. This is intentional — the +// per-host singleton consolidates telemetry across connections to keep +// the request rate low. func (m *clientManager) getOrCreateClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient { m.mu.Lock() defer m.mu.Unlock() From 472c2569703dbd312ba05d904b15b1f63f991a65 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Thu, 14 May 2026 03:51:03 +0530 Subject: [PATCH 4/4] Fix lint: gofmt -s, drop dogsled, remove unused setState - gofmt -s on connector_test.go, internal/config/config_test.go, telemetry/exporter.go (struct alignment + missing blank line). - internal/config/config.go: telemetry_retry_count is parsed for backwards compat then discarded, so call extract (2 returns) rather than extractAsInt (3 returns) to satisfy dogsled. - telemetry/circuitbreaker.go: drop the now-unused setState helper. execute() now transitions to half-open via setStateUnlocked under an already-held lock. Co-authored-by: Isaac --- connector_test.go | 12 +- internal/config/config.go | 2 +- internal/config/config_test.go | 542 ++++++++++++++++----------------- telemetry/circuitbreaker.go | 7 - telemetry/exporter.go | 1 + 5 files changed, 279 insertions(+), 285 deletions(-) diff --git a/connector_test.go b/connector_test.go index de3ea603..66f351c9 100644 --- a/connector_test.go +++ b/connector_test.go @@ -51,12 +51,12 @@ func TestNewConnector(t *testing.T) { HTTPClient: &http.Client{Transport: roundTripper}, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, MaxRows: maxRows, QueryTimeout: timeout, Catalog: catalog, diff --git a/internal/config/config.go b/internal/config/config.go index 13fdd840..6bf45639 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -319,7 +319,7 @@ func ParseDSN(dsn string) (UserConfig, error) { // telemetry traffic are owned by the underlying retryable HTTP // client. Extract and discard the values so they don't fall through // into session params below. - _, _, _ = params.extractAsInt("telemetry_retry_count") + _, _ = params.extract("telemetry_retry_count") _, _ = params.extract("telemetry_retry_delay") // for timezone we do a case insensitive key match. diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 54756a9c..ef619cf7 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -33,18 +33,18 @@ func TestParseConfig(t *testing.T) { name: "base case", args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - AccessToken: "supersecret", - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -53,18 +53,18 @@ func TestParseConfig(t *testing.T) { name: "with https scheme", args: args{dsn: "https://token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a"}, //nolint:gosec // G101: test DSN with example password, not a real credential wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -73,17 +73,17 @@ func TestParseConfig(t *testing.T) { name: "with http scheme", args: args{dsn: "http://localhost:8080/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "http", - Host: "localhost", - Port: 8080, - MaxRows: defaultMaxRows, - Authenticator: &noop.NoopAuth{}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "http", + Host: "localhost", + Port: 8080, + MaxRows: defaultMaxRows, + Authenticator: &noop.NoopAuth{}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantErr: false, wantURL: "http://localhost:8080/sql/1.0/endpoints/12346a5b5b0e123a", @@ -92,16 +92,16 @@ func TestParseConfig(t *testing.T) { name: "with localhost", args: args{dsn: "http://localhost:8080"}, wantCfg: UserConfig{ - Protocol: "http", - Host: "localhost", - Port: 8080, - Authenticator: &noop.NoopAuth{}, - MaxRows: defaultMaxRows, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "http", + Host: "localhost", + Port: 8080, + Authenticator: &noop.NoopAuth{}, + MaxRows: defaultMaxRows, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantErr: false, wantURL: "http://localhost:8080", @@ -110,19 +110,19 @@ func TestParseConfig(t *testing.T) { name: "with query params", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -131,20 +131,20 @@ func TestParseConfig(t *testing.T) { name: "with query params and session params", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?timeout=100&maxRows=1000&timezone=America/Vancouver&QUERY_TAGS=team:testing,driver:go"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - Location: tz, - SessionParams: map[string]string{"timezone": "America/Vancouver", "QUERY_TAGS": "team:testing,driver:go"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + Location: tz, + SessionParams: map[string]string{"timezone": "America/Vancouver", "QUERY_TAGS": "team:testing,driver:go"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -153,17 +153,17 @@ func TestParseConfig(t *testing.T) { name: "bare", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Authenticator: &noop.NoopAuth{}, - Port: 8000, - MaxRows: defaultMaxRows, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Authenticator: &noop.NoopAuth{}, + Port: 8000, + MaxRows: defaultMaxRows, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -172,19 +172,19 @@ func TestParseConfig(t *testing.T) { name: "with catalog", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?catalog=default"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - Catalog: "default", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + Catalog: "default", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", wantErr: false, @@ -193,19 +193,19 @@ func TestParseConfig(t *testing.T) { name: "with user agent entry", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?userAgentEntry=partner-name"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - UserAgentEntry: "partner-name", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + UserAgentEntry: "partner-name", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", wantErr: false, @@ -214,19 +214,19 @@ func TestParseConfig(t *testing.T) { name: "with schema", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?schema=system"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -235,17 +235,17 @@ func TestParseConfig(t *testing.T) { name: "with useCloudFetch", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?useCloudFetch=true"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 10, @@ -260,17 +260,17 @@ func TestParseConfig(t *testing.T) { name: "with useCloudFetch and maxDownloadThreads", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?useCloudFetch=true&maxDownloadThreads=15"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -285,21 +285,21 @@ func TestParseConfig(t *testing.T) { name: "with everything", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&useCloudFetch=true&maxDownloadThreads=15"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -314,17 +314,17 @@ func TestParseConfig(t *testing.T) { name: "missing http path", args: args{dsn: "token:supersecret@example.cloud.databricks.com:443"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", wantErr: false, @@ -334,20 +334,20 @@ func TestParseConfig(t *testing.T) { name: "missing http path 2", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:443?catalog=default&schema=system&timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - Catalog: "default", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + Catalog: "default", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", wantErr: false, @@ -393,19 +393,19 @@ func TestParseConfig(t *testing.T) { name: "missing host", args: args{dsn: "token:supersecret2@:443?catalog=default&schema=system&timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Port: 443, - Protocol: "https", - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - MaxRows: 1000, - QueryTimeout: 100 * time.Second, - Catalog: "default", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Port: 443, + Protocol: "https", + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + MaxRows: 1000, + QueryTimeout: 100 * time.Second, + Catalog: "default", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://:443", wantErr: false, @@ -415,22 +415,22 @@ func TestParseConfig(t *testing.T) { name: "with accessToken param", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -439,22 +439,22 @@ func TestParseConfig(t *testing.T) { name: "with accessToken param and client id/secret params", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2&clientId=client_id&clientSecret=client_secret"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -463,22 +463,22 @@ func TestParseConfig(t *testing.T) { name: "authType unknown with accessTokenParam", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?authType=unknown&catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2&clientId=client_id&clientSecret=client_secret"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -517,21 +517,21 @@ func TestParseConfig(t *testing.T) { name: "authType unknown with client id/secret", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?authType=unknown&clientId=client_id&clientSecret=client_secret&catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - Authenticator: m2m.NewAuthenticator("client_id", "client_secret", "example.cloud.databricks.com"), - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + Authenticator: m2m.NewAuthenticator("client_id", "client_secret", "example.cloud.databricks.com"), + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -543,17 +543,17 @@ func TestParseConfig(t *testing.T) { name: "with telemetry_retry_count=0 (backwards-compat no-op)", args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?telemetry_retry_count=0&telemetry_retry_delay=100ms"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - AccessToken: "supersecret", - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go index 46731779..b9cedbb5 100644 --- a/telemetry/circuitbreaker.go +++ b/telemetry/circuitbreaker.go @@ -257,13 +257,6 @@ func (cb *circuitBreaker) extendOpenStateAtLeast(d time.Duration) { cb.mu.Unlock() } -// setState transitions to a new state. -func (cb *circuitBreaker) setState(newState circuitState) { - cb.mu.Lock() - defer cb.mu.Unlock() - cb.setStateUnlocked(newState) -} - // setStateUnlocked transitions to a new state without locking. // Caller must hold cb.mu lock. func (cb *circuitBreaker) setStateUnlocked(newState circuitState) { diff --git a/telemetry/exporter.go b/telemetry/exporter.go index af127bba..aae25518 100644 --- a/telemetry/exporter.go +++ b/telemetry/exporter.go @@ -99,6 +99,7 @@ func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetr // on ClientMethod) using its configured wait policy and Retry-After. // - generic 5xx (500/502/504) and transport errors → one attempt; the // circuit breaker counts them as a failure per export. +// // Any non-2xx outcome reaching this function is therefore the *post-retry* // (or single-attempt) result, returned so the breaker observes exactly one // signal per export call.