Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: add pass through proxy factory #1081

Merged
merged 14 commits into from
Apr 3, 2021
5 changes: 4 additions & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (
PROTOCOL_KEY = "protocol"
PATH_SEPARATOR = "/"
//DUBBO_KEY = "dubbo"
SSL_ENABLED_KEY = "ssl-enabled"
SSL_ENABLED_KEY = "ssl-enabled"
ParameterTypeKey = "parameter-type-names" // ParameterType key used in dapr, to tranfer rsp param type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe follow the codeSSL_ENABLED_KEY .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

)

const (
Expand Down Expand Up @@ -199,6 +200,8 @@ const (
// default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s"
DEREGISTER_AFTER = "consul-deregister-critical-service-after"
// PassThroughProxyFactoryKey is key of proxy factory with raw data input service
PassThroughProxyFactoryKey = "dubbo-raw"
)

const (
Expand Down
124 changes: 124 additions & 0 deletions common/proxy/proxy_factory/pass_through.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package proxy_factory

import (
"context"
"reflect"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/proxy"
"github.com/apache/dubbo-go/protocol"
)

func init() {
extension.SetProxyFactory(constant.PassThroughProxyFactoryKey, NewPassThroughProxyFactory)
}

// PassThroughProxyFactory is the factory of PassThroughProxyInvoker
type PassThroughProxyFactory struct {
}

// NewPassThroughProxyFactory returns a proxy factory instance
func NewPassThroughProxyFactory(_ ...proxy.Option) proxy.ProxyFactory {
return &PassThroughProxyFactory{}
}

// GetProxy gets a proxy
func (factory *PassThroughProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
return factory.GetAsyncProxy(invoker, nil, url)
}

// GetAsyncProxy gets a async proxy
func (factory *PassThroughProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy {
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, callBack, attachments)
}

// GetInvoker gets a invoker
func (factory *PassThroughProxyFactory) GetInvoker(url *common.URL) protocol.Invoker {
return &PassThroughProxyInvoker{
ProxyInvoker: &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
},
}
}

// PassThroughProxyInvoker is a invoker struct, it calls service with specific method 'Serivce' and params:
// Service(method string, argsTypes []string, args [][]byte, attachment map[string]interface{})
// PassThroughProxyInvoker pass through raw invocation data and method name to service, which will deal with them.
type PassThroughProxyInvoker struct {
*ProxyInvoker
}

// Invoke is used to call service method by invocation
func (pi *PassThroughProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
result.SetAttachments(invocation.Attachments())
url := getProviderURL(pi.GetUrl())

arguments := invocation.Arguments()
srv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())

var args [][]byte
if len(arguments) == 0 {
args = [][]byte{}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce a if-else branch by

if len(arguments) > 0 {
	args = make([][]byte, 0, len(arguments))
	for _, arg := range arguments {
		if v, ok := arg.([]byte); ok {
			args = append(args, v)
		} else {
			result.Err = perrors.New("the param type is not []byte")
			return result
		}
	}
}

args = make([][]byte, 0, len(arguments))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same comment as line 93: pls using gxbytes.AcquireBytes instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gxbytes.AcquireBytes() is used to get specific make([]byte, 0, size), not to make [][]byte.

for _, arg := range arguments {
if v, ok := arg.([]byte); ok {
args = append(args, v)
} else {
result.Err = perrors.New("the param type is not []byte")
return result
}
}
}
method := srv.Method()["Service"]

in := []reflect.Value{srv.Rcvr()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Init with length in := make([]reflect.Value, 5).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

in = append(in, reflect.ValueOf(invocation.MethodName()))
in = append(in, reflect.ValueOf(invocation.Attachment(constant.ParameterTypeKey)))
in = append(in, reflect.ValueOf(args))
in = append(in, reflect.ValueOf(invocation.Attachments()))

returnValues := method.Method().Func.Call(in)

var retErr interface{}
replyv := returnValues[0]
retErr = returnValues[1].Interface()

if retErr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more appropriate.

	if retErr != nil {
		result.SetError(retErr.(error))
                return result
	}
	if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
		result.SetResult(replyv.Interface())
	}
	return result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

result.SetError(retErr.(error))
} else {
if replyv.IsValid() && (replyv.Kind() != reflect.Ptr || replyv.Kind() == reflect.Ptr && replyv.Elem().IsValid()) {
result.SetResult(replyv.Interface())
}
}
return result
}
61 changes: 61 additions & 0 deletions common/proxy/proxy_factory/pass_through_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package proxy_factory

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import (
"fmt"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

func TestPassThroughProxyFactoryGetProxy(t *testing.T) {
proxyFactory := NewPassThroughProxyFactory()
url := common.NewURLWithOptions()
proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(url), url)
assert.NotNil(t, proxy)
}

type TestPassThroughProxyFactoryAsync struct {
}

func (u *TestPassThroughProxyFactoryAsync) CallBack(res common.CallbackResponse) {
fmt.Println("CallBack res:", res)
}

func TestPassThroughProxyFactoryGetAsyncProxy(t *testing.T) {
proxyFactory := NewPassThroughProxyFactory()
url := common.NewURLWithOptions()
async := &TestPassThroughProxyFactoryAsync{}
proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(url), async.CallBack, url)
assert.NotNil(t, proxy)
}

func TestPassThroughProxyFactoryGetInvoker(t *testing.T) {
proxyFactory := NewPassThroughProxyFactory()
url := common.NewURLWithOptions()
invoker := proxyFactory.GetInvoker(url)
assert.True(t, invoker.IsAvailable())
}
2 changes: 1 addition & 1 deletion registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
reserveParams = []string{
"application", "codec", "exchanger", "serialization", "cluster", "connections", "deprecated", "group",
"loadbalance", "mock", "path", "timeout", "token", "version", "warmup", "weight", "timestamp", "dubbo",
"release", "interface",
"release", "interface", "registry.role",
}
)

Expand Down