Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the way we resolve services DNS to identities by going directly from the targetRef of endpoints instead of going through IP addresses #229

Merged
merged 6 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ permissions:
# pull-requests: read

jobs:
golangci:
name: golangci-lint
runs-on: ubuntu-20.04
vet:
# run vet in a separate job to avoid conflicts with golangci-lint pkg-cache
name: vet
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
Expand All @@ -31,6 +32,16 @@ jobs:
working-directory: src/
- name: check git diff
run: git diff --exit-code
golangci:
name: golangci-lint
runs-on: ubuntu-20.04
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.21.3'
- uses: actions/checkout@v3
- name: Install dependencies
run: sudo apt update && sudo apt install libpcap-dev # required for the linter to be able to lint github.com/google/gopacket
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand All @@ -41,7 +52,7 @@ jobs:
working-directory: src

# Optional: golangci-lint command line arguments.
args: --timeout 5m
args: --timeout 5m --out-format github-actions

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
Expand Down
26 changes: 16 additions & 10 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (k *KubeFinder) ResolveIstioWorkloadToPod(ctx context.Context, workload str
return &podList.Items[0], nil
}

func (k *KubeFinder) ResolveServiceAddressToIps(ctx context.Context, fqdn string) ([]string, types.NamespacedName, error) {
func (k *KubeFinder) ResolveServiceAddressToPods(ctx context.Context, fqdn string) ([]corev1.Pod, types.NamespacedName, error) {
clusterDomain := viper.GetString(config.ClusterDomainKey)
if !strings.HasSuffix(fqdn, clusterDomain) {
return nil, types.NamespacedName{}, errors.Errorf("address %s is not in the cluster", fqdn)
Expand All @@ -364,22 +364,28 @@ func (k *KubeFinder) ResolveServiceAddressToIps(ctx context.Context, fqdn string
}
namespace := fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-2]
serviceName := fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-3]
endpoints := &corev1.Endpoints{}
service := &corev1.Service{}
serviceNamespacedName := types.NamespacedName{Name: serviceName, Namespace: namespace}
err := k.client.Get(ctx, serviceNamespacedName, endpoints)
err := k.client.Get(ctx, serviceNamespacedName, service)
if err != nil {
return nil, types.NamespacedName{}, errors.Wrap(err)
}
ips := make([]string, 0)
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
ips = append(ips, address.IP)
}
pods, err := k.ResolveServiceToPods(ctx, service)
if err != nil {
return nil, types.NamespacedName{}, errors.Wrap(err)
}
return ips, serviceNamespacedName, nil

return pods, serviceNamespacedName, nil
case "pod":
// for address format of pods: 172-17-0-3.default.pod.cluster.local
return []string{strings.ReplaceAll(fqdnWithoutClusterDomainParts[0], "-", ".")}, types.NamespacedName{}, nil
ip := strings.ReplaceAll(fqdnWithoutClusterDomainParts[0], "-", ".")
pod, err := k.ResolveIPToPod(ctx, ip)
if err != nil {
return make([]corev1.Pod, 0), types.NamespacedName{}, errors.Wrap(err)
}

return []corev1.Pod{*pod}, types.NamespacedName{}, nil

default:
return nil, types.NamespacedName{}, errors.Errorf("cannot resolve k8s address %s, type %s not supported", fqdn, fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-1])
}
Expand Down
24 changes: 15 additions & 9 deletions src/mapper/pkg/kubefinder/kubefinder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/otterize/network-mapper/src/shared/testbase"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
"testing"
Expand Down Expand Up @@ -38,35 +39,40 @@ func (s *KubeFinderTestSuite) TestResolveIpToPod() {
}

func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() {
_, _, err := s.kubeFinder.ResolveServiceAddressToIps(context.Background(), "www.google.com")
_, _, err := s.kubeFinder.ResolveServiceAddressToPods(context.Background(), "www.google.com")
s.Require().Error(err)

_, _, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
_, _, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().Error(err)

podIp0 := "1.1.1.1"
podIp1 := "1.1.1.2"
podIp2 := "1.1.1.3"
s.Require().NoError(s.Mgr.GetClient().List(context.Background(), &corev1.EndpointsList{})) // Workaround: make then client start caching Endpoints, so when we do "WaitForCacheSync" it will actually sync cache"
s.AddDeploymentWithService("service0", []string{podIp0}, map[string]string{"app": "service0"}, "10.0.0.10")
s.AddDeploymentWithService("service1", []string{podIp1, podIp2}, map[string]string{"app": "service1"}, "10.0.0.11")
_, _, retPods := s.AddDeploymentWithService("service1", []string{podIp1, podIp2}, map[string]string{"app": "service1"}, "10.0.0.11")
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

ips, service, err := s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
pods, service, err := s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().NoError(err)
s.Require().Equal("svc-service1", service.Name)
s.Require().ElementsMatch(ips, []string{podIp1, podIp2})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(retPods, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))

// make sure we don't fail on the longer forms of k8s service addresses, listed on this page: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service
ips, service, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("4-4-4-4.svc-service1.%s.svc.cluster.local", s.TestNamespace))
pods, service, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().Equal("svc-service1", service.Name)
s.Require().NoError(err)
s.Require().ElementsMatch(ips, []string{podIp1, podIp2})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(retPods, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))

ips, service, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
_, _, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
s.Require().Error(err)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

_, pods4444 := s.AddDeployment("depl", []string{"4.4.4.4"}, map[string]string{"app": "4444"})
pods, service, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
s.Require().NoError(err)
s.Require().Empty(service)
s.Require().ElementsMatch(ips, []string{"4.4.4.4"})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))
}

func TestKubeFinderTestSuite(t *testing.T) {
Expand Down
31 changes: 11 additions & 20 deletions src/mapper/pkg/resolvers/schema.helpers.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (r *Resolver) handleDNSCaptureResultsAsKubernetesPods(ctx context.Context,

func (r *Resolver) resolveOtterizeIdentityForDestinationAddress(ctx context.Context, dest model.Destination) (*model.OtterizeServiceIdentity, bool, error) {
destAddress := dest.Destination
ips, serviceName, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, destAddress)
pods, serviceName, err := r.kubeFinder.ResolveServiceAddressToPods(ctx, destAddress)
if err != nil {
logrus.WithError(err).Warningf("Could not resolve service address %s", destAddress)
// Intentionally no error return
Expand All @@ -282,30 +282,21 @@ func (r *Resolver) resolveOtterizeIdentityForDestinationAddress(ctx context.Cont
}, true, nil
}

if len(ips) == 0 {
logrus.Debugf("Service address %s is currently not backed by any pod, ignoring", destAddress)
return nil, false, nil
}
// Resolving the IP of the service's endpoints!
destPod, err := r.kubeFinder.ResolveIPToPod(ctx, ips[0])
if err != nil {
if errors.Is(err, kubefinder.ErrFoundMoreThanOnePod) {
logrus.WithError(err).Debugf("Ip %s belongs to more than one pod, ignoring", ips[0])
} else {
logrus.WithError(err).Debugf("Could not resolve %s to pod", ips[0])
filteredPods := lo.Filter(pods, func(pod corev1.Pod, _ int) bool {
lastCreationTimeForUsToTrustIt := dest.LastSeen
if lo.IsEmpty(serviceName) {
// In this case the DNS was a "pod" DNS - which contains IP - and therefore less reliable.
lastCreationTimeForUsToTrustIt = lastCreationTimeForUsToTrustIt.Add(viper.GetDuration(config.TimeServerHasToLiveBeforeWeTrustItKey))
}
return nil, false, nil
}
return lastCreationTimeForUsToTrustIt.After(pod.CreationTimestamp.Time) && pod.DeletionTimestamp == nil
})

if destPod.CreationTimestamp.After(dest.LastSeen) {
logrus.Debugf("Pod %s was created after capture time %s, ignoring", destPod.Name, dest.LastSeen)
if len(filteredPods) == 0 {
logrus.Debugf("Service address %s is currently not backed by any valid pod, ignoring", destAddress)
return nil, false, nil
}

if destPod.DeletionTimestamp != nil {
logrus.Debugf("Pod %s is being deleted, ignoring", destPod.Name)
return nil, false, nil
}
destPod := &filteredPods[0]

dstService, err := r.serviceIdResolver.ResolvePodToServiceIdentity(ctx, destPod)
if err != nil {
Expand Down
Loading