Skip to content

Commit d727503

Browse files
authored
object_score: Support Azure Fabric OAuth Provider (#6382)
* Update Azure dependencies and add support for Fabric token authentication * Refactor Azure credential provider to support Fabric token authentication * Refactor Azure credential provider to remove unnecessary print statements and improve token handling * Bump object_store version to 0.11.0 * Refactor Azure credential provider to remove unnecessary print statements and improve token handling
1 parent bc6009f commit d727503

File tree

2 files changed

+199
-3
lines changed

2 files changed

+199
-3
lines changed

object_store/src/azure/builder.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use crate::azure::client::{AzureClient, AzureConfig};
1919
use crate::azure::credential::{
20-
AzureAccessKey, AzureCliCredential, ClientSecretOAuthProvider, ImdsManagedIdentityProvider,
21-
WorkloadIdentityOAuthProvider,
20+
AzureAccessKey, AzureCliCredential, ClientSecretOAuthProvider, FabricTokenOAuthProvider,
21+
ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider,
2222
};
2323
use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE};
2424
use crate::client::TokenCredentialProvider;
@@ -172,6 +172,14 @@ pub struct MicrosoftAzureBuilder {
172172
use_fabric_endpoint: ConfigValue<bool>,
173173
/// When set to true, skips tagging objects
174174
disable_tagging: ConfigValue<bool>,
175+
/// Fabric token service url
176+
fabric_token_service_url: Option<String>,
177+
/// Fabric workload host
178+
fabric_workload_host: Option<String>,
179+
/// Fabric session token
180+
fabric_session_token: Option<String>,
181+
/// Fabric cluster identifier
182+
fabric_cluster_identifier: Option<String>,
175183
}
176184

177185
/// Configuration keys for [`MicrosoftAzureBuilder`]
@@ -336,6 +344,34 @@ pub enum AzureConfigKey {
336344
/// - `disable_tagging`
337345
DisableTagging,
338346

347+
/// Fabric token service url
348+
///
349+
/// Supported keys:
350+
/// - `azure_fabric_token_service_url`
351+
/// - `fabric_token_service_url`
352+
FabricTokenServiceUrl,
353+
354+
/// Fabric workload host
355+
///
356+
/// Supported keys:
357+
/// - `azure_fabric_workload_host`
358+
/// - `fabric_workload_host`
359+
FabricWorkloadHost,
360+
361+
/// Fabric session token
362+
///
363+
/// Supported keys:
364+
/// - `azure_fabric_session_token`
365+
/// - `fabric_session_token`
366+
FabricSessionToken,
367+
368+
/// Fabric cluster identifier
369+
///
370+
/// Supported keys:
371+
/// - `azure_fabric_cluster_identifier`
372+
/// - `fabric_cluster_identifier`
373+
FabricClusterIdentifier,
374+
339375
/// Client options
340376
Client(ClientConfigKey),
341377
}
@@ -361,6 +397,10 @@ impl AsRef<str> for AzureConfigKey {
361397
Self::SkipSignature => "azure_skip_signature",
362398
Self::ContainerName => "azure_container_name",
363399
Self::DisableTagging => "azure_disable_tagging",
400+
Self::FabricTokenServiceUrl => "azure_fabric_token_service_url",
401+
Self::FabricWorkloadHost => "azure_fabric_workload_host",
402+
Self::FabricSessionToken => "azure_fabric_session_token",
403+
Self::FabricClusterIdentifier => "azure_fabric_cluster_identifier",
364404
Self::Client(key) => key.as_ref(),
365405
}
366406
}
@@ -406,6 +446,14 @@ impl FromStr for AzureConfigKey {
406446
"azure_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
407447
"azure_container_name" | "container_name" => Ok(Self::ContainerName),
408448
"azure_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
449+
"azure_fabric_token_service_url" | "fabric_token_service_url" => {
450+
Ok(Self::FabricTokenServiceUrl)
451+
}
452+
"azure_fabric_workload_host" | "fabric_workload_host" => Ok(Self::FabricWorkloadHost),
453+
"azure_fabric_session_token" | "fabric_session_token" => Ok(Self::FabricSessionToken),
454+
"azure_fabric_cluster_identifier" | "fabric_cluster_identifier" => {
455+
Ok(Self::FabricClusterIdentifier)
456+
}
409457
// Backwards compatibility
410458
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
411459
_ => match s.strip_prefix("azure_").unwrap_or(s).parse() {
@@ -525,6 +573,14 @@ impl MicrosoftAzureBuilder {
525573
}
526574
AzureConfigKey::ContainerName => self.container_name = Some(value.into()),
527575
AzureConfigKey::DisableTagging => self.disable_tagging.parse(value),
576+
AzureConfigKey::FabricTokenServiceUrl => {
577+
self.fabric_token_service_url = Some(value.into())
578+
}
579+
AzureConfigKey::FabricWorkloadHost => self.fabric_workload_host = Some(value.into()),
580+
AzureConfigKey::FabricSessionToken => self.fabric_session_token = Some(value.into()),
581+
AzureConfigKey::FabricClusterIdentifier => {
582+
self.fabric_cluster_identifier = Some(value.into())
583+
}
528584
};
529585
self
530586
}
@@ -561,6 +617,10 @@ impl MicrosoftAzureBuilder {
561617
AzureConfigKey::Client(key) => self.client_options.get_config_value(key),
562618
AzureConfigKey::ContainerName => self.container_name.clone(),
563619
AzureConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
620+
AzureConfigKey::FabricTokenServiceUrl => self.fabric_token_service_url.clone(),
621+
AzureConfigKey::FabricWorkloadHost => self.fabric_workload_host.clone(),
622+
AzureConfigKey::FabricSessionToken => self.fabric_session_token.clone(),
623+
AzureConfigKey::FabricClusterIdentifier => self.fabric_cluster_identifier.clone(),
564624
}
565625
}
566626

@@ -856,6 +916,30 @@ impl MicrosoftAzureBuilder {
856916

857917
let credential = if let Some(credential) = self.credentials {
858918
credential
919+
} else if let (
920+
Some(fabric_token_service_url),
921+
Some(fabric_workload_host),
922+
Some(fabric_session_token),
923+
Some(fabric_cluster_identifier),
924+
) = (
925+
&self.fabric_token_service_url,
926+
&self.fabric_workload_host,
927+
&self.fabric_session_token,
928+
&self.fabric_cluster_identifier,
929+
) {
930+
// This case should precede the bearer token case because it is more specific and will utilize the bearer token.
931+
let fabric_credential = FabricTokenOAuthProvider::new(
932+
fabric_token_service_url,
933+
fabric_workload_host,
934+
fabric_session_token,
935+
fabric_cluster_identifier,
936+
self.bearer_token.clone(),
937+
);
938+
Arc::new(TokenCredentialProvider::new(
939+
fabric_credential,
940+
self.client_options.client()?,
941+
self.retry_config.clone(),
942+
)) as _
859943
} else if let Some(bearer_token) = self.bearer_token {
860944
static_creds(AzureCredential::BearerToken(bearer_token))
861945
} else if let Some(access_key) = self.access_key {

object_store/src/azure/credential.rs

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::client::{CredentialProvider, TokenProvider};
2222
use crate::util::hmac_sha256;
2323
use crate::RetryConfig;
2424
use async_trait::async_trait;
25-
use base64::prelude::BASE64_STANDARD;
25+
use base64::prelude::{BASE64_STANDARD, BASE64_URL_SAFE_NO_PAD};
2626
use base64::Engine;
2727
use chrono::{DateTime, SecondsFormat, Utc};
2828
use reqwest::header::{
@@ -51,10 +51,15 @@ pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-typ
5151
pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots");
5252
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
5353
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
54+
static PARTNER_TOKEN: HeaderName = HeaderName::from_static("x-ms-partner-token");
55+
static CLUSTER_IDENTIFIER: HeaderName = HeaderName::from_static("x-ms-cluster-identifier");
56+
static WORKLOAD_RESOURCE: HeaderName = HeaderName::from_static("x-ms-workload-resource-moniker");
57+
static PROXY_HOST: HeaderName = HeaderName::from_static("x-ms-proxy-host");
5458
pub(crate) const RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
5559
const CONTENT_TYPE_JSON: &str = "application/json";
5660
const MSI_SECRET_ENV_KEY: &str = "IDENTITY_HEADER";
5761
const MSI_API_VERSION: &str = "2019-08-01";
62+
const TOKEN_MIN_TTL: u64 = 300;
5863

5964
/// OIDC scope used when interacting with OAuth2 APIs
6065
///
@@ -934,6 +939,113 @@ impl AzureCliCredential {
934939
}
935940
}
936941

942+
/// Encapsulates the logic to perform an OAuth token challenge for Fabric
943+
#[derive(Debug)]
944+
pub struct FabricTokenOAuthProvider {
945+
fabric_token_service_url: String,
946+
fabric_workload_host: String,
947+
fabric_session_token: String,
948+
fabric_cluster_identifier: String,
949+
storage_access_token: Option<String>,
950+
token_expiry: Option<u64>,
951+
}
952+
953+
#[derive(Debug, Deserialize)]
954+
struct Claims {
955+
exp: u64,
956+
}
957+
958+
impl FabricTokenOAuthProvider {
959+
/// Create a new [`FabricTokenOAuthProvider`] for an azure backed store
960+
pub fn new(
961+
fabric_token_service_url: impl Into<String>,
962+
fabric_workload_host: impl Into<String>,
963+
fabric_session_token: impl Into<String>,
964+
fabric_cluster_identifier: impl Into<String>,
965+
storage_access_token: Option<String>,
966+
) -> Self {
967+
let (storage_access_token, token_expiry) = match storage_access_token {
968+
Some(token) => match Self::validate_and_get_expiry(&token) {
969+
Some(expiry) if expiry > Self::get_current_timestamp() + TOKEN_MIN_TTL => {
970+
(Some(token), Some(expiry))
971+
}
972+
_ => (None, None),
973+
},
974+
None => (None, None),
975+
};
976+
977+
Self {
978+
fabric_token_service_url: fabric_token_service_url.into(),
979+
fabric_workload_host: fabric_workload_host.into(),
980+
fabric_session_token: fabric_session_token.into(),
981+
fabric_cluster_identifier: fabric_cluster_identifier.into(),
982+
storage_access_token,
983+
token_expiry,
984+
}
985+
}
986+
987+
fn validate_and_get_expiry(token: &str) -> Option<u64> {
988+
let payload = token.split('.').nth(1)?;
989+
let decoded_bytes = BASE64_URL_SAFE_NO_PAD.decode(payload).ok()?;
990+
let decoded_str = str::from_utf8(&decoded_bytes).ok()?;
991+
let claims: Claims = serde_json::from_str(decoded_str).ok()?;
992+
Some(claims.exp)
993+
}
994+
995+
fn get_current_timestamp() -> u64 {
996+
SystemTime::now()
997+
.duration_since(SystemTime::UNIX_EPOCH)
998+
.map_or(0, |d| d.as_secs())
999+
}
1000+
}
1001+
1002+
#[async_trait::async_trait]
1003+
impl TokenProvider for FabricTokenOAuthProvider {
1004+
type Credential = AzureCredential;
1005+
1006+
/// Fetch a token
1007+
async fn fetch_token(
1008+
&self,
1009+
client: &Client,
1010+
retry: &RetryConfig,
1011+
) -> crate::Result<TemporaryToken<Arc<AzureCredential>>> {
1012+
if let Some(storage_access_token) = &self.storage_access_token {
1013+
if let Some(expiry) = self.token_expiry {
1014+
let exp_in = expiry - Self::get_current_timestamp();
1015+
if exp_in > TOKEN_MIN_TTL {
1016+
return Ok(TemporaryToken {
1017+
token: Arc::new(AzureCredential::BearerToken(storage_access_token.clone())),
1018+
expiry: Some(Instant::now() + Duration::from_secs(exp_in)),
1019+
});
1020+
}
1021+
}
1022+
}
1023+
1024+
let query_items = vec![("resource", AZURE_STORAGE_RESOURCE)];
1025+
let access_token: String = client
1026+
.request(Method::GET, &self.fabric_token_service_url)
1027+
.header(&PARTNER_TOKEN, self.fabric_session_token.as_str())
1028+
.header(&CLUSTER_IDENTIFIER, self.fabric_cluster_identifier.as_str())
1029+
.header(&WORKLOAD_RESOURCE, self.fabric_cluster_identifier.as_str())
1030+
.header(&PROXY_HOST, self.fabric_workload_host.as_str())
1031+
.query(&query_items)
1032+
.retryable(retry)
1033+
.idempotent(true)
1034+
.send()
1035+
.await
1036+
.context(TokenRequestSnafu)?
1037+
.text()
1038+
.await
1039+
.context(TokenResponseBodySnafu)?;
1040+
let exp_in = Self::validate_and_get_expiry(&access_token)
1041+
.map_or(3600, |expiry| expiry - Self::get_current_timestamp());
1042+
Ok(TemporaryToken {
1043+
token: Arc::new(AzureCredential::BearerToken(access_token)),
1044+
expiry: Some(Instant::now() + Duration::from_secs(exp_in)),
1045+
})
1046+
}
1047+
}
1048+
9371049
#[async_trait]
9381050
impl CredentialProvider for AzureCliCredential {
9391051
type Credential = AzureCredential;

0 commit comments

Comments
 (0)