Skip to content

Commit

Permalink
Merge branch 'main' into 3p-licenses
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Oct 29, 2024
2 parents c0bcf5a + eed03fc commit c82f8a2
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 86 deletions.
10 changes: 10 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@ codecov:
require_ci_to_pass: yes
notify:
wait_for_ci: yes
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: true
github_checks:
annotations: false
18 changes: 17 additions & 1 deletion .github/workflows/publish_dockerhub_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,30 @@ jobs:
- id: checkout
uses: actions/checkout@v4

- id: push-to-dockerhub
- id: push-beyla-to-dockerhub
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla
context: .
# cache image layers from/to github actions internal cache, for faster building
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: |-
"linux/amd64"
"linux/arm64"
tags: |-
"main"
push: true

- id: push-beyla-k8s-cache-to-dockerhub
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla-k8s-cache
file: k8scache.Dockerfile
context: .
# cache image layers from/to github actions internal cache, for faster building
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: |-
"linux/amd64"
"linux/arm64"
Expand Down
22 changes: 20 additions & 2 deletions .github/workflows/publish_dockerhub_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- id: checkout
uses: actions/checkout@v4

- id: push-to-dockerhub
- id: push-beyla-to-dockerhub
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla
Expand All @@ -32,4 +32,22 @@ jobs:
"type=semver,pattern={{major}}"
"type=semver,pattern={{major}}.{{minor}}"
"type=semver,pattern={{major}}.{{minor}}.{{patch}}"
push: true
push: true

- id: push-beyla-k8s-cache-to-dockerhub
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla-k8s-cache
file: k8scache.Dockerfile
context: .
# cache image layers from/to github actions internal cache, for faster building
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: |-
"linux/amd64"
"linux/arm64"
tags: |-
"type=semver,pattern={{major}}"
"type=semver,pattern={{major}}.{{minor}}"
"type=semver,pattern={{major}}.{{minor}}.{{patch}}"
push: true
23 changes: 22 additions & 1 deletion pkg/internal/kube/cache_svc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ type cacheSvcClient struct {
address string
log *slog.Logger

waitForSubscription chan struct{}
syncTimeout time.Duration
waitForSubscription chan struct{}
waitForSynchronization chan struct{}
}

func (sc *cacheSvcClient) Start(ctx context.Context) {
sc.log = cslog()
sc.waitForSubscription = make(chan struct{})
sc.waitForSynchronization = make(chan struct{})
go func() {
select {
case <-ctx.Done():
Expand All @@ -53,6 +56,17 @@ func (sc *cacheSvcClient) Start(ctx context.Context) {
}
}
}()
sc.log.Info("waiting for K8s metadata synchronization", "timeout", sc.syncTimeout)
select {
case <-sc.waitForSynchronization:
sc.log.Debug("K8s metadata cache service synchronized")
case <-ctx.Done():
sc.log.Debug("context done. Nothing to do")
case <-time.After(sc.syncTimeout):
sc.log.Warn("timed out while waiting for K8s metadata synchronization. Some metadata might be temporarily missing." +
" If this is expected due to the size of your cluster, you might want to increase the timeout via" +
" the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT configuration option")
}
}

func (sc *cacheSvcClient) connect(ctx context.Context) error {
Expand All @@ -73,12 +87,19 @@ func (sc *cacheSvcClient) connect(ctx context.Context) error {
return fmt.Errorf("could not subscribe: %w", err)
}

unsynced := true
// Receive and print messages.
for {
event, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving message: %w", err)
}
// send a notification about the client being synced with the K8s metadata service
// so Beyla can start processing/decorating the received flows and traces
if event.GetType() == informer.EventType_SYNC_FINISHED && unsynced {
close(sc.waitForSynchronization)
unsynced = false
}
sc.BaseNotifier.Notify(event)
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor
// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
// each Beyla instance connects to the Kube API informer on each node, which would overload the Kube API
func (mp *MetadataProvider) initRemoteInformerCacheClient(ctx context.Context) *cacheSvcClient {
client := &cacheSvcClient{address: mp.cfg.MetaCacheAddr, BaseNotifier: meta.NewBaseNotifier()}
client := &cacheSvcClient{
address: mp.cfg.MetaCacheAddr,
BaseNotifier: meta.NewBaseNotifier(),
syncTimeout: mp.cfg.SyncTimeout,
}
client.Start(ctx)
return client
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/kubecache/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,20 @@ func (ic *InformersCache) Run(ctx context.Context, opts ...meta.InformerOption)
informer.RegisterEventStreamServiceServer(s, ic)

ic.log.Info("server listening", "port", ic.Port)
if err := s.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %w", err)

errs := make(chan error, 1)
go func() {
if err := s.Serve(lis); err != nil {
errs <- fmt.Errorf("failed to serve: %w", err)
}
close(errs)
}()
select {
case <-ctx.Done():
return nil
case err := <-errs:
return err
}
return nil
}

// Subscribe method of the generated protobuf definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand All @@ -23,7 +21,6 @@ func TestMain(m *testing.M) {
docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla},
docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer},
docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger},
docker.ImageBuild{Tag: "grpcpinger:dev", Dockerfile: k8s.DockerfilePinger},
docker.ImageBuild{Tag: "beyla-k8s-cache:dev", Dockerfile: k8s.DockerfileBeylaK8sCache},
docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"},
); err != nil {
Expand All @@ -36,7 +33,6 @@ func TestMain(m *testing.M) {
kube.KindConfig(k8s.PathManifests+"/00-kind.yml"),
kube.LocalImage("beyla:dev"),
kube.LocalImage("testserver:dev"),
kube.LocalImage("grpcpinger:dev"),
kube.LocalImage("httppinger:dev"),
kube.LocalImage("beyla-k8s-cache:dev"),
kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"),
Expand Down Expand Up @@ -72,10 +68,5 @@ func TestInformersCache_ProcessMetrics(t *testing.T) {
}

func TestInformersCache_NetworkMetrics(t *testing.T) {
// we don't deploy any pinger
// it will reuse internal-pinger from MetricsDecoration_HTTP test
cluster.TestEnv().Test(t, features.New("network flow bytes").
Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections).
Feature(),
)
cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes())
}
6 changes: 3 additions & 3 deletions test/integration/k8s/manifests/06-beyla-external-informer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ metadata:
namespace: default
labels:
app: k8s-cache
# this label will trigger a deletion of beyla pods before tearing down
# kind, to force Beyla writing the coverage data
teardown: delete
spec:
replicas: 1
selector:
Expand All @@ -130,6 +127,9 @@ spec:
name: k8s-cache
labels:
app: k8s-cache
# this label will trigger a deletion of beyla pods before tearing down
# kind, to force Beyla writing the coverage data
teardown: delete
spec:
# required to let the service accessing the K8s API
serviceAccountName: beyla
Expand Down
17 changes: 1 addition & 16 deletions test/integration/k8s/netolly/k8s_netolly_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand Down Expand Up @@ -49,18 +47,5 @@ func TestMain(m *testing.M) {
}

func TestNetworkFlowBytes(t *testing.T) {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger",
TargetURL: "http://testserver:8080/iping",
},
}
cluster.TestEnv().Test(t, features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", DoTestNetFlowBytesForExistingConnections).
Assess("catches external traffic", testNetFlowBytesForExternalTraffic).
Feature(),
)
cluster.TestEnv().Test(t, FeatureNetworkFlowBytes())
}
40 changes: 29 additions & 11 deletions test/integration/k8s/netolly/k8s_netolly_network_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/kube"
"github.com/grafana/beyla/test/integration/components/prom"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
)

const (
Expand All @@ -26,12 +29,27 @@ const (
var podSubnets = []string{"10.244.0.0/16", "fd00:10:244::/56"}
var svcSubnets = []string{"10.96.0.0/16", "fd00:10:96::/112"}

func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
pq := prom.Client{HostPort: prometheusHostPort}
func FeatureNetworkFlowBytes() features.Feature {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger-net",
TargetURL: "http://testserver:8080/iping",
},
}
return features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", testNetFlowBytesForExistingConnections).
Assess("catches external traffic", testNetFlowBytesForExternalTraffic).
Feature()
}

func testNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
pq := prom.Client{HostPort: prometheusHostPort}
// testing request flows (to testserver as Service)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="testserver"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="testserver"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -43,7 +61,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assert.Equal(t, "beyla-network-flows", metric["job"])
assert.Equal(t, "my-kube", metric["k8s_cluster_name"])
assert.Equal(t, "default", metric["k8s_src_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_src_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"])
assert.Equal(t, "Pod", metric["k8s_src_owner_type"])
assert.Equal(t, "Pod", metric["k8s_src_type"])
assert.Regexp(t,
Expand All @@ -62,7 +80,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
})
// testing request flows (to testserver as Pod)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name=~"testserver-.*"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name=~"testserver-.*"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -73,7 +91,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assertIsIP(t, metric["dst_address"])
assert.Equal(t, "beyla-network-flows", metric["job"])
assert.Equal(t, "default", metric["k8s_src_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_src_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"])
assert.Equal(t, "Pod", metric["k8s_src_owner_type"])
assert.Equal(t, "Pod", metric["k8s_src_type"])
assert.Regexp(t,
Expand All @@ -97,7 +115,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,

// testing response flows (from testserver Pod)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger-net"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -116,7 +134,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
metric["k8s_src_node_name"])
assertIsIP(t, metric["k8s_src_node_ip"])
assert.Equal(t, "default", metric["k8s_dst_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_dst_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"])
assert.Equal(t, "Pod", metric["k8s_dst_owner_type"])
assert.Equal(t, "Pod", metric["k8s_dst_type"])
assert.Regexp(t,
Expand All @@ -132,7 +150,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,

// testing response flows (from testserver Service)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger-net"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -148,7 +166,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assert.Equal(t, "Service", metric["k8s_src_type"])
// services don't have host IP or name
assert.Equal(t, "default", metric["k8s_dst_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_dst_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"])
assert.Equal(t, "Pod", metric["k8s_dst_owner_type"])
assert.Equal(t, "Pod", metric["k8s_dst_type"])
assert.Regexp(t,
Expand All @@ -162,7 +180,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
})

// check that there aren't captured flows if there is no communication
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="otherinstance"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="otherinstance"}`)
require.NoError(t, err)
require.Empty(t, results)

Expand Down
Loading

0 comments on commit c82f8a2

Please sign in to comment.