Skip to content

Commit

Permalink
Merge pull request #151 from lzp0412/develop
Browse files Browse the repository at this point in the history
support nacos registry
  • Loading branch information
hxmhlt authored Aug 12, 2019
2 parents 046699a + d448aa8 commit 96bf3ea
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 5 deletions.
12 changes: 12 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,15 @@ const (
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
)

const (
NACOS_KEY = "nacos"
NACOS_DEFAULT_ROLETYPE = 3
NACOS_CACHE_DIR_KEY = "cacheDir"
NACOS_LOG_DIR_KEY = "logDir"
NACOS_ENDPOINT = "endpoint"
NACOS_SERVICE_NAME_SEPARATOR = ":"
NACOS_CATEGORY_KEY = "category"
NACOS_PROTOCOL_KEY = "protocol"
NACOS_PATH_KEY = "path"
)
1 change: 1 addition & 0 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GROUP_KEY, refconfig.Group)
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))

Expand Down
11 changes: 7 additions & 4 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type RegistryConfig struct {
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}

func (*RegistryConfig) Prefix() string {
Expand Down Expand Up @@ -109,6 +110,8 @@ func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr)

for k, v := range regconfig.Params {
urlMap.Set(k, v)
}
return urlMap
}
1 change: 1 addition & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
//application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
Expand Down
13 changes: 12 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,26 @@ module github.com/apache/dubbo-go

require (
github.com/Workiva/go-datastructures v1.0.50
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/golang/mock v1.3.1
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/stretchr/testify v1.3.0
github.com/tebeka/strftime v0.1.3 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
gopkg.in/yaml.v2 v2.2.2
Expand Down
59 changes: 59 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo=
github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -10,23 +16,69 @@ github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE=
github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
Expand All @@ -42,9 +94,16 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
199 changes: 199 additions & 0 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package nacos

import (
"bytes"
"net/url"
"reflect"
"strconv"
"sync"
)

import (
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)

type nacosListener struct {
namingClient naming_client.INamingClient
listenUrl common.URL
events chan *remoting.ConfigChangeEvent
instanceMap map[string]model.Instance
cacheLock sync.Mutex
done chan struct{}
subscribeParam *vo.SubscribeParam
}

func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
}

func generateInstance(ss model.SubscribeService) model.Instance {
return model.Instance{
InstanceId: ss.InstanceId,
Ip: ss.Ip,
Port: ss.Port,
ServiceName: ss.ServiceName,
Valid: ss.Valid,
Enable: ss.Enable,
Weight: ss.Weight,
Metadata: ss.Metadata,
ClusterName: ss.ClusterName,
}
}

func generateUrl(instance model.Instance) *common.URL {
if instance.Metadata == nil {
logger.Errorf("nacos instance metadata is empty,instance:%+v", instance)
return nil
}
path := instance.Metadata["path"]
myInterface := instance.Metadata["interface"]
if len(path) == 0 && len(myInterface) == 0 {
logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance)
return nil
}
if len(path) == 0 && len(myInterface) != 0 {
path = "/" + myInterface
}
protocol := instance.Metadata["protocol"]
if len(protocol) == 0 {
logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance)
return nil
}
urlMap := url.Values{}
for k, v := range instance.Metadata {
urlMap.Set(k, v)
}
return common.NewURLWithOptions(
common.WithIp(instance.Ip),
common.WithPort(strconv.Itoa(int(instance.Port))),
common.WithProtocol(protocol),
common.WithParams(urlMap),
common.WithPath(path),
)
}

func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
return
}
nl.cacheLock.Lock()
defer nl.cacheLock.Unlock()
addInstances := make([]model.Instance, 0, len(services))
delInstances := make([]model.Instance, 0, len(services))
updateInstances := make([]model.Instance, 0, len(services))

newInstanceMap := make(map[string]model.Instance, len(services))

for i := range services {
if !services[i].Enable || !services[i].Valid {
// instance is not available,so ignore it
continue
}
host := services[i].Ip + ":" + strconv.Itoa(int(services[i].Port))
instance := generateInstance(services[i])
newInstanceMap[host] = instance
if old, ok := nl.instanceMap[host]; !ok {
//instance is not exsit in cache,add it to cache
addInstances = append(addInstances, instance)
} else {
//instance is not different from cache,update it to cache
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
}
}

for host, inst := range nl.instanceMap {
if _, ok := newInstanceMap[host]; !ok {
//cache instance is not exsit in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}

nl.instanceMap = newInstanceMap

for i := range addInstances {
newUrl := generateUrl(addInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd})
}
}
for i := range delInstances {
newUrl := generateUrl(delInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel})
}
}

for i := range updateInstances {
newUrl := generateUrl(updateInstances[i])
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate})
}
}
}

func getSubscribeName(url common.URL) string {
var buffer bytes.Buffer

buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
appendParam(&buffer, url, constant.INTERFACE_KEY)
appendParam(&buffer, url, constant.VERSION_KEY)
appendParam(&buffer, url, constant.GROUP_KEY)
return buffer.String()
}

func (nl *nacosListener) startListen() error {
if nl.namingClient == nil {
return perrors.New("nacos naming client stopped")
}
serviceName := getSubscribeName(nl.listenUrl)
nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback}
return nl.namingClient.Subscribe(nl.subscribeParam)
}

func (nl *nacosListener) stopListen() error {
return nl.namingClient.Unsubscribe(nl.subscribeParam)
}

func (nl *nacosListener) process(configType *remoting.ConfigChangeEvent) {
nl.events <- configType
}

func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-nl.done:
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl)
return nil, perrors.New("listener stopped")

case e := <-nl.events:
logger.Debugf("got nacos event %s", e)
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}

func (nl *nacosListener) Close() {
nl.stopListen()
close(nl.done)
}
Loading

0 comments on commit 96bf3ea

Please sign in to comment.