From 9ae52202df278336367e74f114894bbf228c36ad Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 23 Jul 2024 22:49:33 +0800 Subject: [PATCH] enhance: init the hook when creating the proxy object Signed-off-by: SimFG --- internal/distributed/proxy/httpserver/handler_v2_test.go | 8 ++++++++ internal/proxy/hook_interceptor.go | 6 ++---- internal/proxy/proxy.go | 4 ++++ internal/querycoordv2/utils/util.go | 3 ++- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index 85f7989e22752..5cd092339cb99 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -159,6 +159,7 @@ func TestHTTPWrapper(t *testing.T) { func TestGrpcWrapper(t *testing.T) { getTestCases := []rawTestCase{} getTestCasesNeedAuth := []rawTestCase{} + proxy.SetMockAPIHook("", nil) needAuthPrefix := "/auth" ginHandler := gin.Default() app := ginHandler.Group("") @@ -374,6 +375,7 @@ func TestTimeout(t *testing.T) { func TestDatabaseWrapper(t *testing.T) { postTestCases := []requestBodyTestCase{} mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ Status: &StatusSuccess, DbNames: []string{DefaultCollectionName, "exist"}, @@ -466,6 +468,7 @@ func TestDatabaseWrapper(t *testing.T) { func TestCreateCollection(t *testing.T) { postTestCases := []requestBodyTestCase{} mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(11) mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) @@ -693,6 +696,7 @@ func initHTTPServerV2(proxy types.ProxyComponent, needAuth bool) *gin.Engine { func TestMethodGet(t *testing.T) { paramtable.Init() mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ Status: &StatusSuccess, }, nil).Once() @@ -918,6 +922,7 @@ var commonErrorStatus = &commonpb.Status{ func TestMethodDelete(t *testing.T) { paramtable.Init() mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().DropCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() mp.EXPECT().DropPartition(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() mp.EXPECT().DeleteCredential(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() @@ -969,6 +974,7 @@ func TestMethodDelete(t *testing.T) { func TestMethodPost(t *testing.T) { paramtable.Init() mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() mp.EXPECT().RenameCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Once() mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Twice() @@ -1077,6 +1083,7 @@ func TestMethodPost(t *testing.T) { func TestDML(t *testing.T) { paramtable.Init() mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ CollectionName: DefaultCollectionName, Schema: generateCollectionSchema(schemapb.DataType_Int64), @@ -1198,6 +1205,7 @@ func TestDML(t *testing.T) { func TestSearchV2(t *testing.T) { paramtable.Init() mp := mocks.NewMockProxy(t) + proxy.SetMockAPIHook("", nil) mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ CollectionName: DefaultCollectionName, Schema: generateCollectionSchema(schemapb.DataType_Int64), diff --git a/internal/proxy/hook_interceptor.go b/internal/proxy/hook_interceptor.go index 18dc28bfddf4d..c1e05316455dc 100644 --- a/internal/proxy/hook_interceptor.go +++ b/internal/proxy/hook_interceptor.go @@ -92,10 +92,8 @@ func UnaryServerHookInterceptor() grpc.UnaryServerInterceptor { func HookInterceptor(ctx context.Context, req any, userName, fullMethod string, handler grpc.UnaryHandler) (interface{}, error) { if hoo == nil { - if hookError := initHook(); hookError != nil { - logger.Error("hook error", zap.String("path", Params.ProxyCfg.SoPath.GetValue()), zap.Error(hookError)) - hoo = defaultHook{} - } + log.Warn("hook is not initialized") + return nil, errors.New("hook is not initialized") } var ( newCtx context.Context diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 74c9c980f5dc2..7d243be6d8008 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -148,6 +148,10 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) { } node.UpdateStateCode(commonpb.StateCode_Abnormal) expr.Register("proxy", node) + if hookError := initHook(); hookError != nil { + log.Warn("hook error", zap.String("path", Params.ProxyCfg.SoPath.GetValue()), zap.Error(hookError)) + hoo = defaultHook{} + } logutil.Logger(ctx).Debug("create a new Proxy instance", zap.Any("state", node.stateCode.Load())) return node, nil } diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 58c40a5511257..3401262e500ef 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -20,12 +20,13 @@ import ( "context" "fmt" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" - "go.uber.org/zap" ) // In a replica, a shard is available, if and only if: