Skip to content

Commit 49df81f

Browse files
authored
feat: edfs stream metrics (#2137)
1 parent 1ae6242 commit 49df81f

39 files changed

+1759
-218
lines changed

demo/pkg/subgraphs/subgraphs.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"github.com/99designs/gqlgen/graphql/playground"
2222
"github.com/nats-io/nats.go"
2323
"github.com/nats-io/nats.go/jetstream"
24+
rmetric "github.com/wundergraph/cosmo/router/pkg/metric"
25+
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
2426
natsPubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
2527
"golang.org/x/sync/errgroup"
2628

@@ -210,13 +212,17 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {
210212

211213
natsPubSubByProviderID := map[string]natsPubsub.Adapter{}
212214

213-
defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test")
215+
defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
216+
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
217+
})
214218
if err != nil {
215219
return nil, fmt.Errorf("failed to create default nats adapter: %w", err)
216220
}
217221
natsPubSubByProviderID["default"] = defaultAdapter
218222

219-
myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test")
223+
myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
224+
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
225+
})
220226
if err != nil {
221227
return nil, fmt.Errorf("failed to create my-nats adapter: %w", err)
222228
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"github.com/redis/go-redis/v9"
6+
"github.com/stretchr/testify/require"
7+
"github.com/twmb/franz-go/pkg/kgo"
8+
"github.com/wundergraph/cosmo/router-tests/testenv"
9+
"net/url"
10+
"testing"
11+
"time"
12+
)
13+
14+
const waitTimeout = time.Second * 30
15+
16+
func ProduceKafkaMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) {
17+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
18+
defer cancel()
19+
20+
pErrCh := make(chan error)
21+
22+
xEnv.KafkaClient.Produce(ctx, &kgo.Record{
23+
Topic: xEnv.GetPubSubName(topicName),
24+
Value: []byte(message),
25+
}, func(_ *kgo.Record, err error) {
26+
pErrCh <- err
27+
})
28+
29+
testenv.AwaitChannelWithT(t, waitTimeout, pErrCh, func(t *testing.T, pErr error) {
30+
require.NoError(t, pErr)
31+
})
32+
33+
fErr := xEnv.KafkaClient.Flush(ctx)
34+
require.NoError(t, fErr)
35+
}
36+
37+
func EnsureTopicExists(t *testing.T, xEnv *testenv.Environment, topics ...string) {
38+
// Delete topic for idempotency
39+
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
40+
defer cancel()
41+
prefixedTopics := make([]string, 0, len(topics))
42+
for _, topic := range topics {
43+
prefixedTopics = append(prefixedTopics, xEnv.GetPubSubName(topic))
44+
}
45+
46+
_, err := xEnv.KafkaAdminClient.DeleteTopics(deleteCtx, prefixedTopics...)
47+
require.NoError(t, err)
48+
49+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
50+
defer cancel()
51+
52+
_, err = xEnv.KafkaAdminClient.CreateTopics(ctx, 1, 1, nil, prefixedTopics...)
53+
require.NoError(t, err)
54+
}
55+
56+
func ProduceRedisMessage(t *testing.T, xEnv *testenv.Environment, topicName string, message string) {
57+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
58+
defer cancel()
59+
60+
parsedURL, err := url.Parse(xEnv.RedisHosts[0])
61+
if err != nil {
62+
t.Fatalf("Failed to parse Redis URL: %v", err)
63+
}
64+
var redisConn redis.UniversalClient
65+
if !xEnv.RedisWithClusterMode {
66+
redisConn = redis.NewClient(&redis.Options{
67+
Addr: parsedURL.Host,
68+
})
69+
} else {
70+
redisConn = redis.NewClusterClient(&redis.ClusterOptions{
71+
Addrs: []string{parsedURL.Host},
72+
})
73+
}
74+
75+
defer func() {
76+
_ = redisConn.Close()
77+
}()
78+
79+
intCmd := redisConn.Publish(ctx, xEnv.GetPubSubName(topicName), message)
80+
require.NoError(t, intCmd.Err())
81+
}

router-tests/events/events_config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package events_test
1+
package events
22

33
import (
44
"testing"

0 commit comments

Comments
 (0)