Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ impl Directory for RegistryDirectory {
.expect("msg")
.subscribe(
url,
MemoryNotifyListener {
Arc::new(MemoryNotifyListener {
service_instances: Arc::clone(&self.service_instances),
},
}),
)
.expect("subscribe");

Expand Down
8 changes: 3 additions & 5 deletions dubbo/src/registry/memory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tracing::debug;

use crate::common::url::Url;

use super::{NotifyListener, Registry};
use super::{NotifyListener, Registry, RegistryNotifyListener};

// 从url中获取服务注册的元数据
/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
Expand All @@ -47,8 +47,6 @@ impl MemoryRegistry {
}

impl Registry for MemoryRegistry {
type NotifyListener = MemoryNotifyListener;

fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> {
// define provider label: ${registry.group}/${service_name}/provider
let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Expand Down Expand Up @@ -91,15 +89,15 @@ impl Registry for MemoryRegistry {
fn subscribe(
&self,
url: crate::common::url::Url,
listener: Self::NotifyListener,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError> {
todo!()
}

fn unsubscribe(
&self,
url: crate::common::url::Url,
listener: Self::NotifyListener,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError> {
todo!()
}
Expand Down
22 changes: 14 additions & 8 deletions dubbo/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ pub mod memory_registry;
pub mod protocol;
pub mod types;

use std::fmt::{Debug, Formatter};
use std::{
fmt::{Debug, Formatter},
sync::Arc,
};

use crate::{common::url::Url, registry::memory_registry::MemoryNotifyListener};
use crate::common::url::Url;

pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
pub trait Registry {
type NotifyListener;

fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;

fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), crate::StdError>;
fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), crate::StdError>;
fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
fn unsubscribe(
&self,
url: Url,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError>;
}

pub trait NotifyListener {
Expand All @@ -47,7 +53,7 @@ pub struct ServiceEvent {
pub service: Vec<Url>,
}

pub type BoxRegistry = Box<dyn Registry<NotifyListener = MemoryNotifyListener> + Send + Sync>;
pub type BoxRegistry = Box<dyn Registry + Send + Sync>;

impl Debug for BoxRegistry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand All @@ -57,7 +63,7 @@ impl Debug for BoxRegistry {

#[derive(Default)]
pub struct RegistryWrapper {
pub registry: Option<Box<dyn Registry<NotifyListener = MemoryNotifyListener>>>,
pub registry: Option<Box<dyn Registry>>,
}

impl Clone for RegistryWrapper {
Expand Down
10 changes: 5 additions & 5 deletions dubbo/src/registry/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ use tracing::info;

use crate::{
common::url::Url,
registry::{memory_registry::MemoryNotifyListener, BoxRegistry, Registry},
registry::{BoxRegistry, Registry},
StdError,
};

use super::RegistryNotifyListener;

pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;

Expand Down Expand Up @@ -66,8 +68,6 @@ impl RegistriesOperation for Registries {
}

impl Registry for SafeRegistry {
type NotifyListener = MemoryNotifyListener;

fn register(&mut self, url: Url) -> Result<(), StdError> {
info!("register {}.", url);
self.lock().unwrap().register(url).expect("registry err.");
Expand All @@ -79,12 +79,12 @@ impl Registry for SafeRegistry {
Ok(())
}

fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
self.lock().unwrap().register(url).expect("registry err.");
Ok(())
}

fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
self.lock().unwrap().register(url).expect("registry err.");
Ok(())
}
Expand Down
12 changes: 7 additions & 5 deletions registry-nacos/src/nacos_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use anyhow::anyhow;
use dubbo::{
common::url::Url,
registry::{NotifyListener, Registry, ServiceEvent},
registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent},
};
use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
use tracing::{error, info, warn};
Expand Down Expand Up @@ -139,8 +139,6 @@ impl NacosRegistry {
}

impl Registry for NacosRegistry {
type NotifyListener = Arc<dyn NotifyListener + Sync + Send + 'static>;

fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
let side = url.get_param(SIDE_KEY).unwrap_or_default();
let register_consumer = url
Expand Down Expand Up @@ -205,7 +203,7 @@ impl Registry for NacosRegistry {
Ok(())
}

fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), dubbo::StdError> {
fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), dubbo::StdError> {
let service_name = NacosServiceName::new(&url);
let url_str = url.to_url();

Expand Down Expand Up @@ -252,7 +250,11 @@ impl Registry for NacosRegistry {
Ok(())
}

fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), dubbo::StdError> {
fn unsubscribe(
&self,
url: Url,
listener: RegistryNotifyListener,
) -> Result<(), dubbo::StdError> {
let service_name = NacosServiceName::new(&url);
let url_str = url.to_url();
info!("unsubscribe: {}", &url_str);
Expand Down
29 changes: 11 additions & 18 deletions registry-zookeeper/src/zookeeper_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ use dubbo::{
url::Url,
},
registry::{
integration::ClusterRegistryIntegration,
memory_registry::{MemoryNotifyListener, MemoryRegistry},
NotifyListener, Registry, ServiceEvent,
integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
Registry, RegistryNotifyListener, ServiceEvent,
},
StdError,
};
Expand All @@ -61,7 +60,7 @@ impl Watcher for LoggingWatcher {
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as Registry>::NotifyListener>>>,
listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
memory_registry: Arc<Mutex<MemoryRegistry>>,
}

Expand Down Expand Up @@ -103,7 +102,7 @@ impl ZookeeperRegistry {
&self,
path: String,
service_name: String,
listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
listener: RegistryNotifyListener,
) -> ServiceInstancesChangedListener {
let mut service_names = HashSet::new();
service_names.insert(service_name.clone());
Expand Down Expand Up @@ -240,8 +239,6 @@ impl Default for ZookeeperRegistry {
}

impl Registry for ZookeeperRegistry {
type NotifyListener = MemoryNotifyListener;

fn register(&mut self, url: Url) -> Result<(), StdError> {
println!("register url: {}", url);
let zk_path = format!(
Expand All @@ -268,7 +265,7 @@ impl Registry for ZookeeperRegistry {
}

// for consumer to find the changes of providers
fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
let service_name = url.get_service_name();
let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
if self
Expand All @@ -281,17 +278,13 @@ impl Registry for ZookeeperRegistry {
return Ok(());
}

let arc_listener = Arc::new(listener);
self.listeners
.write()
.unwrap()
.insert(service_name.to_string(), Arc::clone(&arc_listener));
.insert(service_name.to_string(), listener.clone());

let zk_listener = self.create_listener(
zk_path.clone(),
service_name.to_string(),
Arc::clone(&arc_listener),
);
let zk_listener =
self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());

let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
let result = match zk_changed_paths {
Expand All @@ -312,15 +305,15 @@ impl Registry for ZookeeperRegistry {
.collect(),
};
info!("notifying {}->{:?}", service_name, result);
arc_listener.notify(ServiceEvent {
listener.notify(ServiceEvent {
key: service_name,
action: String::from("ADD"),
service: result,
});
Ok(())
}

fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> Result<(), StdError> {
fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
todo!()
}
}
Expand All @@ -329,7 +322,7 @@ pub struct ServiceInstancesChangedListener {
zk_client: Arc<ZooKeeper>,
path: String,
service_name: String,
listener: Arc<MemoryNotifyListener>,
listener: RegistryNotifyListener,
}

impl Watcher for ServiceInstancesChangedListener {
Expand Down