Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka watcher component in network mapper #82

Merged
merged 29 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bc0c859
kafka-watcher skeleton
amitlicht Mar 12, 2023
2a8b340
Add report kafka watcher results to network mapper
amitlicht Mar 14, 2023
2960545
report results from watcher to mapper
amitlicht Mar 14, 2023
c2807a5
Build + test fixes
amitlicht Mar 14, 2023
71d94e3
Support multiple kafka servers
amitlicht Mar 14, 2023
99eb500
Use in-cluster config
amitlicht Mar 14, 2023
71cd344
Merge branch 'main' into amitlicht/kafka_watcher
amitlicht Mar 15, 2023
d610f95
Remove CN resolving from otterize identities
amitlicht Mar 15, 2023
7fb8b98
Use record read time as its last seen
amitlicht Mar 16, 2023
8ae5672
Mapper: store & API by intents with types and topics etc
amitlicht Mar 16, 2023
e5cea63
Fix schema.graphql wrongly ignored in diffs
amitlicht Mar 16, 2023
ad05858
Tests fixes
amitlicht Mar 16, 2023
564973e
Fix intents holder not preseving topics
amitlicht Mar 19, 2023
089931e
Test fix
amitlicht Mar 19, 2023
64a5d27
Fix intent type on upload to cloud
amitlicht Mar 19, 2023
3312479
Move go generate to lint CI
amitlicht Mar 19, 2023
9ce7903
Fix store key to be consistent (no pointer)
amitlicht Mar 19, 2023
627199e
Run go generate
amitlicht Mar 19, 2023
ff64faf
rename client -> mapperclient
amitlicht Mar 21, 2023
eb5ad31
rename client -> mapperclient
amitlicht Mar 21, 2023
b75faaf
CR fixes - move to lo maps to functions
amitlicht Mar 21, 2023
f95ced6
CR fixes
amitlicht Mar 21, 2023
2582191
CR fixes
amitlicht Mar 21, 2023
a57adb7
Add kafka watcher (experimental) to readme
amitlicht Mar 21, 2023
502ed25
Move kafka-watcher source code to exp
amitlicht Mar 22, 2023
f0650e0
Move docker files to /build
amitlicht Mar 22, 2023
7b677a3
Add missing retag_image_as_latest kafka-watcher
amitlicht Mar 22, 2023
47fcf8c
Fix go generate
amitlicht Mar 22, 2023
2b96e9a
Fix docker files to work with correct src paths
amitlicht Mar 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
*_gen.go linguist-generated=true
generated.go linguist-generated=true
go.mod linguist-generated=true
go.sum linguist-generated=true
schema.graphql linguist-generated=true
go.sum linguist-generated=true
4 changes: 3 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
service:
- mapper
- sniffer
- kafka-watcher

steps:
- name: Checkout
Expand Down Expand Up @@ -61,7 +62,7 @@ jobs:
uses: docker/build-push-action@v2
with:
context: src/
file: src/${{ matrix.service }}.Dockerfile
file: build/${{ matrix.service }}.Dockerfile
tags: ${{ env.REGISTRY }}:${{ matrix.service }}-${{ github.sha }}
push: true
network: host
Expand Down Expand Up @@ -100,3 +101,4 @@ jobs:
retag_image_as_latest() { MANIFEST=$(aws ecr batch-get-image --repository-name ${{ env.REPOSITORY_NAME }} --image-ids imageTag="$1-${{ github.sha }}" --query "images[].imageManifest" --output text); if [ -z "$MANIFEST" ]; then echo Manifest not found; exit 1; fi; OUTPUT=$(aws ecr put-image --repository-name ${{ env.REPOSITORY_NAME }} --image-tag "$1-latest" --image-manifest "$MANIFEST" 2>&1 || true); if echo $OUTPUT | grep 'An error occurred' >/dev/null && ! echo $OUTPUT | grep ImageAlreadyExistsException >/dev/null; then echo $OUTPUT; exit 1; fi; }
retag_image_as_latest mapper
retag_image_as_latest sniffer
retag_image_as_latest kafka-watcher
11 changes: 11 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ jobs:
- uses: actions/checkout@v3
- name: Install dependencies
run: sudo apt update && sudo apt install libpcap-dev # required for the linter to be able to lint github.com/google/gopacket
- name: go get
run: go get .
working-directory: src
- name: go generate
run: go generate ./...
working-directory: src
- name: go vet
run: go vet ./...
working-directory: src/
- name: check git diff
run: git diff --exit-code
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/release-tag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ jobs:
network: host
platforms: linux/amd64,linux/arm64
build-args:
SOURCE_IMAGE=${{ env.REGISTRY }}:sniffer-${{ github.sha }}
SOURCE_IMAGE=${{ env.REGISTRY }}:sniffer-${{ github.sha }}

- name: Push to Docker Hub - Kafka Watcher
uses: docker/build-push-action@v2
with:
context: .github/workflows
file: .github/workflows/release.Dockerfile
tags: otterize/network-mapper-kafka-watcher:latest,otterize/network-mapper-kafka-watcher:${{ github.ref_name }}
push: true
network: host
platforms: linux/amd64,linux/arm64
build-args:
SOURCE_IMAGE=${{ env.REGISTRY }}:kafka-watcher-${{ github.sha }}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ For more platforms, see [the installation guide](https://docs.otterize.com/k8s-i
## How does the network mapper work?
The Otterize network mapper creates a map of in-cluster traffic by capturing DNS traffic and inspecting active connections then resolving the IP addresses participating in connections to their pods, and crawling up the ownership of the pod until it reaches the root object. The network mapper continues building the network map as long as it's deployed.
### Components
- Sniffer: the sniffer is deployed to each node, and is responsible for capturing node-local DNS traffic and inspecting open connections.
- Mapper: the mapper is deployed once, and resolves service names using the Kubernetes API with traffic information reported by the sniffers.
- Sniffer: the sniffer is deployed to each node, and is responsible for capturing node-local DNS traffic and inspecting open connections.
- Kafka watcher (experimental): deployed once to your cluster, and is responsible for capturing kafka server logs and reporting them through the network mapper.

### Service name resolution
Service names are resolved in one of two ways:
Expand Down
26 changes: 26 additions & 0 deletions build/kafka-watcher.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM --platform=linux/amd64 golang:1.19-alpine as buildenv
RUN apk add --no-cache ca-certificates git protoc
RUN apk add build-base libpcap-dev
WORKDIR /src

# restore dependencies
COPY go.mod go.sum ./
RUN go mod download

COPY . .

FROM buildenv as test
RUN go test ./exp/kafka-watcher/...

FROM test as builder
ARG TARGETOS
ARG TARGETARCH
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /main ./exp/kafka-watcher/cmd

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
COPY --from=builder /main /main
USER 65532:65532

ENTRYPOINT ["/main"]
1 change: 0 additions & 1 deletion src/mapper.Dockerfile → build/mapper.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go generate ./mapper/...

FROM buildenv as test
# install dependencies for "envtest" package
Expand Down
1 change: 0 additions & 1 deletion src/sniffer.Dockerfile → build/sniffer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go generate ./sniffer/...

FROM buildenv as test
RUN go test ./sniffer/... && echo dep > /dep
Expand Down
52 changes: 52 additions & 0 deletions src/exp/kafka-watcher/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"fmt"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/config"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/logwatcher"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/mapperclient"
"k8s.io/apimachinery/pkg/types"
"strings"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func parseKafkaServers(serverNames []string) ([]types.NamespacedName, error) {
var servers []types.NamespacedName
for _, serverName := range serverNames {
nameParts := strings.Split(serverName, ".")
if len(nameParts) != 2 {
return nil, fmt.Errorf("error parsing server pod name %s - should be formatted as 'name.namespace'", serverName)
}
servers = append(servers, types.NamespacedName{
Name: nameParts[0],
Namespace: nameParts[1],
})
}
return servers, nil
}

func main() {
if viper.GetBool(config.DebugKey) {
logrus.SetLevel(logrus.DebugLevel)
}

kafkaServers, err := parseKafkaServers(viper.GetStringSlice(config.KafkaServersKey))
if err != nil {
panic(err)
}
mapperClient := mapperclient.NewMapperClient(viper.GetString(config.MapperApiUrlKey))
w, err := logwatcher.NewWatcher(
mapperClient,
kafkaServers,
)
if err != nil {
panic(err)
}

if err := w.RunForever(context.Background()); err != nil {
panic(err)
}
}
35 changes: 35 additions & 0 deletions src/exp/kafka-watcher/pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"github.com/spf13/viper"
"strings"
"time"
)

const (
EnvPrefix = "OTTERIZE"
MapperApiUrlKey = "mapper-api-url"
MapperApiUrlDefault = "http://mapper:9090/query"
ReportIntervalKey = "report-interval"
ReportIntervalDefault = 10 * time.Second
CallsTimeoutKey = "calls-timeout"
CallsTimeoutDefault = 5 * time.Second
CooldownIntervalKey = "cooldown-interval"
CooldownIntervalDefault = 10 * time.Second
DebugKey = "debug"
DebugDefault = false

KafkaServersKey = "kafka-servers"
)

func init() {
viper.SetDefault(MapperApiUrlKey, MapperApiUrlDefault)
viper.SetDefault(ReportIntervalKey, ReportIntervalDefault)
viper.SetDefault(CallsTimeoutKey, CallsTimeoutDefault)
viper.SetDefault(CooldownIntervalKey, CooldownIntervalDefault)
viper.SetDefault(DebugKey, DebugDefault)
viper.SetDefault(KafkaServersKey, []string{})
viper.SetEnvPrefix(EnvPrefix)
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
viper.AutomaticEnv()
}
158 changes: 158 additions & 0 deletions src/exp/kafka-watcher/pkg/logwatcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package logwatcher

import (
"bufio"
"context"
"errors"
"github.com/oriser/regroup"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/config"
mapperclient2 "github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/mapperclient"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"math"
"sync"
"time"
)

// AclAuthorizerRegex matches & decodes AclAuthorizer log records.
// Sample log record for reference:
// [2023-03-12 13:51:55,904] INFO Principal = User:2.5.4.45=#13206331373734376636373865323137613636346130653335393130326638303662,CN=myclient.otterize-tutorial-kafka-mtls,O=SPIRE,C=US is Denied Operation = Describe from host = 10.244.0.27 on resource = Topic:LITERAL:mytopic for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger)
var AclAuthorizerRegex = regroup.MustCompile(
`^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+\] [A-Z]+ Principal = \S+ is (?P<access>\S+) Operation = (?P<operation>\S+) from host = (?P<host>\S+) on resource = Topic:LITERAL:(?P<topic>.+) for request = \S+ with resourceRefCount = \d+ \(kafka\.authorizer\.logger\)$`,
)

type AuthorizerRecord struct {
Server types.NamespacedName
Access string `regroup:"access"`
Operation string `regroup:"operation"`
Host string `regroup:"host"`
Topic string `regroup:"topic"`
}

type SeenRecordsStore map[AuthorizerRecord]time.Time

type Watcher struct {
clientset *kubernetes.Clientset
mu sync.Mutex
seen SeenRecordsStore
mapperClient mapperclient2.MapperClient
kafkaServers []types.NamespacedName
}

func NewWatcher(mapperClient mapperclient2.MapperClient, kafkaServers []types.NamespacedName) (*Watcher, error) {
conf, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

cs, err := kubernetes.NewForConfig(conf)
if err != nil {
return nil, err
}

w := &Watcher{
clientset: cs,
mu: sync.Mutex{},
seen: SeenRecordsStore{},
mapperClient: mapperClient,
kafkaServers: kafkaServers,
}

return w, nil
}

func (w *Watcher) processLogRecord(kafkaServer types.NamespacedName, record string) {
r := AuthorizerRecord{
Server: kafkaServer,
}
if err := AclAuthorizerRegex.MatchToTarget(record, &r); errors.Is(err, &regroup.NoMatchFoundError{}) {
return
} else if err != nil {
logrus.Errorf("Error matching authorizer regex: %s", err)
return
}

w.mu.Lock()
defer w.mu.Unlock()
w.seen[r] = time.Now()
}

func (w *Watcher) WatchOnce(ctx context.Context, kafkaServer types.NamespacedName) error {
podLogOpts := corev1.PodLogOptions{
Follow: true,
SinceSeconds: lo.ToPtr(int64(math.Ceil(viper.GetDuration(config.CooldownIntervalKey).Seconds()))),
}
req := w.clientset.CoreV1().Pods(kafkaServer.Namespace).GetLogs(kafkaServer.Name, &podLogOpts)
reader, err := req.Stream(ctx)
if err != nil {
return err
}

defer reader.Close()

s := bufio.NewScanner(reader)
s.Split(bufio.ScanLines)
for s.Scan() {
w.processLogRecord(kafkaServer, s.Text())
}

return nil
}

func (w *Watcher) WatchForever(ctx context.Context, kafkaServer types.NamespacedName) {
log := logrus.WithField("pod", kafkaServer)
cooldownPeriod := viper.GetDuration(config.CooldownIntervalKey)
for {
log.Info("Watching logs")
err := w.WatchOnce(ctx, kafkaServer)
if err != nil {
log.WithError(err).Error("Error watching logs")
}
log.Infof("Watcher stopped, will retry after cooldown period (%s)...", cooldownPeriod)
time.Sleep(cooldownPeriod)
}
}

func (w *Watcher) Flush() SeenRecordsStore {
w.mu.Lock()
defer w.mu.Unlock()
r := w.seen
w.seen = SeenRecordsStore{}
return r
}

func (w *Watcher) ReportResults(ctx context.Context) error {
records := w.Flush()
logrus.Infof("Reporting %d records", len(records))

results := lo.MapToSlice(records, func(r AuthorizerRecord, t time.Time) mapperclient2.KafkaMapperResult {
return mapperclient2.KafkaMapperResult{
SrcIp: r.Host,
ServerPodName: r.Server.Name,
ServerNamespace: r.Server.Namespace,
Topic: r.Topic,
Operation: r.Operation,
LastSeen: t,
}
})

return w.mapperClient.ReportKafkaMapperResults(ctx, mapperclient2.KafkaMapperResults{Results: results})
}

func (w *Watcher) RunForever(ctx context.Context) error {
for _, kafkaServer := range w.kafkaServers {
go w.WatchForever(ctx, kafkaServer)
}

for {
time.Sleep(viper.GetDuration(config.ReportIntervalKey))
if err := w.ReportResults(ctx); err != nil {
logrus.WithError(err).Errorf("Failed reporting watcher results to mapper")
}
}
}
28 changes: 28 additions & 0 deletions src/exp/kafka-watcher/pkg/mapperclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mapperclient

import (
"context"
"github.com/Khan/genqlient/graphql"
"net/http"
)

type MapperClient interface {
ReportKafkaMapperResults(ctx context.Context, results KafkaMapperResults) error
}

type mapperClientImpl struct {
mapperAddress string
gqlClient graphql.Client
}

func NewMapperClient(mapperAddress string) MapperClient {
return &mapperClientImpl{
mapperAddress: mapperAddress,
gqlClient: graphql.NewClient(mapperAddress, http.DefaultClient),
}
}

func (c *mapperClientImpl) ReportKafkaMapperResults(ctx context.Context, results KafkaMapperResults) error {
_, err := reportKafkaMapperResults(ctx, c.gqlClient, results)
return err
}
Loading