Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

This document outlines a **telemetry design** for the Databricks SQL Go driver that collects usage metrics and exports them to the Databricks telemetry service. The design leverages Go's `context.Context` and middleware patterns to instrument driver operations without impacting performance.

**Important Note:** Telemetry is **disabled by default** and will be enabled only after full testing and validation is complete.

**Key Objectives:**
- Collect driver usage metrics and performance data
- Export aggregated metrics to Databricks telemetry service
Expand Down Expand Up @@ -1364,9 +1366,10 @@ type Config struct {
}

// DefaultConfig returns default telemetry configuration.
// Note: Telemetry is disabled by default and will be enabled after full testing and validation.
func DefaultConfig() *Config {
return &Config{
Enabled: true,
Enabled: false, // Disabled by default until testing is complete
BatchSize: 100,
FlushInterval: 5 * time.Second,
MaxRetries: 3,
Expand Down Expand Up @@ -1733,11 +1736,11 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {

## 11. Implementation Checklist

### Phase 1: Core Infrastructure
- [ ] Create `telemetry` package structure
- [ ] Implement `config.go` with configuration types
- [ ] Implement `tags.go` with tag definitions and filtering
- [ ] Add unit tests for configuration and tags
### Phase 1: Core Infrastructure ✅ COMPLETED (PECOBLR-1145)
- [x] Create `telemetry` package structure
- [x] Implement `config.go` with configuration types
- [x] Implement `tags.go` with tag definitions and filtering
- [x] Add unit tests for configuration and tags

### Phase 2: Per-Host Management
- [ ] Implement `featureflag.go` with caching and reference counting
Expand Down
71 changes: 71 additions & 0 deletions telemetry/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package telemetry

import (
"strconv"
"time"
)

// Config holds telemetry configuration.
type Config struct {
// Enabled controls whether telemetry is active
Enabled bool

// BatchSize is the number of metrics to batch before flushing
BatchSize int

// FlushInterval is how often to flush metrics
FlushInterval time.Duration

// MaxRetries is the maximum number of retry attempts
MaxRetries int

// RetryDelay is the base delay between retries
RetryDelay time.Duration

// CircuitBreakerEnabled enables circuit breaker protection
CircuitBreakerEnabled bool

// CircuitBreakerThreshold is failures before opening circuit
CircuitBreakerThreshold int

// CircuitBreakerTimeout is time before retrying after open
CircuitBreakerTimeout time.Duration
}

// DefaultConfig returns default telemetry configuration.
// Note: Telemetry is disabled by default and will be enabled after full testing and validation.
func DefaultConfig() *Config {
return &Config{
Enabled: false, // Disabled by default until testing is complete
BatchSize: 100,
FlushInterval: 5 * time.Second,
MaxRetries: 3,
RetryDelay: 100 * time.Millisecond,
CircuitBreakerEnabled: true,
CircuitBreakerThreshold: 5,
CircuitBreakerTimeout: 1 * time.Minute,
}
}

// ParseTelemetryConfig extracts telemetry config from DSN query parameters.
func ParseTelemetryConfig(params map[string]string) *Config {
cfg := DefaultConfig()

if v, ok := params["telemetry"]; ok {
cfg.Enabled = v == "true" || v == "1"
}

if v, ok := params["telemetry_batch_size"]; ok {
if size, err := strconv.Atoi(v); err == nil && size > 0 {
cfg.BatchSize = size
}
}

if v, ok := params["telemetry_flush_interval"]; ok {
if duration, err := time.ParseDuration(v); err == nil {
cfg.FlushInterval = duration
}
}

return cfg
}
187 changes: 187 additions & 0 deletions telemetry/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package telemetry

import (
"testing"
"time"
)

func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig()

// Verify telemetry is disabled by default
if cfg.Enabled {
t.Error("Expected telemetry to be disabled by default, got enabled")
}

// Verify other defaults
if cfg.BatchSize != 100 {
t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize)
}

if cfg.FlushInterval != 5*time.Second {
t.Errorf("Expected FlushInterval 5s, got %v", cfg.FlushInterval)
}

if cfg.MaxRetries != 3 {
t.Errorf("Expected MaxRetries 3, got %d", cfg.MaxRetries)
}

if cfg.RetryDelay != 100*time.Millisecond {
t.Errorf("Expected RetryDelay 100ms, got %v", cfg.RetryDelay)
}

if !cfg.CircuitBreakerEnabled {
t.Error("Expected CircuitBreakerEnabled true, got false")
}

if cfg.CircuitBreakerThreshold != 5 {
t.Errorf("Expected CircuitBreakerThreshold 5, got %d", cfg.CircuitBreakerThreshold)
}

if cfg.CircuitBreakerTimeout != 1*time.Minute {
t.Errorf("Expected CircuitBreakerTimeout 1m, got %v", cfg.CircuitBreakerTimeout)
}
}

func TestParseTelemetryConfig_EmptyParams(t *testing.T) {
params := map[string]string{}
cfg := ParseTelemetryConfig(params)

// Should return defaults
if cfg.Enabled {
t.Error("Expected telemetry to be disabled by default")
}

if cfg.BatchSize != 100 {
t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize)
}
}

func TestParseTelemetryConfig_EnabledTrue(t *testing.T) {
params := map[string]string{
"telemetry": "true",
}
cfg := ParseTelemetryConfig(params)

if !cfg.Enabled {
t.Error("Expected telemetry to be enabled when set to 'true'")
}
}

func TestParseTelemetryConfig_Enabled1(t *testing.T) {
params := map[string]string{
"telemetry": "1",
}
cfg := ParseTelemetryConfig(params)

if !cfg.Enabled {
t.Error("Expected telemetry to be enabled when set to '1'")
}
}

func TestParseTelemetryConfig_EnabledFalse(t *testing.T) {
params := map[string]string{
"telemetry": "false",
}
cfg := ParseTelemetryConfig(params)

if cfg.Enabled {
t.Error("Expected telemetry to be disabled when set to 'false'")
}
}

func TestParseTelemetryConfig_BatchSize(t *testing.T) {
params := map[string]string{
"telemetry_batch_size": "50",
}
cfg := ParseTelemetryConfig(params)

if cfg.BatchSize != 50 {
t.Errorf("Expected BatchSize 50, got %d", cfg.BatchSize)
}
}

func TestParseTelemetryConfig_BatchSizeInvalid(t *testing.T) {
params := map[string]string{
"telemetry_batch_size": "invalid",
}
cfg := ParseTelemetryConfig(params)

// Should fall back to default
if cfg.BatchSize != 100 {
t.Errorf("Expected BatchSize to fallback to 100, got %d", cfg.BatchSize)
}
}

func TestParseTelemetryConfig_BatchSizeZero(t *testing.T) {
params := map[string]string{
"telemetry_batch_size": "0",
}
cfg := ParseTelemetryConfig(params)

// Should ignore zero and use default
if cfg.BatchSize != 100 {
t.Errorf("Expected BatchSize to fallback to 100 when zero, got %d", cfg.BatchSize)
}
}

func TestParseTelemetryConfig_BatchSizeNegative(t *testing.T) {
params := map[string]string{
"telemetry_batch_size": "-10",
}
cfg := ParseTelemetryConfig(params)

// Should ignore negative and use default
if cfg.BatchSize != 100 {
t.Errorf("Expected BatchSize to fallback to 100 when negative, got %d", cfg.BatchSize)
}
}

func TestParseTelemetryConfig_FlushInterval(t *testing.T) {
params := map[string]string{
"telemetry_flush_interval": "10s",
}
cfg := ParseTelemetryConfig(params)

if cfg.FlushInterval != 10*time.Second {
t.Errorf("Expected FlushInterval 10s, got %v", cfg.FlushInterval)
}
}

func TestParseTelemetryConfig_FlushIntervalInvalid(t *testing.T) {
params := map[string]string{
"telemetry_flush_interval": "invalid",
}
cfg := ParseTelemetryConfig(params)

// Should fall back to default
if cfg.FlushInterval != 5*time.Second {
t.Errorf("Expected FlushInterval to fallback to 5s, got %v", cfg.FlushInterval)
}
}

func TestParseTelemetryConfig_MultipleParams(t *testing.T) {
params := map[string]string{
"telemetry": "true",
"telemetry_batch_size": "200",
"telemetry_flush_interval": "30s",
}
cfg := ParseTelemetryConfig(params)

if !cfg.Enabled {
t.Error("Expected telemetry to be enabled")
}

if cfg.BatchSize != 200 {
t.Errorf("Expected BatchSize 200, got %d", cfg.BatchSize)
}

if cfg.FlushInterval != 30*time.Second {
t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval)
}

// Other fields should still have defaults
if cfg.MaxRetries != 3 {
t.Errorf("Expected MaxRetries to remain default 3, got %d", cfg.MaxRetries)
}
}
101 changes: 101 additions & 0 deletions telemetry/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package telemetry

// Tag names for connection metrics
const (
TagWorkspaceID = "workspace.id"
TagSessionID = "session.id"
TagDriverVersion = "driver.version"
TagDriverOS = "driver.os"
TagDriverRuntime = "driver.runtime"
TagServerAddress = "server.address" // Not exported to Databricks
)

// Tag names for statement metrics
const (
TagStatementID = "statement.id"
TagResultFormat = "result.format"
TagResultChunkCount = "result.chunk_count"
TagResultBytesDownloaded = "result.bytes_downloaded"
TagCompressionEnabled = "result.compression_enabled"
TagPollCount = "poll.count"
TagPollLatency = "poll.latency_ms"
)

// Tag names for error metrics
const (
TagErrorType = "error.type"
TagErrorCode = "error.code"
)

// Feature flag tags
const (
TagFeatureCloudFetch = "feature.cloudfetch"
TagFeatureLZ4 = "feature.lz4"
TagFeatureDirectResults = "feature.direct_results"
)

// tagExportScope defines where a tag can be exported.
type tagExportScope int

const (
exportNone tagExportScope = 0
exportLocal = 1 << iota
exportDatabricks
exportAll = exportLocal | exportDatabricks
)

// tagDefinition defines a metric tag and its export scope.
type tagDefinition struct {
name string
exportScope tagExportScope
description string
required bool
}

// connectionTags returns tags allowed for connection events.
func connectionTags() []tagDefinition {
return []tagDefinition{
{TagWorkspaceID, exportDatabricks, "Databricks workspace ID", true},
{TagSessionID, exportDatabricks, "Connection session ID", true},
{TagDriverVersion, exportAll, "Driver version", false},
{TagDriverOS, exportAll, "Operating system", false},
{TagDriverRuntime, exportAll, "Go runtime version", false},
{TagFeatureCloudFetch, exportDatabricks, "CloudFetch enabled", false},
{TagFeatureLZ4, exportDatabricks, "LZ4 compression enabled", false},
{TagServerAddress, exportLocal, "Server address (local only)", false},
}
}

// statementTags returns tags allowed for statement events.
func statementTags() []tagDefinition {
return []tagDefinition{
{TagStatementID, exportDatabricks, "Statement ID", true},
{TagSessionID, exportDatabricks, "Session ID", true},
{TagResultFormat, exportDatabricks, "Result format", false},
{TagResultChunkCount, exportDatabricks, "Chunk count", false},
{TagResultBytesDownloaded, exportDatabricks, "Bytes downloaded", false},
{TagCompressionEnabled, exportDatabricks, "Compression enabled", false},
{TagPollCount, exportDatabricks, "Poll count", false},
{TagPollLatency, exportDatabricks, "Poll latency", false},
}
}

// shouldExportToDatabricks returns true if tag should be exported to Databricks.
func shouldExportToDatabricks(metricType, tagName string) bool {
var tags []tagDefinition
switch metricType {
case "connection":
tags = connectionTags()
case "statement":
tags = statementTags()
default:
return false
}

for _, tag := range tags {
if tag.name == tagName {
return tag.exportScope&exportDatabricks != 0
}
}
return false
}
Loading
Loading