diff --git a/common/constant/key.go b/common/constant/key.go index da21a3a9e1..6acb2299c4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -79,6 +79,10 @@ const ( EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler" PROVIDER_SHUTDOWN_FILTER = "pshutdown" CONSUMER_SHUTDOWN_FILTER = "cshutdown" + SYNC_REPORT_KEY = "sync.report" + RETRY_PERIOD_KEY = "retry.period" + RETRY_TIMES_KEY = "retry.times" + CYCLE_REPORT_KEY = "cycle.report" ) const ( diff --git a/config/instance/metedata_report.go b/config/instance/metedata_report.go index 9cf435bc9d..70e6ffc23d 100644 --- a/config/instance/metedata_report.go +++ b/config/instance/metedata_report.go @@ -28,14 +28,31 @@ import ( ) var ( - instance report.MetadataReport - once sync.Once + instance report.MetadataReport + reportUrl common.URL + once sync.Once ) -// GetMetadataReportInstance ... -func GetMetadataReportInstance(url *common.URL) report.MetadataReport { +// InitMetadataReportInstance will create the metadata report instance by the specified metadata report url +func InitMetadataReportInstance(url *common.URL) report.MetadataReport { once.Do(func() { instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + reportUrl = *url }) return instance } + +// GetMetadataReportInstance will return the instance +func GetMetadataReportInstance() report.MetadataReport { + return instance +} + +// GetMetadataReportUrl will return the report instance url +func GetMetadataReportUrl() common.URL { + return reportUrl +} + +// SetMetadataReportUrl will only can be used by unit test to mock url +func SetMetadataReportUrl(url common.URL) { + reportUrl = url +} diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 41fb6b4769..95506462a2 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -101,7 +101,7 @@ func startMetadataReport(metadataType string, metadataReportConfig *MetadataRepo } if url, err := metadataReportConfig.ToUrl(); err == nil { - instance.GetMetadataReportInstance(url) + instance.InitMetadataReportInstance(url) } else { return perrors.New("MetadataConfig is invalid!") } diff --git a/go.mod b/go.mod index fe1891ea6e..54e5ebcd93 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/dubbogo/gost v1.8.0 github.com/emicklei/go-restful/v3 v3.0.0 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect + github.com/go-co-op/gocron v0.1.1 github.com/go-errors/errors v1.0.1 // indirect github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect diff --git a/go.sum b/go.sum index 73c3da87a0..46235271ab 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v0.1.1 h1:OfDmkqkCguFtFMsm6Eaayci3DADLa8pXvdmOlPU/JcU= +github.com/go-co-op/gocron v0.1.1/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= @@ -152,6 +154,7 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+ github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= +github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= @@ -391,10 +394,14 @@ github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= @@ -526,6 +533,8 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -534,6 +543,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 8d4a584ee5..18578cb2c1 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -19,6 +19,8 @@ package definition import ( "bytes" + "encoding/json" + "fmt" ) import ( @@ -26,6 +28,11 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ServiceDefinition is a interface of service's definition +type ServiceDefiner interface { + ToBytes() ([]byte, error) +} + // ServiceDefinition is the describer of service definition type ServiceDefinition struct { CanonicalName string @@ -34,6 +41,39 @@ type ServiceDefinition struct { Types []TypeDefinition } +func (def ServiceDefinition) ToBytes() ([]byte, error) { + return json.Marshal(def) + +} + +func (def ServiceDefinition) String() string { + var methodStr string + for _, m := range def.Methods { + var paramType string + for _, p := range m.ParameterTypes { + paramType = paramType + fmt.Sprintf("{type:%v}", p) + } + var param string + for _, d := range m.Parameters { + param = param + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + } + methodStr = methodStr + fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType, m.ReturnType, param) + + } + var types string + for _, d := range def.Types { + types = types + fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName) + } + + return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr, types) +} + +// FullServiceDefinition is the describer of service definition with parameters +type FullServiceDefinition struct { + ServiceDefinition + Params map[string]string +} + // MethodDefinition is the describer of method definition type MethodDefinition struct { Name string diff --git a/metadata/definition/definition_test.go b/metadata/definition/definition_test.go new file mode 100644 index 0000000000..958f9324d0 --- /dev/null +++ b/metadata/definition/definition_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func TestBuildServiceDefinition(t *testing.T) { + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(serviceName, protocol, &UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := BuildServiceDefinition(*service, url) + assert.Equal(t, "{canonicalName:com.ikurento.user.UserProvider, codeSource:, methods:[{name:GetUser,parameterTypes:[{type:slice}],returnType:ptr,params:[] }], types:[]}", sd.String()) +} diff --git a/metadata/definition/mock.go b/metadata/definition/mock.go new file mode 100644 index 0000000000..ca9e125a74 --- /dev/null +++ b/metadata/definition/mock.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "context" + "time" +) + +type User struct { + Id string + Name string + Age int32 + Time time.Time +} + +type UserProvider struct { +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { + rsp := User{"A001", "Alex Stocks", 18, time.Now()} + return &rsp, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/metadata/identifier/service_metadata_identifier.go b/metadata/identifier/service_metadata_identifier.go index 92c15704db..7cdb55e53d 100644 --- a/metadata/identifier/service_metadata_identifier.go +++ b/metadata/identifier/service_metadata_identifier.go @@ -18,6 +18,7 @@ package identifier import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" ) @@ -28,6 +29,18 @@ type ServiceMetadataIdentifier struct { BaseMetadataIdentifier } +func NewServiceMetadataIdentifier(url common.URL) *ServiceMetadataIdentifier { + return &ServiceMetadataIdentifier{ + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: url.Service(), + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + Side: url.GetParam(constant.SIDE_KEY, ""), + }, + Protocol: url.Protocol, + } +} + // GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision"+Revision func (mdi *ServiceMetadataIdentifier) GetIdentifierKey() string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Protocol, constant.KEY_REVISON_PREFIX+mdi.Revision) diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index e599fc9e0d..fa35ab79d6 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -20,7 +20,7 @@ package identifier // SubscriberMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision type SubscriberMetadataIdentifier struct { Revision string - BaseMetadataIdentifier + MetadataIdentifier } // GetIdentifierKey will return string format as service:Version:Group:Side:Revision diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go new file mode 100644 index 0000000000..870625b9cb --- /dev/null +++ b/metadata/report/delegate/delegate_report.go @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "encoding/json" + "sync" + "time" +) + +import ( + "github.com/go-co-op/gocron" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +const ( + // defaultMetadataReportRetryTimes is defined for max times to retry + defaultMetadataReportRetryTimes int64 = 100 + // defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second + defaultMetadataReportRetryPeriod int64 = 3 + // defaultMetadataReportRetryPeriod is defined for cycle report or not + defaultMetadataReportCycleReport bool = true +) + +// metadataReportRetry is a scheduler for retrying task +type metadataReportRetry struct { + retryPeriod int64 + retryLimit int64 + scheduler *gocron.Scheduler + job *gocron.Job + retryCounter *atomic.Int64 + // if no failed report, wait how many times to run retry task. + retryTimesIfNonFail int64 +} + +// newMetadataReportRetry will create a scheduler for retry task +func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) { + s1 := gocron.NewScheduler(time.UTC) + + mrr := &metadataReportRetry{ + retryPeriod: retryPeriod, + retryLimit: retryLimit, + scheduler: s1, + retryCounter: atomic.NewInt64(0), + retryTimesIfNonFail: 600, + } + + newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do( + func() { + mrr.retryCounter.Inc() + logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load()) + if mrr.retryCounter.Load() > mrr.retryLimit { + mrr.scheduler.Clear() + } else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail { + mrr.scheduler.Clear() // may not interrupt the running job + } + }) + + mrr.job = newJob + return mrr, err +} + +// startRetryTask will make scheduler with retry task run +func (mrr *metadataReportRetry) startRetryTask() { + mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + mrr.scheduler.Start() +} + +// MetadataReport is a absolute delegate for MetadataReport +type MetadataReport struct { + reportUrl common.URL + syncReport bool + metadataReportRetry *metadataReportRetry + + failedReports map[*identifier.MetadataIdentifier]interface{} + failedReportsLock sync.RWMutex + + // allMetadataReports store all the metdadata reports records in memory + allMetadataReports map[*identifier.MetadataIdentifier]interface{} + allMetadataReportsLock sync.RWMutex +} + +// NewMetadataReport will create a MetadataReport with initiation +func NewMetadataReport() (*MetadataReport, error) { + url := instance.GetMetadataReportUrl() + bmr := &MetadataReport{ + reportUrl: url, + syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), + failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + } + + mrr, err := newMetadataReportRetry( + url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod), + url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes), + bmr.retry, + ) + + if err != nil { + return nil, err + } + + bmr.metadataReportRetry = mrr + if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) { + scheduler := gocron.NewScheduler(time.UTC) + _, err := scheduler.Every(1).Day().Do( + func() { + logger.Info("start to publish all metadata.") + bmr.allMetadataReportsLock.RLock() + bmr.doHandlerMetadataCollection(bmr.allMetadataReports) + bmr.allMetadataReportsLock.RUnlock() + + }) + if err != nil { + return nil, err + } + scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + scheduler.Start() + } + return bmr, nil +} + +// retry will do metadata failed reports collection by call metadata report sdk +func (bmr *MetadataReport) retry() bool { + bmr.failedReportsLock.RLock() + bmr.failedReportsLock.Unlock() + return bmr.doHandlerMetadataCollection(bmr.failedReports) +} + +// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition +func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { + if bmr.syncReport { + bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + } else { + go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + } +} + +// storeMetadataTask will delegate to call remote metadata's sdk to store +func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { + logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) + bmr.allMetadataReportsLock.Lock() + bmr.allMetadataReports[identifier] = definer + bmr.allMetadataReportsLock.Unlock() + + bmr.failedReportsLock.Lock() + delete(bmr.failedReports, identifier) + bmr.failedReportsLock.Unlock() + // data is store the json marshaled definition + var ( + data []byte + err error + ) + + defer func() { + if r := recover(); r != nil { + bmr.failedReportsLock.Lock() + bmr.failedReports[identifier] = definer + bmr.failedReportsLock.Unlock() + bmr.metadataReportRetry.startRetryTask() + logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r) + } + }() + + data, err = json.Marshal(definer) + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err) + panic(err) + } + report := instance.GetMetadataReportInstance() + if role == common.PROVIDER { + err = report.StoreProviderMetadata(identifier, string(data)) + } else if role == common.CONSUMER { + err = report.StoreConsumerMetadata(identifier, string(data)) + } + + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) + panic(err) + } + +} + +// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition +func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { + if bmr.syncReport { + bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + } else { + go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + } +} + +// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata +func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.SaveServiceMetadata(identifier, url) + } else { + go report.SaveServiceMetadata(identifier, url) + return nil + } +} + +// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata +func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.RemoveServiceMetadata(identifier) + } else { + go report.RemoveServiceMetadata(identifier) + return nil + } +} + +// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls +func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetExportedURLs(identifier) +} + +// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data +func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + report := instance.GetMetadataReportInstance() + if bmr.syncReport { + return report.SaveSubscribedData(identifier, urls) + } else { + go report.SaveSubscribedData(identifier, urls) + return nil + } +} + +// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls +func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetSubscribedURLs(identifier) +} + +// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions +func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string { + report := instance.GetMetadataReportInstance() + return report.GetServiceDefinition(identifier) +} + +// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap +func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { + if len(metadataMap) == 0 { + return true + } + for e := range metadataMap { + if common.RoleType(common.PROVIDER).Role() == e.Side { + bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) + } else if common.RoleType(common.CONSUMER).Role() == e.Side { + bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) + } + } + return false +} diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go new file mode 100644 index 0000000000..b5f8551a1a --- /dev/null +++ b/metadata/report/delegate/delegate_report_test.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "fmt" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +func TestMetadataReport_MetadataReportRetry(t *testing.T) { + counter := 1 + + retry, err := newMetadataReportRetry(1, 10, func() bool { + counter++ + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter, 3) + logger.Info("over") + } +} + +func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) { + counter := 1 + + retry, err := newMetadataReportRetry(1, 1, func() bool { + counter++ + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter, 2) + logger.Info("over") + } + +} + +func mockNewMetadataReport(t *testing.T) *MetadataReport { + syncReportKey := "false" + retryPeroidKey := "3" + retryTimesKey := "100" + cycleReportKey := "true" + + url, err := common.NewURL(fmt.Sprintf( + "test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+ + constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v", + syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey)) + assert.NoError(t, err) + instance.SetMetadataReportUrl(url) + mtr, err := NewMetadataReport() + assert.NoError(t, err) + assert.NotNil(t, mtr) + return mtr +} + +func TestMetadataReport_StoreProviderMetadata(t *testing.T) { + mtr := mockNewMetadataReport(t) + var metadataId = &identifier.MetadataIdentifier{ + Application: "app", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.ikurento.user.UserProvider", + Version: "0.0.1", + Group: "group1", + Side: "provider", + }, + } + + mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t)) +} + +func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) definition.ServiceDefinition { + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, id.ServiceInterface, id.Group, id.Version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(id.ServiceInterface, protocol, &definition.UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + return definition.BuildServiceDefinition(*service, url) +} diff --git a/metadata/report/report.go b/metadata/report/report.go index 81227e0c76..61cdda1f96 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -19,18 +19,17 @@ package report import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/identifier" ) // MetadataReport is an interface of remote metadata report type MetadataReport interface { - StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) - StoreConsumerMetadata(*identifier.MetadataIdentifier, map[string]string) - SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL) - RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) + StoreProviderMetadata(*identifier.MetadataIdentifier, string) error + StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error + SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error + RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string - SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []*common.URL) + SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string - GetServiceDefinition(*identifier.MetadataIdentifier) + GetServiceDefinition(*identifier.MetadataIdentifier) string } diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index c59949401f..df0ec7a4a7 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -17,7 +17,6 @@ package inmemory import ( - "encoding/json" "sync" ) @@ -53,13 +52,13 @@ func NewMetadataService() *MetadataService { } } -// comparator is defined as Comparator for skip list to compare the URL -type comparator common.URL +// Comparator is defined as Comparator for skip list to compare the URL +type Comparator common.URL // Compare is defined as Comparator for skip list to compare the URL -func (c comparator) Compare(comp cm.Comparator) int { +func (c Comparator) Compare(comp cm.Comparator) int { a := common.URL(c).String() - b := common.URL(comp.(comparator)).String() + b := common.URL(comp.(Comparator)).String() switch { case a > b: return 1 @@ -79,7 +78,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { logger.Debug(url.ServiceKey()) if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded { mts.lock.RLock() - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.RUnlock() return false @@ -88,12 +87,12 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { } mts.lock.Lock() //double chk - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.Unlock() return false } - urlSet.(*skip.SkipList).Insert(comparator(*url)) + urlSet.(*skip.SkipList).Insert(Comparator(*url)) mts.lock.Unlock() return true } @@ -102,7 +101,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { if value, loaded := targetMap.Load(url.ServiceKey()); loaded { mts.lock.Lock() - value.(*skip.SkipList).Delete(comparator(*url)) + value.(*skip.SkipList).Delete(Comparator(*url)) mts.lock.Unlock() mts.lock.RLock() defer mts.lock.RUnlock() @@ -118,9 +117,9 @@ func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList { services.Range(func(key, value interface{}) bool { urls := value.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) + url := common.URL(urls.ByPosition(i).(Comparator)) if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" { - skipList.Insert(comparator(url)) + skipList.Insert(Comparator(url)) } } return true @@ -135,9 +134,9 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s if loaded { urls := serviceList.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) + url := common.URL(urls.ByPosition(i).(Comparator)) if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol { - skipList.Insert(comparator(url)) + skipList.Insert(Comparator(url)) } } } @@ -182,9 +181,9 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { // //TODO:generate the service definition and store it //} sd := definition.BuildServiceDefinition(*service, url) - data, err := json.Marshal(sd) + data, err := sd.ToBytes() if err != nil { - logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err) + logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err) } mts.serviceDefinitions.Store(url.ServiceKey(), string(data)) return nil @@ -221,12 +220,12 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) return v.(string), nil } -// Version will return the version of metadata service -func (mts *MetadataService) Version() string { - return "1.0.0" +// RefreshMetadata will always return true because it will be implement by remote service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + return true } // Version will return the version of metadata service -func (mts *MetadataService) Reference() string { - return "MetadataService" +func (mts *MetadataService) Version() string { + return "1.0.0" } diff --git a/metadata/service/inmemory/service_test.go b/metadata/service/inmemory/service_test.go index 9e593db282..fc0410ecca 100644 --- a/metadata/service/inmemory/service_test.go +++ b/metadata/service/inmemory/service_test.go @@ -18,10 +18,8 @@ package inmemory import ( - "context" "fmt" "testing" - "time" ) import ( @@ -33,29 +31,6 @@ import ( "github.com/apache/dubbo-go/metadata/definition" ) -type User struct { - Id string - Name string - Age int32 - Time time.Time -} - -type UserProvider struct { -} - -func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { - rsp := User{"A001", "Alex Stocks", 18, time.Now()} - return &rsp, nil -} - -func (u *UserProvider) Reference() string { - return "UserProvider" -} - -func (u User) JavaClassName() string { - return "com.ikurento.user.User" -} - func TestMetadataService(t *testing.T) { mts := NewMetadataService() serviceName := "com.ikurento.user.UserProvider" @@ -111,7 +86,7 @@ func TestMetadataService(t *testing.T) { list4, _ := mts.GetSubscribedURLs() assert.Equal(t, uint64(0), list4.Len()) - userProvider := &UserProvider{} + userProvider := &definition.UserProvider{} common.ServiceMap.Register(serviceName, protocol, userProvider) mts.PublishServiceDefinition(u) expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go new file mode 100644 index 0000000000..706f2f919c --- /dev/null +++ b/metadata/service/remote/service.go @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "github.com/Workiva/go-datastructures/slice/skip" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report/delegate" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +// MetadataService is a implement of metadata service which will delegate the remote metadata report +type MetadataService struct { + service.BaseMetadataService + inMemoryMetadataService *inmemory.MetadataService + exportedRevision atomic.String + subscribedRevision atomic.String + delegateReport *delegate.MetadataReport +} + +// NewMetadataService will create a new remote MetadataService instance +func NewMetadataService() (*MetadataService, error) { + mr, err := delegate.NewMetadataReport() + if err != nil { + return nil, err + } + return &MetadataService{ + inMemoryMetadataService: inmemory.NewMetadataService(), + delegateReport: mr, + }, nil +} + +// setInMemoryMetadataService will replace the in memory metadata service by the specific param +func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) { + mts.inMemoryMetadataService = metadata +} + +// ExportURL will be implemented by in memory service +func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { + return true, nil +} + +// UnexportURL +func (mts *MetadataService) UnexportURL(url common.URL) error { + smi := identifier.NewServiceMetadataIdentifier(url) + smi.Revision = mts.exportedRevision.Load() + return mts.delegateReport.RemoveServiceMetadata(smi) +} + +// SubscribeURL will be implemented by in memory service +func (MetadataService) SubscribeURL(url common.URL) (bool, error) { + return true, nil +} + +// UnsubscribeURL will be implemented by in memory service +func (MetadataService) UnsubscribeURL(url common.URL) error { + return nil +} + +// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition +func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { + interfaceName := url.GetParam(constant.INTERFACE_KEY, "") + isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) + if len(interfaceName) > 0 && !isGeneric { + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := definition.BuildServiceDefinition(*service, url) + id := &identifier.MetadataIdentifier{ + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: interfaceName, + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + }, + } + mts.delegateReport.StoreProviderMetadata(id, sd) + } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil +} + +// GetExportedURLs will be implemented by in memory service +func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { + return nil, nil +} + +// GetSubscribedURLs will be implemented by in memory service +func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { + return nil, nil +} + +// GetServiceDefinition will be implemented by in memory service +func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + return "", nil +} + +// GetServiceDefinitionByServiceKey will be implemented by in memory service +func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + return "", nil +} + +// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + result := true + if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() { + mts.exportedRevision.Store(exportedRevision) + urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + iterator := urls.Iter(inmemory.Comparator{}) + logger.Infof("urls length = %v", urls.Len()) + for { + if !iterator.Next() { + break + } + url := iterator.Value().(inmemory.Comparator) + id := identifier.NewServiceMetadataIdentifier(common.URL(url)) + id.Revision = mts.exportedRevision.Load() + if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + + } + } + + if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() { + mts.subscribedRevision.Store(subscribedRevision) + urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + if urls != nil && urls.Len() > 0 { + id := &identifier.SubscriberMetadataIdentifier{ + MetadataIdentifier: identifier.MetadataIdentifier{ + Application: config.GetApplicationConfig().Name, + }, + Revision: subscribedRevision, + } + if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + result = false + } + } + } + return result +} + +// Version will return the remote service version +func (MetadataService) Version() string { + return "1.0.0" +} + +// convertUrls will convert the skip list to slice +func convertUrls(list *skip.SkipList) []common.URL { + urls := make([]common.URL, list.Len()) + iterator := list.Iter(inmemory.Comparator{}) + for { + if iterator.Value() == nil { + break + } + url := iterator.Value().(inmemory.Comparator) + urls = append(urls, common.URL(url)) + if !iterator.Next() { + break + } + } + return urls +} diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go new file mode 100644 index 0000000000..05378cb8b1 --- /dev/null +++ b/metadata/service/remote/service_test.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4) +var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4) + +func getMetadataReportFactory() factory.MetadataReportFactory { + return &metadataReportFactory{} +} + +type metadataReportFactory struct { +} + +func (mrf *metadataReportFactory) CreateMetadataReport(*common.URL) report.MetadataReport { + return &metadataReport{} +} + +type metadataReport struct { +} + +func (metadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (metadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (mr *metadataReport) SaveServiceMetadata(id *identifier.ServiceMetadataIdentifier, url common.URL) error { + logger.Infof("SaveServiceMetadata , url is %v", url) + serviceMetadata[id] = url + return nil +} + +func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error { + return nil +} + +func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string { + return nil +} + +func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + logger.Infof("SaveSubscribedData, , url is %v", urls) + subscribedMetadata[id] = urls + return nil +} + +func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string { + return nil +} + +func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string { + return "" +} + +func TestMetadataService(t *testing.T) { + extension.SetMetadataReportFactory("mock", getMetadataReportFactory) + u, err := common.NewURL(fmt.Sprintf( + "mock://127.0.0.1:20000/?sync.report=true")) + assert.NoError(t, err) + instance.InitMetadataReportInstance(&u) + mts, err := NewMetadataService() + assert.NoError(t, err) + mts.setInMemoryMetadataService(mockInmemoryProc(t)) + mts.RefreshMetadata("0.0.1", "0.0.1") + assert.Equal(t, 1, len(serviceMetadata)) + assert.Equal(t, 1, len(subscribedMetadata)) +} + +func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { + mts := inmemory.NewMetadataService() + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + + u, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + mts.ExportURL(u) + + mts.SubscribeURL(u) + + userProvider := &definition.UserProvider{} + common.ServiceMap.Register(serviceName, protocol, userProvider) + mts.PublishServiceDefinition(u) + expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + + "\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," + + "\"Parameters\":null}],\"Types\":null}" + def1, _ := mts.GetServiceDefinition(serviceName, group, version) + assert.Equal(t, expected, def1) + serviceKey := definition.ServiceDescriperBuild(serviceName, group, version) + def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey) + assert.Equal(t, expected, def2) + return mts +} diff --git a/metadata/service/service.go b/metadata/service/service.go index bc526c5411..a1b812c0ac 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -49,6 +49,8 @@ type MetadataService interface { GetServiceDefinition(interfaceName string, group string, version string) (string, error) // GetServiceDefinition will get the target service info store in metadata by service key GetServiceDefinitionByServiceKey(serviceKey string) (string, error) + // RefreshMetadata will refresh the metadata + RefreshMetadata(exportedRevision string, subscribedRevision string) bool // Version will return the metadata service version Version() string } @@ -61,3 +63,8 @@ type BaseMetadataService struct { func (mts *BaseMetadataService) ServiceName() (string, error) { return config.GetApplicationConfig().Name, nil } + +// Version will return the version of metadata service +func (mts *BaseMetadataService) Reference() string { + return "MetadataService" +}