Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ opendal = { version = "0.55.0", features = [
"services-cos",
"services-webhdfs",
] }
clap = { version = "4.6.0", features = ["derive"] }
clap = { version = "4.6.0", features = ["derive", "env"] }
anyhow = "1.0.102"
toml_edit = "0.23.7"
toml = "0.8.23"
Expand Down
109 changes: 16 additions & 93 deletions dragonfly-client-backend/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument};
use url::Url;

/// Default region for S3 if not specified.
const DEFAULT_REGION: &str = "us-east-1";

/// Scheme is the scheme of the object storage.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Scheme {
Expand Down Expand Up @@ -330,34 +333,21 @@ impl ObjectStorage {
object_storage: common::v2::ObjectStorage,
timeout: Duration,
) -> ClientResult<Operator> {
// S3 requires the access key id and the secret access key.
let (Some(access_key_id), Some(access_key_secret), Some(region)) = (
&object_storage.access_key_id,
&object_storage.access_key_secret,
&object_storage.region,
) else {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
region
})
),
status_code: None,
header: None,
})));
};

// Initialize the S3 operator with the object storage.
let mut builder = opendal::services::S3::default();
builder = builder
.access_key_id(access_key_id)
.secret_access_key(access_key_secret)
.bucket(&parsed_url.bucket)
.region(region);
builder = builder.bucket(&parsed_url.bucket);

// Configure the credentials using the access key id and access key secret if provided.
if let Some(access_key_id) = object_storage.access_key_id.as_deref() {
builder = builder.access_key_id(access_key_id);
}

if let Some(access_key_secret) = object_storage.access_key_secret.as_deref() {
builder = builder.secret_access_key(access_key_secret);
}

// Configure the region if it is provided. If not provided, use the default region.
builder = builder.region(object_storage.region.as_deref().unwrap_or(DEFAULT_REGION));

// Configure the endpoint if it is provided.
if let Some(endpoint) = object_storage.endpoint.as_deref() {
Expand Down Expand Up @@ -1215,73 +1205,6 @@ mod tests {
)
}

#[test]
fn should_return_error_when_s3_lacks_of_info() {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error: s3 need access_key_id, access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error: s3 need access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: s3 need access_key_id, region",
),
(
ObjectStorageInfo {
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error: s3 need region",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_secret",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
region: Some("test-region".into()),
..Default::default()
},
"backend error: s3 need access_key_id",
),
];

for (object_storage, error_message) in test_cases {
let url: Url = "s3://test-bucket/file".parse().unwrap();
let parsed_url: ParsedURL = url.try_into().unwrap();

let result = ObjectStorage::new(Scheme::S3, Arc::new(Config::default()))
.unwrap()
.operator(&parsed_url, Some(object_storage), Duration::from_secs(3));

assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_message);
}
}

#[test]
fn should_return_error_when_abs_lacks_of_info() {
let test_cases = vec![
Expand Down
16 changes: 15 additions & 1 deletion dragonfly-client/src/bin/dfcache/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,39 @@ pub struct ExportCommand {
#[arg(
long = "transfer-from-dfdaemon",
default_value_t = false,
env = "DFCACHE_EXPORT_TRANSFER_FROM_DFDAEMON",
help = "Specify whether to transfer the content of downloading file from dfdaemon's unix domain socket. If it is true, dfcache will call dfdaemon to download the file, and dfdaemon will return the content of downloading file to dfcache via unix domain socket, and dfcache will copy the content to the output path. If it is false, dfdaemon will download the file and hardlink or copy the file to the output path."
)]
transfer_from_dfdaemon: bool,

#[arg(
long = "overwrite",
default_value_t = false,
env = "DFCACHE_EXPORT_OVERWRITE",
help = "Specify whether to overwrite the output file if it already exists. If it is true, dfget will overwrite the output file. If it is false, dfget will return an error if the output file already exists. Cannot be used with `--force-hard-link=true`"
)]
overwrite: bool,

#[arg(
long = "force-hard-link",
default_value_t = false,
env = "DFCACHE_EXPORT_FORCE_HARD_LINK",
help = "Specify whether the download file must be hard linked to the output path. If hard link is failed, download will be failed. If it is false, dfdaemon will copy the file to the output path if hard link is failed."
)]
force_hard_link: bool,

#[arg(
long = "application",
default_value = "",
env = "DFCACHE_EXPORT_APPLICATION",
help = "Caller application which is used for statistics and access control"
)]
application: String,

#[arg(
long = "tag",
default_value = "",
env = "DFCACHE_EXPORT_TAG",
help = "Different tags for the same file will be divided into different persistent cache tasks"
)]
tag: String,
Expand All @@ -88,6 +93,7 @@ pub struct ExportCommand {
long = "timeout",
value_parser= humantime::parse_duration,
default_value = "2h",
env = "DFCACHE_EXPORT_TIMEOUT",
help = "Specify the timeout for exporting a file"
)]
timeout: Duration,
Expand All @@ -103,13 +109,15 @@ pub struct ExportCommand {
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
env = "DFCACHE_EXPORT_DFDAEMON_ENDPOINT",
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,

#[arg(
long,
default_value_t = false,
env = "DFCACHE_EXPORT_NO_PROGRESS",
help = "Specify whether to disable the progress bar display"
)]
no_progress: bool,
Expand All @@ -118,11 +126,17 @@ pub struct ExportCommand {
short = 'l',
long,
default_value = "info",
env = "DFCACHE_EXPORT_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,

#[arg(long, default_value_t = false, help = "Specify whether to print log")]
#[arg(
long,
default_value_t = false,
env = "DFCACHE_EXPORT_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,
}

Expand Down
17 changes: 16 additions & 1 deletion dragonfly-client/src/bin/dfcache/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,39 @@ pub struct ImportCommand {

#[arg(
long = "content-for-calculating-task-id",
env = "DFCACHE_IMPORT_CONTENT_FOR_CALCULATING_TASK_ID",
help = "Specify the content used to calculate the persistent cache task ID. If it is set, use its value to calculate the task ID. Otherwise, calculate the persistent cache task ID by computing SHA256 hash of file content. Note: SHA256 computation takes longer for large files."
)]
content_for_calculating_task_id: Option<String>,

#[arg(
long = "persistent-replica-count",
default_value_t = default_dfcache_persistent_replica_count(),
env = "DFCACHE_IMPORT_PERSISTENT_REPLICA_COUNT",
help = "Specify the replica count of the persistent cache task"
)]
persistent_replica_count: u64,

#[arg(
long = "piece-length",
required = false,
env = "DFCACHE_IMPORT_PIECE_LENGTH",
help = "Specify the piece length for downloading file. If the piece length is not specified, the piece length will be calculated according to the file size. Different piece lengths will be divided into different persistent cache tasks. The value needs to be set with human readable format and needs to be greater than or equal to 4mib, for example: 4mib, 1gib"
)]
piece_length: Option<ByteSize>,

#[arg(
long = "application",
required = false,
env = "DFCACHE_IMPORT_APPLICATION",
help = "Different applications for the same url will be divided into different persistent cache tasks"
)]
application: Option<String>,

#[arg(
long = "tag",
required = false,
env = "DFCACHE_IMPORT_TAG",
help = "Different tags for the same file will be divided into different persistent cache tasks"
)]
tag: Option<String>,
Expand All @@ -80,6 +85,7 @@ pub struct ImportCommand {
long = "ttl",
value_parser= humantime::parse_duration,
default_value = "1h",
env = "DFCACHE_IMPORT_TTL",
help = "Specify the ttl of the persistent cache task, maximum is 7d and minimum is 1m"
)]
ttl: Duration,
Expand All @@ -88,6 +94,7 @@ pub struct ImportCommand {
long = "timeout",
value_parser= humantime::parse_duration,
default_value = "30m",
env = "DFCACHE_IMPORT_TIMEOUT",
help = "Specify the timeout for importing a file"
)]
timeout: Duration,
Expand All @@ -96,13 +103,15 @@ pub struct ImportCommand {
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
env = "DFCACHE_IMPORT_DFDAEMON_ENDPOINT",
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,

#[arg(
long,
default_value_t = false,
env = "DFCACHE_IMPORT_NO_PROGRESS",
help = "Specify whether to disable the progress bar display"
)]
no_progress: bool,
Expand All @@ -111,11 +120,17 @@ pub struct ImportCommand {
short = 'l',
long,
default_value = "info",
env = "DFCACHE_IMPORT_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,

#[arg(long, default_value_t = false, help = "Specify whether to print log")]
#[arg(
long,
default_value_t = false,
env = "DFCACHE_IMPORT_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,
}

Expand Down
9 changes: 8 additions & 1 deletion dragonfly-client/src/bin/dfcache/stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct StatCommand {
short = 'e',
long = "endpoint",
default_value_os_t = dfdaemon::default_download_unix_socket_path(),
env = "DFCACHE_STAT_DFDAEMON_ENDPOINT",
help = "Endpoint of dfdaemon's GRPC server"
)]
endpoint: PathBuf,
Expand All @@ -51,11 +52,17 @@ pub struct StatCommand {
short = 'l',
long,
default_value = "info",
env = "DFCACHE_STAT_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,

#[arg(long, default_value_t = false, help = "Specify whether to print log")]
#[arg(
long,
default_value_t = false,
env = "DFCACHE_STAT_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,
}

Expand Down
11 changes: 10 additions & 1 deletion dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct Args {
short = 'c',
long = "config",
default_value_os_t = dfdaemon::default_dfdaemon_config_path(),
env = "DFDAEMON_CONFIG",
help = "Specify config file to use")
]
config: PathBuf,
Expand All @@ -79,25 +80,33 @@ struct Args {
short = 'l',
long,
default_value = "info",
env = "DFDAEMON_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,

#[arg(
long,
default_value_os_t = dfdaemon::default_dfdaemon_log_dir(),
env = "DFDAEMON_LOG_DIR",
help = "Specify the log directory"
)]
log_dir: PathBuf,

#[arg(
long,
default_value_t = 6,
env = "DFDAEMON_LOG_MAX_FILES",
help = "Specify the max number of log files"
)]
log_max_files: usize,

#[arg(long, default_value_t = true, help = "Specify whether to print log")]
#[arg(
long,
default_value_t = true,
env = "DFDAEMON_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,

#[arg(
Expand Down
Loading
Loading