Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/kdxf' into kdxf-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Dec 13, 2023
2 parents 30006dd + 5f5ed54 commit c7aa0a3
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 15 deletions.
68 changes: 64 additions & 4 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -59,10 +60,24 @@ type garbageCollector struct {
segRefer *SegmentReferenceManager
indexCoord types.IndexCoord

startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
cmdCh chan gcCmd
pauseUntil time.Time
}

type cmdType int32

const (
pause cmdType = iota + 1
resume
)

type gcCmd struct {
cmdType cmdType
duration time.Duration
}

// newGarbageCollector create garbage collector with meta and option
Expand All @@ -76,6 +91,7 @@ func newGarbageCollector(meta *meta, handler Handler, segRefer *SegmentReference
indexCoord: indexCoord,
option: opt,
closeCh: make(chan struct{}),
cmdCh: make(chan gcCmd),
}
}

Expand All @@ -93,15 +109,59 @@ func (gc *garbageCollector) start() {
}
}

func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
if !gc.option.enabled {
log.Info("garbage collection not enabled")
return nil
}
select {
case gc.cmdCh <- gcCmd{
cmdType: pause,
duration: pauseDuration,
}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (gc *garbageCollector) Resume(ctx context.Context) error {
if !gc.option.enabled {
log.Warn("garbage collection not enabled, cannot resume")
return errors.New("garbage collection not enabled")
}
select {
case gc.cmdCh <- gcCmd{
cmdType: resume,
}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// work contains actual looping check logic
func (gc *garbageCollector) work() {
defer gc.wg.Done()
ticker := time.Tick(gc.option.checkInterval)
for {
select {
case <-ticker:
if time.Now().Before(gc.pauseUntil) {
log.Info("garbage collector paused", zap.Time("until", gc.pauseUntil))
continue
}
gc.clearEtcd()
gc.scan()
case cmd := <-gc.cmdCh:
switch cmd.cmdType {
case pause:
log.Info("garbage collection paused", zap.Duration("duration", cmd.duration))
gc.pauseUntil = time.Now().Add(cmd.duration)
case resume:
// reset to zero value
gc.pauseUntil = time.Time{}
}
case <-gc.closeCh:
log.Warn("garbage collector quit")
return
Expand Down
105 changes: 102 additions & 3 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
Expand Down Expand Up @@ -311,7 +312,7 @@ func (s *Server) initDataCoord() error {
return err
}

if Params.DataCoordCfg.EnableCompaction {
if Params.DataCoordCfg.GetEnableCompaction() {
s.createCompactionHandler()
s.createCompactionTrigger()
}
Expand Down Expand Up @@ -344,12 +345,110 @@ func (s *Server) Start() error {
}

func (s *Server) startDataCoord() {
if Params.DataCoordCfg.EnableCompaction {
if Params.DataCoordCfg.GetEnableCompaction() {
s.compactionHandler.start()
s.compactionTrigger.start()
}
s.startServerLoop()

management.Register(&management.HTTPHandler{
Path: "/datacoord/garbage_collection/pause",
HandlerFunc: func(w management.ResponseWriter, req *management.Request) {
log.Info("receive garbage collection pause request")
pauseSeconds := req.URL.Query().Get("pause_seconds")
seconds, err := strconv.ParseInt(pauseSeconds, 10, 64)
if err != nil {
w.WriteHeader(400)
w.Write([]byte(fmt.Sprintf(`{"msg": "invalid pause seconds(%v)"}`, pauseSeconds)))
return
}

err = s.garbageCollector.Pause(req.Context(), time.Duration(seconds)*time.Second)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error())))
return
}
w.WriteHeader(200)
w.Write([]byte(`{"msg": "OK"}`))
return
},
})
management.Register(&management.HTTPHandler{
Path: "/datacoord/garbage_collection/resume",
HandlerFunc: func(w management.ResponseWriter, req *management.Request) {
log.Info("receive garbage collection resume request")
err := s.garbageCollector.Resume(req.Context())
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error())))
return
}
w.WriteHeader(200)
w.Write([]byte(`{"msg": "OK"}`))
return
},
})
management.Register(&management.HTTPHandler{
Path: "/datacoord/settings/debug",
HandlerFunc: func(writer management.ResponseWriter, request *management.Request) {
opType := request.URL.Query().Get("type")
settingsKey := request.URL.Query().Get("key")
settingsValue := request.URL.Query().Get("value")
if settingsKey == "" {
writer.WriteHeader(500)
writer.Write([]byte(fmt.Sprintf(`{"msg": "the param 'key' is empty"}`)))
return
}
if opType == "" {
opType = "get"
}
log.Info("debug settings", zap.String("op", opType), zap.String("key", settingsKey), zap.String("value", settingsValue))
if opType == "get" {
var v string
switch settingsKey {
case "enableCompaction":
v = fmt.Sprintf("%v", Params.DataCoordCfg.GetEnableCompaction())
case "enableAutoCompaction":
v = fmt.Sprintf("%v", Params.DataCoordCfg.GetEnableAutoCompaction())
default:
v = "unknown"
}
writer.WriteHeader(200)
writer.Write([]byte(fmt.Sprintf(`{"key": "%s", "value": "%s"}`, settingsKey, v)))
return
} else if opType == "set" {
switch settingsKey {
case "enableCompaction":
b, err := strconv.ParseBool(settingsValue)
if err != nil {
writer.WriteHeader(500)
writer.Write([]byte(fmt.Sprintf(`{"msg": "the param 'value' is invalid"}`)))
return
}
Params.DataCoordCfg.SetEnableCompaction(b)
case "enableAutoCompaction":
b, err := strconv.ParseBool(settingsValue)
if err != nil {
writer.WriteHeader(500)
writer.Write([]byte(fmt.Sprintf(`{"msg": "the param 'value' is invalid"}`)))
return
}
Params.DataCoordCfg.SetEnableAutoCompaction(b)
default:
writer.WriteHeader(500)
writer.Write([]byte(fmt.Sprintf(`{"msg": "the param 'key' is invalid"}`)))
return
}
writer.WriteHeader(200)
writer.Write([]byte(fmt.Sprintf(`{"key": "%s", "value": "%s"}`, settingsKey, settingsValue)))
return
}
writer.WriteHeader(500)
writer.Write([]byte(fmt.Sprintf(`{"msg": "the param 'type' is invalid"}`)))
},
})

// DataCoord (re)starts successfully and starts to collection segment stats
// data from all DataNode.
// This will prevent DataCoord from missing out any important segment stats
Expand Down Expand Up @@ -888,7 +987,7 @@ func (s *Server) Stop() error {
s.garbageCollector.close()
s.stopServerLoop()

if Params.DataCoordCfg.EnableCompaction {
if Params.DataCoordCfg.GetEnableCompaction() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3361,7 +3361,7 @@ func TestGetRecoveryInfo(t *testing.T) {
}

func TestGetCompactionState(t *testing.T) {
Params.DataCoordCfg.EnableCompaction = true
Params.DataCoordCfg.SetEnableCompaction(true)
t.Run("test get compaction state with new compactionhandler", func(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Healthy)
Expand Down Expand Up @@ -3425,7 +3425,7 @@ func TestGetCompactionState(t *testing.T) {
}

func TestManualCompaction(t *testing.T) {
Params.DataCoordCfg.EnableCompaction = true
Params.DataCoordCfg.SetEnableCompaction(true)
t.Run("test manual compaction successfully", func(t *testing.T) {
svr := &Server{allocator: &MockAllocator{}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
Expand Down
9 changes: 5 additions & 4 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID

if !req.Importing && Params.DataCoordCfg.EnableCompaction {
if !req.Importing && Params.DataCoordCfg.GetEnableCompaction() {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
if err != nil {
Expand Down Expand Up @@ -1115,7 +1115,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
return resp, nil
}

if !Params.DataCoordCfg.EnableCompaction {
if !Params.DataCoordCfg.GetEnableCompaction() {
resp.Status.Reason = "compaction disabled"
return resp, nil
}
Expand Down Expand Up @@ -1149,7 +1149,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac
return resp, nil
}

if !Params.DataCoordCfg.EnableCompaction {
if !Params.DataCoordCfg.GetEnableCompaction() {
resp.Status.Reason = "compaction disabled"
return resp, nil
}
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.
return resp, nil
}

if !Params.DataCoordCfg.EnableCompaction {
if !Params.DataCoordCfg.GetEnableCompaction() {
resp.Status.Reason = "compaction disabled"
return resp, nil
}
Expand Down Expand Up @@ -1346,6 +1346,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
log.Info("DataCoord receive GetFlushState request, Flushed is true", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
resp.Flushed = true
}

return resp, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/management/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
ListenPortEnvKey = "METRICS_PORT"
)

// Provide alias for native http package
// avoiding import alias when using http package

type ResponseWriter = http.ResponseWriter
type Request = http.Request

type HTTPHandler struct {
Path string
HandlerFunc http.HandlerFunc
Expand Down
16 changes: 14 additions & 2 deletions internal/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ type dataCoordConfig struct {
UpdatedTime time.Time

// compaction
EnableCompaction bool
EnableCompaction atomic.Value
EnableAutoCompaction atomic.Value

MinSegmentToMerge int
Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (p *dataCoordConfig) initChannelWatchPrefix() {
}

func (p *dataCoordConfig) initEnableCompaction() {
p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false)
p.EnableCompaction.Store(p.Base.ParseBool("dataCoord.enableCompaction", false))
}

func (p *dataCoordConfig) initEnableAutoCompaction() {
Expand Down Expand Up @@ -1668,6 +1668,18 @@ func (p *dataCoordConfig) GetEnableAutoCompaction() bool {
return false
}

func (p *dataCoordConfig) SetEnableCompaction(enable bool) {
p.EnableCompaction.Store(enable)
}

func (p *dataCoordConfig) GetEnableCompaction() bool {
enable := p.EnableCompaction.Load()
if enable != nil {
return enable.(bool)
}
return false
}

func (p *dataCoordConfig) initEnableActiveStandby() {
p.EnableActiveStandby = p.Base.ParseBool("dataCoord.enableActiveStandby", false)
}
Expand Down

0 comments on commit c7aa0a3

Please sign in to comment.