Skip to content

Feat(cluster): Cluster Policy Impl #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2192217
feat(dubbo): add unix feature
yang20150702 Dec 7, 2022
ea06ca1
Rft: replace feature with target_os cfg
yang20150702 Dec 8, 2022
f172c25
Merge branch 'apache:main' into main
yang20150702 Dec 8, 2022
d40ea72
Merge branch 'apache:main' into main
yang20150702 Dec 9, 2022
5f5b31f
Rft(dubbo): add ClientBuilder for client
yang20150702 Dec 9, 2022
176cb57
Rftï(dubbo-build): add build api for client
yang20150702 Dec 9, 2022
b4db021
style(examples): cargo fmt
yang20150702 Dec 9, 2022
e9e0a67
Rft: move connection from client to transport mod
yang20150702 Dec 20, 2022
e67e59e
Rft(dubbo): add default timeout for client
yang20150702 Dec 26, 2022
7ab187f
Merge branch 'apache:main' into main
yang20150702 Dec 26, 2022
6e8c95e
Merge branch 'apache:main' into main
yang20150702 Dec 27, 2022
e066505
Ftr: add serverBuilder for Server, support multiple ways to start server
yang20150702 Dec 27, 2022
bd4be4e
Rft(examples): update yaml
yang20150702 Dec 27, 2022
80f3502
refactor(dubbo): update invoker trait
yang20150702 Jan 16, 2023
1022b8e
Merge branch 'main' into main
yang20150702 Jan 20, 2023
f403481
Merge branch 'apache:main' into main
yang20150702 Jan 28, 2023
fb6d30c
Merge branch 'apache:main' into main
yang20150702 Feb 9, 2023
00a90ff
Merge branch 'apache:main' into main
yang20150702 Feb 15, 2023
25903fa
Merge branch 'apache:main' into main
yang20150702 Feb 16, 2023
a03b0b5
Merge branch 'apache:main' into main
yang20150702 Feb 20, 2023
41d2ddd
Merge branch 'apache:main' into main
yang20150702 Apr 11, 2023
cfd1652
Merge branch 'apache:main' into main
yang20150702 May 10, 2023
0fcb29f
refactor(cluster): add Cluster MockImpl
yang20150702 May 11, 2023
2cd76b6
refactor(triple): use ClientBuilder to init Cluster ability
yang20150702 May 11, 2023
3cba7dc
Update builder.rs
yang20150702 May 16, 2023
1e7285e
Update triple.rs
yang20150702 May 16, 2023
2c567a7
Update mod.rs
yang20150702 May 16, 2023
695a880
refactor(triple): rm unused var in clientBuilder
yang20150702 May 16, 2023
d2cb383
fix logical error of get_protocol_or_default (#137)
G-XD May 16, 2023
54181bf
Feat/cluster (#138)
yang20150702 May 16, 2023
e9a9935
refactor(dubbo): delete some codes
yang20150702 May 16, 2023
1c79aee
refactor(cluster): rm some duplicate codes
yang20150702 May 18, 2023
06cb969
refactor(registry): rm unused import
yang20150702 May 18, 2023
18a6140
refactor(triple): use two build func for different usage
yang20150702 May 18, 2023
86de421
Merge branch 'main' into feat/cluster
yang20150702 May 18, 2023
7dbe5f3
style: cargo fmt --all
yang20150702 May 18, 2023
58c6f7d
refactor(cluster): rm registryWrapper
yang20150702 May 19, 2023
157b6c4
refactor(cluster): delete print
yang20150702 May 19, 2023
aa554fe
chore(dubbo): upgrade hyper version in cargo.toml
yang20150702 May 20, 2023
254fe59
refactor(cluster): comment the logic of clone body
yang20150702 May 22, 2023
efb87b4
Rft: add Cluster to ClientBuilder (#142)
yang20150702 May 22, 2023
b24e730
Rft(triple): remove Clone of Invoker
yang20150702 Jul 29, 2023
6999bbb
Rft(cluster): use ready_cache to manage Invokers, add ready_cache in …
yang20150702 Jul 29, 2023
0a5ee22
Rft(protocol): use interface Inheritance to redesign Invoker
yang20150702 Jul 29, 2023
ebfe5c1
Merge branch 'apache:main' into main
yang20150702 Jul 29, 2023
29f83c4
Merge branch 'main' into feat/cluster
yang20150702 Jul 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions config/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,26 @@ impl ProtocolRetrieve for ProtocolConfig {
} else {
let result = self.get_protocol(protocol_key);
if let Some(..) = result {
panic!("default triple base dose not defined.")
} else {
result.unwrap()
} else {
panic!("default triple base dose not defined.")
}
}
}
}

#[cfg(test)]
mod tests {

use super::{ProtocolConfig, ProtocolRetrieve};

#[test]
#[should_panic(expected = "default triple base dose not defined")]
pub fn test_get_invalid_protocol() {
let config = ProtocolConfig::default();

let _ = config.get_protocol_or_default("");

()
}
}
5 changes: 0 additions & 5 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ pub fn generate<T: Service>(
}
}

pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
self.inner = self.inner.with_cluster(invoker);
self
}

#methods

}
Expand Down
7 changes: 4 additions & 3 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = { version = "0.14.19", features = ["full"] }
hyper = { version = "0.14.26", features = ["full"] }
http = "0.2"
tower-service.workspace = true
http-body = "0.4.4"
tower = { workspace = true, features = ["timeout"] }
tower = { workspace = true, features = ["timeout", "ready-cache"] }
futures-util = "0.3.23"
futures-core ="0.3.23"
argh = "0.1"
Expand All @@ -33,7 +33,8 @@ futures.workspace = true
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
aws-smithy-http = "0.55.2"
dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
Expand Down
69 changes: 19 additions & 50 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,21 @@ use std::{
};

use crate::{
codegen::TripleInvoker,
invocation::{Invocation, RpcInvocation},
registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
protocol::BoxInvoker,
registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
};
use dubbo_base::Url;
use dubbo_logger::tracing;

use crate::cluster::Directory;

/// Directory.
///
/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
pub trait Directory: Debug + DirectoryClone {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
}

pub trait DirectoryClone {
fn clone_box(&self) -> Box<dyn Directory>;
}

impl<T> DirectoryClone for T
where
T: 'static + Directory + Clone,
{
fn clone_box(&self) -> Box<dyn Directory> {
Box::new(self.clone())
}
}

impl Clone for Box<dyn Directory> {
fn clone(&self) -> Box<dyn Directory> {
self.clone_box()
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StaticDirectory {
uri: http::Uri,
}
Expand All @@ -78,51 +60,36 @@ impl StaticDirectory {
}

impl Directory for StaticDirectory {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
self.uri.port().unwrap(),
invocation.get_target_service_unique_name(),
))
.unwrap();
vec![url]
}
}

impl DirectoryClone for StaticDirectory {
fn clone_box(&self) -> Box<dyn Directory> {
Box::new(StaticDirectory {
uri: self.uri.clone(),
})
let invoker = Box::new(TripleInvoker::new(url));
vec![invoker]
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RegistryDirectory {
registry: RegistryWrapper,
registry: Arc<BoxRegistry>,
service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
}

impl RegistryDirectory {
pub fn new(registry: BoxRegistry) -> RegistryDirectory {
RegistryDirectory {
registry: RegistryWrapper {
registry: Some(registry),
},
registry: Arc::new(registry),
service_instances: Arc::new(RwLock::new(HashMap::new())),
}
}
}

impl DirectoryClone for RegistryDirectory {
fn clone_box(&self) -> Box<dyn Directory> {
todo!()
}
}

impl Directory for RegistryDirectory {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let service_name = invocation.get_target_service_unique_name();

let url = Url::from_url(&format!(
Expand All @@ -132,9 +99,6 @@ impl Directory for RegistryDirectory {
.unwrap();

self.registry
.registry
.as_ref()
.expect("msg")
.subscribe(
url,
Arc::new(MemoryNotifyListener {
Expand All @@ -149,6 +113,11 @@ impl Directory for RegistryDirectory {
.expect("service_instances.read");
let binding = Vec::new();
let url_vec = map.get(&service_name).unwrap_or(&binding);
url_vec.to_vec()
// url_vec.to_vec()
let mut invokers: Vec<BoxInvoker> = vec![];
for item in url_vec.iter() {
invokers.push(Box::new(TripleInvoker::new(item.clone())));
}
invokers
}
}
133 changes: 94 additions & 39 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,54 @@
* limitations under the License.
*/

use std::{sync::Arc, task::Poll};
use std::{fmt::Debug, sync::Arc, task::Poll};

use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
use futures_util::TryFutureExt;
use tower::ready_cache::ReadyCache;
use tower_service::Service;

use crate::{empty_body, protocol::BoxInvoker};
use crate::{
invocation::RpcInvocation,
protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
};

pub mod directory;
pub mod loadbalance;
pub mod support;

pub trait Directory {
fn list(&self, meta: String) -> Vec<BoxInvoker>;
fn is_empty(&self) -> bool;
pub trait Directory: Debug {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
}

type BoxDirectory = Box<dyn Directory>;
type BoxDirectory = Box<dyn Directory + Send + Sync>;

pub trait Cluster {
fn join(&self, dir: BoxDirectory) -> BoxInvoker;
}

#[derive(Debug, Default)]
pub struct MockCluster {}

impl Cluster for MockCluster {
fn join(&self, dir: BoxDirectory) -> BoxInvoker {
Box::new(FailoverCluster::new(dir))
}
}

// 在Cluster上进行缓存Service
#[derive(Debug)]
pub struct FailoverCluster {
dir: Arc<BoxDirectory>,
caches: ReadyCache<usize, BoxInvoker, http::Request<SdkBody>>,
}

impl FailoverCluster {
pub fn new(dir: BoxDirectory) -> FailoverCluster {
Self { dir: Arc::new(dir) }
Self {
dir: Arc::new(dir),
caches: ReadyCache::default(),
}
}
}

Expand All @@ -59,43 +82,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
}

fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
println!("req: {}", req.body().content_length().unwrap());
let clone_body = req.body().try_clone().unwrap();
let mut clone_req = http::Request::builder()
.uri(req.uri().clone())
.method(req.method().clone());
*clone_req.headers_mut().unwrap() = req.headers().clone();
let r = clone_req.body(clone_body).unwrap();
let invokers = self.dir.list("service_name".to_string());
for mut invoker in invokers {
let fut = async move {
let res = invoker.call(r).await;
return res;
};
return Box::pin(fut);
// let clone_body = req.body().try_clone().unwrap();
// let mut clone_req = http::Request::builder()
// .uri(req.uri().clone())
// .method(req.method().clone());
// *clone_req.headers_mut().unwrap() = req.headers().clone();
// let r = clone_req.body(clone_body).unwrap();
let invokers = self.dir.list(
RpcInvocation::default()
.with_service_unique_name("hello".to_string())
.into(),
);
let mut i: usize = 0;
for invoker in invokers {
self.caches.push(i, invoker);
i += 1;
}
Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
})

Box::pin(self.caches.call_ready_index(0, req).map_err(Into::into))
}
}

pub struct MockDirectory {}
impl Invoker<http::Request<SdkBody>> for FailoverCluster {
fn get_url(&self) -> dubbo_base::Url {
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
}
}

impl Directory for MockDirectory {
fn list(&self, _meta: String) -> Vec<BoxInvoker> {
// tracing::info!("MockDirectory: {}", meta);
// let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
// vec![Box::new(TripleInvoker::new(u))]
todo!()
#[derive(Debug, Default)]
pub struct MockDirectory {
// router_chain: RouterChain,
}

impl MockDirectory {
pub fn new() -> MockDirectory {
Self {
// router_chain: RouterChain::default(),
}
}
}

fn is_empty(&self) -> bool {
false
impl Directory for MockDirectory {
fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
// tracing::info!("MockDirectory: {}", meta);
let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
vec![Box::new(TripleInvoker::new(u))]
// self.router_chain.route(u, invo);
}
}

// #[derive(Debug, Default)]
// pub struct RouterChain {
// router: HashMap<String, BoxRouter>,
// invokers: Arc<Vec<BoxInvoker>>,
// }

// impl RouterChain {
// pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> Arc<Vec<BoxInvoker>> {
// let r = self.router.get("mock").unwrap();
// r.route(self.invokers.clone(), url, invo)
// }
// }

// pub trait Router: Debug {
// fn route(
// &self,
// invokers: Arc<Vec<BoxInvoker>>,
// url: Url,
// invo: Arc<RpcInvocation>,
// ) -> Arc<Vec<BoxInvoker>>;
// }

// pub type BoxRouter = Box<dyn Router + Sync + Send>;
Loading