Skip to content

Commit bbbf2d5

Browse files
author
kifile
committed
feat: Add nacos registry support.
1 parent ae74a99 commit bbbf2d5

File tree

7 files changed

+201
-1
lines changed

7 files changed

+201
-1
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ Issues = "https://github.com/apache/dubbo/issues"
7070
zookeeper = [
7171
"kazoo>=2.10.0",
7272
]
73+
nacos = [
74+
"nacos-sdk-python>=2.0.6",
75+
]
7376

7477
### Hatch settings ###
7578
[tool.hatch.version]

src/dubbo/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717
from .bootstrap import Dubbo
1818
from .client import Client
1919
from .server import Server
20+
from .__about__ import __version__
2021

21-
__all__ = ["Dubbo", "Client", "Server"]
22+
__all__ = ["Dubbo", "Client", "Server", "__version__"]

src/dubbo/configs.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ class RegistryConfig(AbstractConfig):
491491
"_load_balance",
492492
"_group",
493493
"_version",
494+
"_namespace",
494495
]
495496

496497
def __init__(
@@ -503,6 +504,7 @@ def __init__(
503504
load_balance: Optional[str] = None,
504505
group: Optional[str] = None,
505506
version: Optional[str] = None,
507+
namespace: Optional[str] = None,
506508
):
507509
"""
508510
Initialize the registry configuration.
@@ -522,6 +524,8 @@ def __init__(
522524
:type group: Optional[str]
523525
:param version: The version of the registry.
524526
:type version: Optional[str]
527+
:param namespace: The namespace of the registry.
528+
:type namespace: Optional[str]
525529
"""
526530
super().__init__()
527531

@@ -533,6 +537,7 @@ def __init__(
533537
self._load_balance = load_balance
534538
self._group = group
535539
self._version = version
540+
self._namespace = namespace
536541

537542
@property
538543
def protocol(self) -> str:
@@ -677,6 +682,24 @@ def version(self, version: str) -> None:
677682
:type version: str
678683
"""
679684
self._version = version
685+
686+
@property
687+
def namespace(self) -> Optional[str]:
688+
"""
689+
Get the namespace of the registry.
690+
:return: The namespace of the registry.
691+
:rtype: Optional[str]
692+
"""
693+
return self._namespace
694+
695+
@namespace.setter
696+
def namespace(self, namespace: str) -> None:
697+
"""
698+
Set the namespace of the registry.
699+
:param namespace: The namespace of the registry.
700+
:type namespace: str
701+
"""
702+
self._namespace = namespace
680703

681704
def to_url(self) -> URL:
682705
"""
@@ -687,10 +710,13 @@ def to_url(self) -> URL:
687710
parameters = {}
688711
if self.load_balance:
689712
parameters[registry_constants.LOAD_BALANCE_KEY] = self.load_balance
713+
if self.namespace:
714+
parameters[registry_constants.NAMESPACE] = self.namespace
690715
if self.group:
691716
parameters[config_constants.GROUP] = self.group
692717
if self.version:
693718
parameters[config_constants.VERSION] = self.version
719+
694720

695721
return URL(
696722
scheme=self.protocol,
@@ -721,6 +747,7 @@ def from_url(cls, url: Union[str, URL]) -> "RegistryConfig":
721747
load_balance=url.parameters.get(registry_constants.LOAD_BALANCE_KEY),
722748
group=url.parameters.get(config_constants.GROUP),
723749
version=url.parameters.get(config_constants.VERSION),
750+
namespace=url.parameters.get(registry_constants.NAMESPACE),
724751
)
725752

726753

src/dubbo/constants/registry_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@
2222

2323

2424
LOAD_BALANCE_KEY = "loadbalance"
25+
NAMESPACE = "namespace"

src/dubbo/extension/registries.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class ExtendedRegistry:
5454
interface=RegistryFactory,
5555
impls={
5656
"zookeeper": "dubbo.registry.zookeeper.zk_registry.ZookeeperRegistryFactory",
57+
"nacos": "dubbo.registry.nacos.nacos_registry.NacosRegistryFactory",
5758
},
5859
)
5960

src/dubbo/registry/nacos/__init__.py

Whitespace-only changes.
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import dubbo
2+
3+
from nacos import NacosClient
4+
from nacos.timer import NacosTimer, NacosTimerManager
5+
6+
from dubbo.constants import common_constants, registry_constants
7+
from dubbo.registry import NotifyListener, Registry, RegistryFactory
8+
from dubbo.url import URL
9+
from dubbo.loggers import loggerFactory
10+
11+
_LOGGER = loggerFactory.get_logger()
12+
13+
DEFAULT_APPLICATION = "DEFAULT"
14+
15+
16+
class NacosSubscriber:
17+
"""
18+
Nacos instance subscriber
19+
"""
20+
21+
def __init__(
22+
self, nacos_client: NacosClient, service_name: str, listener: NotifyListener
23+
):
24+
self._nacos_client = nacos_client
25+
self._service_name = service_name
26+
self._listener = listener
27+
self._timer_manager = NacosTimerManager()
28+
self._subscribed = False
29+
30+
def refresh_instances(self):
31+
"""
32+
Refresh nacos instances
33+
"""
34+
if not self._subscribed:
35+
return
36+
37+
try:
38+
instances = self._nacos_client.list_naming_instance(self._service_name)
39+
hosts = instances["hosts"]
40+
urls = [
41+
URL(scheme="tri", host=h["ip"], port=h["port"])
42+
for h in hosts
43+
if h["enabled"]
44+
]
45+
self._listener.notify(urls=urls)
46+
except Exception as e:
47+
_LOGGER.error("nacos subscriber refresh_instance failed: %s", e)
48+
49+
def subscribe(self):
50+
"""
51+
Start timer to watch instances
52+
"""
53+
if not self._timer_manager.all_timers().get("refresh_instances"):
54+
self._timer_manager.add_timer(
55+
NacosTimer("refresh_instances", self.refresh_instances, interval=7)
56+
)
57+
self._timer_manager.execute()
58+
self._subscribed = True
59+
self.refresh_instances()
60+
61+
def unsubscribe(self):
62+
self._subscribed = False
63+
64+
65+
def _init_nacos_client(url: URL) -> NacosClient:
66+
server_address = f"{url.host}:{url.port if url.port else 8848}"
67+
parameters = url.parameters
68+
69+
endpoint = parameters.get("endpoint")
70+
namespace = parameters.get(registry_constants.NAMESPACE)
71+
username = url.username
72+
password = url.password
73+
74+
return NacosClient(
75+
server_addresses=server_address,
76+
endpoint=endpoint,
77+
namespace=namespace,
78+
username=username,
79+
password=password,
80+
)
81+
82+
83+
def _build_nacos_service_name(url: URL):
84+
service_name = url.parameters.get(common_constants.SERVICE_KEY)
85+
return f"providers:{service_name}::"
86+
87+
88+
class NacosRegistry(Registry):
89+
90+
def __init__(self, url: URL):
91+
self._url = url
92+
self._nacos_client: NacosClient = _init_nacos_client(url)
93+
self._service_subscriber_mapping = {}
94+
95+
def _service_subscriber(
96+
self, service_name: str, listener: NotifyListener
97+
) -> NacosSubscriber:
98+
if service_name not in self._service_subscriber_mapping:
99+
self._service_subscriber_mapping[service_name] = NacosSubscriber(
100+
self._nacos_client, service_name=service_name, listener=listener
101+
)
102+
103+
return self._service_subscriber_mapping[service_name]
104+
105+
def register(self, url: URL) -> None:
106+
ip = url.host
107+
port = url.port
108+
109+
nacos_service_name = _build_nacos_service_name(url)
110+
111+
metadata = {
112+
"side": "provider",
113+
"release": f"{dubbo.__version__}_py",
114+
"protocol": "tri",
115+
"application": DEFAULT_APPLICATION,
116+
"category": "providers",
117+
"enabled": "true",
118+
"disabled": "false",
119+
}
120+
self._nacos_client.add_naming_instance(
121+
nacos_service_name,
122+
ip,
123+
port,
124+
DEFAULT_APPLICATION,
125+
metadata=metadata,
126+
heartbeat_interval=1,
127+
)
128+
129+
def unregister(self, url: URL) -> None:
130+
ip = url.host
131+
port = url.port
132+
nacos_service_name = _build_nacos_service_name(url)
133+
134+
self._nacos_client.remove_naming_instance(
135+
nacos_service_name, ip=ip, port=port, cluster_name=DEFAULT_APPLICATION
136+
)
137+
138+
def subscribe(self, url: URL, listener: NotifyListener) -> None:
139+
nacos_service_name = _build_nacos_service_name(url)
140+
141+
subscriber = self._service_subscriber(nacos_service_name, listener)
142+
subscriber.subscribe()
143+
144+
def unsubscribe(self, url: URL, listener: NotifyListener) -> None:
145+
nacos_service_name = _build_nacos_service_name(url)
146+
147+
subscriber = self._service_subscriber(nacos_service_name, listener)
148+
subscriber.unsubscribe()
149+
listener.notify([])
150+
151+
def lookup(self, url: URL):
152+
pass
153+
154+
def get_url(self) -> URL:
155+
return self._url
156+
157+
def is_available(self) -> bool:
158+
return self._nacos_client is not None
159+
160+
def destroy(self) -> None:
161+
pass
162+
163+
164+
class NacosRegistryFactory(RegistryFactory):
165+
166+
def get_registry(self, url: URL) -> Registry:
167+
return NacosRegistry(url)

0 commit comments

Comments
 (0)