Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/databricks/databricks-sql-go/auth/tokenprovider"
"github.com/databricks/databricks-sql-go/driverctx"
dbsqlerr "github.com/databricks/databricks-sql-go/errors"
"github.com/databricks/databricks-sql-go/internal/agent"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
"github.com/databricks/databricks-sql-go/internal/config"
Expand Down Expand Up @@ -95,12 +96,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{
Host: c.cfg.Host,
DriverVersion: c.cfg.DriverVersion,
UserAgent: buildUserAgent(c.cfg),
HTTPClient: telemetryClient,
EnableTelemetry: c.cfg.EnableTelemetry,
BatchSize: c.cfg.TelemetryBatchSize,
FlushInterval: c.cfg.TelemetryFlushInterval,
RetryCount: c.cfg.TelemetryRetryCount,
RetryDelay: c.cfg.TelemetryRetryDelay,
})
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
Expand All @@ -117,6 +117,21 @@ func (c *connector) Driver() driver.Driver {
return &databricksDriver{}
}

// buildUserAgent constructs the User-Agent header value used by the driver.
// Mirrors the format set on the Thrift HTTP client in
// internal/client/client.go so telemetry, feature-flag, and Thrift requests
// all carry the same identifier.
func buildUserAgent(cfg *config.Config) string {
userAgent := fmt.Sprintf("%s/%s", cfg.DriverName, cfg.DriverVersion)
if cfg.UserAgentEntry != "" {
userAgent = fmt.Sprintf("%s/%s (%s)", cfg.DriverName, cfg.DriverVersion, cfg.UserAgentEntry)
}
if agentProduct := agent.Detect(); agentProduct != "" {
userAgent = fmt.Sprintf("%s agent/%s", userAgent, agentProduct)
}
return userAgent
}

var _ driver.Connector = (*connector)(nil)

type ConnOption func(*config.Config)
Expand Down
73 changes: 35 additions & 38 deletions connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,23 @@
HTTPClient: &http.Client{Transport: roundTripper},
}
expectedUserConfig := config.UserConfig{
Host: host,

Check failure on line 54 in connector_test.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
Port: port,
Protocol: "https",
AccessToken: accessToken,
Authenticator: &pat.PATAuth{AccessToken: accessToken},
HTTPPath: "/" + httpPath,
MaxRows: maxRows,
QueryTimeout: timeout,
Catalog: catalog,
Schema: schema,
UserAgentEntry: userAgentEntry,
SessionParams: sessionParams,
RetryMax: 10,
RetryWaitMin: 3 * time.Second,
RetryWaitMax: 60 * time.Second,
Transport: roundTripper,
TelemetryRetryCount: -1,
CloudFetchConfig: expectedCloudFetchConfig,
MaxRows: maxRows,
QueryTimeout: timeout,
Catalog: catalog,
Schema: schema,
UserAgentEntry: userAgentEntry,
SessionParams: sessionParams,
RetryMax: 10,
RetryWaitMin: 3 * time.Second,
RetryWaitMax: 60 * time.Second,
Transport: roundTripper,
CloudFetchConfig: expectedCloudFetchConfig,
}
expectedCfg := config.WithDefaults()
expectedCfg.DriverVersion = DriverVersion
Expand Down Expand Up @@ -99,19 +98,18 @@
CloudFetchSpeedThresholdMbps: 0.1,
}
expectedUserConfig := config.UserConfig{
Host: host,
Port: port,
Protocol: "https",
AccessToken: accessToken,
Authenticator: &pat.PATAuth{AccessToken: accessToken},
HTTPPath: "/" + httpPath,
MaxRows: maxRows,
SessionParams: sessionParams,
RetryMax: 4,
RetryWaitMin: 1 * time.Second,
RetryWaitMax: 30 * time.Second,
TelemetryRetryCount: -1,
CloudFetchConfig: expectedCloudFetchConfig,
Host: host,
Port: port,
Protocol: "https",
AccessToken: accessToken,
Authenticator: &pat.PATAuth{AccessToken: accessToken},
HTTPPath: "/" + httpPath,
MaxRows: maxRows,
SessionParams: sessionParams,
RetryMax: 4,
RetryWaitMin: 1 * time.Second,
RetryWaitMax: 30 * time.Second,
CloudFetchConfig: expectedCloudFetchConfig,
}
expectedCfg := config.WithDefaults()
expectedCfg.UserConfig = expectedUserConfig
Expand Down Expand Up @@ -142,19 +140,18 @@
CloudFetchSpeedThresholdMbps: 0.1,
}
expectedUserConfig := config.UserConfig{
Host: host,
Port: port,
Protocol: "https",
AccessToken: accessToken,
Authenticator: &pat.PATAuth{AccessToken: accessToken},
HTTPPath: "/" + httpPath,
MaxRows: maxRows,
SessionParams: sessionParams,
RetryMax: -1,
RetryWaitMin: 0,
RetryWaitMax: 0,
TelemetryRetryCount: -1,
CloudFetchConfig: expectedCloudFetchConfig,
Host: host,
Port: port,
Protocol: "https",
AccessToken: accessToken,
Authenticator: &pat.PATAuth{AccessToken: accessToken},
HTTPPath: "/" + httpPath,
MaxRows: maxRows,
SessionParams: sessionParams,
RetryMax: -1,
RetryWaitMin: 0,
RetryWaitMax: 0,
CloudFetchConfig: expectedCloudFetchConfig,
}
expectedCfg := config.WithDefaults()
expectedCfg.DriverVersion = DriverVersion
Expand Down
37 changes: 37 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,35 @@ type contextKey int

const (
ClientMethod contextKey = iota
// SkipRateLimitRetry, when set to true on the request context, makes
// RetryPolicy treat 429 Too Many Requests as a non-retryable response —
// the request returns the 429 error after a single attempt. The flag
// is narrowly scoped to 429: 503 still retries via the
// isRetryableServerResponse path (which is not method-gated), and any
// other retry decision (generic 5xx / transport error) is governed by
// the request's ClientMethod as usual.
//
// Set by callers that own their own backoff for rate-limited endpoints
// (e.g. the telemetry exporter, whose circuit breaker requires one HTTP
// transaction per call so it can see the 429 directly).
SkipRateLimitRetry
)

// WithSkipRateLimitRetry returns a context that disables retryablehttp's
// retry-on-429 behavior for any request issued with it. The caller takes
// responsibility for handling rate-limit responses (e.g. via a circuit
// breaker).
//
// Note: this flag only affects 429. Whether 5xx generic responses
// (500/502/504) and transport errors are retried depends on the
// ClientMethod set on the context (see nonRetryableClientMethods).
// Callers that do not set a ClientMethod (e.g. the telemetry exporter)
// effectively get one-shot semantics on those failure modes; their
// circuit breaker is expected to absorb them.
func WithSkipRateLimitRetry(ctx context.Context) context.Context {
return context.WithValue(ctx, SkipRateLimitRetry, true)
}

type clientMethod int

//go:generate go run golang.org/x/tools/cmd/stringer -type=clientMethod -trimprefix=clientMethod
Expand Down Expand Up @@ -712,6 +739,16 @@ func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, err
retryAfter = resp.Header.Get("Retry-After")
}

// Callers that own their own rate-limit backoff (e.g. the telemetry
// circuit breaker) opt out of 429 retries via SkipRateLimitRetry so
// each HTTP attempt is a distinct signal to them. 503s still retry
// normally even when this flag is set.
if resp.StatusCode == http.StatusTooManyRequests {
if skip, _ := ctx.Value(SkipRateLimitRetry).(bool); skip {
return false, dbsqlerrint.NewRetryableError(checkErr, retryAfter)
}
}

return true, dbsqlerrint.NewRetryableError(checkErr, retryAfter)
}

Expand Down
35 changes: 35 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,41 @@ func TestRetryPolicy(t *testing.T) {

})

// SkipRateLimitRetry on the context turns 429 into a non-retryable
// response so the caller can own its own backoff. 5xx and transport
// errors are unaffected — the flag is narrowly scoped to rate-limit
// responses.
//
// We use clientMethodOpenSession (which is a retryable method) so the
// generic 5xx branch fires; the default clientMethodUnknown is itself
// in nonRetryableClientMethods and would suppress 500 retries for an
// unrelated reason.
t.Run("SkipRateLimitRetry only affects 429", func(t *testing.T) {
base := context.WithValue(context.Background(), ClientMethod, clientMethodOpenSession)
ctx := WithSkipRateLimitRetry(base)

resp429 := &http.Response{StatusCode: http.StatusTooManyRequests}
retry, err := RetryPolicy(ctx, resp429, nil)
require.False(t, retry, "429 with SkipRateLimitRetry should not be retried")
require.Error(t, err, "429 should still surface an error")

// 503 is the other code isRetryableServerResponse recognizes; the
// flag should NOT suppress its retry.
resp503 := &http.Response{StatusCode: http.StatusServiceUnavailable}
retry, _ = RetryPolicy(ctx, resp503, nil)
require.True(t, retry, "503 must still retry even with SkipRateLimitRetry")

// 500 takes the generic 5xx branch — also unaffected for a
// retryable client method.
resp500 := &http.Response{StatusCode: http.StatusInternalServerError}
retry, _ = RetryPolicy(ctx, resp500, nil)
require.True(t, retry, "500 must still retry even with SkipRateLimitRetry")

// Without the flag, 429 retries as before.
retry, _ = RetryPolicy(base, resp429, nil)
require.True(t, retry, "default behavior: 429 retries")
})

t.Run("test handling client method errors", func(t *testing.T) {
cases := []struct {
base string
Expand Down
28 changes: 7 additions & 21 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@
EnableTelemetry ConfigValue[bool]
TelemetryBatchSize int // 0 = use default (100)
TelemetryFlushInterval time.Duration // 0 = use default (5s)
TelemetryRetryCount int // -1 = use default (3); 0 = disable retries; set via telemetry_retry_count
TelemetryRetryDelay time.Duration // 0 = use default (100ms); set via telemetry_retry_delay
Transport http.RoundTripper
UseLz4Compression bool
EnableMetricViewMetadata bool
Expand Down Expand Up @@ -155,8 +153,6 @@
EnableTelemetry: ucfg.EnableTelemetry,
TelemetryBatchSize: ucfg.TelemetryBatchSize,
TelemetryFlushInterval: ucfg.TelemetryFlushInterval,
TelemetryRetryCount: ucfg.TelemetryRetryCount,
TelemetryRetryDelay: ucfg.TelemetryRetryDelay,
}
}

Expand Down Expand Up @@ -195,10 +191,6 @@
// EnableTelemetry defaults to unset (ConfigValue zero value),
// meaning telemetry is controlled by server feature flags.

// TelemetryRetryCount uses -1 as "not set" so that an explicit 0 from the
// DSN (meaning "disable retries") is distinguishable from the default.
ucfg.TelemetryRetryCount = -1

return ucfg
}

Expand Down Expand Up @@ -322,19 +314,13 @@
ucfg.TelemetryFlushInterval = d
}
}
if retryCount, ok, err := params.extractAsInt("telemetry_retry_count"); ok {
if err != nil {
return UserConfig{}, err
}
if retryCount >= 0 {
ucfg.TelemetryRetryCount = retryCount
}
}
if retryDelay, ok := params.extract("telemetry_retry_delay"); ok {
if d, err := time.ParseDuration(retryDelay); err == nil && d > 0 {
ucfg.TelemetryRetryDelay = d
}
}
// telemetry_retry_count and telemetry_retry_delay are accepted for
// backwards compatibility but no longer applied — retries for
// telemetry traffic are owned by the underlying retryable HTTP
// client. Extract and discard the values so they don't fall through
// into session params below.
_, _, _ = params.extractAsInt("telemetry_retry_count")

Check failure on line 322 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / Lint

declaration has 3 blank identifiers (dogsled)
_, _ = params.extract("telemetry_retry_delay")

// for timezone we do a case insensitive key match.
// We use getNoCase because we want to leave timezone in the params so that it will also
Expand Down
Loading
Loading