Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pipebuilder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipebuilder"
version = "0.1.0"
version = "0.1.2"
authors = ["Li Yu <li.yu.sh0211@gmail.com>"]
edition = "2018"
license = "Apache-2.0"
Expand Down Expand Up @@ -36,7 +36,7 @@ clap = "3.0.0-beta.2"
etcd-client = "0.7.2"
flurry = "0.3.1"
http = "0.2.5"
pipebuilder_common = { version = "0.1.0", path = "../pipebuilder_common" }
pipebuilder_common = { version = "0.1.2", path = "../pipebuilder_common" }
prost = "0.8"
reqwest = "0.11.4"
serde = { version = "1.0", features = ["derive"] }
Expand Down
53 changes: 39 additions & 14 deletions pipebuilder/src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod handlers {
pub async fn build(
mut client: SchedulerClient<Channel>,
mut register: Register,
request: models::BuildRequest,
mut request: models::BuildRequest,
) -> Result<impl warp::Reply, Infallible> {
// validate build request
match validations::validate_build_request(&mut register, &request).await {
Expand All @@ -471,8 +471,9 @@ mod handlers {
};
let namespace = request.namespace.clone();
let id = request.id.clone();
let target_platform = request.target_platform.clone();
// find a builder
let response = match schedule(&mut client, namespace, id).await {
let response = match schedule(&mut client, namespace, id, target_platform.clone()).await {
Ok(response) => response,
Err(err) => return Ok(http_internal_error(err.into())),
};
Expand All @@ -484,6 +485,19 @@ mod handlers {
))))
}
};
// target platform validation
let builder_target_platform = builder_info.target_platform;
match target_platform {
Some(target_platform) => {
if target_platform != builder_target_platform {
return Ok(http_service_unavailable(Failure::new(format!(
"builder target platform miss match '{}' != '{}'",
builder_target_platform, target_platform
))));
}
}
None => request.set_target_platform(builder_target_platform),
};
let builder_id = builder_info.id;
let builder_address = builder_info.address;
info!("scheduled builder ({}, {})", builder_id, builder_address);
Expand Down Expand Up @@ -735,12 +749,13 @@ mod handlers {
let namespace = request.namespace;
let id = request.id;
let version = request.version;
let version_build = register
let build_metadata = register
.get_build_metadata(lease_id, namespace.as_str(), id.as_str(), version)
.await?;
Ok(version_build.map(|b| models::BuildMetadata {
Ok(build_metadata.map(|b| models::BuildMetadata {
id,
version,
target_platform: b.target_platform,
status: b.status,
timestamp: b.timestamp,
builder_id: b.builder_id,
Expand Down Expand Up @@ -773,7 +788,7 @@ mod handlers {
let build_metadatas = register.list_build_metadata(namespace.as_str(), id).await?;
let build_metadatas = build_metadatas
.into_iter()
.map(|(key, version_build)| {
.map(|(key, build_metadata)| {
let id_version = remove_resource_namespace(
key.as_str(),
RESOURCE_BUILD_METADATA,
Expand All @@ -789,11 +804,12 @@ mod handlers {
models::BuildMetadata {
id,
version,
status: version_build.status,
timestamp: version_build.timestamp,
builder_id: version_build.builder_id,
builder_address: version_build.builder_address,
message: version_build.message,
target_platform: build_metadata.target_platform,
status: build_metadata.status,
timestamp: build_metadata.timestamp,
builder_id: build_metadata.builder_id,
builder_address: build_metadata.builder_address,
message: build_metadata.message,
}
})
.collect::<Vec<models::BuildMetadata>>();
Expand Down Expand Up @@ -1494,8 +1510,15 @@ mod handlers {
client: &mut SchedulerClient<Channel>,
namespace: String,
id: String,
target_platform: Option<String>,
) -> pipebuilder_common::Result<ScheduleResponse> {
let response = client.schedule(ScheduleRequest { namespace, id }).await?;
let response = client
.schedule(ScheduleRequest {
namespace,
id,
target_platform,
})
.await?;
Ok(response.into_inner())
}

Expand Down Expand Up @@ -1551,12 +1574,14 @@ mod validations {
register: &mut Register,
request: &models::BuildRequest,
) -> Result<()> {
let target_platform = request.target_platform.as_str();
validate_target_platform(target_platform)?;
let namespace = request.namespace.as_str();
validate_namespace(register, namespace).await?;
let id = request.id.as_str();
validate_project(register, namespace, id).await
validate_project(register, namespace, id).await?;
if let Some(target_platform) = request.target_platform.as_ref() {
validate_target_platform(target_platform)?;
};
Ok(())
}

pub async fn validate_get_build_request(
Expand Down
2 changes: 2 additions & 0 deletions pipebuilder/src/builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,10 @@ async fn update(
) -> pipebuilder_common::Result<()> {
let (namespace, id, _, build_version) = build.get_build_meta();
let (builder_id, builder_address) = build.get_builder_meta();
let target_platform = build.get_target_platform();
let now = Utc::now();
let version_build = BuildMetadata::new(
target_platform.to_owned(),
status,
now,
builder_id.to_owned(),
Expand Down
5 changes: 2 additions & 3 deletions pipebuilder/src/cli/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub fn build() -> Cmd {
Arg::new("target-platform")
.short('t')
.about("Specify target platform, checkout https://doc.rust-lang.org/nightly/rustc/platform-support.html")
.required(true)
.takes_value(true),
])
}
Expand All @@ -50,13 +49,13 @@ pub async fn exec_build(client: ApiClient, args: &clap::ArgMatches) -> Result<()
.unwrap()
.parse()
.expect("invalid manifest version");
let target_platform = args.value_of("target-platform").unwrap();
let target_platform = args.value_of("target-platform").map(String::from);
let response = do_build::build(
&client,
namespace.to_owned(),
id.to_owned(),
manifest_version,
target_platform.to_owned(),
target_platform,
)
.await?;
print_record(&response);
Expand Down
2 changes: 1 addition & 1 deletion pipebuilder/src/cli/ops/do_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) async fn build(
namespace: String,
id: String,
manifest_version: u64,
target_platform: String,
target_platform: Option<String>,
) -> Result<BuildResponse> {
let request = BuildRequest {
namespace,
Expand Down
32 changes: 23 additions & 9 deletions pipebuilder/src/scheduler/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use flurry::HashMap;
use pipebuilder_common::{
deserialize_event,
grpc::schedule::{scheduler_server::Scheduler, BuilderInfo, ScheduleResponse},
hash_distance, log_event, NodeState, NodeStatus, Register,
hash_distance, log_event, NodeState, Register,
};
use std::sync::Arc;
use tonic::Response;
Expand All @@ -26,18 +26,26 @@ impl Scheduler for SchedulerService {
// select builder using consistent hash, build of same app (namespace, id) landed on same builder for compilcation cache hit
let request = request.into_inner();
let request_key = format!("{}/{}", request.namespace, request.id);
let request_target_platform = request.target_platform;
let builders_ref = self.builders.pin();
let mut selected_builder_info: Option<BuilderInfo> = None;
let mut min_hash_distance: u64 = u64::MAX;
for builder in builders_ref.values() {
let builder_key = match builder.status {
NodeStatus::Active => builder.id.to_owned(),
NodeStatus::InActive => continue,
};
if !builder.is_active() {
continue;
}
if let Some(ref target_platform) = request_target_platform {
if !builder.accept_target_platform(target_platform) {
continue;
}
}
let builder_key = builder.id.to_owned();
let distance = hash_distance(&request_key, &builder_key);
if distance < min_hash_distance {
selected_builder_info = Some(Self::builder_info(builder));
min_hash_distance = distance;
selected_builder_info = Self::builder_info(builder);
if selected_builder_info.is_some() {
min_hash_distance = distance;
}
}
}
Ok(Response::new(ScheduleResponse {
Expand Down Expand Up @@ -107,9 +115,15 @@ impl SchedulerService {
Ok(())
}

fn builder_info(state: &NodeState) -> BuilderInfo {
fn builder_info(state: &NodeState) -> Option<BuilderInfo> {
let id = state.id.to_owned();
let address = state.external_address.to_owned();
BuilderInfo { id, address }
state
.get_support_target_platform()
.map(|target_platform| BuilderInfo {
id,
address,
target_platform,
})
}
}
2 changes: 1 addition & 1 deletion pipebuilder_common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipebuilder_common"
version = "0.1.0"
version = "0.1.2"
edition = "2018"
authors = ["Li Yu <li.yu.sh0211@gmail.com>"]
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions pipebuilder_common/proto/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ message ScheduleRequest {
string namespace = 1;
// project id
string id = 2;
// target platform
optional string target_platform = 3;
}

message BuilderInfo {
// builder id
string id = 1;
// builder address
string address = 2;
// target platform support by builder
string target_platform = 3;
}

message ScheduleResponse {
Expand Down
3 changes: 3 additions & 0 deletions pipebuilder_common/src/api/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ pub(crate) const DISPLAY_COUNT_WIDTH: usize = 12;
pub(crate) const DISPLAY_NAMESPACE_WIDTH: usize = 12;
pub(crate) const DISPLAY_NODE_STATUS_WIDTH: usize = 12;
pub(crate) const DISPLAY_NODE_ROLE_WIDTH: usize = 12;
pub(crate) const DISPLAY_NODE_ARCH_WIDTH: usize = 12;
pub(crate) const DISPLAY_NODE_OS_WIDTH: usize = 12;
pub(crate) const DISPLAY_SIZE_WIDTH: usize = 12;
pub(crate) const DISPLAY_VERSION_WIDTH: usize = 12;
pub(crate) const DISPLAY_ADDRESS_WIDTH: usize = 16;
pub(crate) const DISPLAY_BUILD_TARGET_PLATFORM_WIDTH: usize = 24;
pub(crate) const DISPLAY_TIMESTAMP_WIDTH: usize = 32;
pub(crate) const DISPLAY_MESSAGE_WIDTH: usize = 36;
Loading