Skip to content

Commit

Permalink
Sync nodeport services from each kubernetes node
Browse files Browse the repository at this point in the history
The original plan was to only sync NodePort service instances for the
nodes they were running on. Currently, they are being synced with the
pod's ip address instead, which is not guaranteed to be routable. For
users running in this type of environment, this is causing these services
to be reaped by the anti-entropy mechanism since their health checks fail
due to the unroutability. Additionally, this provides the incorrect
ip address for routing to NodePort services, which makes the sync ineffective.

Given that there is not an easy way to get the node info from a service,
the easiest solution to this is to sync all of the kubernetes nodes
for the NodePort service. This has the benefit of distributing traffic
as Kuberentes expects it for this type of service.
  • Loading branch information
adilyse committed Dec 19, 2018
1 parent 3ad0005 commit 3afc033
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 87 deletions.
33 changes: 30 additions & 3 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,37 @@ func (t *ServiceResource) generateRegistrations(key string) {
t.consulMap[key] = append(t.consulMap[key], &r)
}

// For NodePort services, we create a service instance for each
// endpoint of the service. This way we don't register _every_ K8S
// For NodePort services, we register each K8S
// node as part of the service.
case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP:
case apiv1.ServiceTypeNodePort:
// Get all nodes to be able to reference their ip addresses
nodes, err := t.Client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil || len(nodes.Items) == 0 {
t.Log.Warn("error getting nodes", "error", err)
return
}

// Create a service instance for each node
for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeExternalIP {
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, address.Address)
r.Service.Address = address.Address
r.Address = address.Address

if node.Name != "" {
r.Node = node.Name
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
}

case apiv1.ServiceTypeClusterIP:
if t.endpointsMap == nil {
return
}
Expand Down
160 changes: 76 additions & 84 deletions catalog/from-k8s/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,52 +529,50 @@ func TestServiceResource_nodePort(t *testing.T) {
})
defer closer()

// Insert the service
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
node1 := "ip-10-11-12-13.ec2.internal"
node2 := "ip-10-11-12-14.ec2.internal"
// Insert the nodes
_, err := client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Name: node1,
},

Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeNodePort,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"},
},
},
})
require.NoError(err)

_, err = client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node2,
},

Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"},
},
},
})
require.NoError(err)
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

node1 := "ip-10-11-12-13.ec2.internal"
node2 := "ip-10-11-12-14.ec2.internal"
// Insert the endpoints
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
// Insert the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeNodePort,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001},
},
},
})
Expand Down Expand Up @@ -618,31 +616,28 @@ func TestServiceResource_nodePortInitial(t *testing.T) {

node1 := "ip-10-11-12-13.ec2.internal"
node2 := "ip-10-11-12-14.ec2.internal"
// Insert the endpoints
_, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
// Insert the nodes
_, err := client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Name: node1,
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"},
},
},
})
require.NoError(err)

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
_, err = client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node2,
},

Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"},
},
},
})
Expand Down Expand Up @@ -698,18 +693,30 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) {
})
defer closer()

// Insert the service
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
node1 := "ip-10-11-12-13.ec2.internal"
node2 := "ip-10-11-12-14.ec2.internal"
// Insert the nodes
_, err := client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Annotations: map[string]string{annotationServicePort: "rpc"},
Name: node1,
},

Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeNodePort,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"},
},
},
})
require.NoError(err)

_, err = client.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node2,
},

Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"},
},
},
})
Expand All @@ -718,33 +725,18 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) {
// Wait a bit
time.Sleep(300 * time.Millisecond)

node1 := "ip-10-11-12-13.ec2.internal"
node2 := "ip-10-11-12-14.ec2.internal"
// Insert the endpoints
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
// Insert the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Name: "foo",
Annotations: map[string]string{annotationServicePort: "rpc"},
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeNodePort,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001},
},
},
})
Expand Down

0 comments on commit 3afc033

Please sign in to comment.