Skip to content

Commit a7c1a9b

Browse files
committed
[Misc] enhance headless service sync with update logic
Signed-off-by: Omer Aplatony <omerap12@gmail.com>
1 parent 418ffc3 commit a7c1a9b

File tree

3 files changed

+146
-8
lines changed

3 files changed

+146
-8
lines changed

pkg/controller/stormservice/sync.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
apps "k8s.io/api/apps/v1"
2525
corev1 "k8s.io/api/core/v1"
2626
apiequality "k8s.io/apimachinery/pkg/api/equality"
27+
errors "k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/klog/v2"
2930
"k8s.io/utils/integer"
@@ -84,16 +85,29 @@ func (r *StormServiceReconciler) syncHeadlessService(ctx context.Context, servic
8485
}
8586

8687
headlessService := &corev1.Service{}
87-
if err := r.Client.Get(ctx, client.ObjectKey{Name: service.Name, Namespace: service.Namespace}, headlessService); client.IgnoreNotFound(err) != nil {
88-
return err
89-
} else if err != nil {
90-
// not found pg, need to create
91-
if err = r.Client.Create(ctx, expectedService); err == nil {
92-
r.EventRecorder.Eventf(service, corev1.EventTypeNormal, HeadlessServiceEventType, "Headless Service(discovery) %s synced", service.Name)
88+
err := r.Client.Get(ctx, client.ObjectKey{Name: service.Name, Namespace: service.Namespace}, headlessService)
89+
if err != nil {
90+
if errors.IsNotFound(err) {
91+
// service doesn't exist, create it
92+
if createErr := r.Client.Create(ctx, expectedService); createErr != nil {
93+
return fmt.Errorf("failed to create headless service: %w", createErr)
94+
}
95+
r.EventRecorder.Eventf(service, corev1.EventTypeNormal, HeadlessServiceEventType, "Headless Service(discovery) %s created", service.Name)
96+
return nil
9397
}
94-
return err
98+
return err // Return other errors immediately
9599
}
96100

101+
if !isServiceEqual(headlessService, expectedService) {
102+
headlessService.Spec = expectedService.Spec
103+
if err := r.Client.Update(ctx, headlessService); err != nil {
104+
return fmt.Errorf("failed to update headless service: %w", err)
105+
}
106+
r.EventRecorder.Eventf(service, corev1.EventTypeNormal, "Updated", "Headless Service updated")
107+
}
108+
109+
r.EventRecorder.Eventf(service, corev1.EventTypeNormal, HeadlessServiceEventType, "Headless Service(discovery) %s updated", service.Name)
110+
97111
return nil
98112
}
99113

pkg/controller/stormservice/sync_test.go

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,21 @@ limitations under the License.
1616

1717
package stormservice
1818

19-
import "testing"
19+
import (
20+
"context"
21+
"reflect"
22+
"testing"
23+
24+
corev1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/client-go/tools/record"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
30+
31+
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
32+
"github.com/vllm-project/aibrix/pkg/controller/constants"
33+
)
2034

2135
func TestCalculateReplicas(t *testing.T) {
2236
type args struct {
@@ -112,3 +126,104 @@ func TestCalculateReplicas(t *testing.T) {
112126
}
113127
}
114128
}
129+
130+
func TestSyncHeadlessService(t *testing.T) {
131+
scheme := runtime.NewScheme()
132+
_ = corev1.AddToScheme(scheme)
133+
_ = orchestrationv1alpha1.AddToScheme(scheme)
134+
135+
tests := []struct {
136+
name string
137+
stormService *orchestrationv1alpha1.StormService
138+
existingService *corev1.Service
139+
wantError bool
140+
}{
141+
{
142+
name: "create new headless service",
143+
stormService: &orchestrationv1alpha1.StormService{
144+
ObjectMeta: metav1.ObjectMeta{
145+
Name: "test-storm",
146+
Namespace: "default",
147+
Labels: map[string]string{
148+
"app": "test",
149+
},
150+
},
151+
},
152+
existingService: nil,
153+
wantError: false,
154+
},
155+
{
156+
name: "service already exists",
157+
stormService: &orchestrationv1alpha1.StormService{
158+
ObjectMeta: metav1.ObjectMeta{
159+
Name: "test-storm",
160+
Namespace: "default",
161+
},
162+
},
163+
existingService: &corev1.Service{
164+
ObjectMeta: metav1.ObjectMeta{
165+
Name: "test-storm",
166+
Namespace: "default",
167+
},
168+
Spec: corev1.ServiceSpec{
169+
Type: corev1.ServiceTypeClusterIP,
170+
ClusterIP: corev1.ClusterIPNone,
171+
Selector: map[string]string{}, // empty selector that should be updated
172+
},
173+
},
174+
wantError: false,
175+
},
176+
}
177+
178+
for _, tt := range tests {
179+
t.Run(tt.name, func(t *testing.T) {
180+
var objs []client.Object
181+
if tt.existingService != nil {
182+
objs = append(objs, tt.existingService)
183+
}
184+
185+
fakeClient := fake.NewClientBuilder().
186+
WithScheme(scheme).
187+
WithObjects(objs...).
188+
Build()
189+
190+
r := &StormServiceReconciler{
191+
Client: fakeClient,
192+
EventRecorder: &record.FakeRecorder{},
193+
}
194+
195+
err := r.syncHeadlessService(context.TODO(), tt.stormService)
196+
197+
if (err != nil) != tt.wantError {
198+
t.Errorf("syncHeadlessService() error = %v, wantError %v", err, tt.wantError)
199+
return
200+
}
201+
202+
// Check if service was created/updated
203+
service := &corev1.Service{}
204+
err = fakeClient.Get(context.TODO(), client.ObjectKey{
205+
Name: tt.stormService.Name,
206+
Namespace: tt.stormService.Namespace,
207+
}, service)
208+
209+
if err != nil {
210+
t.Errorf("Failed to get service: %v", err)
211+
return
212+
}
213+
214+
// Verify service properties
215+
if service.Spec.ClusterIP != corev1.ClusterIPNone {
216+
t.Errorf("Expected ClusterIP to be None, got %s", service.Spec.ClusterIP)
217+
}
218+
219+
expectedSelector := map[string]string{constants.StormServiceNameLabelKey: tt.stormService.Name}
220+
if !reflect.DeepEqual(service.Spec.Selector, expectedSelector) {
221+
t.Errorf("Expected selector %v, got %v", expectedSelector, service.Spec.Selector)
222+
}
223+
224+
if service.Spec.Type != corev1.ServiceTypeClusterIP {
225+
t.Errorf("Expected service type ClusterIP, got %v", service.Spec.Type)
226+
}
227+
})
228+
}
229+
}

pkg/controller/stormservice/utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package stormservice
1919
import (
2020
"sort"
2121

22+
corev1 "k8s.io/api/core/v1"
23+
apiequality "k8s.io/apimachinery/pkg/api/equality"
2224
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
2325

2426
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
@@ -224,3 +226,10 @@ func sortRoleSetByRevision(roleSets []*orchestrationv1alpha1.RoleSet, updatedRev
224226
}
225227
})
226228
}
229+
230+
// isServiceEqual compares two Kubernetes Service objects for equality
231+
func isServiceEqual(a, b *corev1.Service) bool {
232+
return a.Spec.Type == b.Spec.Type &&
233+
apiequality.Semantic.DeepEqual(a.Spec.Selector, b.Spec.Selector) &&
234+
a.Spec.ClusterIP == b.Spec.ClusterIP
235+
}

0 commit comments

Comments
 (0)