Skip to content

Rft: adapt nacos registry and zookeeper registry #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions dubbo/src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
svc::NewService,
StdError,
};
use dubbo_base::Url;
use dubbo_logger::tracing::debug;
use futures_core::ready;
use futures_util::future;
Expand Down Expand Up @@ -162,7 +163,11 @@ where
let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);

tokio::spawn(async move {
let receiver = registry.subscribe(service_name).await;
// todo use dubbo url model generate subscribe url
// category:serviceInterface:version:group
let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", service_name);
let subscribe_url = Url::from_url(&consumer_url).unwrap();
let receiver = registry.subscribe(subscribe_url).await;
debug!("discover start!");
match receiver {
Err(_e) => {
Expand Down Expand Up @@ -217,22 +222,32 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
let pin_discover = Pin::new(&mut self.discover);
let change = ready!(pin_discover.poll_discover(cx))
.transpose()
.map_err(|e| e.into())?;
match change {
Some(Change::Remove(key)) => {
debug!("remove key: {}", key);
self.directory.remove(&key);
}
Some(Change::Insert(key, _)) => {
debug!("insert key: {}", key);
let invoker = self.new_invoker.new_service(key.clone());
self.directory.insert(key, invoker);

match pin_discover.poll_discover(cx) {
Poll::Pending => {
if self.directory.is_empty() {
return Poll::Pending;
} else {
return Poll::Ready(Ok(()));
}
}
None => {
debug!("stream closed");
return Poll::Ready(Ok(()));
Poll::Ready(change) => {
let change = change.transpose().map_err(|e| e.into())?;
match change {
Some(Change::Remove(key)) => {
debug!("remove key: {}", key);
self.directory.remove(&key);
}
Some(Change::Insert(key, _)) => {
debug!("insert key: {}", key);
let invoker = self.new_invoker.new_service(key.clone());
self.directory.insert(key, invoker);
}
None => {
debug!("stream closed");
return Poll::Ready(Ok(()));
}
}
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions dubbo/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use std::{
use crate::{
protocol::{BoxExporter, Protocol},
registry::{
n_registry::{ArcRegistry, Registry},
protocol::RegistryProtocol,
types::{Registries, RegistriesOperation},
BoxRegistry, Registry,
},
};
use dubbo_base::Url;
Expand Down Expand Up @@ -60,14 +60,14 @@ impl Dubbo {
self
}

pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
pub fn add_registry(mut self, registry_key: &str, registry: ArcRegistry) -> Self {
if self.registries.is_none() {
self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
}
self.registries
.as_ref()
.unwrap()
.insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
.insert(registry_key.to_string(), registry);
self
}

Expand Down Expand Up @@ -130,12 +130,13 @@ impl Dubbo {
async_vec.push(exporter);
//TODO multiple registry
if self.registries.is_some() {
self.registries
let _ = self
.registries
.as_ref()
.unwrap()
.default_registry()
.register(url.clone())
.unwrap();
.await;
}
}
}
Expand Down
79 changes: 39 additions & 40 deletions dubbo/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,46 @@

#![allow(unused_variables, dead_code, missing_docs)]
pub mod integration;
pub mod memory_registry;
pub mod n_registry;
pub mod protocol;
pub mod types;

use std::{
fmt::{Debug, Formatter},
sync::Arc,
};

use dubbo_base::Url;

pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
pub trait Registry {
fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;

fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
fn unsubscribe(
&self,
url: Url,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError>;
}

pub trait NotifyListener {
fn notify(&self, event: ServiceEvent);
fn notify_all(&self, event: ServiceEvent);
}

#[derive(Debug)]
pub struct ServiceEvent {
pub key: String,
pub action: String,
pub service: Vec<Url>,
}

pub type BoxRegistry = Box<dyn Registry + Send + Sync>;

impl Debug for BoxRegistry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("BoxRegistry")
}
}
// use std::{
// fmt::{Debug, Formatter},
// sync::Arc,
// };

// use dubbo_base::Url;

// pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
// pub trait Registry {
// fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
// fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;

// fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), crate::StdError>;
// fn unsubscribe(
// &self,
// url: Url,
// listener: RegistryNotifyListener,
// ) -> Result<(), crate::StdError>;
// }

// pub trait NotifyListener {
// fn notify(&self, event: ServiceEvent);
// fn notify_all(&self, event: ServiceEvent);
// }

// #[derive(Debug)]
// pub struct ServiceEvent {
// pub key: String,
// pub action: String,
// pub service: Vec<Url>,
// }

// pub type BoxRegistry = Box<dyn Registry + Send + Sync>;

// impl Debug for BoxRegistry {
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// f.write_str("BoxRegistry")
// }
// }
133 changes: 109 additions & 24 deletions dubbo/src/registry/n_registry.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use dubbo_base::Url;
use tokio::sync::mpsc::{channel, Receiver};
use thiserror::Error;
use tokio::sync::{
mpsc::{self, Receiver},
Mutex,
};
use tower::discover::Change;

use crate::StdError;

type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
pub type ServiceChange = Change<String, ()>;
pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
pub type BoxRegistry = Box<dyn Registry + Send + Sync>;

#[async_trait]
pub trait Registry {
async fn register(&self, url: Url) -> Result<(), StdError>;

async fn unregister(&self, url: Url) -> Result<(), StdError>;

// todo service_name change to url
async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError>;
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;

async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
}
Expand All @@ -27,13 +35,19 @@ pub struct ArcRegistry {
}

pub enum RegistryComponent {
NacosRegistry,
NacosRegistry(ArcRegistry),
ZookeeperRegistry,
StaticRegistry(StaticRegistry),
}

pub struct StaticServiceValues {
listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
urls: HashSet<String>,
}

#[derive(Default)]
pub struct StaticRegistry {
urls: Vec<Url>,
urls: Mutex<HashMap<String, StaticServiceValues>>,
}

impl ArcRegistry {
Expand All @@ -54,8 +68,8 @@ impl Registry for ArcRegistry {
self.inner.unregister(url).await
}

async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
self.inner.subscribe(service_name).await
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
self.inner.subscribe(url).await
}

async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
Expand All @@ -73,11 +87,11 @@ impl Registry for RegistryComponent {
todo!()
}

async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
match self {
RegistryComponent::NacosRegistry => todo!(),
RegistryComponent::NacosRegistry(registry) => registry.subscribe(url).await,
RegistryComponent::ZookeeperRegistry => todo!(),
RegistryComponent::StaticRegistry(registry) => registry.subscribe(service_name).await,
RegistryComponent::StaticRegistry(registry) => registry.subscribe(url).await,
}
}

Expand All @@ -88,31 +102,102 @@ impl Registry for RegistryComponent {

impl StaticRegistry {
pub fn new(urls: Vec<Url>) -> Self {
Self { urls }
let mut map = HashMap::with_capacity(urls.len());

for url in urls {
let service_name = url.get_service_name();
let static_values = map
.entry(service_name)
.or_insert_with(|| StaticServiceValues {
listeners: Vec::new(),
urls: HashSet::new(),
});
let url = url.to_string();
static_values.urls.insert(url.clone());
}

Self {
urls: Mutex::new(map),
}
}
}

#[async_trait]
impl Registry for StaticRegistry {
async fn register(&self, url: Url) -> Result<(), StdError> {
todo!()
let service_name = url.get_service_name();
let mut lock = self.urls.lock().await;

let static_values = lock
.entry(service_name)
.or_insert_with(|| StaticServiceValues {
listeners: Vec::new(),
urls: HashSet::new(),
});
let url = url.to_string();
static_values.urls.insert(url.clone());

static_values.listeners.retain(|listener| {
let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
ret.is_ok()
});

Ok(())
}

async fn unregister(&self, url: Url) -> Result<(), StdError> {
todo!()
}

async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> {
let (tx, rx) = channel(self.urls.len());
for url in self.urls.iter() {
let change = Ok(Change::Insert(url.to_url(), ()));
tx.send(change).await?;
let service_name = url.get_service_name();
let mut lock = self.urls.lock().await;

match lock.get_mut(&service_name) {
None => Ok(()),
Some(static_values) => {
let url = url.to_string();
static_values.urls.remove(&url);
static_values.listeners.retain(|listener| {
let ret = listener.try_send(Ok(ServiceChange::Remove(url.clone())));
ret.is_ok()
});
if static_values.urls.is_empty() {
lock.remove(&service_name);
}
Ok(())
}
}
}

Ok(rx)
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
let service_name = url.get_service_name();

let change_rx = {
let mut lock = self.urls.lock().await;
let static_values = lock
.entry(service_name)
.or_insert_with(|| StaticServiceValues {
listeners: Vec::new(),
urls: HashSet::new(),
});

let (tx, change_rx) = mpsc::channel(64);
static_values.listeners.push(tx);

for url in static_values.urls.iter() {
static_values.listeners.retain(|listener| {
let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
ret.is_ok()
});
}
change_rx
};

Ok(change_rx)
}

async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
todo!()
Ok(())
}
}

#[derive(Error, Debug)]
#[error("static registry error: {0}")]
struct StaticRegistryError(String);
Loading