Skip to content

Commit

Permalink
Impl GetReplicas in Proxy
Browse files Browse the repository at this point in the history
See also: #16298

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Apr 6, 2022
1 parent 543ec4a commit 7f81c29
Show file tree
Hide file tree
Showing 19 changed files with 3,928 additions and 992 deletions.
2,218 changes: 1,995 additions & 223 deletions internal/core/src/pb/milvus.pb.cc

Large diffs are not rendered by default.

1,099 changes: 1,098 additions & 1 deletion internal/core/src/pb/milvus.pb.h

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,10 @@ func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStat
return s.proxy.GetImportState(ctx, req)
}

func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return s.proxy.GetReplicas(ctx, req)
}

// Check is required by gRPC healthy checking
func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
ret := &grpc_health_v1.HealthCheckResponse{
Expand Down
6 changes: 5 additions & 1 deletion internal/distributed/proxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return nil, nil
}

func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return nil, nil
}

Expand Down Expand Up @@ -708,6 +708,10 @@ func (m *MockProxy) GetImportState(ctx context.Context, req *milvuspb.GetImportS
return nil, nil
}

func (m *MockProxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return nil, nil
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}

// GetReplicas gets the replicas of a certain collection.
func (c *Client) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
Expand All @@ -312,7 +312,7 @@ func (c *Client) GetReplicas(ctx context.Context, req *querypb.GetReplicasReques
if err != nil || ret == nil {
return nil, err
}
return ret.(*querypb.GetReplicasResponse), err
return ret.(*milvuspb.GetReplicasResponse), err
}

// GetShardLeaders gets the shard leaders of a certain collection.
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}

// GetReplicas returns the shard leaders of a certain collection.
func (s *Server) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return s.queryCoord.GetReplicas(ctx, req)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/querycoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type MockQueryCoord struct {
channelResp *querypb.CreateQueryChannelResponse
infoResp *querypb.GetSegmentInfoResponse
metricResp *milvuspb.GetMetricsResponse
replicasResp *querypb.GetReplicasResponse
replicasResp *milvuspb.GetReplicasResponse
shardLeadersResp *querypb.GetShardLeadersResponse
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return m.metricResp, m.err
}

func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return m.replicasResp, m.err
}

Expand Down
32 changes: 32 additions & 0 deletions internal/proto/milvus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ service MilvusService {
rpc GetFlushState(GetFlushStateRequest) returns (GetFlushStateResponse) {}
rpc GetPersistentSegmentInfo(GetPersistentSegmentInfoRequest) returns (GetPersistentSegmentInfoResponse) {}
rpc GetQuerySegmentInfo(GetQuerySegmentInfoRequest) returns (GetQuerySegmentInfoResponse) {}
rpc GetReplicas(GetReplicasRequest) returns (GetReplicasResponse) {}

rpc Dummy(DummyRequest) returns (DummyResponse) {}

Expand Down Expand Up @@ -821,6 +822,37 @@ message GetImportStateResponse {
repeated common.KeyValuePair infos = 5; // more informations about the task, progress percent, file path, failed reason, etc.
}

message GetReplicasRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
bool with_shard_nodes = 3;
}

message GetReplicasResponse {
common.Status status = 1;
repeated ReplicaInfo replicas = 2;
}

message ReplicaInfo { // ReplicaGroup
int64 replicaID = 1;
int64 collectionID = 2;
repeated int64 partition_ids = 3; // empty indicates to load collection
repeated ShardReplica shard_replicas = 4;
repeated int64 node_ids = 5; // include leaders
}

message ShardReplica {
int64 leaderID = 1;
string leader_addr = 2; // IP:port
string dm_channel_name = 3;
// optional, DO NOT save it in meta, set it only for GetReplicas()
// if with_shard_nodes is true
repeated int64 node_ids = 4;
}



service ProxyService {
rpc RegisterLink(RegisterLinkRequest) returns (RegisterLinkResponse) {}
}

Loading

0 comments on commit 7f81c29

Please sign in to comment.