Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose init offset and schedule metrics for ConsumerGroup reconciler #3294

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
)

func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
var secret *corev1.Secret

if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
)

func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {

Check warning on line 28 in control-plane/pkg/reconciler/consumergroup/autoscaler_config.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/autoscaler_config.go#L28

Added line #L28 was not covered by tests
cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err)
Expand Down
100 changes: 85 additions & 15 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
"fmt"
"math"
"sort"
"time"

"github.com/Shopify/sarama"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -36,7 +40,9 @@
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/reconciler"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
Expand All @@ -60,8 +66,38 @@
var (
ErrNoSubscriberURI = errors.New("no subscriber URI resolved")
ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved")

scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds)
// scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)

initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds)
// initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)
)

func init() {
views := []*view.View{
{
Description: "Latency of consumer group schedule operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: scheduleLatencyStat,
Aggregation: scheduleDistribution,
},
{
Description: "Latency of consumer group offsets initialization operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: initializeOffsetsLatencyStat,
Aggregation: initializeOffsetsDistribution,
},
}
if err := view.Register(views...); err != nil {
panic(err)

Check warning on line 97 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L97

Added line #L97 was not covered by tests
}
}

type Scheduler struct {
scheduler.Scheduler
SchedulerConfig
Expand Down Expand Up @@ -103,7 +139,7 @@
DeleteConsumerGroupMetadataCounter *counter.Counter
}

func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -150,7 +186,7 @@
return nil
}

func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {

cg.Spec.Replicas = pointer.Int32(0)
err := r.schedule(ctx, cg) //de-schedule placements
Expand All @@ -159,7 +195,7 @@
cg.Status.Placements = nil

// return an error to 1. update the status. 2. not clear the finalizer
return errors.New("placement list was not empty")
return fmt.Errorf("failed to unschedule consumer group: %w", err)

Check warning on line 198 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L198

Added line #L198 was not covered by tests
}

// Get consumers associated with the ConsumerGroup.
Expand All @@ -185,7 +221,7 @@
return nil
}

func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -213,7 +249,7 @@
return nil
}

func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// Get consumers associated with the ConsumerGroup.
existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
Expand Down Expand Up @@ -242,7 +278,7 @@
return nil
}

func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {
func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {

placement := *pc.Placement
consumers := pc.Consumers
Expand Down Expand Up @@ -297,7 +333,7 @@
return nil
}

func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
c := cg.ConsumerFromTemplate()

c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-")
Expand All @@ -310,7 +346,7 @@
return nil
}

func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
dOpts := metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &consumer.UID},
}
Expand All @@ -321,7 +357,10 @@
return nil
}

func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordScheduleLatency(ctx, cg, startTime)

statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind)

// Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing
Expand All @@ -348,7 +387,7 @@
Consumers []*kafkainternals.Consumer
}

func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers)))))

// Group consumers by Pod bind.
Expand Down Expand Up @@ -403,7 +442,7 @@
return placementConsumers
}

func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
func (r *Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
consumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("failed to list consumers for selector %+v: %w", cg.Spec.Selector, err)
Expand Down Expand Up @@ -435,7 +474,10 @@
return condition, nil
}

func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordInitializeOffsetsLatency(ctx, cg, startTime)

if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest {
return nil
}
Expand Down Expand Up @@ -479,7 +521,7 @@
return nil
}

func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

Check warning on line 524 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L524

Added line #L524 was not covered by tests
var triggerAuthentication *kedav1alpha1.TriggerAuthentication
var secret *corev1.Secret

Expand Down Expand Up @@ -620,7 +662,7 @@
return nil
}

func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
Expand All @@ -644,7 +686,7 @@
return nil
}

func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
// Check if ConfigMap exists in lister cache
_, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name)
// ConfigMap already exists, return
Expand Down Expand Up @@ -684,3 +726,31 @@
_ consumergroup.Interface = &Reconciler{}
_ consumergroup.Finalizer = &Reconciler{}
)

func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

Check warning on line 738 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L737-L738

Added lines #L737 - L738 were not covered by tests

metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

Check warning on line 752 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L751-L752

Added lines #L751 - L752 were not covered by tests

metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ func TestReconcileKind(t *testing.T) {
_, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName)
store.OnConfigChanged(exampleConfig)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down Expand Up @@ -1787,7 +1787,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {

ctx, _ = kedaclient.With(ctx)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down
9 changes: 6 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/evictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type evictor struct {
// newEvictor creates a new evictor.
//
// fields are additional logger fields to be attached to the evictor logger.
func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
return evictor{
func newEvictor(ctx context.Context, fields ...zap.Field) *evictor {
return &evictor{
ctx: ctx,
kubeClient: kubeclient.Get(ctx),
InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand All @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
}
}

func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
key := vpod.GetKey()

logger := e.logger.
Expand Down Expand Up @@ -124,6 +124,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro
_, err := e.kubeClient.CoreV1().
Pods(pod.GetNamespace()).
Update(e.ctx, pod, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
e := newEvictor(ctx)
err := e.evict(pod, cg, placement)

require.NotNil(t, err)
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/google/gofuzz v1.2.0
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/kedacore/keda/v2 v2.8.1
go.opencensus.io v0.24.0
knative.dev/eventing v0.38.1-0.20230815095940-29ac3eee64a4
knative.dev/hack v0.0.0-20230815012940-044c02b7a447
knative.dev/pkg v0.0.0-20230815132840-4f651e092853
Expand Down Expand Up @@ -133,7 +134,6 @@ require (
github.com/wavesoftware/go-ensure v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand Down
Loading