Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions dragonfly-client-util/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod errors;
mod selector;

use crate::digest::is_blob_url;
use crate::http::{headermap_to_hashmap, query_params::default_proxy_rule_filtered_query_params};
use crate::id_generator::{IDGenerator, TaskIDParameter};
use crate::net::format_url;
Expand Down Expand Up @@ -118,18 +119,25 @@ pub struct GetRequest {
/// Default value includes the filtered query params of s3, gcs, oss, obs, cos.
pub filtered_query_params: Vec<String>,

/// content_for_calculating_task_id is the content used to calculate the task id.
/// If content_for_calculating_task_id is set, use its value to calculate the task ID.
/// Otherwise, calculate the task ID based on url, piece_length, tag, application, and filtered_query_params.
/// Content for calculating task id. This is used when the task ID cannot be calculated based
/// on URL and other parameters, such as when the URL contains dynamic query parameters that
/// cannot be filtered out.
pub content_for_calculating_task_id: Option<String>,

/// Enable task id based blob digest. It indicates whether to use the blob digest for task ID calculation
/// when downloading from OCI registries. When enabled for OCI blob URLs (e.g., /v2/<name>/blobs/sha256:<digest>),
/// the task ID is derived from the blob digest rather than the full URL. This enables deduplication across
/// registries - the same blob from different registries shares one task ID, eliminating redundant downloads
/// and storage.
pub enable_task_id_based_blob_digest: bool,

/// Refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L67
pub priority: Option<i32>,

/// timeout is the timeout of the request.
pub timeout: Duration,

/// client_cert is the client certificates for the request.
/// Client cert is the client certificates for the request.
pub client_cert: Option<Vec<CertificateDer<'static>>>,
}

Expand Down Expand Up @@ -427,17 +435,22 @@ impl Proxy {
// Generate task id for selecting seed peer.
let task_id = self
.id_generator
.task_id(match request.content_for_calculating_task_id.as_ref() {
Some(content) => TaskIDParameter::Content(content.clone()),
None => TaskIDParameter::URLBased {
url: request.url.clone(),
piece_length: request.piece_length,
tag: request.tag.clone(),
application: request.application.clone(),
filtered_query_params,
revision: None,
.task_id(
if let Some(content) = request.content_for_calculating_task_id.clone() {
TaskIDParameter::Content(content)
} else if request.enable_task_id_based_blob_digest && is_blob_url(&request.url) {
TaskIDParameter::BlobDigestBased(request.url.clone())
} else {
TaskIDParameter::URLBased {
url: request.url.clone(),
piece_length: request.piece_length,
tag: request.tag.clone(),
application: request.application.clone(),
filtered_query_params,
revision: None,
}
},
})
)
.map_err(|err| Error::Internal(format!("failed to generate task id: {}", err)))?;

// Select seed peers for downloading.
Expand Down Expand Up @@ -602,6 +615,20 @@ impl Proxy {
);
}

headers.insert(
"X-Dragonfly-Enable-Task-ID-Based-Blob-Digest",
request
.enable_task_id_based_blob_digest
.to_string()
.parse()
.map_err(|err| {
Error::InvalidArgument(format!(
"invalid enable task id based blob digest: {}",
err
))
})?,
);

if let Some(priority) = request.priority {
headers.insert(
"X-Dragonfly-Priority",
Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.task
.id_generator
.task_id(
if download.enable_task_id_based_blob_digest && is_blob_url(&download.url) {
TaskIDParameter::BlobDigestBased(download.url.clone())
} else if let Some(content) = download.content_for_calculating_task_id.clone() {
if let Some(content) = download.content_for_calculating_task_id.clone() {
TaskIDParameter::Content(content)
} else if download.enable_task_id_based_blob_digest && is_blob_url(&download.url) {
TaskIDParameter::BlobDigestBased(download.url.clone())
} else {
let revision = download
.hugging_face
Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
.task
.id_generator
.task_id(
if download.enable_task_id_based_blob_digest && is_blob_url(&download.url) {
TaskIDParameter::BlobDigestBased(download.url.clone())
} else if let Some(content) = download.content_for_calculating_task_id.clone() {
if let Some(content) = download.content_for_calculating_task_id.clone() {
TaskIDParameter::Content(content)
} else if download.enable_task_id_based_blob_digest && is_blob_url(&download.url) {
TaskIDParameter::BlobDigestBased(download.url.clone())
} else {
let revision = download
.hugging_face
Expand Down
84 changes: 70 additions & 14 deletions dragonfly-client/src/proxy/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ pub const DRAGONFLY_PIECE_LENGTH_HEADER: &str = "X-Dragonfly-Piece-Length";
pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER: &str =
"X-Dragonfly-Content-For-Calculating-Task-ID";

/// DRAGONFLY_ENABLE_TASK_ID_BASED_BLOB_DIGEST is the header key to indicate whether to use the blob's content
/// digest (e.g., SHA-256 hash) for task ID calculation, when downloading from OCI registries. When enabled
/// for OCI blob URLs (e.g., /v2/<name>/blobs/sha256:<digest>), the task ID is derived from the blob digest
/// rather than the full URL. This enables deduplication across registries - the same blob from different
/// registries shares one task ID, eliminating redundant downloads and storage.
pub const DRAGONFLY_ENABLE_TASK_ID_BASED_BLOB_DIGEST: &str =
"X-Dragonfly-Enable-Task-ID-Based-Blob-Digest";

/// DRAGONFLY_TASK_DOWNLOAD_FINISHED_HEADER is the response header key to indicate whether the task download finished.
/// When the task download is finished, the response will include this header with the value `"true"`,
/// indicating that the download hit the local cache.
Expand Down Expand Up @@ -114,7 +122,7 @@ pub enum ErrorType {
Dfdaemon,
}

/// ErrorType implements as_str.
/// Error type implements as_str.
impl ErrorType {
pub fn as_str(&self) -> &'static str {
match self {
Expand All @@ -125,7 +133,7 @@ impl ErrorType {
}
}

/// ErrorType implements fmt::Display.
/// Error type implements fmt::Display.
impl fmt::Display for ErrorType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
Expand All @@ -136,7 +144,8 @@ impl fmt::Display for ErrorType {
impl FromStr for ErrorType {
type Err = String;

/// from_str parses a string into a ErrorType.
/// Parses a string into an ErrorType. The string must be one of "backend", "proxy", or
/// "dfdaemon".
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"backend" => Ok(ErrorType::Backend),
Expand All @@ -147,23 +156,23 @@ impl FromStr for ErrorType {
}
}

/// get_tag gets the tag from http header.
/// Get X-Dragonfly-Tag header value to determine the tag of the task.
pub fn get_tag(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_TAG_HEADER)
.and_then(|tag| tag.to_str().ok())
.map(|tag| tag.to_string())
}

/// get_application gets the application from http header.
/// Get X-Dragonfly-Application header value to determine the application of the task.
pub fn get_application(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_APPLICATION_HEADER)
.and_then(|application| application.to_str().ok())
.map(|application| application.to_string())
}

/// get_priority gets the priority from http header.
/// Get X-Dragonfly-Priority header value to determine the priority of the task.
pub fn get_priority(header: &HeaderMap) -> i32 {
let default_priority = Priority::Level6 as i32;
match header.get(DRAGONFLY_PRIORITY_HEADER) {
Expand All @@ -184,15 +193,17 @@ pub fn get_priority(header: &HeaderMap) -> i32 {
}
}

/// get_registry gets the custom address of container registry from http header.
/// Get X-Dragonfly-Registry header value to determine the custom address of container registry for
/// downloading.
pub fn get_registry(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_REGISTRY_HEADER)
.and_then(|registry| registry.to_str().ok())
.map(|registry| registry.to_string())
}

/// get_filters gets the filters from http header.
/// Get X-Dragonfly-Filtered-Query-Params header value to determine the filtered query params for
/// generating task ID.
pub fn get_filtered_query_params(
header: &HeaderMap,
default_filtered_query_params: Vec<String>,
Expand All @@ -209,7 +220,8 @@ pub fn get_filtered_query_params(
}
}

/// get_use_p2p gets the use p2p from http header.
/// Get X-Dragonfly-Use-P2P header value to determine whether to use P2P technology to distribute
/// the content.
pub fn get_use_p2p(header: &HeaderMap) -> bool {
match header.get(DRAGONFLY_USE_P2P_HEADER) {
Some(value) => match value.to_str() {
Expand All @@ -223,7 +235,8 @@ pub fn get_use_p2p(header: &HeaderMap) -> bool {
}
}

/// get_prefetch gets the prefetch from http header.
/// Get X-Dragonfly-Prefetch header value to determine whether to prefetch the entire file for
/// range request.
pub fn get_prefetch(header: &HeaderMap) -> Option<bool> {
match header.get(DRAGONFLY_PREFETCH_HEADER) {
Some(value) => match value.to_str() {
Expand All @@ -237,15 +250,17 @@ pub fn get_prefetch(header: &HeaderMap) -> Option<bool> {
}
}

/// get_output_path gets the output path from http header.
/// Get X-Dragonfly-Output-Path header value to determine the absolute output path for the
/// downloaded file.
pub fn get_output_path(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_OUTPUT_PATH_HEADER)
.and_then(|output_path| output_path.to_str().ok())
.map(|output_path| output_path.to_string())
}

/// get_force_hard_link gets the force hard link from http header.
/// Get X-Dragonfly-Force-Hard-Link header value to determine whether the download file must be
/// hard linked to the output path.
pub fn get_force_hard_link(header: &HeaderMap) -> bool {
match header.get(DRAGONFLY_FORCE_HARD_LINK_HEADER) {
Some(value) => match value.to_str() {
Expand All @@ -259,7 +274,8 @@ pub fn get_force_hard_link(header: &HeaderMap) -> bool {
}
}

/// get_piece_length gets the piece length from http header.
/// Get X-Dragonfly-Piece-Length header value to determine the piece length for downloading the
/// file.
pub fn get_piece_length(header: &HeaderMap) -> Option<ByteSize> {
match header.get(DRAGONFLY_PIECE_LENGTH_HEADER) {
Some(piece_length) => match piece_length.to_str() {
Expand All @@ -279,14 +295,33 @@ pub fn get_piece_length(header: &HeaderMap) -> Option<ByteSize> {
}
}

/// get_content_for_calculating_task_id gets the content for calculating task id from http header.
/// Get X-Dragonfly-Content-For-Calculating-Task-ID header value to determine the content for
/// calculating task ID.
pub fn get_content_for_calculating_task_id(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER)
.and_then(|content| content.to_str().ok())
.map(|content| content.to_string())
}

/// Get X-Dragonfly-Enable-Task-ID-Based-Blob-Digest header value to determine whether to use the
/// blob's content digest for task ID calculation.
pub fn get_enable_task_id_based_blob_digest(header: &HeaderMap, default: bool) -> bool {
match header.get(DRAGONFLY_ENABLE_TASK_ID_BASED_BLOB_DIGEST) {
Some(value) => match value.to_str() {
Ok(value) => value.eq_ignore_ascii_case("true"),
Err(err) => {
error!(
"get enable task id based blob digest from header failed: {}",
err
);
default
}
},
None => default,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -461,4 +496,25 @@ mod tests {
let empty_headers = HeaderMap::new();
assert_eq!(get_registry(&empty_headers), None);
}

#[test]
fn test_get_enable_task_id_based_blob_digest() {
let mut headers = HeaderMap::new();
headers.insert(
DRAGONFLY_ENABLE_TASK_ID_BASED_BLOB_DIGEST,
HeaderValue::from_static("true"),
);
assert!(get_enable_task_id_based_blob_digest(&headers, false));

let mut headers = HeaderMap::new();
headers.insert(
DRAGONFLY_ENABLE_TASK_ID_BASED_BLOB_DIGEST,
HeaderValue::from_static("false"),
);
assert!(!get_enable_task_id_based_blob_digest(&headers, true));

let empty_headers = HeaderMap::new();
assert!(get_enable_task_id_based_blob_digest(&empty_headers, true));
assert!(!get_enable_task_id_based_blob_digest(&empty_headers, false));
}
}
11 changes: 7 additions & 4 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,10 +1190,13 @@ fn make_download_task_request(
actual_piece_length: None,
actual_content_length: None,
actual_piece_count: None,
enable_task_id_based_blob_digest: config
.proxy
.registry_mirror
.enable_task_id_based_blob_digest,
enable_task_id_based_blob_digest: header::get_enable_task_id_based_blob_digest(
&header,
config
.proxy
.registry_mirror
.enable_task_id_based_blob_digest,
),
}),
})
}
Expand Down
Loading