Skip to content

Commit

Permalink
Fix bug where discovered traffic from a pod wasn't filtered properly …
Browse files Browse the repository at this point in the history
…by its creation timestamp (#243)
  • Loading branch information
omris94 authored Sep 22, 2024
1 parent c7bfa5a commit 35157de
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
13 changes: 10 additions & 3 deletions src/mapper/pkg/resolvers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src *model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
svc, ok, err := r.kubeFinder.ResolveIPToControlPlane(ctx, src.SrcIP)
if err != nil {
return model.OtterizeServiceIdentity{}, errors.Errorf("could not resolve %s to service: %w", src.SrcIP, err)
Expand All @@ -34,6 +34,15 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
return model.OtterizeServiceIdentity{}, errors.Errorf("found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname)
}

// This function requires "src" to be a pointer.
// If at some point this function will be called with a non-pointer "src"
// It may cause a bug because the function will not be able to modify the "src" object of the caller.
r.filterTargetsAccordingToPodCreationTime(src, srcPod)

return r.resolveInClusterIdentity(ctx, srcPod)
}

func (r *Resolver) filterTargetsAccordingToPodCreationTime(src *model.RecordedDestinationsForSrc, srcPod *corev1.Pod) {
filteredDestinations := make([]model.Destination, 0)
for _, dest := range src.Destinations {
if srcPod.CreationTimestamp.After(dest.LastSeen) {
Expand All @@ -43,8 +52,6 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
filteredDestinations = append(filteredDestinations, dest)
}
src.Destinations = filteredDestinations

return r.resolveInClusterIdentity(ctx, srcPod)
}

func (r *Resolver) resolveInClusterIdentity(ctx context.Context, pod *corev1.Pod) (model.OtterizeServiceIdentity, error) {
Expand Down
57 changes: 56 additions & 1 deletion src/mapper/pkg/resolvers/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerDNS() {
Destinations: []test_gql_client.Destination{
{
Destination: fmt.Sprintf("%s.%s.svc.cluster.local", service.GetName(), service.GetNamespace()),
LastSeen: time.Now().Add(time.Minute),
},
},
},
Expand Down Expand Up @@ -1070,7 +1071,7 @@ func (s *ResolverTestSuite) TestIntentsToApiServerSocketScan() {
Destinations: []test_gql_client.Destination{
{
Destination: service.Spec.ClusterIP,
LastSeen: time.Now(),
LastSeen: time.Now().Add(time.Minute),
},
},
},
Expand Down Expand Up @@ -1365,6 +1366,60 @@ func (s *ResolverTestSuite) TestTCPResultsFromExternalToLoadBalancerServiceUsing
s.Require().Equal("8.8.8.8", intents[0].Intent.IP)
}

func (s *ResolverTestSuite) TestResolveOtterizeIdentityFilterSrcDestinationsByCreationTimestamp() {
podIP := "1.1.1.3"
pod := s.AddPod("pod3", podIP, nil, nil)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))
recorededDestinationForSrc := &model.RecordedDestinationsForSrc{
SrcIP: podIP,
Destinations: []model.Destination{
{
Destination: "target-on-time",
LastSeen: pod.CreationTimestamp.Add(time.Minute),
},
{
Destination: "target-before-time",
LastSeen: pod.CreationTimestamp.Add(-time.Minute),
},
},
}
srcIdentity, err := s.resolver.discoverInternalSrcIdentity(context.Background(), recorededDestinationForSrc)
s.Require().NoError(err)
s.Require().Equal("pod3", srcIdentity.Name)

s.Require().Len(recorededDestinationForSrc.Destinations, 1)
s.Require().Equal("target-on-time", recorededDestinationForSrc.Destinations[0].Destination)

}

func (s *ResolverTestSuite) TestPoop() {
//serviceIP := "10.0.0.10"
podIP := "1.1.1.3"

pod3 := s.AddPod("pod3", podIP, nil, nil)
//s.AddService(serviceName, map[string]string{"app": "test"}, serviceIP, []*v1.Pod{pod3})
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

pod := &v1.Pod{}
err := s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{Name: pod3.Name, Namespace: pod3.Namespace}, pod)
s.Require().NoError(err)
podlist1 := &v1.PodList{}
err = s.Mgr.GetClient().List(context.Background(), podlist1, client.MatchingFields{"ip": pod.Status.PodIP})
s.Require().NoError(err)
s.Require().Len(podlist1.Items, 1)

err = s.Mgr.GetClient().Delete(context.Background(), pod)
s.Require().NoError(err)

s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

podlist := &v1.PodList{}
err = s.Mgr.GetClient().List(context.Background(), podlist, client.MatchingFields{"ip": pod.Status.PodIP})
s.Require().NoError(err)
s.Require().Empty(podlist.Items)

}

func TestRunSuite(t *testing.T) {
suite.Run(t, new(ResolverTestSuite))
}
6 changes: 3 additions & 3 deletions src/mapper/pkg/resolvers/schema.helpers.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (r *Resolver) handleTCPCaptureResult(ctx context.Context, captureItem model
return errors.Wrap(r.reportIncomingInternetTraffic(ctx, captureItem.SrcIP, captureItem.Destinations))
}

srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem)
if err != nil {
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
return nil
Expand Down Expand Up @@ -462,7 +462,7 @@ func (r *Resolver) handleReportCaptureResults(ctx context.Context, results model

var newResults int
for _, captureItem := range results.Results {
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, captureItem)
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &captureItem)
if err != nil {
logrus.WithError(err).Debugf("could not discover src identity for '%s'", captureItem.SrcIP)
continue
Expand Down Expand Up @@ -499,7 +499,7 @@ func (r *Resolver) handleReportSocketScanResults(ctx context.Context, results mo
return nil
}
for _, socketScanItem := range results.Results {
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, socketScanItem)
srcSvcIdentity, err := r.discoverInternalSrcIdentity(ctx, &socketScanItem)
if err != nil {
logrus.WithError(err).Debugf("could not discover src identity for '%s'", socketScanItem.SrcIP)
continue
Expand Down

0 comments on commit 35157de

Please sign in to comment.