Skip to content

Commit

Permalink
Add rpc for logcoord/node and add lognode client for proxy
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd committed Oct 11, 2023
1 parent 7df0cd9 commit f9990bb
Show file tree
Hide file tree
Showing 15 changed files with 923 additions and 494 deletions.
12 changes: 12 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,18 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt
return merr.Status(nil), nil
}

func (s *Server) GetChannelDistribution(ctx context.Context, req *datapb.GetChannelDistributionRequest) (*datapb.GetChannelDistributionResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &datapb.GetChannelDistributionResponse{Status: merr.Status(err)}, nil
}
infos := s.logCoord.GetPChannelInfos()

return &datapb.GetChannelDistributionResponse{
Status: merr.Status(nil),
Infos: infos,
}, nil
}

func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &milvuspb.CheckHealthResponse{
Expand Down
6 changes: 6 additions & 0 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,9 @@ func (c *Client) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat
return client.ReportDataNodeTtMsgs(ctx, req)
})
}

func (c *Client) GetChannelDistribution(ctx context.Context, req *datapb.GetChannelDistributionRequest, opts ...grpc.CallOption) (*datapb.GetChannelDistributionResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetChannelDistributionResponse, error) {
return client.GetChannelDistribution(ctx, req)
})
}
24 changes: 14 additions & 10 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,6 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.
return s.dataCoord.GetCompactionStateWithPlans(ctx, req)
}

// WatchChannels starts watch channels by give request
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return s.dataCoord.WatchChannels(ctx, req)
}

// GetFlushState gets the flush state of the collection based on the provided flush ts and segment IDs.
func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return s.dataCoord.GetFlushState(ctx, req)
Expand All @@ -377,11 +372,6 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
return s.dataCoord.GetFlushAllState(ctx, req)
}

// DropVirtualChannel drop virtual channel in datacoord
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return s.dataCoord.DropVirtualChannel(ctx, req)
}

// SetSegmentState sets the state of a segment.
func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return s.dataCoord.SetSegmentState(ctx, req)
Expand Down Expand Up @@ -473,3 +463,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) {
return s.dataCoord.ReportDataNodeTtMsgs(ctx, req)
}

// WatchChannels starts watch channels by give request
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return s.dataCoord.WatchChannels(ctx, req)
}

// DropVirtualChannel drop virtual channel in datacoord
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return s.dataCoord.DropVirtualChannel(ctx, req)
}

func (s *Server) GetChannelDistribution(ctx context.Context, req *datapb.GetChannelDistributionRequest) (*datapb.GetChannelDistributionResponse, error) {
return s.dataCoord.GetChannelDistribution(ctx, req)
}
6 changes: 6 additions & 0 deletions internal/distributed/lognode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (c *Client) WatchChannel(ctx context.Context, req *logpb.WatchChannelReques
})
}

func (c *Client) UnwatchChannel(ctx context.Context, req *logpb.UnwatchChannelRequest, options ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client logpb.LogNodeClient) (*commonpb.Status, error) {
return client.UnwatchChannel(ctx, req, options...)
})
}

func (c *Client) Insert(ctx context.Context, req *logpb.InsertRequest, options ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client logpb.LogNodeClient) (*commonpb.Status, error) {
return client.Insert(ctx, req, options...)
Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/lognode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetCompon
func (s *Server) WatchChannel(ctx context.Context, req *logpb.WatchChannelRequest) (*commonpb.Status, error) {
return s.lognode.WatchChannel(ctx, req)
}
func (s *Server) UnwatchChannel(ctx context.Context, req *logpb.UnwatchChannelRequest) (*commonpb.Status, error) {
return s.lognode.UnwatchChannel(ctx, req)
}

func (s *Server) Insert(ctx context.Context, req *logpb.InsertRequest) (*commonpb.Status, error) {
return s.lognode.Insert(ctx, req)
Expand Down
21 changes: 16 additions & 5 deletions internal/logcoord/meta/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type PhysicalChannel struct {
catalog metastore.DataCoordCatalog
name string
status logpb.PChannelState
state logpb.PChannelState

nodeID int64
leaseID uint64
Expand All @@ -45,7 +45,7 @@ func NewPhysicalChannel(name string, catalog metastore.DataCoordCatalog) *Physic
nodeID: -1,
leaseID: 0,
catalog: catalog,
status: logpb.PChannelState_Waitting,
state: logpb.PChannelState_Waitting,
}
}

Expand All @@ -64,7 +64,7 @@ func (c *PhysicalChannel) Assign(ctx context.Context, nodeID int64) error {
}

c.nodeID = nodeID
c.status = logpb.PChannelState_Watching
c.state = logpb.PChannelState_Watching
return nil
}

Expand All @@ -83,7 +83,7 @@ func (c *PhysicalChannel) Unassign(ctx context.Context) error {
}

c.nodeID = -1
c.status = logpb.PChannelState_Waitting
c.state = logpb.PChannelState_Waitting
return nil
}

Expand Down Expand Up @@ -121,11 +121,22 @@ func (c *PhysicalChannel) GetRef() uint64 {
return c.refCnt
}

func (c *PhysicalChannel) GetInfo() *logpb.PChannelInfo {
c.mu.RLock()
defer c.mu.RUnlock()

return &logpb.PChannelInfo{
Name: c.name,
State: c.state,
NodeID: c.nodeID,
}
}

func (c *PhysicalChannel) CheckState(state logpb.PChannelState) bool {
c.mu.RLock()
defer c.mu.RUnlock()

return c.status == state
return c.state == state
}

func (c *PhysicalChannel) IncRef() {
Expand Down
36 changes: 23 additions & 13 deletions internal/logcoord/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Meta interface {
Init(types.RootCoordClient) error
// physical Channel
GetPChannel(channel string) *PhysicalChannel
GetPChannelInfos() []*logpb.PChannelInfo
GetPChannelNamesBy(filters ...PChannelFilter) []string

UpdateLeaseID(ctx context.Context, channel string) error
Expand All @@ -38,7 +39,6 @@ type Meta interface {
// virtual vhannel
AddVChannel(channels ...string) error
RemoveVChannel(channels ...string) error
// ListVChannelName() []string
}

type PChannelFilter func(*PhysicalChannel) bool
Expand All @@ -59,6 +59,12 @@ func (l *PChannelList) GetNames() []string {
return lo.Keys(*l)
}

func (l *PChannelList) GetInfos() []*logpb.PChannelInfo {
return lo.Map(lo.Values(*l), func(channel *PhysicalChannel, _ int) *logpb.PChannelInfo {
return channel.GetInfo()
})
}

func NewPChannelList(channels []string, catalog metastore.DataCoordCatalog, infos map[string]*logpb.PChannelInfo, leaseIDs map[string]uint64) PChannelList {
list := make(map[string]*PhysicalChannel)

Expand All @@ -79,18 +85,18 @@ func NewPChannelList(channels []string, catalog metastore.DataCoordCatalog, info
return list
}

type ChannelMeta struct {
type MetaImpl struct {
catalog metastore.DataCoordCatalog
channelList PChannelList
}

func NewChannelMeta(catalog metastore.DataCoordCatalog) *ChannelMeta {
return &ChannelMeta{
func NewMetaImpl(catalog metastore.DataCoordCatalog) *MetaImpl {
return &MetaImpl{
catalog: catalog,
}
}

func (m *ChannelMeta) initPChannel(ctx context.Context, channels ...string) error {
func (m *MetaImpl) initPChannel(ctx context.Context, channels ...string) error {
infos, err := m.catalog.ListPChannelInfo(ctx)
if err != nil {
return err
Expand All @@ -105,7 +111,7 @@ func (m *ChannelMeta) initPChannel(ctx context.Context, channels ...string) erro
return nil
}

func (m *ChannelMeta) Init(rc types.RootCoordClient) error {
func (m *MetaImpl) Init(rc types.RootCoordClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -119,23 +125,27 @@ func (m *ChannelMeta) Init(rc types.RootCoordClient) error {
return nil
}

func (m *ChannelMeta) UpdateLeaseID(ctx context.Context, channel string) error {
func (m *MetaImpl) UpdateLeaseID(ctx context.Context, channel string) error {
return m.GetPChannel(channel).UpdateLeaseID(ctx)
}

func (m *ChannelMeta) AssignPChannel(ctx context.Context, channel string, nodeID int64) error {
func (m *MetaImpl) AssignPChannel(ctx context.Context, channel string, nodeID int64) error {
return m.GetPChannel(channel).Assign(ctx, nodeID)
}

func (m *ChannelMeta) UnassignPChannel(ctx context.Context, channel string) error {
func (m *MetaImpl) UnassignPChannel(ctx context.Context, channel string) error {
return m.GetPChannel(channel).Unassign(ctx)
}

func (m *ChannelMeta) GetPChannel(channel string) *PhysicalChannel {
func (m *MetaImpl) GetPChannel(channel string) *PhysicalChannel {
return m.channelList.Get(channel)
}

func (m *ChannelMeta) GetPChannelNamesBy(filters ...PChannelFilter) []string {
func (m *MetaImpl) GetPChannelInfos() []*logpb.PChannelInfo {
return m.channelList.GetInfos()
}

func (m *MetaImpl) GetPChannelNamesBy(filters ...PChannelFilter) []string {
channels := []string{}
filter := func(channel *PhysicalChannel) bool {
for _, filter := range filters {
Expand All @@ -154,15 +164,15 @@ func (m *ChannelMeta) GetPChannelNamesBy(filters ...PChannelFilter) []string {
return channels
}

func (m *ChannelMeta) AddVChannel(channels ...string) error {
func (m *MetaImpl) AddVChannel(channels ...string) error {
for _, channel := range channels {
pchannel := getPChannelName(channel)
m.channelList.Get(pchannel).IncRef()
}
return nil
}

func (m *ChannelMeta) RemoveVChannel(channels ...string) error {
func (m *MetaImpl) RemoveVChannel(channels ...string) error {
for _, channel := range channels {
pchannel := getPChannelName(channel)
m.channelList.Get(pchannel).DecRef()
Expand Down
11 changes: 8 additions & 3 deletions internal/logcoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/logcoord/meta"
"github.com/milvus-io/milvus/internal/logcoord/session"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/logpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -46,7 +47,7 @@ func NewLogCoord(factory msgstream.Factory) *Server {
func (m *Server) Init(etcdSession *sessionutil.Session, catalog metastore.DataCoordCatalog) error {
var err error
m.initOnce.Do(func() {
m.meta = meta.NewChannelMeta(catalog)
m.meta = meta.NewMetaImpl(catalog)
err = m.meta.Init(m.rootCoord)
if err != nil {
return
Expand Down Expand Up @@ -74,18 +75,22 @@ func (m *Server) Stop() {
})
}

func (m *Server) WatchVChannel(channels ...string) error {
func (m *Server) WatchVChannels(channels ...string) error {
err := m.meta.AddVChannel(channels...)
if err != nil {
return err
}
return nil
}

func (m *Server) UnwatchVChannel(channels ...string) error {
func (m *Server) DropVChannels(channels ...string) error {
err := m.meta.RemoveVChannel(channels...)
if err != nil {
return err
}
return nil
}

func (m *Server) GetPChannelInfos() []*logpb.PChannelInfo {
return m.meta.GetPChannelInfos()
}
25 changes: 25 additions & 0 deletions internal/lognode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ func (node *LogNode) WatchChannel(ctx context.Context, req *logpb.WatchChannelRe
return merr.Status(err), nil
}

func (node *LogNode) UnwatchChannel(ctx context.Context, req *logpb.UnwatchChannelRequest) (*commonpb.Status, error) {
log.Debug("received UnwatchChannel Request",
zap.Int64("msgID", req.GetBase().GetMsgID()),
zap.String("pChannel", req.GetPChannel()))

if !node.lifetime.Add(commonpbutil.IsHealthy) {
msg := fmt.Sprintf("log node %d is not ready", paramtable.GetNodeID())
err := merr.WrapErrServiceNotReady(msg)
return merr.Status(err), nil
}
defer node.lifetime.Done()

err := merr.CheckTargetID(req.GetBase())
if err != nil {
log.Warn("target ID not match",
zap.Int64("targetID", req.GetBase().GetTargetID()),
zap.Int64("nodeID", paramtable.GetNodeID()),
)
return merr.Status(err), nil
}

node.loggerManager.RemoveLogger(req.GetPChannel())
return merr.Status(nil), nil
}

func (node *LogNode) Insert(ctx context.Context, req *logpb.InsertRequest) (*commonpb.Status, error) {
log.Debug("received WatchChannel Request",
zap.Int64("msgID", req.GetBase().GetMsgID()),
Expand Down
29 changes: 26 additions & 3 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import "milvus.proto";
import "schema.proto";
import "msg.proto";
import "index_coord.proto";
import "log_coord.proto";

// TODO: import google/protobuf/empty.proto
message Empty {}
Expand Down Expand Up @@ -59,9 +60,7 @@ service DataCoord {
rpc GetCompactionState(milvus.GetCompactionStateRequest) returns (milvus.GetCompactionStateResponse) {}
rpc GetCompactionStateWithPlans(milvus.GetCompactionPlansRequest) returns (milvus.GetCompactionPlansResponse) {}

rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}
rpc GetFlushState(GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {}
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}

rpc SetSegmentState(SetSegmentStateRequest) returns (SetSegmentStateResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
Expand Down Expand Up @@ -91,8 +90,14 @@ service DataCoord {
rpc GcConfirm(GcConfirmRequest) returns (GcConfirmResponse) {}

rpc ReportDataNodeTtMsgs(ReportDataNodeTtMsgsRequest) returns (common.Status) {}
}

rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}

rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}

rpc GetChannelDistribution(GetChannelDistributionRequest) returns (GetChannelDistributionResponse) {}
}

service DataNode {
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns (milvus.StringResponse) {}
Expand Down Expand Up @@ -535,6 +540,24 @@ message WatchChannelsResponse {
common.Status status = 1;
}

message DropChannelsRequest {
int64 collectionID = 1;
repeated string channelNames = 2;
}

message DropChannelsResponse {
common.Status status = 1;
}

message GetChannelDistributionRequest{
common.MsgBase base = 1;
}

message GetChannelDistributionResponse{
common.Status status = 1;
repeated log.PChannelInfo infos = 2;
}

message SetSegmentStateRequest {
common.MsgBase base = 1;
int64 segment_id = 2;
Expand Down
Loading

0 comments on commit f9990bb

Please sign in to comment.