Skip to content

Commit

Permalink
[connector/otlpjson] Add connector's implementations (#34249)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This is the 2nd PR for the new `otlpjson` connector
(#34208).
Adds the `ConsumeLogs` methods for the logs, metrics and traces
respective connector types.

At the moment, the connector retrieves the otlp json from the incoming
Log's `Body`. This is not configurable yet. It can be decided if that
needs to be.

In addition, there is no attribute/metadata extraction/transfer from the
incoming Log. This can be added in future iterations.

**Link to tracking Issue:** <Issue number if applicable>
#34208
#34239

**Testing:** <Describe what testing was performed and which tests were
added.> Added.

**Documentation:** <Describe the documentation added.> ~

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark authored Jul 25, 2024
1 parent 1c06ffb commit 3cb736b
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/implement_oltpjson_connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otlpjsonconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add connector's implementations

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34249, 34208]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
97 changes: 97 additions & 0 deletions connector/otlpjsonconnector/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ package otlpjsonconnector
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

func TestNewFactory(t *testing.T) {
Expand All @@ -28,3 +35,93 @@ func TestNewFactory(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, conn)
}

func TestLogsToLogs(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.LogsSink{}
conn, err := factory.CreateLogsToLogs(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

lp := testdata.GenerateLogs(1)
marshaler := &plog.JSONMarshaler{}
b, err := marshaler.MarshalLogs(lp)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllLogs(), 1)
assert.EqualValues(t, lp, sink.AllLogs()[0])
}

func TestLogsToMetrics(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.MetricsSink{}
conn, err := factory.CreateLogsToMetrics(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

mt := testdata.GenerateMetrics(1)
marshaler := &pmetric.JSONMarshaler{}
b, err := marshaler.MarshalMetrics(mt)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllMetrics(), 1)
assert.EqualValues(t, mt, sink.AllMetrics()[0])
}

func TestLogsToTraces(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.TracesSink{}
conn, err := factory.CreateLogsToTraces(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

td := testdata.GenerateTraces(1)
marshaler := &ptrace.JSONMarshaler{}
b, err := marshaler.MarshalTraces(td)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllTraces(), 1)
assert.EqualValues(t, td, sink.AllTraces()[0])
}
2 changes: 2 additions & 0 deletions connector/otlpjsonconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
go.opentelemetry.io/collector/connector v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/consumer v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/pdata v1.12.1-0.20240716231837-5753a58f712b
go.opentelemetry.io/collector/pdata/testdata v0.105.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand Down Expand Up @@ -41,6 +42,7 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/featuregate v1.12.1-0.20240716231837-5753a58f712b // indirect
go.opentelemetry.io/collector/internal/globalgates v0.105.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.105.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
Expand Down
20 changes: 19 additions & 1 deletion connector/otlpjsonconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ func (c *connectorLogs) Capabilities() consumer.Capabilities {
}

// ConsumeLogs method is called for each instance of a log sent to the connector
func (c *connectorLogs) ConsumeLogs(_ context.Context, _ plog.Logs) error {
func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
// loop through the levels of logs
logsUnmarshaler := &plog.JSONUnmarshaler{}
for i := 0; i < pl.ResourceLogs().Len(); i++ {
li := pl.ResourceLogs().At(i)
for j := 0; j < li.ScopeLogs().Len(); j++ {
logRecord := li.ScopeLogs().At(j)
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var l plog.Logs
l, _ = logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
err := c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
}
}
}
}
return nil
}
21 changes: 20 additions & 1 deletion connector/otlpjsonconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -40,6 +41,24 @@ func (c *connectorMetrics) Capabilities() consumer.Capabilities {
}

// ConsumeLogs method is called for each instance of a log sent to the connector
func (c *connectorMetrics) ConsumeLogs(_ context.Context, _ plog.Logs) error {
func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
// loop through the levels of logs
metricsUnmarshaler := &pmetric.JSONUnmarshaler{}
for i := 0; i < pl.ResourceLogs().Len(); i++ {
li := pl.ResourceLogs().At(i)
for j := 0; j < li.ScopeLogs().Len(); j++ {
logRecord := li.ScopeLogs().At(j)
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var m pmetric.Metrics
m, _ = metricsUnmarshaler.UnmarshalMetrics([]byte(token.AsString()))
err := c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not extract metrics from otlp json", zap.Error(err))
}
}
}
}
return nil
}
21 changes: 20 additions & 1 deletion connector/otlpjsonconnector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -40,6 +41,24 @@ func (c *connectorTraces) Capabilities() consumer.Capabilities {
}

// ConsumeLogs method is called for each instance of a log sent to the connector
func (c *connectorTraces) ConsumeLogs(_ context.Context, _ plog.Logs) error {
func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
// loop through the levels of logs
tracesUnmarshaler := &ptrace.JSONUnmarshaler{}
for i := 0; i < pl.ResourceLogs().Len(); i++ {
li := pl.ResourceLogs().At(i)
for j := 0; j < li.ScopeLogs().Len(); j++ {
logRecord := li.ScopeLogs().At(j)
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var t ptrace.Traces
t, _ = tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString()))
err := c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not extract traces from otlp json", zap.Error(err))
}
}
}
}
return nil
}

0 comments on commit 3cb736b

Please sign in to comment.