From 35157ded0acd5ff7d146715d0558b8965a27c583 Mon Sep 17 00:00:00 2001 From: omris94 <46892443+omris94@users.noreply.github.com> Date: Sun, 22 Sep 2024 17:04:48 +0300 Subject: [PATCH] Fix bug where discovered traffic from a pod wasn't filtered properly by its creation timestamp (#243) --- src/mapper/pkg/resolvers/helpers.go | 13 ++++- src/mapper/pkg/resolvers/resolver_test.go | 57 ++++++++++++++++++- .../pkg/resolvers/schema.helpers.resolvers.go | 6 +- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/src/mapper/pkg/resolvers/helpers.go b/src/mapper/pkg/resolvers/helpers.go index 2c805f1c..4a6a883c 100644 --- a/src/mapper/pkg/resolvers/helpers.go +++ b/src/mapper/pkg/resolvers/helpers.go @@ -9,7 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" ) -func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) { +func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src *model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) { svc, ok, err := r.kubeFinder.ResolveIPToControlPlane(ctx, src.SrcIP) if err != nil { return model.OtterizeServiceIdentity{}, errors.Errorf("could not resolve %s to service: %w", src.SrcIP, err) @@ -34,6 +34,15 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re return model.OtterizeServiceIdentity{}, errors.Errorf("found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname) } + // This function requires "src" to be a pointer. + // If at some point this function will be called with a non-pointer "src" + // It may cause a bug because the function will not be able to modify the "src" object of the caller. + r.filterTargetsAccordingToPodCreationTime(src, srcPod) + + return r.resolveInClusterIdentity(ctx, srcPod) +} + +func (r *Resolver) filterTargetsAccordingToPodCreationTime(src *model.RecordedDestinationsForSrc, srcPod *corev1.Pod) { filteredDestinations := make([]model.Destination, 0) for _, dest := range src.Destinations { if srcPod.CreationTimestamp.After(dest.LastSeen) { @@ -43,8 +52,6 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re filteredDestinations = append(filteredDestinations, dest) } src.Destinations = filteredDestinations - - return r.resolveInClusterIdentity(ctx, srcPod) } func (r *Resolver) resolveInClusterIdentity(ctx context.Context, pod *corev1.Pod) (model.OtterizeServiceIdentity, error) { diff --git a/src/mapper/pkg/resolvers/resolver_test.go b/src/mapper/pkg/resolvers/resolver_test.go index 2c81a180..423157b2 100644 --- a/src/mapper/pkg/resolvers/resolver_test.go +++ b/src/mapper/pkg/resolvers/resolver_test.go @@ -1019,6 +1019,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerDNS() { Destinations: []test_gql_client.Destination{ { Destination: fmt.Sprintf("%s.%s.svc.cluster.local", service.GetName(), service.GetNamespace()), + LastSeen: time.Now().Add(time.Minute), }, }, }, @@ -1070,7 +1071,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerSocketScan() { Destinations: []test_gql_client.Destination{ { Destination: service.Spec.ClusterIP, - LastSeen: time.Now(), + LastSeen: time.Now().Add(time.Minute), }, }, }, @@ -1365,6 +1366,60 @@ func (s *ResolverTestSuite) TestTCPResultsFromExternalToLoadBalancerServiceUsing s.Require().Equal("8.8.8.8", intents[0].Intent.IP) } +func (s *ResolverTestSuite) TestResolveOtterizeIdentityFilterSrcDestinationsByCreationTimestamp() { + podIP := "1.1.1.3" + pod := s.AddPod("pod3", podIP, nil, nil) + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + recorededDestinationForSrc := &model.RecordedDestinationsForSrc{ + SrcIP: podIP, + Destinations: []model.Destination{ + { + Destination: "target-on-time", + LastSeen: pod.CreationTimestamp.Add(time.Minute), + }, + { + Destination: "target-before-time", + LastSeen: pod.CreationTimestamp.Add(-time.Minute), + }, + }, + } + srcIdentity, err := s.resolver.discoverInternalSrcIdentity(context.Background(), recorededDestinationForSrc) + s.Require().NoError(err) + s.Require().Equal("pod3", srcIdentity.Name) + + s.Require().Len(recorededDestinationForSrc.Destinations, 1) + s.Require().Equal("target-on-time", recorededDestinationForSrc.Destinations[0].Destination) + +} + +func (s *ResolverTestSuite) TestPoop() { + //serviceIP := "10.0.0.10" + podIP := "1.1.1.3" + + pod3 := s.AddPod("pod3", podIP, nil, nil) + //s.AddService(serviceName, map[string]string{"app": "test"}, serviceIP, []*v1.Pod{pod3}) + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + pod := &v1.Pod{} + err := s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{Name: pod3.Name, Namespace: pod3.Namespace}, pod) + s.Require().NoError(err) + podlist1 := &v1.PodList{} + err = s.Mgr.GetClient().List(context.Background(), podlist1, client.MatchingFields{"ip": pod.Status.PodIP}) + s.Require().NoError(err) + s.Require().Len(podlist1.Items, 1) + + err = s.Mgr.GetClient().Delete(context.Background(), pod) + s.Require().NoError(err) + + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + podlist := &v1.PodList{} + err = s.Mgr.GetClient().List(context.Background(), podlist, client.MatchingFields{"ip": pod.Status.PodIP}) + s.Require().NoError(err) + s.Require().Empty(podlist.Items) + +} + func TestRunSuite(t *testing.T) { suite.Run(t, new(ResolverTestSuite)) } diff --git a/src/mapper/pkg/resolvers/schema.helpers.resolvers.go b/src/mapper/pkg/resolvers/schema.helpers.resolvers.go index 693c7d97..35924020 100644 --- a/src/mapper/pkg/resolvers/schema.helpers.resolvers.go +++ b/src/mapper/pkg/resolvers/schema.helpers.resolvers.go @@ -399,7 +399,7 @@ func (r *Resolver) handleTCPCaptureResult(ctx context.Context, captureItem model return errors.Wrap(r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations)) } - srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem) + srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem) if err != nil { logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP) return nil @@ -462,7 +462,7 @@ func (r *Resolver) handleReportCaptureResults(ctx context.Context, results model var newResults int for _, captureItem := range results.Results { - srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem) + srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem) if err != nil { logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP) continue @@ -499,7 +499,7 @@ func (r *Resolver) handleReportSocketScanResults(ctx context.Context, results mo return nil } for _, socketScanItem := range results.Results { - srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, socketScanItem) + srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &socketScanItem) if err != nil { logrus.WithError(err).Debugf("could not discover src identity for '%s'", socketScanItem.SrcIP) continue