Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,24 @@ def _get_credentials_using_key_secret_name(self) -> tuple[Credentials, str]:
self._log_debug("Getting connection using JSON key data from GCP secret: %s", self.key_secret_name)

# Use ADC to access GCP Secret Manager.
adc_credentials, adc_project_id = google.auth.default(scopes=self.scopes)
scopes = list(self.scopes) if self.scopes else None
adc_credentials, adc_project_id = google.auth.default(scopes=scopes)
secret_manager_client = _SecretManagerClient(credentials=adc_credentials)

if self.key_secret_name is None:
raise ValueError("The key_secret_name field is None, and we need it for keyfile_dict auth.")
if not secret_manager_client.is_valid_secret_name(self.key_secret_name):
raise AirflowException("Invalid secret name specified for fetching JSON key data.")

project_id = self.key_secret_project_id if self.key_secret_project_id else adc_project_id
if not project_id:
raise AirflowException(
"Project ID could not be determined from default credentials. "
"Please provide `key_secret_project_id` parameter."
)
secret_value = secret_manager_client.get_secret(
secret_id=self.key_secret_name,
project_id=self.key_secret_project_id if self.key_secret_project_id else adc_project_id,
project_id=project_id,
)
if secret_value is None:
raise AirflowException(f"Failed getting value of secret {self.key_secret_name}.")
Expand Down Expand Up @@ -395,14 +402,26 @@ def _get_credentials_using_credential_config_file_and_token_supplier(self):
oidc_issuer_url=self.idp_issuer_url, client_id=self.client_id, client_secret=self.client_secret
)

credentials, project_id = google.auth.load_credentials_from_dict(info=info, scopes=self.scopes)
scopes = list(self.scopes) if self.scopes else None
credentials, project_id = google.auth.load_credentials_from_dict(info=info, scopes=scopes)
if not project_id:
raise AirflowException(
"Project ID could not be determined from default credentials. "
"Please provide `key_secret_project_id` parameter."
)
return credentials, project_id

def _get_credentials_using_adc(self) -> tuple[Credentials, str]:
self._log_info(
"Getting connection using `google.auth.default()` since no explicit credentials are provided."
)
credentials, project_id = google.auth.default(scopes=self.scopes)
scopes = list(self.scopes) if self.scopes else None
credentials, project_id = google.auth.default(scopes=scopes)
if not project_id:
raise AirflowException(
"Project ID could not be determined from default credentials. "
"Please provide `key_secret_project_id` parameter."
)
return credentials, project_id

def _log_info(self, *args, **kwargs) -> None:
Expand Down