-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
receiver.go
121 lines (92 loc) · 3.29 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver/internal/metadata"
)
type logsDataConsumer interface {
consumeLogsJSON(ctx context.Context, json []byte) error
setNextLogsConsumer(nextLogsConsumer consumer.Logs)
}
type tracesDataConsumer interface {
consumeTracesJSON(ctx context.Context, json []byte) error
setNextTracesConsumer(nextracesConsumer consumer.Traces)
}
type blobReceiver struct {
blobEventHandler blobEventHandler
logger *zap.Logger
logsUnmarshaler plog.Unmarshaler
tracesUnmarshaler ptrace.Unmarshaler
nextLogsConsumer consumer.Logs
nextTracesConsumer consumer.Traces
obsrecv *receiverhelper.ObsReport
}
func (b *blobReceiver) Start(ctx context.Context, _ component.Host) error {
err := b.blobEventHandler.run(ctx)
return err
}
func (b *blobReceiver) Shutdown(ctx context.Context) error {
return b.blobEventHandler.close(ctx)
}
func (b *blobReceiver) setNextLogsConsumer(nextLogsConsumer consumer.Logs) {
b.nextLogsConsumer = nextLogsConsumer
}
func (b *blobReceiver) setNextTracesConsumer(nextTracesConsumer consumer.Traces) {
b.nextTracesConsumer = nextTracesConsumer
}
func (b *blobReceiver) consumeLogsJSON(ctx context.Context, json []byte) error {
if b.nextLogsConsumer == nil {
return nil
}
logsContext := b.obsrecv.StartLogsOp(ctx)
logs, err := b.logsUnmarshaler.UnmarshalLogs(json)
if err != nil {
return fmt.Errorf("failed to unmarshal logs: %w", err)
}
err = b.nextLogsConsumer.ConsumeLogs(logsContext, logs)
b.obsrecv.EndLogsOp(logsContext, metadata.Type.String(), 1, err)
return err
}
func (b *blobReceiver) consumeTracesJSON(ctx context.Context, json []byte) error {
if b.nextTracesConsumer == nil {
return nil
}
tracesContext := b.obsrecv.StartTracesOp(ctx)
traces, err := b.tracesUnmarshaler.UnmarshalTraces(json)
if err != nil {
return fmt.Errorf("failed to unmarshal traces: %w", err)
}
err = b.nextTracesConsumer.ConsumeTraces(tracesContext, traces)
b.obsrecv.EndTracesOp(tracesContext, metadata.Type.String(), 1, err)
return err
}
// Returns a new instance of the log receiver
func newReceiver(set receiver.Settings, blobEventHandler blobEventHandler) (component.Component, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "event",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
blobReceiver := &blobReceiver{
blobEventHandler: blobEventHandler,
logger: set.Logger,
logsUnmarshaler: &plog.JSONUnmarshaler{},
tracesUnmarshaler: &ptrace.JSONUnmarshaler{},
obsrecv: obsrecv,
}
blobEventHandler.setLogsDataConsumer(blobReceiver)
blobEventHandler.setTracesDataConsumer(blobReceiver)
return blobReceiver, nil
}