Skip to content

Commit

Permalink
update httpserver
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Feb 23, 2024
1 parent 7b01562 commit 87492de
Show file tree
Hide file tree
Showing 18 changed files with 892 additions and 623 deletions.
18 changes: 18 additions & 0 deletions internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,21 @@ func (c *Client) GetDdChannel(ctx context.Context, req *internalpb.GetDdChannelR
return client.GetDdChannel(ctx, req)
})
}

func (c *Client) ImportV2(ctx context.Context, req *internalpb.ImportRequest, opts ...grpc.CallOption) (*internalpb.ImportResponse, error) {
return wrapGrpcCall(ctx, c, func(client proxypb.ProxyClient) (*internalpb.ImportResponse, error) {
return client.ImportV2(ctx, req)
})
}

func (c *Client) GetImportProgress(ctx context.Context, req *internalpb.GetImportProgressRequest, opts ...grpc.CallOption) (*internalpb.GetImportProgressResponse, error) {
return wrapGrpcCall(ctx, c, func(client proxypb.ProxyClient) (*internalpb.GetImportProgressResponse, error) {
return client.GetImportProgress(ctx, req)
})
}

func (c *Client) ListImports(ctx context.Context, req *internalpb.ListImportsRequest, opts ...grpc.CallOption) (*internalpb.ListImportsResponse, error) {
return wrapGrpcCall(ctx, c, func(client proxypb.ProxyClient) (*internalpb.ListImportsResponse, error) {
return client.ListImports(ctx, req)
})
}
29 changes: 29 additions & 0 deletions internal/distributed/proxy/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,32 @@ func Test_GetDdChannel(t *testing.T) {
_, err = client.GetDdChannel(ctx, &internalpb.GetDdChannelRequest{})
assert.ErrorIs(t, err, context.DeadlineExceeded)
}

func Test_ImportV2(t *testing.T) {
paramtable.Init()
ctx := context.Background()

client, err := NewClient(ctx, "test", 1)
assert.NoError(t, err)
defer client.Close()

mockProxy := mocks.NewMockProxyClient(t)
mockGrpcClient := mocks.NewMockGrpcClient[proxypb.ProxyClient](t)
mockGrpcClient.EXPECT().Close().Return(nil)
mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(proxypb.ProxyClient) (interface{}, error)) (interface{}, error) {
return f(mockProxy)
})
client.(*Client).grpcClient = mockGrpcClient

mockProxy.EXPECT().ImportV2(mock.Anything, mock.Anything).Return(&internalpb.ImportResponse{Status: merr.Success()}, nil)
_, err = client.ImportV2(ctx, &internalpb.ImportRequest{})
assert.Nil(t, err)

mockProxy.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{Status: merr.Success()}, nil)
_, err = client.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{})
assert.Nil(t, err)

mockProxy.EXPECT().ListImports(mock.Anything, mock.Anything).Return(&internalpb.ListImportsResponse{Status: merr.Success()}, nil)
_, err = client.ListImports(ctx, &internalpb.ListImportsRequest{})
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an
jobIDGetter := anyReq.(JobIDGetter)
req := &internalpb.GetImportProgressRequest{
DbName: dbName,
JobID: strconv.FormatInt(jobIDGetter.GetJobID(), 10),
JobID: jobIDGetter.GetJobID(),
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.GetImportProgress(reqCtx, req.(*internalpb.GetImportProgressRequest))
Expand Down
14 changes: 7 additions & 7 deletions internal/distributed/proxy/httpserver/handler_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,18 +810,18 @@ func TestMethodPost(t *testing.T) {
mp.EXPECT().ListImports(mock.Anything, mock.Anything).Return(&internalpb.ListImportsResponse{
Status: &StatusSuccess,
JobIDs: []string{"1", "2", "3", "4"},
States: []internalpb.ImportState{
internalpb.ImportState_Pending,
internalpb.ImportState_InProgress,
internalpb.ImportState_Failed,
internalpb.ImportState_Completed,
States: []internalpb.ImportJobState{
internalpb.ImportJobState_Pending,
internalpb.ImportJobState_Importing,
internalpb.ImportJobState_Failed,
internalpb.ImportJobState_Completed,
},
Reasons: []string{"", "", "mock reason", ""},
Progresses: []int64{0, 30, 0, 100},
}, nil).Once()
mp.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{
Status: &StatusSuccess,
State: internalpb.ImportState_Completed,
State: internalpb.ImportJobState_Completed,
Reason: "",
Progress: 100,
}, nil).Once()
Expand Down Expand Up @@ -903,7 +903,7 @@ func TestMethodPost(t *testing.T) {
`"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` +
`"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` +
`"aliasName": "` + DefaultAliasName + `",` +
`"jobID": 1234567890,` +
`"jobID": "1234567890",` +
`"files": [["book.json"]]` +
`}`))
req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader)
Expand Down
8 changes: 4 additions & 4 deletions internal/distributed/proxy/httpserver/request_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (req *ImportReq) GetCollectionName() string {
}

func (req *ImportReq) GetPartitionName() string {
return req.CollectionName
return req.PartitionName
}

func (req *ImportReq) GetFiles() [][]string {
Expand All @@ -81,10 +81,10 @@ func (req *ImportReq) GetOptions() map[string]string {
}

type JobIDReq struct {
JobID int64 `json:"jobID" binding:"required"`
JobID string `json:"jobID" binding:"required"`
}

func (req *JobIDReq) GetJobID() int64 { return req.JobID }
func (req *JobIDReq) GetJobID() string { return req.JobID }

type QueryReqV2 struct {
DbName string `json:"dbName"`
Expand Down Expand Up @@ -211,7 +211,7 @@ type OptionsGetter interface {
GetOptions() map[string]string
}
type JobIDGetter interface {
GetJobID() int64
GetJobID() string
}

type PasswordReq struct {
Expand Down
12 changes: 12 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,3 +1202,15 @@ func (s *Server) AllocTimestamp(ctx context.Context, req *milvuspb.AllocTimestam
func (s *Server) ReplicateMessage(ctx context.Context, req *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) {
return s.proxy.ReplicateMessage(ctx, req)
}

func (s *Server) ImportV2(ctx context.Context, req *internalpb.ImportRequest) (*internalpb.ImportResponse, error) {
return s.proxy.ImportV2(ctx, req)
}

func (s *Server) GetImportProgress(ctx context.Context, req *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
return s.proxy.GetImportProgress(ctx, req)
}

func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsRequest) (*internalpb.ListImportsResponse, error) {
return s.proxy.ListImports(ctx, req)
}
16 changes: 8 additions & 8 deletions internal/mocks/mock_datacoord.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions internal/mocks/mock_datacoord_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 87492de

Please sign in to comment.