Skip to content

Feat/cluster Optimized the Router module #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,5 @@ dubbo:
routers:
consumer:
- service: "org.apache.dubbo.sample.tri.Greeter"
url: triple://localhost:20000
protocol: triple
nacos:
addr: "127.0.0.1:8848"
namespace: ""
app: ""
conditions:
- scope: "service"
force: false
runtime: true
enabled: true
key: "org.apache.dubbo.sample.tri.Greeter"
conditions:
- method=greet => port=8889
- scope: "service"
force: true
runtime: true
enabled: true
key: "user.UserService"
conditions:
- method=get_s => port=2003
tags:
force: true
enabled: true
key: shop-detail
tags:
- name: gray
matches:
- key: env
value: gray
url: tri://127.0.0.1:20000
protocol: triple
6 changes: 5 additions & 1 deletion config/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ConditionRouterConfig {
#[serde(rename = "configVersion")]
pub config_version: String,
pub scope: String,
pub force: bool,
pub runtime: bool,
pub enabled: bool,
pub key: String,
pub conditions: Vec<String>,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct TagRouterConfig {
#[serde(rename = "configVersion")]
pub config_version: String,
pub force: bool,
pub enabled: bool,
pub key: String,
Expand All @@ -28,6 +31,7 @@ pub struct ConsumerConfig {
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct Tag {
pub name: String,
#[serde(rename = "match")]
pub matches: Vec<TagMatchRule>,
}

Expand Down
9 changes: 5 additions & 4 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tower::{
ready_cache::ReadyCache,
};

use crate::cluster::Directory;
use crate::{cluster::Directory, codegen::RpcInvocation, invocation::Invocation};

/// Directory.
///
Expand Down Expand Up @@ -68,12 +68,12 @@ impl StaticDirectory {
}

impl Directory for StaticDirectory {
fn list(&self, service_name: String) -> Vec<BoxInvoker> {
fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
self.uri.port().unwrap(),
service_name,
inv.get_target_service_unique_name(),
))
.unwrap();
let invoker = Box::new(TripleInvoker::new(url));
Expand All @@ -97,7 +97,8 @@ impl RegistryDirectory {
}

impl Directory for RegistryDirectory {
fn list(&self, service_name: String) -> Vec<BoxInvoker> {
fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let service_name = inv.get_target_service_unique_name();
let url = Url::from_url(&format!(
"triple://{}:{}/{}",
"127.0.0.1", "8888", service_name
Expand Down
67 changes: 27 additions & 40 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use tower::{ready_cache::ReadyCache, ServiceExt};
use tower_service::Service;

use crate::{
cluster::router::{
manager::router_manager::get_global_router_manager, router_chain::RouterChain,
},
codegen::RpcInvocation,
invocation::Invocation,
protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
Expand All @@ -35,7 +38,7 @@ pub mod loadbalance;
pub mod router;

pub trait Directory: Debug {
fn list(&self, service_name: String) -> Vec<BoxInvoker>;
fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
}

type BoxDirectory = Box<dyn Directory + Send + Sync>;
Expand Down Expand Up @@ -134,7 +137,7 @@ impl Service<http::Request<ClonedBody>> for FailoverCluster {
let inv = inv.unwrap();
let service_name = inv.get_target_service_unique_name();

let invokers = self.dir.list(service_name.clone());
let invokers = self.dir.list(Arc::new(inv.clone()));

Box::pin(async move {
let mut current_req = req;
Expand Down Expand Up @@ -172,55 +175,36 @@ impl Invoker<http::Request<ClonedBody>> for FailoverCluster {

#[derive(Debug, Default)]
pub struct MockDirectory {
// router_chain: RouterChain,
router_chain: RouterChain,
}

impl MockDirectory {
pub fn new() -> MockDirectory {
// let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation);
Self {
// router_chain
}
pub fn new(service_name: String) -> MockDirectory {
let router_chain = get_global_router_manager()
.read()
.unwrap()
.get_router_chain(service_name);
Self { router_chain }
}
}

impl Directory for MockDirectory {
fn list(&self, service_name: String) -> Vec<BoxInvoker> {
// tracing::info!("MockDirectory: {}", meta);
fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
vec![Box::new(TripleInvoker::new(u))]
// self.router_chain.route(u, invo);
let mut urls = vec![u];
// tracing::info!("MockDirectory: {}", meta);
urls = self.router_chain.route(urls, inv);
let mut result = Vec::new();
for url in urls {
result.push(Box::new(TripleInvoker::new(url)) as BoxInvoker);
}
result
}
}


// #[derive(Debug, Default)]
// pub struct RouterChain {
// router: HashMap<String, BoxRouter>,
// invokers: Arc<Vec<BoxInvoker>>,
// }

// impl RouterChain {
// pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> Arc<Vec<BoxInvoker>> {
// let r = self.router.get("mock").unwrap();
// r.route(self.invokers.clone(), url, invo)
// }
// }

// pub trait Router: Debug {
// fn route(
// &self,
// invokers: Arc<Vec<BoxInvoker>>,
// url: Url,
// invo: Arc<RpcInvocation>,
// ) -> Arc<Vec<BoxInvoker>>;
// }

// pub type BoxRouter = Box<dyn Router + Sync + Send>;

#[cfg(test)]
pub mod tests {
use std::task::Poll;
use std::{sync::Arc, task::Poll};

use bytes::{Buf, BufMut, BytesMut};
use dubbo_base::Url;
Expand Down Expand Up @@ -250,8 +234,11 @@ pub mod tests {
struct MockDirectory;

impl Directory for MockDirectory {
fn list(&self, service_name: String) -> Vec<crate::protocol::BoxInvoker> {
println!("get invoker list for {}", service_name);
fn list(&self, inv: Arc<RpcInvocation>) -> Vec<crate::protocol::BoxInvoker> {
println!(
"get invoker list for {}",
inv.get_target_service_unique_name()
);

vec![
Box::new(MockInvoker(1)),
Expand Down
13 changes: 6 additions & 7 deletions dubbo/src/cluster/router/condition/condition_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@ pub struct ConditionRouter {
}

impl Router for ConditionRouter {
fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
let mut invokers_result = invokers.clone();
if let Some(routers) = self.application_routers.clone() {
fn route(&self, mut invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
if let Some(routers) = &self.application_routers {
for router in &routers.read().unwrap().routers {
invokers_result = router.route(invokers_result, url.clone(), invo.clone())
invokers = router.route(invokers, url.clone(), invo.clone());
}
}
if let Some(routers) = self.service_routers.clone() {
if let Some(routers) = &self.service_routers {
for router in &routers.read().unwrap().routers {
invokers_result = router.route(invokers_result, url.clone(), invo.clone())
invokers = router.route(invokers, url.clone(), invo.clone());
}
}
invokers_result
invokers
}
}

Expand Down
80 changes: 32 additions & 48 deletions dubbo/src/cluster/router/condition/matcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::codegen::RpcInvocation;
use regex::Regex;
use std::{collections::HashSet, error::Error, option::Option, sync::Arc};
use std::{collections::HashSet, error::Error, option::Option};

#[derive(Clone, Debug, Default)]
pub struct ConditionMatcher {
Expand All @@ -18,67 +17,52 @@ impl ConditionMatcher {
}
}

pub fn is_match(
&self,
value: Option<String>,
invocation: Arc<RpcInvocation>,
_is_when: bool,
) -> Result<bool, Box<dyn Error>> {
pub fn is_match(&self, value: Option<String>) -> Result<bool, Box<dyn Error>> {
match value {
None => {
// if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false.
Ok(false)
}
None => Ok(false),
Some(val) => {
if !self.matches.is_empty() && self.mismatches.is_empty() {
for match_ in self.matches.iter() {
if self.do_pattern_match(match_, &val, invocation.clone())? {
return Ok(true);
}
}
Ok(false)
} else if !self.mismatches.is_empty() && self.matches.is_empty() {
for mismatch in self.mismatches.iter() {
if self.do_pattern_match(mismatch, &val, invocation.clone())? {
return Ok(false);
}
for match_ in self.matches.iter() {
if self.do_pattern_match(match_, &val) {
return Ok(true);
}
Ok(true)
} else if !self.matches.is_empty() && !self.mismatches.is_empty() {
for mismatch in self.mismatches.iter() {
if self.do_pattern_match(mismatch, &val, invocation.clone())? {
return Ok(false);
}
}
for match_ in self.matches.iter() {
if self.do_pattern_match(match_, &val, invocation.clone())? {
return Ok(true);
}
}
for mismatch in self.mismatches.iter() {
if !self.do_pattern_match(mismatch, &val) {
return Ok(true);
}
Ok(false)
} else {
Ok(false)
}
Ok(false)
}
}
}

pub fn get_matches(&mut self) -> &mut HashSet<String> {
&mut self.matches
}
pub fn get_mismatches(&mut self) -> &mut HashSet<String> {
&mut self.mismatches
}

fn do_pattern_match(
&self,
pattern: &String,
value: &String,
_invocation: Arc<RpcInvocation>,
) -> Result<bool, Box<dyn Error>> {
if pattern.contains("*") {
return Ok(star_matcher(pattern, value));
fn do_pattern_match(&self, pattern: &str, value: &str) -> bool {
if pattern.contains('*') {
return star_matcher(pattern, value);
}

if pattern.contains('~') {
let parts: Vec<&str> = pattern.split('~').collect();

if parts.len() == 2 {
if let (Ok(left), Ok(right), Ok(val)) = (
parts[0].parse::<i32>(),
parts[1].parse::<i32>(),
value.parse::<i32>(),
) {
return range_matcher(val, left, right);
}
}
return false;
}
Ok(pattern.eq(value))
pattern == value
}
}

Expand All @@ -89,6 +73,6 @@ pub fn star_matcher(pattern: &str, input: &str) -> bool {
regex.is_match(input)
}

pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool {
pub fn range_matcher(val: i32, min: i32, max: i32) -> bool {
min <= val && val <= max
}
Loading