Skip to content

Commit

Permalink
Fix proxy unable to update cache (milvus-io#16646)
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Apr 26, 2022
1 parent bc5e9ec commit fbc7fe1
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
4 changes: 2 additions & 2 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmen

// Search performs replica search tasks in QueryNode.
func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
Expand All @@ -258,7 +258,7 @@ func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*inter

// Query performs replica query tasks in QueryNode.
func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
Expand Down
7 changes: 5 additions & 2 deletions internal/proxy/task_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package proxy
import (
"context"
"errors"
"fmt"

qnClient "github.com/milvus-io/milvus/internal/distributed/querynode/client"

Expand Down Expand Up @@ -75,7 +74,11 @@ func roundRobinPolicy(ctx context.Context, getQueryNodePolicy getQueryNodePolicy
}

if current == replicaNum && err != nil {
return fmt.Errorf("no shard leaders available for channel: %s, leaders: %v, err: %s", leaders.GetChannelName(), leaders.GetNodeIds(), err.Error())
log.Warn("no shard leaders available for channel",
zap.String("channel name", leaders.GetChannelName()),
zap.Int64s("leaders", leaders.GetNodeIds()), zap.Error(err))
// needs to return the error from query
return err
}
return nil
}
2 changes: 1 addition & 1 deletion internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (t *queryTask) Execute(ctx context.Context) error {
return executeQuery(WithoutCache)
}
if err != nil {
return err
return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error())
}

log.Info("Query Execute done.",
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
return executeSearch(WithoutCache)
}
if err != nil {
return err
return fmt.Errorf("fail to search on all shard leaders, err=%s", err.Error())
}

log.Info("Search Execute done.",
Expand Down

0 comments on commit fbc7fe1

Please sign in to comment.