@@ -27,6 +27,7 @@ import (
2727 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828 "k8s.io/apimachinery/pkg/runtime"
2929 "k8s.io/apimachinery/pkg/types"
30+ "k8s.io/apimachinery/pkg/util/intstr"
3031 metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
3132 "k8s.io/client-go/tools/record"
3233 "k8s.io/klog/v2"
@@ -66,6 +67,7 @@ func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, record r
6667//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services,verbs=get;list;watch;create;update;patch;delete
6768//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services/status,verbs=get;update;patch
6869//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services/finalizers,verbs=update
70+ //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
6971
7072// Reconcile is part of the main kubernetes reconciliation loop which aims to
7173// move the current state of the cluster closer to the desired state.
@@ -97,6 +99,11 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
9799 return ctrl.Result {}, err
98100 }
99101
102+ // Create a service for the leader pods of the lws for loadbalancing.
103+ if err := CreateServiceIfNotExists (ctx , r .Client , r .Scheme , service ); err != nil {
104+ return ctrl.Result {}, err
105+ }
106+
100107 // Handle status.
101108
102109 workload := & lws.LeaderWorkerSet {}
@@ -299,3 +306,48 @@ func setControllerReferenceForWorkload(owner metav1.Object, lws *applyconfigurat
299306 WithController (true ))
300307 return nil
301308}
309+
310+ func CreateServiceIfNotExists (ctx context.Context , k8sClient client.Client , Scheme * runtime.Scheme , service * inferenceapi.Service ) error {
311+ log := ctrl .LoggerFrom (ctx )
312+ // The load balancing service name.
313+ svcName := service .Name + "-lb"
314+
315+ var svc corev1.Service
316+ if err := k8sClient .Get (ctx , types.NamespacedName {Name : svcName , Namespace : service .Namespace }, & svc ); err != nil {
317+ if client .IgnoreNotFound (err ) != nil {
318+ return err
319+ }
320+ svc = corev1.Service {
321+ ObjectMeta : metav1.ObjectMeta {
322+ Name : svcName ,
323+ Namespace : service .Namespace ,
324+ },
325+ Spec : corev1.ServiceSpec {
326+ Ports : []corev1.ServicePort {
327+ {
328+ Name : "http" ,
329+ Protocol : corev1 .ProtocolTCP ,
330+ Port : modelSource .DEFAULT_BACKEND_PORT ,
331+ TargetPort : intstr .FromInt (modelSource .DEFAULT_BACKEND_PORT ),
332+ },
333+ },
334+ Selector : map [string ]string {
335+ lws .SetNameLabelKey : service .Name ,
336+ // the leader pod.
337+ lws .WorkerIndexLabelKey : "0" ,
338+ },
339+ },
340+ }
341+
342+ // Set the controller owner reference for garbage collection and reconciliation.
343+ if err := ctrl .SetControllerReference (service , & svc , Scheme ); err != nil {
344+ return err
345+ }
346+ // create the service in the cluster
347+ log .V (2 ).Info ("Creating service." )
348+ if err := k8sClient .Create (ctx , & svc ); err != nil {
349+ return err
350+ }
351+ }
352+ return nil
353+ }
0 commit comments