Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def _create_client_application(
SystemAssignedManagedIdentity(),
http_client=Session(),
)

else:
authority = MsalAuth._resolve_authority(self._msal_configuration, tenant_id)

Expand Down Expand Up @@ -233,6 +234,23 @@ def _create_client_application(
"thumbprint": thumbprint,
"private_key": private_key,
}
elif self._msal_configuration.AUTH_TYPE == AuthTypes.federated_credentials:
assert self._msal_configuration.FEDERATED_CLIENT_ID
mi_client = ManagedIdentityClient(
SystemAssignedManagedIdentity(), # TODO
http_client=Session(),
)
mi_token = mi_client.acquire_token_for_client(
resource=self._msal_configuration.FEDERATED_CLIENT_ID
)
if "access_token" not in mi_token:
logger.error(
f"Failed to acquire token for federated credentials: {mi_token}"
)
raise ValueError(
authentication_errors.FailedToAcquireToken.format(str(mi_token))
)
self._client_credential_cache = mi_token["access_token"]
Comment on lines 237 to 253
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new auth type introduces new behavior in _create_client_application, but there are existing pytest suites for MsalAuth and tenant resolution. Please add tests that cover the AuthTypes.federated_credentials branch (including failure paths when token acquisition fails), so regressions are caught early.

Copilot uses AI. Check for mistakes.
else:
logger.error(
f"Unsupported authentication type: {self._msal_configuration.AUTH_TYPE}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ def __init__(
"ApplicationOptions.storage is required and was not configured.",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The `ApplicationOptions.storage` property is required and was not configured.
""")
"""
)

if options.long_running_messages and (
not options.adapter or not options.bot_app_id
Expand All @@ -131,10 +133,12 @@ def __init__(
"ApplicationOptions.long_running_messages requires an adapter and bot_app_id.",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The `ApplicationOptions.long_running_messages` property is unavailable because
no adapter or `bot_app_id` was configured.
""")
"""
)

if options.adapter:
self._adapter = options.adapter
Expand Down Expand Up @@ -176,10 +180,12 @@ def adapter(self) -> ChannelServiceAdapter:
"AgentApplication.adapter(): self._adapter is not configured.",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The AgentApplication.adapter property is unavailable because it was
not configured when creating the AgentApplication.
""")
"""
)

return self._adapter

Expand All @@ -197,10 +203,12 @@ def auth(self) -> Authorization:
"AgentApplication.auth(): self._auth is not configured.",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The `AgentApplication.auth` property is unavailable because
no Auth options were configured.
""")
"""
)

return self._auth

Expand Down Expand Up @@ -584,10 +592,12 @@ async def sign_in_success(context: TurnContext, state: TurnState, connection_id:
f"Failed to register sign-in success handler for route handler {func.__name__}",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The `AgentApplication.on_sign_in_success` method is unavailable because
no Auth options were configured.
""")
"""
)
return func

def on_sign_in_failure(
Expand Down Expand Up @@ -618,10 +628,12 @@ async def sign_in_failure(context: TurnContext, state: TurnState, connection_id:
f"Failed to register sign-in failure handler for route handler {func.__name__}",
stack_info=True,
)
raise ApplicationError("""
raise ApplicationError(
"""
The `AgentApplication.on_sign_in_failure` method is unavailable because
no Auth options were configured.
""")
"""
)
return func

def error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AgentAuthConfiguration:
CONNECTION_NAME: Optional[str]
SCOPES: Optional[list[str]]
AUTHORITY: Optional[str]
FEDERATED_CLIENT_ID: Optional[str]
ALT_BLUEPRINT_ID: Optional[str]
ANONYMOUS_ALLOWED: bool = False

Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
self.CONNECTION_NAME = connection_name or kwargs.get("CONNECTIONNAME", None)
self.SCOPES = scopes or kwargs.get("SCOPES", None)
self.ALT_BLUEPRINT_ID = kwargs.get("ALT_BLUEPRINT_NAME", None)
self.FEDERATED_CLIENT_ID = kwargs.get("FEDERATEDCLIENTID", None)
self.ANONYMOUS_ALLOWED = anonymous_allowed or kwargs.get(
"ANONYMOUS_ALLOWED", False
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ class AuthTypes(str, Enum):
client_secret = "ClientSecret"
user_managed_identity = "UserManagedIdentity"
system_managed_identity = "SystemManagedIdentity"
federated_credentials = "FederatedCredentials"
Loading