Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra exporter implementation #17905

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
unit tests and readme added
  • Loading branch information
yalvac committed Jan 22, 2023
commit b90eee77c661cbb96a9f092d6b8dd27d8c7e14a7
4 changes: 3 additions & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gocql/gocql v1.3.1 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
Expand Down Expand Up @@ -218,6 +219,7 @@ require (
github.com/grobie/gomemcache v0.0.0-20180201122607-1f779c573665 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect
github.com/hashicorp/cronexpr v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -308,8 +310,8 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuredataexplorerexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/cassandraexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/coralogixexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter v0.69.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gocql/gocql v1.3.1 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
Expand Down Expand Up @@ -207,6 +208,7 @@ require (
github.com/grobie/gomemcache v0.0.0-20180201122607-1f779c573665 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect
github.com/hashicorp/cronexpr v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -297,8 +299,8 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuredataexplorerexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/cassandraexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/coralogixexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter v0.69.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter v0.69.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions exporter/cassandraexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Cassandra Exporter

| Status | |
|--------------------------|-----------|
| Stability | [alpha] |
| Supported pipeline types | traces |
| Distributions | [contrib] |

## Configuration options

The following settings can be optionally configured:

- `dsn` The Cassandra server DSN (Data Source Name), for example `127.0.0.1`.
reference: [https://pkg.go.dev/github.com/gocql/gocql](https://pkg.go.dev/github.com/gocql/gocql)
- `keyspace` (default = otel): The keyspace name.
- `trace_table` (default = otel_spans): The table name for traces.

## Example

```yaml
receivers:
examplereceiver:
processors:
batch:
timeout: 5s
send_batch_size: 100000
exporters:
cassandra:
dsn: 127.0.0.1
keyspace: "otel"
trace_table: "otel_spans"
service:
pipelines:
logs:
receivers: [ examplereceiver ]
processors: [ batch ]
exporters: [ cassandra ]
```
3 changes: 3 additions & 0 deletions exporter/cassandraexporter/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package cassandraexporter

type Config struct {
DSN string `mapstructure:"dsn"`
Keyspace string `mapstructure:"keyspace"`
TraceTable string `mapstructure:"trace_table"`
}
69 changes: 69 additions & 0 deletions exporter/cassandraexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandraexporter

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"path/filepath"
"testing"
)

const defaultDSN = "127.0.0.1"

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

defaultCfg := createDefaultConfig()
defaultCfg.(*Config).DSN = defaultDSN

tests := []struct {
id component.ID
expected component.Config
}{

{
id: component.NewIDWithName(typeStr, ""),
expected: defaultCfg,
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

func withDefaultConfig(fns ...func(*Config)) *Config {
cfg := createDefaultConfig().(*Config)
for _, fn := range fns {
fn(cfg)
}
return cfg
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ receivers:
endpoint: 0.0.0.0:4317
exporters:
cassandra:
dsn: 127.0.0.1
keyspace: "otel"
trace_table: "otel_spans"

service:
pipelines:
Expand Down
114 changes: 105 additions & 9 deletions exporter/cassandraexporter/exporter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,137 @@ package cassandraexporter

import (
"context"
"database/sql"
"fmt"
"github.com/gocql/gocql"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
"time"
)

type tracesExporter struct {
client *sql.DB
client *gocql.Session
logger *zap.Logger
cfg *Config
}

func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) {
initializeKernel(cfg)
cluster := gocql.NewCluster(cfg.DSN)
session, err := cluster.CreateSession()
cluster.Keyspace = cfg.Keyspace
cluster.Consistency = gocql.Quorum

if err != nil {
return nil, err
}

return &tracesExporter{logger: logger, client: session, cfg: cfg}, nil
}

func initializeKernel(cfg *Config) error {
ctx := context.Background()
cluster := gocql.NewCluster("127.0.0.1")
session, _ := cluster.CreateSession()
session.Query(`INSERT INTO tweet (timeline, id, text) VALUES (?, ?, ?)`,
"me", gocql.TimeUUID(), "hello world").WithContext(ctx).Exec()
cluster := gocql.NewCluster(cfg.DSN)
cluster.Consistency = gocql.Quorum
session, err := cluster.CreateSession()
if err != nil {
return err
}

session.Query(parseCreateDatabaseSql(cfg)).WithContext(ctx).Exec()
session.Query(parseCreateLinksTypeSql(cfg)).WithContext(ctx).Exec()
session.Query(parseCreateEventsTypeSql(cfg)).WithContext(ctx).Exec()
session.Query(parseCreateSpanTableSql(cfg)).WithContext(ctx).Exec()

defer session.Close()

return nil
}

func parseCreateSpanTableSql(cfg *Config) string {
return fmt.Sprintf(createSpanTableSQL, cfg.Keyspace, cfg.TraceTable)
}

return &tracesExporter{logger: logger}, nil
func parseCreateEventsTypeSql(cfg *Config) string {
return fmt.Sprintf(createEventTypeSql, cfg.Keyspace)
}

func parseCreateLinksTypeSql(cfg *Config) string {
return fmt.Sprintf(createLinksTypeSql, cfg.Keyspace)
}

func parseCreateDatabaseSql(cfg *Config) string {
return fmt.Sprintf(createDatabaseSQL, cfg.Keyspace)
}

func (e *tracesExporter) Shutdown(_ context.Context) error {
if e.client != nil {
e.client.Close()
}

return nil
}

func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
start := time.Now()

fmt.Println("here")
fmt.Println(td.SpanCount())
for i := 0; i < td.ResourceSpans().Len(); i++ {
spans := td.ResourceSpans().At(i)
res := spans.Resource()
resAttr := attributesToMap(res.Attributes())
var serviceName string
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
for j := 0; j < spans.ScopeSpans().Len(); j++ {
rs := spans.ScopeSpans().At(j).Spans()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
spanAttr := attributesToMap(r.Attributes())
status := r.Status()

e.client.Query(fmt.Sprintf(`INSERT INTO %s.%s (timestamp, traceid, spanid, parentspanid, tracestate, spanname, spankind, servicename, resourceattributes, spanattributes, duration, statuscode, statusmessage) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, e.cfg.Keyspace, e.cfg.TraceTable), r.StartTimestamp().AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
traceutil.SpanIDToHexOrEmptyString(r.ParentSpanID()),
r.TraceState().AsRaw(),
r.Name(),
traceutil.SpanKindStr(r.Kind()),
serviceName,
resAttr,
spanAttr,
r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds(),
traceutil.StatusCodeStr(status.Code()),
status.Message(),
).WithContext(ctx).Exec()
}
}
}

duration := time.Since(start)
e.logger.Info("insert traces", zap.Int("records", td.SpanCount()),
zap.String("cost", duration.String()))
return nil
}

func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
return true
})
return m
}

const (
// language=SQL
createDatabaseSQL = `CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`
// language=SQL
createEventTypeSql = `CREATE TYPE IF NOT EXISTS %s.Events (Timestamp Date, Name text, Attributes map<text, text>);`
// language=SQL
createLinksTypeSql = `CREATE TYPE IF NOT EXISTS %s.Links (TraceId text, SpanId text, TraceState text, Attributes map<text, text>);`
// language=SQL
createSpanTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s (TimeStamp DATE,TraceId text, SpanId text, ParentSpanId text, TraceState text, SpanName text, SpanKind text, ServiceName text, ResourceAttributes map<text, text>, SpanAttributes map<text, text>, Duration int,StatusCode text,StatusMessage text, Events frozen<Events>, Links frozen<Links>, PRIMARY KEY (TraceId));`
)
Loading