Skip to content

Commit

Permalink
enhance: [2.3] init the hook when creating the proxy object (#34936)
Browse files Browse the repository at this point in the history
- issue: #34885
- pr: #34887

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG authored Jul 24, 2024
1 parent a6ac268 commit fe786ff
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
8 changes: 8 additions & 0 deletions internal/distributed/proxy/httpserver/handler_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestHTTPWrapper(t *testing.T) {
func TestGrpcWrapper(t *testing.T) {
getTestCases := []rawTestCase{}
getTestCasesNeedAuth := []rawTestCase{}
proxy.SetMockAPIHook("", nil)
needAuthPrefix := "/auth"
ginHandler := gin.Default()
app := ginHandler.Group("")
Expand Down Expand Up @@ -374,6 +375,7 @@ func TestTimeout(t *testing.T) {
func TestDatabaseWrapper(t *testing.T) {
postTestCases := []requestBodyTestCase{}
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &StatusSuccess,
DbNames: []string{DefaultCollectionName, "exist"},
Expand Down Expand Up @@ -466,6 +468,7 @@ func TestDatabaseWrapper(t *testing.T) {
func TestCreateCollection(t *testing.T) {
postTestCases := []requestBodyTestCase{}
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(11)
mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6)
mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6)
Expand Down Expand Up @@ -693,6 +696,7 @@ func initHTTPServerV2(proxy types.ProxyComponent, needAuth bool) *gin.Engine {
func TestMethodGet(t *testing.T) {
paramtable.Init()
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &StatusSuccess,
}, nil).Once()
Expand Down Expand Up @@ -918,6 +922,7 @@ var commonErrorStatus = &commonpb.Status{
func TestMethodDelete(t *testing.T) {
paramtable.Init()
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().DropCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().DropPartition(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().DeleteCredential(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
Expand Down Expand Up @@ -969,6 +974,7 @@ func TestMethodDelete(t *testing.T) {
func TestMethodPost(t *testing.T) {
paramtable.Init()
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().RenameCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once()
mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Twice()
Expand Down Expand Up @@ -1077,6 +1083,7 @@ func TestMethodPost(t *testing.T) {
func TestDML(t *testing.T) {
paramtable.Init()
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
CollectionName: DefaultCollectionName,
Schema: generateCollectionSchema(schemapb.DataType_Int64),
Expand Down Expand Up @@ -1198,6 +1205,7 @@ func TestDML(t *testing.T) {
func TestSearchV2(t *testing.T) {
paramtable.Init()
mp := mocks.NewMockProxy(t)
proxy.SetMockAPIHook("", nil)
mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
CollectionName: DefaultCollectionName,
Schema: generateCollectionSchema(schemapb.DataType_Int64),
Expand Down
6 changes: 2 additions & 4 deletions internal/proxy/hook_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ func UnaryServerHookInterceptor() grpc.UnaryServerInterceptor {

func HookInterceptor(ctx context.Context, req any, userName, fullMethod string, handler grpc.UnaryHandler) (interface{}, error) {
if hoo == nil {
if hookError := initHook(); hookError != nil {
logger.Error("hook error", zap.String("path", Params.ProxyCfg.SoPath.GetValue()), zap.Error(hookError))
hoo = defaultHook{}
}
log.Warn("hook is not initialized")
return nil, errors.New("hook is not initialized")
}
var (
newCtx context.Context
Expand Down
4 changes: 4 additions & 0 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("proxy", node)
if hookError := initHook(); hookError != nil {
log.Warn("hook error", zap.String("path", Params.ProxyCfg.SoPath.GetValue()), zap.Error(hookError))
hoo = defaultHook{}
}
logutil.Logger(ctx).Debug("create a new Proxy instance", zap.Any("state", node.stateCode.Load()))
return node, nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/querycoordv2/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"go.uber.org/zap"
)

// In a replica, a shard is available, if and only if:
Expand Down

0 comments on commit fe786ff

Please sign in to comment.