Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kdxf flush #7

Merged
merged 3 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ replace (
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt v3.2.2+incompatible // Fix security alert for jwt-go 3.2.0
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
//github.com/milvus-io/milvus-proto/go-api/v2 => github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20231201024302-ea1c36bb9546
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231213063050-7886c3ad3699
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,6 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20231201024302-ea1c36bb9546 h1:UQkHkAizP+uXmmEnae/J7G1TxxE59T8LyDLubJziCSo=
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20231201024302-ea1c36bb9546/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down Expand Up @@ -715,6 +713,8 @@ github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231213063050-7886c3ad3699 h1:43MGVVwZ/Vb7CteoDyZ33FEJMGs6FH55lOmpTezyY5U=
github.com/wayblink/milvus-proto/go-api/v2 v2.0.0-20231213063050-7886c3ad3699/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xiaofan-luan/pulsarctl v0.5.1 h1:2V+IWFarElzcln5WBbU3VNu3zC8Q7RS6rMpVs9oUfLg=
Expand Down
24 changes: 23 additions & 1 deletion internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datacoord
import (
"context"
"fmt"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
Expand Down Expand Up @@ -114,6 +113,29 @@ func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string,
return nil
}

func (c *Cluster) FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error {
if len(channels) == 0 {
return nil
}

for _, channel := range channels {
if !c.channelManager.Match(nodeID, channel) {
return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID)
}
}

req := &datapb.FlushChannelsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()),
commonpbutil.WithTargetID(nodeID),
),
FlushTs: flushTs,
Channels: channels,
}

return c.sessionManager.FlushChannels(ctx, nodeID, req)
}

// Import sends import requests to DataNodes whose ID==nodeID.
func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) {
c.sessionManager.Import(ctx, nodeID, it)
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ type mockDataNodeClient struct {
compactionResp *commonpb.Status
}

func (c *mockDataNodeClient) FlushChannels(ctx context.Context, req *datapb.FlushChannelsRequest) (*commonpb.Status, error) {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}

func newMockDataNodeClient(id int64, ch chan interface{}) (*mockDataNodeClient, error) {
return &mockDataNodeClient{
id: id,
Expand Down
63 changes: 60 additions & 3 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3742,7 +3742,7 @@ func TestGetFlushState(t *testing.T) {
},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Expand Down Expand Up @@ -3773,7 +3773,7 @@ func TestGetFlushState(t *testing.T) {
}
svr.stateCode.Store(commonpb.StateCode_Healthy)

resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Expand Down Expand Up @@ -3804,13 +3804,70 @@ func TestGetFlushState(t *testing.T) {
}
svr.stateCode.Store(commonpb.StateCode_Healthy)

resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: true,
}, resp)
})

t.Run("channel unflushed", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
svr := newTestServerWithMeta(t, nil, meta)
defer closeTestServer(t, svr)

var (
vchannel = "ch1"
collection = int64(0)
)

svr.channelManager = &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: vchannel, CollectionID: collection}}},
},
},
}

err = svr.meta.UpdateChannelCheckpoint(vchannel, &internalpb.MsgPosition{
MsgID: []byte{1},
Timestamp: 10,
})
assert.NoError(t, err)

resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{
FlushTs: 11,
CollectionID: collection,
})
assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: false,
}, resp)
})

t.Run("no channels", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
svr := newTestServerWithMeta(t, nil, meta)
defer closeTestServer(t, svr)

var (
collection = int64(0)
)

resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{
FlushTs: 11,
CollectionID: collection,
})
assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: true,
}, resp)
})
}

func TestGetFlushAllState(t *testing.T) {
Expand Down
51 changes: 44 additions & 7 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,47 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
flushSegmentIDs := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if segment != nil &&
(segment.GetState() == commonpb.SegmentState_Flushed ||
segment.GetState() == commonpb.SegmentState_Flushing) &&
isFlushState(segment.GetState()) &&
!sealedSegmentsIDDict[segment.GetID()] {
flushSegmentIDs = append(flushSegmentIDs, segment.GetID())
}
}

err = retry.Do(ctx, func() error {
for _, channelInfo := range s.channelManager.GetChannels() {
nodeID := channelInfo.NodeID
channels := lo.Filter(channelInfo.Channels, func(channel *channel, _ int) bool {
return channel.CollectionID == req.GetCollectionID()
})
channelNames := lo.Map(channels, func(channel *channel, _ int) string {
return channel.Name
})
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil {
return err
}
}
return nil
}, retry.Attempts(60))
if err != nil {
resp.Status.Reason = fmt.Sprintf("failed to flush channel %d, %s", req.CollectionID, err)
return resp, nil
}

log.Info("flush response with segments",
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("sealSegments", sealedSegmentIDs),
zap.Int64s("flushSegments", flushSegmentIDs),
zap.Int64("timeOfSeal", timeOfSeal.Unix()),
zap.Time("flushTs", tsoutil.PhysicalTime(ts)),
zap.Any("channelCps", resp.GetChannelCps()))
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.DbID = req.GetDbID()
resp.CollectionID = req.GetCollectionID()
resp.SegmentIDs = sealedSegmentIDs
resp.TimeOfSeal = timeOfSeal.Unix()
resp.FlushSegmentIDs = flushSegmentIDs
resp.FlushTs = ts
return resp, nil
}

Expand Down Expand Up @@ -1272,11 +1294,13 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}

// GetFlushState gets the flush state of multiple segments
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()),
zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs()))).
WithRateGroup("dc.GetFlushState", 1, 60)
resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}
if s.isClosed() {
log.Warn("DataCoord receive GetFlushState request, server closed",
zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
log.Warn("DataCoord receive GetFlushState request, server closed")
setNotServingStatus(resp.Status, s.GetStateCode())
return resp, nil
}
Expand All @@ -1301,15 +1325,28 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
unflushed = append(unflushed, sid)
}

resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.ChannelCps = channelCPs

for _, channelCP := range channelCPs {
if channelCP.GetTimestamp() < req.GetFlushTs() {
resp.Flushed = false
log.RatedInfo(10, "GetFlushState failed, channel unflushed",
zap.String("channel", channelCP.GetChannelName()),
zap.Time("CP", tsoutil.PhysicalTime(channelCP.GetTimestamp())),
zap.Duration("lag", tsoutil.PhysicalTime(req.GetFlushTs()).Sub(tsoutil.PhysicalTime(channelCP.GetTimestamp()))))
return resp, nil
}
}

if len(unflushed) != 0 {
log.Info("DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("segmentIDs", unflushed), zap.Int("len", len(unflushed)))
resp.Flushed = false
} else {
log.Info("DataCoord receive GetFlushState request, Flushed is true", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
resp.Flushed = true
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.ChannelCps = channelCPs

return resp, nil
}

Expand Down
22 changes: 22 additions & 0 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -291,6 +292,27 @@ func (c *SessionManager) getClient(ctx context.Context, nodeID int64) (types.Dat
return session.GetOrCreateClient(ctx)
}

func (c *SessionManager) FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error {
log := log.Ctx(ctx).With(zap.Int64("nodeID", nodeID),
zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs())),
zap.Strings("channels", req.GetChannels()))
cli, err := c.getClient(ctx, nodeID)
if err != nil {
log.Warn("failed to get client", zap.Error(err))
return err
}

log.Info("SessionManager.FlushChannels start")
resp, err := cli.FlushChannels(ctx, req)
err = VerifyResponse(resp, err)
if err != nil {
log.Warn("SessionManager.FlushChannels failed", zap.Error(err))
return err
}
log.Info("SessionManager.FlushChannels successfully")
return nil
}

// Close release sessions
func (c *SessionManager) Close() {
c.sessions.Lock()
Expand Down
40 changes: 34 additions & 6 deletions internal/datanode/channel_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ type Channel interface {

// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
forceToSync()

setIsHighMemory(b bool)
getIsHighMemory() bool

getFlushTs() Timestamp
setFlushTs(ts Timestamp)

close()
}
Expand All @@ -111,7 +116,15 @@ type ChannelMeta struct {
segMu sync.RWMutex
segments map[UniqueID]*Segment

needToSync *atomic.Bool
// isHighMemory is intended to trigger the syncing of segments
// when segment's buffer consumes a significant amount of memory.
isHighMemory *atomic.Bool

// flushTs is intended to trigger:
// 1. the syncing of segments when consumed ts exceeds flushTs;
// 2. the updating of channelCP when channelCP exceeds flushTs.
flushTs *atomic.Uint64

syncPolicies []segmentSyncPolicy

metaService *metaService
Expand All @@ -133,11 +146,14 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection

segments: make(map[UniqueID]*Segment),

needToSync: atomic.NewBool(false),
isHighMemory: atomic.NewBool(false),
flushTs: atomic.NewUint64(math.MaxUint64),

syncPolicies: []segmentSyncPolicy{
syncPeriodically(),
syncMemoryTooHigh(),
syncCPLagTooBehind(),
syncSegmentsAtTs(),
},

metaService: metaService,
Expand Down Expand Up @@ -283,7 +299,7 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {

segIDsToSync := typeutil.NewUniqueSet()
for _, policy := range c.syncPolicies {
segments := policy(validSegs, ts, c.needToSync)
segments := policy(validSegs, c, ts)
for _, segID := range segments {
segIDsToSync.Insert(segID)
}
Expand Down Expand Up @@ -895,8 +911,20 @@ func (c *ChannelMeta) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *inter
log.Warn("cannot find segment when evictHistoryDeleteBuffer", zap.Int64("segmentID", segmentID))
}

func (c *ChannelMeta) forceToSync() {
c.needToSync.Store(true)
func (c *ChannelMeta) setIsHighMemory(b bool) {
c.isHighMemory.Store(b)
}

func (c *ChannelMeta) getIsHighMemory() bool {
return c.isHighMemory.Load()
}

func (c *ChannelMeta) getFlushTs() Timestamp {
return c.flushTs.Load()
}

func (c *ChannelMeta) setFlushTs(ts Timestamp) {
c.flushTs.Store(ts)
}

func (c *ChannelMeta) getTotalMemorySize() int64 {
Expand Down
Loading
Loading