Skip to content

Commit

Permalink
Introduce /ready endpoint that can be used in a readiness or startup …
Browse files Browse the repository at this point in the history
…probe
  • Loading branch information
weeco authored and amuraru committed Aug 1, 2021
1 parent 032d501 commit 45540a5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ func main() {
if err != nil {
logger.Fatal("failed to setup minion service", zap.Error(err))
}
if false {
err = minionSvc.Start(ctx)
if err != nil {
logger.Fatal("failed to start minion service", zap.Error(err))
}

err = minionSvc.Start(ctx)
if err != nil {
logger.Fatal("failed to start minion service", zap.Error(err))
}

// Create end to end testing service
Expand Down Expand Up @@ -106,6 +105,7 @@ func main() {
),
),
)
http.Handle("/ready", minionSvc.HandleIsReady())

// Start HTTP server
address := net.JoinHostPort(cfg.Exporter.Host, strconv.Itoa(cfg.Exporter.Port))
Expand Down
2 changes: 1 addition & 1 deletion minion/offset_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
}

// 3. Check if high watermarks have been consumed. To avoid a race condition here we will wait some time before
// comparing, so that the consumer has enough time to pass the new high watermarks we just fetched.
// comparing, so that the consumer has enough time to catch up to the new high watermarks we just fetched.
time.Sleep(3 * time.Second)
consumedOffsets := s.storage.getConsumedOffsets()
topicRes := highMarksRes.Topics[0]
Expand Down
26 changes: 26 additions & 0 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package minion

import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"sync"
"time"
Expand Down Expand Up @@ -92,6 +94,30 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}

func (s *Service) isReady() bool {
if s.Cfg.ConsumerGroups.ScrapeMode == ConsumerGroupScrapeModeAdminAPI {
return true
}

return s.storage.isReady()
}

func (s *Service) HandleIsReady() http.HandlerFunc {
type response struct {
StatusCode int `json:"statusCode"`
}
return func(w http.ResponseWriter, r *http.Request) {
status := http.StatusOK
if !s.isReady() {
status = http.StatusServiceUnavailable
}
res := response{StatusCode: status}
resJson, _ := json.Marshal(res)
w.WriteHeader(status)
w.Write(resJson)
}
}

func (s *Service) ensureCompatibility(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
Expand Down

0 comments on commit 45540a5

Please sign in to comment.