diff --git a/internal/receiver/databricksreceiver/factory.go b/internal/receiver/databricksreceiver/factory.go index 57be02e8ac..eb5b869fe0 100644 --- a/internal/receiver/databricksreceiver/factory.go +++ b/internal/receiver/databricksreceiver/factory.go @@ -58,6 +58,7 @@ func newReceiverFactory() receiver.CreateMetricsFunc { rmp: databricks.NewRunMetricsProvider(dbrsvc), dbrmp: databricks.MetricsProvider{Svc: dbrsvc}, metricsBuilder: metadata.NewMetricsBuilder(dbrcfg.MetricsBuilderConfig, settings), + resourceBuilder: metadata.NewResourceBuilder(dbrcfg.MetricsBuilderConfig.ResourceAttributes), scmb: spark.ClusterMetricsBuilder{Ssvc: ssvc}, semb: spark.ExtraMetricsBuilder{ Ssvc: ssvc, diff --git a/internal/receiver/databricksreceiver/internal/metadata/generated_metrics.go b/internal/receiver/databricksreceiver/internal/metadata/generated_metrics.go index 5b2703da0c..4890e6d335 100644 --- a/internal/receiver/databricksreceiver/internal/metadata/generated_metrics.go +++ b/internal/receiver/databricksreceiver/internal/metadata/generated_metrics.go @@ -7673,10 +7673,8 @@ func newMetricDatabricksTasksScheduleStatus(cfg MetricConfig) metricDatabricksTa // MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations // required to produce metric representation defined in metadata and user config. type MetricsBuilder struct { - startTime pcommon.Timestamp // start time that will be applied to all recorded data points. - metricsCapacity int // maximum observed number of metrics per resource. - metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. - buildInfo component.BuildInfo // contains version information + metricsBuffer pmetric.Metrics + buildInfo component.BuildInfo metricDatabricksJobsActiveTotal metricDatabricksJobsActiveTotal metricDatabricksJobsRunDuration metricDatabricksJobsRunDuration metricDatabricksJobsScheduleStatus metricDatabricksJobsScheduleStatus @@ -7817,6 +7815,8 @@ type MetricsBuilder struct { metricDatabricksSparkTimerLiveListenerBusQueueStreamsListenerProcessingTime metricDatabricksSparkTimerLiveListenerBusQueueStreamsListenerProcessingTime metricDatabricksTasksRunDuration metricDatabricksTasksRunDuration metricDatabricksTasksScheduleStatus metricDatabricksTasksScheduleStatus + startTime pcommon.Timestamp + metricsCapacity int } // metricBuilderOption applies changes to default metrics builder. diff --git a/internal/receiver/databricksreceiver/internal/metadata/generated_resource.go b/internal/receiver/databricksreceiver/internal/metadata/generated_resource.go new file mode 100644 index 0000000000..8bde4e201e --- /dev/null +++ b/internal/receiver/databricksreceiver/internal/metadata/generated_resource.go @@ -0,0 +1,57 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// ResourceBuilder is a helper struct to build resources predefined in metadata.yaml. +// The ResourceBuilder is not thread-safe and must not to be used in multiple goroutines. +type ResourceBuilder struct { + res pcommon.Resource + config ResourceAttributesConfig +} + +// NewResourceBuilder creates a new ResourceBuilder. This method should be called on the start of the application. +func NewResourceBuilder(rac ResourceAttributesConfig) *ResourceBuilder { + return &ResourceBuilder{ + config: rac, + res: pcommon.NewResource(), + } +} + +// SetDatabricksInstanceName sets provided value as "databricks.instance.name" attribute. +func (rb *ResourceBuilder) SetDatabricksInstanceName(val string) { + if rb.config.DatabricksInstanceName.Enabled { + rb.res.Attributes().PutStr("databricks.instance.name", val) + } +} + +// SetSparkAppID sets provided value as "spark.app.id" attribute. +func (rb *ResourceBuilder) SetSparkAppID(val string) { + if rb.config.SparkAppID.Enabled { + rb.res.Attributes().PutStr("spark.app.id", val) + } +} + +// SetSparkClusterID sets provided value as "spark.cluster.id" attribute. +func (rb *ResourceBuilder) SetSparkClusterID(val string) { + if rb.config.SparkClusterID.Enabled { + rb.res.Attributes().PutStr("spark.cluster.id", val) + } +} + +// SetSparkClusterName sets provided value as "spark.cluster.name" attribute. +func (rb *ResourceBuilder) SetSparkClusterName(val string) { + if rb.config.SparkClusterName.Enabled { + rb.res.Attributes().PutStr("spark.cluster.name", val) + } +} + +// Emit returns the built resource and resets the internal builder state. +func (rb *ResourceBuilder) Emit() pcommon.Resource { + r := rb.res + rb.res = pcommon.NewResource() + return r +} diff --git a/internal/receiver/databricksreceiver/internal/metadata/generated_resource_test.go b/internal/receiver/databricksreceiver/internal/metadata/generated_resource_test.go new file mode 100644 index 0000000000..f475c1a53e --- /dev/null +++ b/internal/receiver/databricksreceiver/internal/metadata/generated_resource_test.go @@ -0,0 +1,58 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResourceBuilder(t *testing.T) { + for _, test := range []string{"default", "all_set", "none_set"} { + t.Run(test, func(t *testing.T) { + cfg := loadResourceAttributesConfig(t, test) + rb := NewResourceBuilder(cfg) + rb.SetDatabricksInstanceName("databricks.instance.name-val") + rb.SetSparkAppID("spark.app.id-val") + rb.SetSparkClusterID("spark.cluster.id-val") + rb.SetSparkClusterName("spark.cluster.name-val") + + res := rb.Emit() + assert.Equal(t, 0, rb.Emit().Attributes().Len()) // Second call should return 0 + + switch test { + case "default": + assert.Equal(t, 4, res.Attributes().Len()) + case "all_set": + assert.Equal(t, 4, res.Attributes().Len()) + case "none_set": + assert.Equal(t, 0, res.Attributes().Len()) + return + default: + assert.Failf(t, "unexpected test case: %s", test) + } + + val, ok := res.Attributes().Get("databricks.instance.name") + assert.True(t, ok) + if ok { + assert.EqualValues(t, "databricks.instance.name-val", val.Str()) + } + val, ok = res.Attributes().Get("spark.app.id") + assert.True(t, ok) + if ok { + assert.EqualValues(t, "spark.app.id-val", val.Str()) + } + val, ok = res.Attributes().Get("spark.cluster.id") + assert.True(t, ok) + if ok { + assert.EqualValues(t, "spark.cluster.id-val", val.Str()) + } + val, ok = res.Attributes().Get("spark.cluster.name") + assert.True(t, ok) + if ok { + assert.EqualValues(t, "spark.cluster.name-val", val.Str()) + } + }) + } +} diff --git a/internal/receiver/databricksreceiver/internal/spark/cluster_metrics_builder_test.go b/internal/receiver/databricksreceiver/internal/spark/cluster_metrics_builder_test.go index 8555a7c39a..c5e5c81c40 100644 --- a/internal/receiver/databricksreceiver/internal/spark/cluster_metrics_builder_test.go +++ b/internal/receiver/databricksreceiver/internal/spark/cluster_metrics_builder_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/commontest" + "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata" ) func TestStripSparkMetricKey(t *testing.T) { @@ -47,7 +48,8 @@ func TestClusterMetricsBuilder_GeneratedMetrics(t *testing.T) { const expectedCount = 112 testBuilder := commontest.NewTestMetricsBuilder() - built := coreMetrics.Build(testBuilder, pcommon.NewTimestampFromTime(time.Now())) + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + built := coreMetrics.Build(testBuilder, rb, pcommon.NewTimestampFromTime(time.Now()), "my-app-id") pm := pmetric.NewMetrics() for _, metric := range built { metric.ResourceMetrics().MoveAndAppendTo(pm.ResourceMetrics()) diff --git a/internal/receiver/databricksreceiver/internal/spark/extra_metrics_builder_test.go b/internal/receiver/databricksreceiver/internal/spark/extra_metrics_builder_test.go index d74c24d686..65c0ff15a0 100644 --- a/internal/receiver/databricksreceiver/internal/spark/extra_metrics_builder_test.go +++ b/internal/receiver/databricksreceiver/internal/spark/extra_metrics_builder_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/commontest" + "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata" ) func TestSparkExtraMetricsBuilder_Executors(t *testing.T) { @@ -32,7 +33,8 @@ func TestSparkExtraMetricsBuilder_Executors(t *testing.T) { require.NoError(t, err) builder := commontest.NewTestMetricsBuilder() - built := execMetrics.Build(builder, pcommon.NewTimestampFromTime(time.Now())) + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + built := execMetrics.Build(builder, rb, pcommon.NewTimestampFromTime(time.Now()), "my-app-id") pm := pmetric.NewMetrics() for _, metrics := range built { metrics.ResourceMetrics().MoveAndAppendTo(pm.ResourceMetrics()) @@ -53,7 +55,8 @@ func TestSparkExtraMetricsBuilder_Jobs(t *testing.T) { require.NoError(t, err) builder := commontest.NewTestMetricsBuilder() - built := jobMetrics.Build(builder, pcommon.NewTimestampFromTime(time.Now())) + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + built := jobMetrics.Build(builder, rb, pcommon.NewTimestampFromTime(time.Now()), "my-app-id") pm := pmetric.NewMetrics() for _, metrics := range built { metrics.ResourceMetrics().MoveAndAppendTo(pm.ResourceMetrics()) @@ -77,7 +80,8 @@ func TestSparkExtraMetricsBuilder_Stages(t *testing.T) { require.NoError(t, err) builder := commontest.NewTestMetricsBuilder() - built := stageMetrics.Build(builder, pcommon.NewTimestampFromTime(time.Now())) + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + built := stageMetrics.Build(builder, rb, pcommon.NewTimestampFromTime(time.Now()), "my-app-id") pm := pmetric.NewMetrics() for _, metrics := range built { metrics.ResourceMetrics().MoveAndAppendTo(pm.ResourceMetrics()) diff --git a/internal/receiver/databricksreceiver/internal/spark/resource_metrics.go b/internal/receiver/databricksreceiver/internal/spark/resource_metrics.go index e01db5addc..779f9ba62e 100644 --- a/internal/receiver/databricksreceiver/internal/spark/resource_metrics.go +++ b/internal/receiver/databricksreceiver/internal/spark/resource_metrics.go @@ -124,15 +124,16 @@ func (m *ResourceMetrics) addStageInfo(clstr Cluster, appID string, info StageIn }) } -func (m *ResourceMetrics) Build(builder *metadata.MetricsBuilder, now pcommon.Timestamp, rmo ...metadata.ResourceMetricsOption) []pmetric.Metrics { +func (m *ResourceMetrics) Build(mb *metadata.MetricsBuilder, rb *metadata.ResourceBuilder, now pcommon.Timestamp, instanceName string) []pmetric.Metrics { var out []pmetric.Metrics for rs, metricInfos := range m.m { for _, mi := range metricInfos { - mi.build(builder, rs, now) + mi.build(mb, rs, now) } - rmo = append(rmo, metadata.WithSparkClusterID(rs.cluster.ClusterID)) - rmo = append(rmo, metadata.WithSparkClusterName(rs.cluster.ClusterName)) - out = append(out, builder.Emit(rmo...)) + rb.SetDatabricksInstanceName(instanceName) + rb.SetSparkClusterID(rs.cluster.ClusterID) + rb.SetSparkClusterName(rs.cluster.ClusterName) + out = append(out, mb.Emit(metadata.WithResource(rb.Emit()))) } return out } diff --git a/internal/receiver/databricksreceiver/internal/spark/resource_metrics_test.go b/internal/receiver/databricksreceiver/internal/spark/resource_metrics_test.go index 00a3d5f004..a00d9c40c7 100644 --- a/internal/receiver/databricksreceiver/internal/spark/resource_metrics_test.go +++ b/internal/receiver/databricksreceiver/internal/spark/resource_metrics_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/commontest" + "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata" ) func TestSparkDbrMetrics_Append(t *testing.T) { @@ -45,8 +46,9 @@ func TestSparkDbrMetrics_Append(t *testing.T) { }) outerRM.Append(rmSub2) - builder := commontest.NewTestMetricsBuilder() - built := outerRM.Build(builder, pcommon.NewTimestampFromTime(time.Now())) + mb := commontest.NewTestMetricsBuilder() + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + built := outerRM.Build(mb, rb, pcommon.NewTimestampFromTime(time.Now()), "my-app-id") allMetrics := pmetric.NewMetrics() for _, metrics := range built { metrics.ResourceMetrics().CopyTo(allMetrics.ResourceMetrics()) diff --git a/internal/receiver/databricksreceiver/scraper.go b/internal/receiver/databricksreceiver/scraper.go index 1783f8c2b0..752ea3a8e6 100644 --- a/internal/receiver/databricksreceiver/scraper.go +++ b/internal/receiver/databricksreceiver/scraper.go @@ -39,6 +39,7 @@ type scraper struct { dbrsvc databricks.Service logger *zap.Logger metricsBuilder *metadata.MetricsBuilder + resourceBuilder *metadata.ResourceBuilder dbrInstanceName string } @@ -60,7 +61,8 @@ func (s scraper) scrape(_ context.Context) (pmetric.Metrics, error) { return pmetric.Metrics{}, fmt.Errorf("scrape failed to add multi job run metrics: %w", err) } - dbrMetrics := s.metricsBuilder.Emit(metadata.WithDatabricksInstanceName(s.dbrInstanceName)) + s.resourceBuilder.SetDatabricksInstanceName(s.dbrInstanceName) + dbrMetrics := s.metricsBuilder.Emit(metadata.WithResource(s.resourceBuilder.Emit())) // spark metrics clusters, err := s.dbrsvc.RunningClusters() @@ -103,7 +105,7 @@ func (s scraper) scrape(_ context.Context) (pmetric.Metrics, error) { out := pmetric.NewMetrics() dbrMetrics.ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics()) - sparkMetrics := allSparkDbrMetrics.Build(s.metricsBuilder, now, metadata.WithDatabricksInstanceName(s.dbrInstanceName)) + sparkMetrics := allSparkDbrMetrics.Build(s.metricsBuilder, s.resourceBuilder, now, s.dbrInstanceName) for _, metric := range sparkMetrics { metric.ResourceMetrics().MoveAndAppendTo(out.ResourceMetrics()) } diff --git a/internal/receiver/databricksreceiver/scraper_test.go b/internal/receiver/databricksreceiver/scraper_test.go index 9e2a025594..715485ffb4 100644 --- a/internal/receiver/databricksreceiver/scraper_test.go +++ b/internal/receiver/databricksreceiver/scraper_test.go @@ -24,6 +24,7 @@ import ( "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/commontest" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/databricks" + "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/metadata" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver/internal/spark" ) @@ -39,6 +40,7 @@ func TestScraper_Success(t *testing.T) { logger: nopLogger, dbrInstanceName: "my-instance", metricsBuilder: commontest.NewTestMetricsBuilder(), + resourceBuilder: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()), rmp: databricks.NewRunMetricsProvider(dbrsvc), dbrmp: databricks.MetricsProvider{Svc: dbrsvc}, scmb: spark.ClusterMetricsBuilder{Ssvc: ssvc}, @@ -71,6 +73,7 @@ func TestScraper_Forbidden(t *testing.T) { logger: nopLogger, dbrInstanceName: "my-instance", metricsBuilder: commontest.NewTestMetricsBuilder(), + resourceBuilder: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()), rmp: databricks.NewRunMetricsProvider(dbrsvc), dbrmp: databricks.MetricsProvider{Svc: dbrsvc}, scmb: spark.ClusterMetricsBuilder{Ssvc: ssvc}, @@ -94,6 +97,7 @@ func TestScraper_MultiCluster_Forbidden(t *testing.T) { logger: nopLogger, dbrInstanceName: "my-instance", metricsBuilder: commontest.NewTestMetricsBuilder(), + resourceBuilder: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()), rmp: databricks.NewRunMetricsProvider(dbrsvc), dbrmp: databricks.MetricsProvider{Svc: dbrsvc}, scmb: spark.ClusterMetricsBuilder{Ssvc: ssvc},