Skip to content

Conversation

BeryJu
Copy link
Member

@BeryJu BeryJu commented Sep 21, 2025

Details

REPLACE ME


Checklist

  • Local tests pass (ak test authentik/)
  • The code has been formatted (make lint-fix)

If an API change has been made

  • The API schema has been updated (make gen-build)

If changes to the frontend have been made

  • The code has been formatted (make web)

If applicable

  • The documentation has been updated
  • The documentation has been formatted (make docs)

… expire

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
@BeryJu BeryJu requested a review from a team as a code owner September 21, 2025 21:39
Copy link

netlify bot commented Sep 21, 2025

Deploy Preview for authentik-storybook canceled.

Name Link
🔨 Latest commit 4c0b631
🔍 Latest deploy log https://app.netlify.com/projects/authentik-storybook/deploys/68d070762c785c0008e9976e

Copy link

netlify bot commented Sep 21, 2025

Deploy Preview for authentik-integrations canceled.

Name Link
🔨 Latest commit 4c0b631
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/68d07076be19150008dde9a2

Copy link

netlify bot commented Sep 21, 2025

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit 4c0b631
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/68d07076db41010008469d79
😎 Deploy Preview https://deploy-preview-16905--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

Copy link

codecov bot commented Sep 21, 2025

❌ 5 Tests Failed:

Tests completed Failed Passed Skipped
2057 5 2052 2
View the top 3 failed test(s) by shortest run time
authentik.providers.oauth2.tests.test_token.TestToken::test_refresh_token_view_invalid_origin
Stack Traces | 1.17s run time
self = <unittest.case._Outcome object at 0x7f16dc6a4520>
test_case = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>
result = <TestCaseFunction test_refresh_token_view_invalid_origin>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>
method = <bound method TestToken.test_refresh_token_view_invalid_origin of <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view_invalid_origin>

    @apply_blueprint("system/providers-oauth2.yaml")
    def test_refresh_token_view_invalid_origin(self):
        """test request param"""
        provider = OAuth2Provider.objects.create(
            name=generate_id(),
            authorization_flow=create_test_flow(),
            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
            signing_key=self.keypair,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                managed__in=[
                    "goauthentik..../providers/oauth2/scope-openid",
                    "goauthentik..../providers/oauth2/scope-email",
                    "goauthentik..../providers/oauth2/scope-profile",
                    "goauthentik..../providers/oauth2/scope-offline_access",
                ]
            )
        )
        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
        user = create_test_admin_user()
        token: RefreshToken = RefreshToken.objects.create(
            provider=provider,
            user=user,
            token=generate_id(),
            _id_token=dumps({}),
            auth_time=timezone.now(),
            _scope="offline_access",
        )
>       response = self.client.post(
            reverse("authentik_providers_oauth2:token"),
            data={
                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
                "refresh_token": token.token,
                "redirect_uri": "http://local.invalid",
            },
            HTTP_AUTHORIZATION=f"Basic {header}",
            HTTP_ORIGIN="http://another.invalid",
        )

.../oauth2/tests/test_token.py:280: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d982c870>
path = '......................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://local.invalid', 'refresh_token': 'JMbTx7GVcmoNx5aXjKApG6R2YeRIfILGZvqmqSXn'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', follow = False
secure = False, headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic d2NYVEYydExzclpYUExXclBWT1lwY3pSbnFrd0paTG5yRHgxRDU1Vjp0eHhYV3NaamdUREVVcmRuc01oQWRjbFdz...emdrZHhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_ORIGIN': 'http://another.invalid'}

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        follow=False,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Request a response from the server using POST."""
        self.extra = extra
        self.headers = headers
>       response = super().post(
            path,
            data=data,
            content_type=content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:1158: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d982c870>
path = '......................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://local.invalid', 'refresh_token': 'JMbTx7GVcmoNx5aXjKApG6R2YeRIfILGZvqmqSXn'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic d2NYVEYydExzclpYUExXclBWT1lwY3pSbnFrd0paTG5yRHgxRDU1Vjp0eHhYV3NaamdUREVVcmRuc01oQWRjbFdz...emdrZHhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_ORIGIN': 'http://another.invalid'}
post_data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...aRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://local.invalid\r\n--BoUnDaRyStRiNg--\r\n'

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct a POST request."""
        data = self._encode_json({} if data is None else data, content_type)
        post_data = self._encode_data(data, content_type)
    
>       return self.generic(
            "POST",
            path,
            post_data,
            content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:503: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d982c870>, method = 'POST'
path = '......................................./application/o/token/'
data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...aRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://local.invalid\r\n--BoUnDaRyStRiNg--\r\n'
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic d2NYVEYydExzclpYUExXclBWT1lwY3pSbnFrd0paTG5yRHgxRDU1Vjp0eHhYV3NaamdUREVVcmRuc01oQWRjbFdz...emdrZHhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_ORIGIN': 'http://another.invalid'}
parsed = ParseResult(scheme='', netloc='', path='......................................./application/o/token/', params='', query='', fragment='')
r = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...HhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_ORIGIN': 'http://another.invalid', ...}
query_string = ''

    def generic(
        self,
        method,
        path,
        data="",
        content_type="application/octet-stream",
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct an arbitrary HTTP request."""
        parsed = urlparse(str(path))  # path can be lazy
        data = force_bytes(data, settings.DEFAULT_CHARSET)
        r = {
            "PATH_INFO": self._get_path(parsed),
            "REQUEST_METHOD": method,
            "SERVER_PORT": "443" if secure else "80",
            "wsgi.url_scheme": "https" if secure else "http",
        }
        if data:
            r.update(
                {
                    "CONTENT_LENGTH": str(len(data)),
                    "CONTENT_TYPE": content_type,
                    "wsgi.input": FakePayload(data),
                }
            )
        if headers:
            extra.update(HttpHeaders.to_wsgi_names(headers))
        if query_params:
            extra["QUERY_STRING"] = urlencode(query_params, doseq=True)
        r.update(extra)
        # If QUERY_STRING is absent or empty, we want to extract it from the URL.
        if not r.get("QUERY_STRING"):
            # WSGI requires latin-1 encoded strings. See get_path_info().
            query_string = parsed[4].encode().decode("iso-8859-1")
            r["QUERY_STRING"] = query_string
>       return self.request(**r)

.venv/lib/python3.13.../django/test/client.py:676: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d982c870>
request = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...HhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_ORIGIN': 'http://another.invalid', ...}
environ = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...SZkxZN1Rxc0l4MUdhemdrZHhzMGM4bFNOa294Zlp4QTNpVk40TmdqSHFnWHpzMnV3OUpHZXpUMUZCRlZLeUt6UUpvcg==', 'HTTP_COOKIE': '', ...}
data = {}
on_template_render = functools.partial(<function store_rendered_templates at 0x7f16e6fa4860>, {})
signal_uid = 'template-render-139736111797056'
exception_uid = 'request-exception-139736111797056'
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def request(self, **request):
        """
        Make a generic request. Compose the environment dictionary and pass
        to the handler, return the result of the handler. Assume defaults for
        the query environment, which can be overridden using the arguments to
        the request.
        """
        environ = self._base_environ(**request)
    
        # Curry a data dictionary into an instance of the template renderer
        # callback function.
        data = {}
        on_template_render = partial(store_rendered_templates, data)
        signal_uid = "template-render-%s" % id(request)
        signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid)
        # Capture exceptions created by the handler.
        exception_uid = "request-exception-%s" % id(request)
        got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid)
        try:
            response = self.handler(environ)
        finally:
            signals.template_rendered.disconnect(dispatch_uid=signal_uid)
            got_request_exception.disconnect(dispatch_uid=exception_uid)
        # Check for signaled exceptions.
>       self.check_exception(response)

.venv/lib/python3.13.../django/test/client.py:1092: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d982c870>
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def check_exception(self, response):
        """
        Look for a signaled exception, clear the current context exception
        data, re-raise the signaled exception, and clear the signaled exception
        from the local cache.
        """
        response.exc_info = self.exc_info
        if self.exc_info:
            _, exc_value, _ = self.exc_info
            self.exc_info = None
            if self.raise_request_exception:
>               raise exc_value

.venv/lib/python3.13.../django/test/client.py:805: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>

    @wraps(get_response)
    def inner(request):
        try:
>           response = get_response(request)

.venv/lib/python3.13.../core/handlers/exception.py:55: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.ClientHandler object at 0x7f16dab4def0>
request = <WSGIRequest: POST '......................................./application/o/token/'>

    def _get_response(self, request):
        """
        Resolve and call the view, then apply view, exception, and
        template_response middleware. This method is everything that happens
        inside the request/response middleware.
        """
        response = None
        callback, callback_args, callback_kwargs = self.resolve_request(request)
    
        # Apply view middleware
        for middleware_method in self._view_middleware:
            response = middleware_method(
                request, callback, callback_args, callback_kwargs
            )
            if response:
                break
    
        if response is None:
            wrapped_callback = self.make_view_atomic(callback)
            # If it is an asynchronous view, run it in a subthread.
            if iscoroutinefunction(wrapped_callback):
                wrapped_callback = async_to_sync(wrapped_callback)
            try:
>               response = wrapped_callback(request, *callback_args, **callback_kwargs)

.venv/lib/python3.13.../core/handlers/base.py:197: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
current_scope = <Scope id=0x7f16ec015c40 name=None type=ScopeType.CURRENT>
sentry_scope = <Scope id=0x7f16ebeb3760 name=None type=ScopeType.ISOLATION>

    @functools.wraps(callback)
    def sentry_wrapped_callback(request, *args, **kwargs):
        # type: (Any, *Any, **Any) -> Any
        current_scope = sentry_sdk.get_current_scope()
        if current_scope.transaction is not None:
            current_scope.transaction.update_active_thread()
    
        sentry_scope = sentry_sdk.get_isolation_scope()
        # set the active thread id to the handler thread for sync views
        # this isn't necessary for async views since that runs on main
        if sentry_scope.profile is not None:
            sentry_scope.profile.update_active_thread_id()
    
        with sentry_sdk.start_span(
            op=OP.VIEW_RENDER,
            name=request.resolver_match.view_name,
            origin=DjangoIntegration.origin,
        ):
>           return callback(request, *args, **kwargs)

.venv/lib/python3.13.../integrations/django/views.py:94: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>

    def view(request, *args, **kwargs):
        self = cls(**initkwargs)
        self.setup(request, *args, **kwargs)
        if not hasattr(self, "request"):
            raise AttributeError(
                "%s instance has no 'request' attribute. Did you override "
                "setup() and forget to call super()?" % cls.__name__
            )
>       return self.dispatch(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>
args = (<WSGIRequest: POST '......................................./application/o/token/'>,), kwargs = {}
bound_method = <function TokenView.dispatch at 0x7f16d9881d00>
dec = <function csrf_exempt at 0x7f16e6b99d00>

    def _wrapper(self, *args, **kwargs):
        # bound_method has the signature that 'decorator' expects i.e. no
        # 'self' argument, but it's a closure over self so it can call
        # 'func'. Also, wrap method.__get__() in a function because new
        # attributes can't be set on bound method objects, only on functions.
        bound_method = wraps(method)(partial(method.__get__(self, type(self))))
        for dec in decorators:
            bound_method = dec(bound_method)
>       return bound_method(*args, **kwargs)

.venv/lib/python3.13.../django/utils/decorators.py:48: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}

    def _view_wrapper(request, *args, **kwargs):
>       return view_func(request, *args, **kwargs)

.venv/lib/python3.13.../views/decorators/csrf.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>
request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}

    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
>       response = super().dispatch(request, *args, **kwargs)

.../oauth2/views/token.py:560: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>
request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
handler = <bound method TokenView.post of <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>>

    def dispatch(self, request, *args, **kwargs):
        # Try to dispatch to the right method; if a method doesn't exist,
        # defer to the error handler. Also defer to the error handler if the
        # request method isn't on the approved list.
        if request.method.lower() in self.http_method_names:
            handler = getattr(
                self, request.method.lower(), self.http_method_not_allowed
            )
        else:
            handler = self.http_method_not_allowed
>       return handler(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>
request = <WSGIRequest: POST '......................................./application/o/token/'>

    def post(self, request: HttpRequest) -> HttpResponse:
        """Generate tokens for clients"""
        try:
            with start_span(
                op="authentik.providers.oauth2.post.parse",
            ):
                client_id, client_secret = extract_client_auth(request)
                self.provider = OAuth2Provider.objects.filter(client_id=client_id).first()
                if not self.provider:
                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
                    raise TokenError("invalid_client")
                CTX_AUTH_VIA.set("oauth_client_secret")
                self.params = TokenParams.parse(request, self.provider, client_id, client_secret)
    
            with start_span(
                op="authentik.providers.oauth2.post.response",
            ):
                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
                    LOGGER.debug("Converting authorization code to access token")
                    return TokenResponse(self.create_code_response())
                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
                    LOGGER.debug("Refreshing refresh token")
>                   return TokenResponse(self.create_refresh_response())

.../oauth2/views/token.py:592: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0ce50>

    def create_refresh_response(self) -> dict[str, Any]:
        """See https://datatracker.ietf..../doc/html/rfc6749#section-6"""
        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
        if unauthorized_scopes:
            raise TokenError("invalid_scope")
        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
            raise TokenError("invalid_scope")
        now = timezone.now()
        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
        access_token = AccessToken(
            provider=self.provider,
            user=self.params.refresh_token.user,
            expires=access_token_expiry,
            # Keep same scopes as previous token
            scope=self.params.refresh_token.scope,
            auth_time=self.params.refresh_token.auth_time,
            session=self.params.refresh_token.session,
        )
        access_token.id_token = IDToken.new(
            self.provider,
            access_token,
            self.request,
        )
        access_token.save()
    
        res = {
            "access_token": access_token.token,
            "token_type": TOKEN_TYPE,
            "scope": " ".join(access_token.scope),
            "expires_in": int(
                timedelta_from_string(self.provider.access_token_validity).total_seconds()
            ),
            "id_token": access_token.id_token.to_jwt(self.provider),
        }
    
>       if (now() - self.params.refresh_token.expires) < timedelta(hours=1):
E       TypeError: 'datetime.datetime' object is not callable

.../oauth2/views/token.py:697: TypeError
authentik.providers.oauth2.tests.test_token.TestToken::test_refresh_token_revoke
Stack Traces | 1.19s run time
self = <unittest.case._Outcome object at 0x7f16d982b9a0>
test_case = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>
result = <TestCaseFunction test_refresh_token_revoke>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>
method = <bound method TestToken.test_refresh_token_revoke of <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_revoke>

    @apply_blueprint("system/providers-oauth2.yaml")
    def test_refresh_token_revoke(self):
        """test request param"""
        provider = OAuth2Provider.objects.create(
            name=generate_id(),
            authorization_flow=create_test_flow(),
            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
            signing_key=self.keypair,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                managed__in=[
                    "goauthentik..../providers/oauth2/scope-openid",
                    "goauthentik..../providers/oauth2/scope-email",
                    "goauthentik..../providers/oauth2/scope-profile",
                    "goauthentik..../providers/oauth2/scope-offline_access",
                ]
            )
        )
        # Needs to be assigned to an application for iss to be set
        self.app.provider = provider
        self.app.save()
        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
        user = create_test_admin_user()
        token: RefreshToken = RefreshToken.objects.create(
            provider=provider,
            user=user,
            token=generate_id(),
            _id_token=dumps({}),
            auth_time=timezone.now(),
            _scope="offline_access",
        )
        # Create initial refresh token
>       response = self.client.post(
            reverse("authentik_providers_oauth2:token"),
            data={
                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
                "refresh_token": token.token,
                "redirect_uri": "http://testserver",
            },
            HTTP_AUTHORIZATION=f"Basic {header}",
        )

.../oauth2/tests/test_token.py:343: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d9840050>
path = '............................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://testserver', 'refresh_token': 'CrBMKG2gTceHy6kIb42zi2R2fNrqye5xgz5UtLBJ'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', follow = False
secure = False, headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic Q202bk1Ec2szNEVHd2Q4U2g5RUFmUWxoVjM3SWN5cVRRNWdZYkY4UTp5eUFTMVpLdm5iWExiYjZPOWhkRXV1RXhr...wdTRBN3AxMzhIdlVabGtZSWphTEkza09rTDNnYUdDTjFiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg=='}

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        follow=False,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Request a response from the server using POST."""
        self.extra = extra
        self.headers = headers
>       response = super().post(
            path,
            data=data,
            content_type=content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:1158: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d9840050>
path = '............................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://testserver', 'refresh_token': 'CrBMKG2gTceHy6kIb42zi2R2fNrqye5xgz5UtLBJ'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic Q202bk1Ec2szNEVHd2Q4U2g5RUFmUWxoVjM3SWN5cVRRNWdZYkY4UTp5eUFTMVpLdm5iWExiYjZPOWhkRXV1RXhr...wdTRBN3AxMzhIdlVabGtZSWphTEkza09rTDNnYUdDTjFiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg=='}
post_data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...UnDaRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://testserver\r\n--BoUnDaRyStRiNg--\r\n'

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct a POST request."""
        data = self._encode_json({} if data is None else data, content_type)
        post_data = self._encode_data(data, content_type)
    
>       return self.generic(
            "POST",
            path,
            post_data,
            content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:503: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d9840050>, method = 'POST'
path = '............................................./application/o/token/'
data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...UnDaRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://testserver\r\n--BoUnDaRyStRiNg--\r\n'
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic Q202bk1Ec2szNEVHd2Q4U2g5RUFmUWxoVjM3SWN5cVRRNWdZYkY4UTp5eUFTMVpLdm5iWExiYjZPOWhkRXV1RXhr...wdTRBN3AxMzhIdlVabGtZSWphTEkza09rTDNnYUdDTjFiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg=='}
parsed = ParseResult(scheme='', netloc='', path='............................................./application/o/token/', params='', query='', fragment='')
r = {'CONTENT_LENGTH': '314', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...FiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg==', 'PATH_INFO': '............................................./application/o/token/', ...}
query_string = ''

    def generic(
        self,
        method,
        path,
        data="",
        content_type="application/octet-stream",
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct an arbitrary HTTP request."""
        parsed = urlparse(str(path))  # path can be lazy
        data = force_bytes(data, settings.DEFAULT_CHARSET)
        r = {
            "PATH_INFO": self._get_path(parsed),
            "REQUEST_METHOD": method,
            "SERVER_PORT": "443" if secure else "80",
            "wsgi.url_scheme": "https" if secure else "http",
        }
        if data:
            r.update(
                {
                    "CONTENT_LENGTH": str(len(data)),
                    "CONTENT_TYPE": content_type,
                    "wsgi.input": FakePayload(data),
                }
            )
        if headers:
            extra.update(HttpHeaders.to_wsgi_names(headers))
        if query_params:
            extra["QUERY_STRING"] = urlencode(query_params, doseq=True)
        r.update(extra)
        # If QUERY_STRING is absent or empty, we want to extract it from the URL.
        if not r.get("QUERY_STRING"):
            # WSGI requires latin-1 encoded strings. See get_path_info().
            query_string = parsed[4].encode().decode("iso-8859-1")
            r["QUERY_STRING"] = query_string
>       return self.request(**r)

.venv/lib/python3.13.../django/test/client.py:676: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d9840050>
request = {'CONTENT_LENGTH': '314', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...FiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg==', 'PATH_INFO': '............................................./application/o/token/', ...}
environ = {'CONTENT_LENGTH': '314', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...hTEkza09rTDNnYUdDTjFiS2lRdXFtQWlxMHdIMnduSzFzakVLT1Q4czNwVFhTR3d0N0RrN21YdFZaYmFJMmszcElxYg==', 'HTTP_COOKIE': '', ...}
data = {}
on_template_render = functools.partial(<function store_rendered_templates at 0x7f16e6fa4860>, {})
signal_uid = 'template-render-139736140086272'
exception_uid = 'request-exception-139736140086272'
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def request(self, **request):
        """
        Make a generic request. Compose the environment dictionary and pass
        to the handler, return the result of the handler. Assume defaults for
        the query environment, which can be overridden using the arguments to
        the request.
        """
        environ = self._base_environ(**request)
    
        # Curry a data dictionary into an instance of the template renderer
        # callback function.
        data = {}
        on_template_render = partial(store_rendered_templates, data)
        signal_uid = "template-render-%s" % id(request)
        signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid)
        # Capture exceptions created by the handler.
        exception_uid = "request-exception-%s" % id(request)
        got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid)
        try:
            response = self.handler(environ)
        finally:
            signals.template_rendered.disconnect(dispatch_uid=signal_uid)
            got_request_exception.disconnect(dispatch_uid=exception_uid)
        # Check for signaled exceptions.
>       self.check_exception(response)

.venv/lib/python3.13.../django/test/client.py:1092: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16d9840050>
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def check_exception(self, response):
        """
        Look for a signaled exception, clear the current context exception
        data, re-raise the signaled exception, and clear the signaled exception
        from the local cache.
        """
        response.exc_info = self.exc_info
        if self.exc_info:
            _, exc_value, _ = self.exc_info
            self.exc_info = None
            if self.raise_request_exception:
>               raise exc_value

.venv/lib/python3.13.../django/test/client.py:805: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '............................................./application/o/token/'>

    @wraps(get_response)
    def inner(request):
        try:
>           response = get_response(request)

.venv/lib/python3.13.../core/handlers/exception.py:55: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.ClientHandler object at 0x7f16dac2e760>
request = <WSGIRequest: POST '............................................./application/o/token/'>

    def _get_response(self, request):
        """
        Resolve and call the view, then apply view, exception, and
        template_response middleware. This method is everything that happens
        inside the request/response middleware.
        """
        response = None
        callback, callback_args, callback_kwargs = self.resolve_request(request)
    
        # Apply view middleware
        for middleware_method in self._view_middleware:
            response = middleware_method(
                request, callback, callback_args, callback_kwargs
            )
            if response:
                break
    
        if response is None:
            wrapped_callback = self.make_view_atomic(callback)
            # If it is an asynchronous view, run it in a subthread.
            if iscoroutinefunction(wrapped_callback):
                wrapped_callback = async_to_sync(wrapped_callback)
            try:
>               response = wrapped_callback(request, *callback_args, **callback_kwargs)

.venv/lib/python3.13.../core/handlers/base.py:197: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '............................................./application/o/token/'>, args = (), kwargs = {}
current_scope = <Scope id=0x7f16ec015c40 name=None type=ScopeType.CURRENT>
sentry_scope = <Scope id=0x7f16ebeb3760 name=None type=ScopeType.ISOLATION>

    @functools.wraps(callback)
    def sentry_wrapped_callback(request, *args, **kwargs):
        # type: (Any, *Any, **Any) -> Any
        current_scope = sentry_sdk.get_current_scope()
        if current_scope.transaction is not None:
            current_scope.transaction.update_active_thread()
    
        sentry_scope = sentry_sdk.get_isolation_scope()
        # set the active thread id to the handler thread for sync views
        # this isn't necessary for async views since that runs on main
        if sentry_scope.profile is not None:
            sentry_scope.profile.update_active_thread_id()
    
        with sentry_sdk.start_span(
            op=OP.VIEW_RENDER,
            name=request.resolver_match.view_name,
            origin=DjangoIntegration.origin,
        ):
>           return callback(request, *args, **kwargs)

.venv/lib/python3.13.../integrations/django/views.py:94: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '............................................./application/o/token/'>, args = (), kwargs = {}
self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>

    def view(request, *args, **kwargs):
        self = cls(**initkwargs)
        self.setup(request, *args, **kwargs)
        if not hasattr(self, "request"):
            raise AttributeError(
                "%s instance has no 'request' attribute. Did you override "
                "setup() and forget to call super()?" % cls.__name__
            )
>       return self.dispatch(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>
args = (<WSGIRequest: POST '............................................./application/o/token/'>,), kwargs = {}
bound_method = <function TokenView.dispatch at 0x7f16d98a6a20>
dec = <function csrf_exempt at 0x7f16e6b99d00>

    def _wrapper(self, *args, **kwargs):
        # bound_method has the signature that 'decorator' expects i.e. no
        # 'self' argument, but it's a closure over self so it can call
        # 'func'. Also, wrap method.__get__() in a function because new
        # attributes can't be set on bound method objects, only on functions.
        bound_method = wraps(method)(partial(method.__get__(self, type(self))))
        for dec in decorators:
            bound_method = dec(bound_method)
>       return bound_method(*args, **kwargs)

.venv/lib/python3.13.../django/utils/decorators.py:48: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '............................................./application/o/token/'>, args = (), kwargs = {}

    def _view_wrapper(request, *args, **kwargs):
>       return view_func(request, *args, **kwargs)

.venv/lib/python3.13.../views/decorators/csrf.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>
request = <WSGIRequest: POST '............................................./application/o/token/'>, args = (), kwargs = {}

    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
>       response = super().dispatch(request, *args, **kwargs)

.../oauth2/views/token.py:560: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>
request = <WSGIRequest: POST '............................................./application/o/token/'>, args = (), kwargs = {}
handler = <bound method TokenView.post of <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>>

    def dispatch(self, request, *args, **kwargs):
        # Try to dispatch to the right method; if a method doesn't exist,
        # defer to the error handler. Also defer to the error handler if the
        # request method isn't on the approved list.
        if request.method.lower() in self.http_method_names:
            handler = getattr(
                self, request.method.lower(), self.http_method_not_allowed
            )
        else:
            handler = self.http_method_not_allowed
>       return handler(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>
request = <WSGIRequest: POST '............................................./application/o/token/'>

    def post(self, request: HttpRequest) -> HttpResponse:
        """Generate tokens for clients"""
        try:
            with start_span(
                op="authentik.providers.oauth2.post.parse",
            ):
                client_id, client_secret = extract_client_auth(request)
                self.provider = OAuth2Provider.objects.filter(client_id=client_id).first()
                if not self.provider:
                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
                    raise TokenError("invalid_client")
                CTX_AUTH_VIA.set("oauth_client_secret")
                self.params = TokenParams.parse(request, self.provider, client_id, client_secret)
    
            with start_span(
                op="authentik.providers.oauth2.post.response",
            ):
                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
                    LOGGER.debug("Converting authorization code to access token")
                    return TokenResponse(self.create_code_response())
                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
                    LOGGER.debug("Refreshing refresh token")
>                   return TokenResponse(self.create_refresh_response())

.../oauth2/views/token.py:592: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16dad0c250>

    def create_refresh_response(self) -> dict[str, Any]:
        """See https://datatracker.ietf..../doc/html/rfc6749#section-6"""
        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
        if unauthorized_scopes:
            raise TokenError("invalid_scope")
        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
            raise TokenError("invalid_scope")
        now = timezone.now()
        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
        access_token = AccessToken(
            provider=self.provider,
            user=self.params.refresh_token.user,
            expires=access_token_expiry,
            # Keep same scopes as previous token
            scope=self.params.refresh_token.scope,
            auth_time=self.params.refresh_token.auth_time,
            session=self.params.refresh_token.session,
        )
        access_token.id_token = IDToken.new(
            self.provider,
            access_token,
            self.request,
        )
        access_token.save()
    
        res = {
            "access_token": access_token.token,
            "token_type": TOKEN_TYPE,
            "scope": " ".join(access_token.scope),
            "expires_in": int(
                timedelta_from_string(self.provider.access_token_validity).total_seconds()
            ),
            "id_token": access_token.id_token.to_jwt(self.provider),
        }
    
>       if (now() - self.params.refresh_token.expires) < timedelta(hours=1):
E       TypeError: 'datetime.datetime' object is not callable

.../oauth2/views/token.py:697: TypeError
authentik.providers.oauth2.tests.test_token.TestToken::test_refresh_token_view
Stack Traces | 1.21s run time
self = <unittest.case._Outcome object at 0x7f16dc664bb0>
test_case = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>
result = <TestCaseFunction test_refresh_token_view>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>
method = <bound method TestToken.test_refresh_token_view of <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.tests.test_token.TestToken testMethod=test_refresh_token_view>

    @apply_blueprint("system/providers-oauth2.yaml")
    def test_refresh_token_view(self):
        """test request param"""
        provider = OAuth2Provider.objects.create(
            name=generate_id(),
            authorization_flow=create_test_flow(),
            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
            signing_key=self.keypair,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                managed__in=[
                    "goauthentik..../providers/oauth2/scope-openid",
                    "goauthentik..../providers/oauth2/scope-email",
                    "goauthentik..../providers/oauth2/scope-profile",
                    "goauthentik..../providers/oauth2/scope-offline_access",
                ]
            )
        )
        # Needs to be assigned to an application for iss to be set
        self.app.provider = provider
        self.app.save()
        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
        user = create_test_admin_user()
        token: RefreshToken = RefreshToken.objects.create(
            provider=provider,
            user=user,
            token=generate_id(),
            _id_token=dumps({}),
            auth_time=timezone.now(),
            _scope="offline_access",
        )
>       response = self.client.post(
            reverse("authentik_providers_oauth2:token"),
            data={
                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
                "refresh_token": token.token,
                "redirect_uri": "http://local.invalid",
            },
            HTTP_AUTHORIZATION=f"Basic {header}",
            HTTP_ORIGIN="http://local.invalid",
        )

.../oauth2/tests/test_token.py:220: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16daf136b0>
path = '......................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://local.invalid', 'refresh_token': 'INKxRClDZXGXqsxy6qinPsf2RlCYeLkraAkHMtpQ'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', follow = False
secure = False, headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic R1FmNGZBSVFpU2VsR1hJZ3A3SWJTOXprSmZUbXRqS2R5UlgyTnVSZDpLSG1rQ1lTUDNCd09IMjBLRVMzYnRPaGVI...BUaGh6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_ORIGIN': 'http://local.invalid'}

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        follow=False,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Request a response from the server using POST."""
        self.extra = extra
        self.headers = headers
>       response = super().post(
            path,
            data=data,
            content_type=content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:1158: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16daf136b0>
path = '......................................./application/o/token/'
data = {'grant_type': 'refresh_token', 'redirect_uri': 'http://local.invalid', 'refresh_token': 'INKxRClDZXGXqsxy6qinPsf2RlCYeLkraAkHMtpQ'}
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic R1FmNGZBSVFpU2VsR1hJZ3A3SWJTOXprSmZUbXRqS2R5UlgyTnVSZDpLSG1rQ1lTUDNCd09IMjBLRVMzYnRPaGVI...BUaGh6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_ORIGIN': 'http://local.invalid'}
post_data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...aRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://local.invalid\r\n--BoUnDaRyStRiNg--\r\n'

    def post(
        self,
        path,
        data=None,
        content_type=MULTIPART_CONTENT,
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct a POST request."""
        data = self._encode_json({} if data is None else data, content_type)
        post_data = self._encode_data(data, content_type)
    
>       return self.generic(
            "POST",
            path,
            post_data,
            content_type,
            secure=secure,
            headers=headers,
            query_params=query_params,
            **extra,
        )

.venv/lib/python3.13.../django/test/client.py:503: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16daf136b0>, method = 'POST'
path = '......................................./application/o/token/'
data = b'--BoUnDaRyStRiNg\r\nContent-Disposition: form-data; name="grant_type"\r\n\r\nrefresh_token\r\n--BoUnDaRyStRiNg\r\nCo...aRyStRiNg\r\nContent-Disposition: form-data; name="redirect_uri"\r\n\r\nhttp://local.invalid\r\n--BoUnDaRyStRiNg--\r\n'
content_type = 'multipart/form-data; boundary=BoUnDaRyStRiNg', secure = False
headers = None, query_params = None
extra = {'HTTP_AUTHORIZATION': 'Basic R1FmNGZBSVFpU2VsR1hJZ3A3SWJTOXprSmZUbXRqS2R5UlgyTnVSZDpLSG1rQ1lTUDNCd09IMjBLRVMzYnRPaGVI...BUaGh6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_ORIGIN': 'http://local.invalid'}
parsed = ParseResult(scheme='', netloc='', path='......................................./application/o/token/', params='', query='', fragment='')
r = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_ORIGIN': 'http://local.invalid', ...}
query_string = ''

    def generic(
        self,
        method,
        path,
        data="",
        content_type="application/octet-stream",
        secure=False,
        *,
        headers=None,
        query_params=None,
        **extra,
    ):
        """Construct an arbitrary HTTP request."""
        parsed = urlparse(str(path))  # path can be lazy
        data = force_bytes(data, settings.DEFAULT_CHARSET)
        r = {
            "PATH_INFO": self._get_path(parsed),
            "REQUEST_METHOD": method,
            "SERVER_PORT": "443" if secure else "80",
            "wsgi.url_scheme": "https" if secure else "http",
        }
        if data:
            r.update(
                {
                    "CONTENT_LENGTH": str(len(data)),
                    "CONTENT_TYPE": content_type,
                    "wsgi.input": FakePayload(data),
                }
            )
        if headers:
            extra.update(HttpHeaders.to_wsgi_names(headers))
        if query_params:
            extra["QUERY_STRING"] = urlencode(query_params, doseq=True)
        r.update(extra)
        # If QUERY_STRING is absent or empty, we want to extract it from the URL.
        if not r.get("QUERY_STRING"):
            # WSGI requires latin-1 encoded strings. See get_path_info().
            query_string = parsed[4].encode().decode("iso-8859-1")
            r["QUERY_STRING"] = query_string
>       return self.request(**r)

.venv/lib/python3.13.../django/test/client.py:676: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16daf136b0>
request = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_ORIGIN': 'http://local.invalid', ...}
environ = {'CONTENT_LENGTH': '317', 'CONTENT_TYPE': 'multipart/form-data; boundary=BoUnDaRyStRiNg', 'HTTP_AUTHORIZATION': 'Basic...0SzZXQndsSk80OFBUaGh6a2laQkFrNkVBN3BiOHROM0g2dkt3QXUxUndmM0FEYjZHdGhyNzQxdThsZlVFSklNQUgyVg==', 'HTTP_COOKIE': '', ...}
data = {}
on_template_render = functools.partial(<function store_rendered_templates at 0x7f16e6fa4860>, {})
signal_uid = 'template-render-139736147502976'
exception_uid = 'request-exception-139736147502976'
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def request(self, **request):
        """
        Make a generic request. Compose the environment dictionary and pass
        to the handler, return the result of the handler. Assume defaults for
        the query environment, which can be overridden using the arguments to
        the request.
        """
        environ = self._base_environ(**request)
    
        # Curry a data dictionary into an instance of the template renderer
        # callback function.
        data = {}
        on_template_render = partial(store_rendered_templates, data)
        signal_uid = "template-render-%s" % id(request)
        signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid)
        # Capture exceptions created by the handler.
        exception_uid = "request-exception-%s" % id(request)
        got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid)
        try:
            response = self.handler(environ)
        finally:
            signals.template_rendered.disconnect(dispatch_uid=signal_uid)
            got_request_exception.disconnect(dispatch_uid=exception_uid)
        # Check for signaled exceptions.
>       self.check_exception(response)

.venv/lib/python3.13.../django/test/client.py:1092: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.Client object at 0x7f16daf136b0>
response = <HttpResponseNotAllowed [GET, HEAD, OPTIONS] status_code=405, "text/html; charset=utf-8">

    def check_exception(self, response):
        """
        Look for a signaled exception, clear the current context exception
        data, re-raise the signaled exception, and clear the signaled exception
        from the local cache.
        """
        response.exc_info = self.exc_info
        if self.exc_info:
            _, exc_value, _ = self.exc_info
            self.exc_info = None
            if self.raise_request_exception:
>               raise exc_value

.venv/lib/python3.13.../django/test/client.py:805: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>

    @wraps(get_response)
    def inner(request):
        try:
>           response = get_response(request)

.venv/lib/python3.13.../core/handlers/exception.py:55: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.test.client.ClientHandler object at 0x7f16daf5d040>
request = <WSGIRequest: POST '......................................./application/o/token/'>

    def _get_response(self, request):
        """
        Resolve and call the view, then apply view, exception, and
        template_response middleware. This method is everything that happens
        inside the request/response middleware.
        """
        response = None
        callback, callback_args, callback_kwargs = self.resolve_request(request)
    
        # Apply view middleware
        for middleware_method in self._view_middleware:
            response = middleware_method(
                request, callback, callback_args, callback_kwargs
            )
            if response:
                break
    
        if response is None:
            wrapped_callback = self.make_view_atomic(callback)
            # If it is an asynchronous view, run it in a subthread.
            if iscoroutinefunction(wrapped_callback):
                wrapped_callback = async_to_sync(wrapped_callback)
            try:
>               response = wrapped_callback(request, *callback_args, **callback_kwargs)

.venv/lib/python3.13.../core/handlers/base.py:197: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
current_scope = <Scope id=0x7f16ec015c40 name=None type=ScopeType.CURRENT>
sentry_scope = <Scope id=0x7f16ebeb3760 name=None type=ScopeType.ISOLATION>

    @functools.wraps(callback)
    def sentry_wrapped_callback(request, *args, **kwargs):
        # type: (Any, *Any, **Any) -> Any
        current_scope = sentry_sdk.get_current_scope()
        if current_scope.transaction is not None:
            current_scope.transaction.update_active_thread()
    
        sentry_scope = sentry_sdk.get_isolation_scope()
        # set the active thread id to the handler thread for sync views
        # this isn't necessary for async views since that runs on main
        if sentry_scope.profile is not None:
            sentry_scope.profile.update_active_thread_id()
    
        with sentry_sdk.start_span(
            op=OP.VIEW_RENDER,
            name=request.resolver_match.view_name,
            origin=DjangoIntegration.origin,
        ):
>           return callback(request, *args, **kwargs)

.venv/lib/python3.13.../integrations/django/views.py:94: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>

    def view(request, *args, **kwargs):
        self = cls(**initkwargs)
        self.setup(request, *args, **kwargs)
        if not hasattr(self, "request"):
            raise AttributeError(
                "%s instance has no 'request' attribute. Did you override "
                "setup() and forget to call super()?" % cls.__name__
            )
>       return self.dispatch(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>
args = (<WSGIRequest: POST '......................................./application/o/token/'>,), kwargs = {}
bound_method = <function TokenView.dispatch at 0x7f16d99c8180>
dec = <function csrf_exempt at 0x7f16e6b99d00>

    def _wrapper(self, *args, **kwargs):
        # bound_method has the signature that 'decorator' expects i.e. no
        # 'self' argument, but it's a closure over self so it can call
        # 'func'. Also, wrap method.__get__() in a function because new
        # attributes can't be set on bound method objects, only on functions.
        bound_method = wraps(method)(partial(method.__get__(self, type(self))))
        for dec in decorators:
            bound_method = dec(bound_method)
>       return bound_method(*args, **kwargs)

.venv/lib/python3.13.../django/utils/decorators.py:48: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}

    def _view_wrapper(request, *args, **kwargs):
>       return view_func(request, *args, **kwargs)

.venv/lib/python3.13.../views/decorators/csrf.py:65: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>
request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}

    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
>       response = super().dispatch(request, *args, **kwargs)

.../oauth2/views/token.py:560: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>
request = <WSGIRequest: POST '......................................./application/o/token/'>, args = (), kwargs = {}
handler = <bound method TokenView.post of <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>>

    def dispatch(self, request, *args, **kwargs):
        # Try to dispatch to the right method; if a method doesn't exist,
        # defer to the error handler. Also defer to the error handler if the
        # request method isn't on the approved list.
        if request.method.lower() in self.http_method_names:
            handler = getattr(
                self, request.method.lower(), self.http_method_not_allowed
            )
        else:
            handler = self.http_method_not_allowed
>       return handler(request, *args, **kwargs)

.venv/lib/python3.13.../views/generic/base.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>
request = <WSGIRequest: POST '......................................./application/o/token/'>

    def post(self, request: HttpRequest) -> HttpResponse:
        """Generate tokens for clients"""
        try:
            with start_span(
                op="authentik.providers.oauth2.post.parse",
            ):
                client_id, client_secret = extract_client_auth(request)
                self.provider = OAuth2Provider.objects.filter(client_id=client_id).first()
                if not self.provider:
                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
                    raise TokenError("invalid_client")
                CTX_AUTH_VIA.set("oauth_client_secret")
                self.params = TokenParams.parse(request, self.provider, client_id, client_secret)
    
            with start_span(
                op="authentik.providers.oauth2.post.response",
            ):
                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
                    LOGGER.debug("Converting authorization code to access token")
                    return TokenResponse(self.create_code_response())
                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
                    LOGGER.debug("Refreshing refresh token")
>                   return TokenResponse(self.create_refresh_response())

.../oauth2/views/token.py:592: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.providers.oauth2.views.token.TokenView object at 0x7f16db1417b0>

    def create_refresh_response(self) -> dict[str, Any]:
        """See https://datatracker.ietf..../doc/html/rfc6749#section-6"""
        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
        if unauthorized_scopes:
            raise TokenError("invalid_scope")
        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
            raise TokenError("invalid_scope")
        now = timezone.now()
        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
        access_token = AccessToken(
            provider=self.provider,
            user=self.params.refresh_token.user,
            expires=access_token_expiry,
            # Keep same scopes as previous token
            scope=self.params.refresh_token.scope,
            auth_time=self.params.refresh_token.auth_time,
            session=self.params.refresh_token.session,
        )
        access_token.id_token = IDToken.new(
            self.provider,
            access_token,
            self.request,
        )
        access_token.save()
    
        res = {
            "access_token": access_token.token,
            "token_type": TOKEN_TYPE,
            "scope": " ".join(access_token.scope),
            "expires_in": int(
                timedelta_from_string(self.provider.access_token_validity).total_seconds()
            ),
            "id_token": access_token.id_token.to_jwt(self.provider),
        }
    
>       if (now() - self.params.refresh_token.expires) < timedelta(hours=1):
E       TypeError: 'datetime.datetime' object is not callable

.../oauth2/views/token.py:697: TypeError
tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC::test_authorization_consent_implied
Stack Traces | 234s run time
self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
        consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)
    
        self.assertIn(
            app.name,
            consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text,
        )
        consent_stage.find_element(
            By.CSS_SELECTOR,
            "[type=submit]",
        ).click()
    
>       self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))

tests/e2e/test_provider_oidc.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.support.wait.WebDriverWait (session="ad4131bb8ef3a119cc6889c7a5915997")>
method = <function presence_of_element_located.<locals>._predicate at 0x7f88a9aac9a0>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
                value = method(self._driver)
                if value:
                    return value
            except self._ignored_exceptions as exc:
                screen = getattr(exc, "screen", None)
                stacktrace = getattr(exc, "stacktrace", None)
            if time.monotonic() > end_time:
                break
            time.sleep(self._poll)
>       raise TimeoutException(message, screen, stacktrace)
E       selenium.common.exceptions.TimeoutException: Message: 
E       Stacktrace:
E       #0 0x56194066994a <unknown>
E       #1 0x5619400de8a0 <unknown>
E       #2 0x561940130540 <unknown>
E       #3 0x561940130731 <unknown>
E       #4 0x56194017e824 <unknown>
E       #5 0x56194015605d <unknown>
E       #6 0x56194017bc23 <unknown>
E       #7 0x561940155e03 <unknown>
E       #8 0x561940122968 <unknown>
E       #9 0x5619401235e1 <unknown>
E       #10 0x56194062d548 <unknown>
E       #11 0x561940631272 <unknown>
E       #12 0x561940614313 <unknown>
E       #13 0x561940631dc5 <unknown>
E       #14 0x5619405f949f <unknown>
E       #15 0x561940656158 <unknown>
E       #16 0x561940656332 <unknown>
E       #17 0x561940668a53 <unknown>
E       #18 0x7f07ff641aa4 <unknown>
E       #19 0x7f07ff6cea34 __clone

.venv/lib/python3.13.../webdriver/support/wait.py:146: TimeoutException

During handling of the above exception, another exception occurred:

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
        consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)
    
        self.assertIn(
            app.name,
            consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text,
        )
        consent_stage.find_element(
            By.CSS_SELECTOR,
            "[type=submit]",
        ).click()
    
>       self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))

tests/e2e/test_provider_oidc.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.support.wait.WebDriverWait (session="a8e4dd4efca5918de932af1f97c20691")>
method = <function presence_of_element_located.<locals>._predicate at 0x7f88a5621d00>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
                value = method(self._driver)
                if value:
                    return value
            except self._ignored_exceptions as exc:
                screen = getattr(exc, "screen", None)
                stacktrace = getattr(exc, "stacktrace", None)
            if time.monotonic() > end_time:
                break
            time.sleep(self._poll)
>       raise TimeoutException(message, screen, stacktrace)
E       selenium.common.exceptions.TimeoutException: Message: 
E       Stacktrace:
E       #0 0x55840092694a <unknown>
E       #1 0x55840039b8a0 <unknown>
E       #2 0x5584003ed540 <unknown>
E       #3 0x5584003ed731 <unknown>
E       #4 0x55840043b824 <unknown>
E       #5 0x55840041305d <unknown>
E       #6 0x558400438c23 <unknown>
E       #7 0x558400412e03 <unknown>
E       #8 0x5584003df968 <unknown>
E       #9 0x5584003e05e1 <unknown>
E       #10 0x5584008ea548 <unknown>
E       #11 0x5584008ee272 <unknown>
E       #12 0x5584008d1313 <unknown>
E       #13 0x5584008eedc5 <unknown>
E       #14 0x5584008b649f <unknown>
E       #15 0x558400913158 <unknown>
E       #16 0x558400913332 <unknown>
E       #17 0x558400925a53 <unknown>
E       #18 0x7fa2df542aa4 <unknown>
E       #19 0x7fa2df5cfa34 __clone

.venv/lib/python3.13.../webdriver/support/wait.py:146: TimeoutException

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f88aa136fd0>
test_case = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
result = <TestCaseFunction test_authorization_consent_implied>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
method = <bound method TestProviderOAuth2OIDC.test_authorization_consent_implied of <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:332: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-implicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (implicit consent)\nentries:\n- attrs:\n    desi...henticated\n  identifiers:\n    slug: default-provider-authorization-implicit-consent\n  model: authentik_flows.flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_implied>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_implied(self):
        """test OpenID Provider flow (default authorization flow with implied consent)
        (due to offline_access a consent will still be triggered)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-implicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
            authorization_flow=authorization_flow,
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
        consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)
    
        self.assertIn(
            app.name,
            consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text,
        )
        consent_stage.find_element(
            By.CSS_SELECTOR,
            "[type=submit]",
        ).click()
    
>       self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))

tests/e2e/test_provider_oidc.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.support.wait.WebDriverWait (session="4587e84bb3613f5c4d5259bb628c90bf")>
method = <function presence_of_element_located.<locals>._predicate at 0x7f88aa622de0>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
                value = method(self._driver)
                if value:
                    return value
            except self._ignored_exceptions as exc:
                screen = getattr(exc, "screen", None)
                stacktrace = getattr(exc, "stacktrace", None)
            if time.monotonic() > end_time:
                break
            time.sleep(self._poll)
>       raise TimeoutException(message, screen, stacktrace)
E       selenium.common.exceptions.TimeoutException: Message: 
E       Stacktrace:
E       #0 0x55f4df4fb94a <unknown>
E       #1 0x55f4def708a0 <unknown>
E       #2 0x55f4defc2540 <unknown>
E       #3 0x55f4defc2731 <unknown>
E       #4 0x55f4df010824 <unknown>
E       #5 0x55f4defe805d <unknown>
E       #6 0x55f4df00dc23 <unknown>
E       #7 0x55f4defe7e03 <unknown>
E       #8 0x55f4defb4968 <unknown>
E       #9 0x55f4defb55e1 <unknown>
E       #10 0x55f4df4bf548 <unknown>
E       #11 0x55f4df4c3272 <unknown>
E       #12 0x55f4df4a6313 <unknown>
E       #13 0x55f4df4c3dc5 <unknown>
E       #14 0x55f4df48b49f <unknown>
E       #15 0x55f4df4e8158 <unknown>
E       #16 0x55f4df4e8332 <unknown>
E       #17 0x55f4df4faa53 <unknown>
E       #18 0x7fb8c278aaa4 <unknown>
E       #19 0x7fb8c2817a34 __clone

.venv/lib/python3.13.../webdriver/support/wait.py:146: TimeoutException
tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC::test_authorization_consent_explicit
Stack Traces | 331s run time
self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
        consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)
    
        self.assertIn(
            app.name,
            consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text,
        )
        consent_stage.find_element(
            By.CSS_SELECTOR,
            "[type=submit]",
        ).click()
    
>       self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))

tests/e2e/test_provider_oidc.py:237: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.support.wait.WebDriverWait (session="5f486b49b327a8d83848a3390ef73b13")>
method = <function presence_of_element_located.<locals>._predicate at 0x7f88b1be0680>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
                value = method(self._driver)
                if value:
                    return value
            except self._ignored_exceptions as exc:
                screen = getattr(exc, "screen", None)
                stacktrace = getattr(exc, "stacktrace", None)
            if time.monotonic() > end_time:
                break
            time.sleep(self._poll)
>       raise TimeoutException(message, screen, stacktrace)
E       selenium.common.exceptions.TimeoutException: Message: 
E       Stacktrace:
E       #0 0x565293dd594a <unknown>
E       #1 0x56529384a8a0 <unknown>
E       #2 0x56529389c540 <unknown>
E       #3 0x56529389c731 <unknown>
E       #4 0x5652938ea824 <unknown>
E       #5 0x5652938c205d <unknown>
E       #6 0x5652938e7c23 <unknown>
E       #7 0x5652938c1e03 <unknown>
E       #8 0x56529388e968 <unknown>
E       #9 0x56529388f5e1 <unknown>
E       #10 0x565293d99548 <unknown>
E       #11 0x565293d9d272 <unknown>
E       #12 0x565293d80313 <unknown>
E       #13 0x565293d9ddc5 <unknown>
E       #14 0x565293d6549f <unknown>
E       #15 0x565293dc2158 <unknown>
E       #16 0x565293dc2332 <unknown>
E       #17 0x565293dd4a53 <unknown>
E       #18 0x7fcaedc1faa4 <unknown>
E       #19 0x7fcaedcaca34 __clone

.venv/lib/python3.13.../webdriver/support/wait.py:146: TimeoutException

During handling of the above exception, another exception occurred:

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
        consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)
    
        self.assertIn(
            app.name,
            consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text,
        )
        consent_stage.find_element(
            By.CSS_SELECTOR,
            "[type=submit]",
        ).click()
    
>       self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))

tests/e2e/test_provider_oidc.py:237: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.support.wait.WebDriverWait (session="1c5c76960bcd4c6a33f92d1644a1b1d9")>
method = <function presence_of_element_located.<locals>._predicate at 0x7f88a724bb00>
message = ''

    def until(self, method: Callable[[D], Union[Literal[False], T]], message: str = "") -> T:
        """Wait until the method returns a value that is not False.
    
        Calls the method provided with the driver as an argument until the
        return value does not evaluate to ``False``.
    
        Parameters:
        -----------
        method: callable(WebDriver)
            - A callable object that takes a WebDriver instance as an argument.
    
        message: str
            - Optional message for :exc:`TimeoutException`
    
        Return:
        -------
        object: T
            - The result of the last call to `method`
    
        Raises:
        -------
        TimeoutException
            - If 'method' does not return a truthy value within the WebDriverWait
            object's timeout
    
        Example:
        --------
        >>> from selenium.webdriver.common.by import By
        >>> from selenium.webdriver.support.ui import WebDriverWait
        >>> from selenium.webdriver.support import expected_conditions as EC
    
        # Wait until an element is visible on the page
        >>> wait = WebDriverWait(driver, 10)
        >>> element = wait.until(EC.visibility_of_element_located((By.ID, "exampleId")))
        >>> print(element.text)
        """
        screen = None
        stacktrace = None
    
        end_time = time.monotonic() + self._timeout
        while True:
            try:
                value = method(self._driver)
                if value:
                    return value
            except self._ignored_exceptions as exc:
                screen = getattr(exc, "screen", None)
                stacktrace = getattr(exc, "stacktrace", None)
            if time.monotonic() > end_time:
                break
            time.sleep(self._poll)
>       raise TimeoutException(message, screen, stacktrace)
E       selenium.common.exceptions.TimeoutException: Message: 
E       Stacktrace:
E       #0 0x55555675a94a <unknown>
E       #1 0x5555561cf8a0 <unknown>
E       #2 0x555556221540 <unknown>
E       #3 0x555556221731 <unknown>
E       #4 0x55555626f824 <unknown>
E       #5 0x55555624705d <unknown>
E       #6 0x55555626cc23 <unknown>
E       #7 0x555556246e03 <unknown>
E       #8 0x555556213968 <unknown>
E       #9 0x5555562145e1 <unknown>
E       #10 0x55555671e548 <unknown>
E       #11 0x555556722272 <unknown>
E       #12 0x555556705313 <unknown>
E       #13 0x555556722dc5 <unknown>
E       #14 0x5555566ea49f <unknown>
E       #15 0x555556747158 <unknown>
E       #16 0x555556747332 <unknown>
E       #17 0x555556759a53 <unknown>
E       #18 0x7f85013d0aa4 <unknown>
E       #19 0x7f850145da34 __clone

.venv/lib/python3.13.../webdriver/support/wait.py:146: TimeoutException

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f88aaabeea0>
test_case = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
result = <TestCaseFunction test_authorization_consent_explicit>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
method = <bound method TestProviderOAuth2OIDC.test_authorization_consent_explicit of <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.7........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:332: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}
file = 'default/flow-default-provider-authorization-explicit-consent.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider authorization flow (explicit consent)\nentries:\n- attrs:\n    desi...e: !KeyOf default-provider-authorization-consent\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, file = 'system/providers-oauth2.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - OAuth2 Provider - Sc... application the ability to access the authentik API\n        # on behalf of the authorizing user\n        return {}\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml")
    @apply_blueprint("system/providers-oauth2.yaml")
    @reconcile_app("authentik_crypto")
    def test_authorization_consent_explicit(self):
        """test OpenID Provider flow (default authorization flow with explicit consent)"""
        sleep(1)
        # Bootstrap all needed objects
        authorization_flow = Flow.objects.get(
            slug="default-provider-authorization-explicit-consent"
        )
        provider = OAuth2Provider.objects.create(
            name=self.application_slug,
            authorization_flow=authorization_flow,
            client_type=ClientTypes.CONFIDENTIAL,
            client_id=self.client_id,
            client_secret=self.client_secret,
            signing_key=create_test_cert(),
            redirect_uris=[
                RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback")
            ],
        )
        provider.property_mappings.set(
            ScopeMapping.objects.filter(
                scope_name__in=[
                    SCOPE_OPENID,
                    SCOPE_OPENID_EMAIL,
                    SCOPE_OPENID_PROFILE,
                    SCOPE_OFFLINE_ACCESS,
                ]
            )
        )
        app = Application.objects.create(
            name=self.application_slug,
            slug=self.application_slug,
            provider=provider,
        )
        self.setup_client()
    
        self.driver.get("http://localhost:9009")
        self.login()
    
        self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")))
    
        flow_executor = self.get_shadow_root("ak-flow-executor")
>       consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor)

tests/e2e/test_provider_oidc.py:226: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_oidc.TestProviderOAuth2OIDC testMethod=test_authorization_consent_explicit>
selector = 'ak-stage-consent'
container = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="bd41e611b00fbdea9c540ea47145adcd", element="f.197037B91AA38E639635C9D216D70522.d.21EF49956096CAC0BD1F70990433864A.e.19")>

    def get_shadow_root(
        self, selector: str, container: WebElement | WebDriver | None = None
    ) -> WebElement:
        """Get shadow root element's inner shadowRoot"""
        if not container:
            container = self.driver
>       shadow_root = container.find_element(By.CSS_SELECTOR, selector)

tests/e2e/utils.py:236: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="bd41e611b00fbdea9c540ea47145adcd", element="f.197037B91AA38E639635C9D216D70522.d.21EF49956096CAC0BD1F70990433864A.e.19")>
by = 'css selector', value = 'ak-stage-consent'

    def find_element(self, by: str = By.ID, value: str = None):
        """Find an element inside a shadow root given a By strategy and
        locator.
    
        Parameters:
        -----------
        by : selenium.webdriver.common.by.By
            The locating strategy to use. Default is `By.ID`. Supported values include:
            - By.ID: Locate by element ID.
            - By.NAME: Locate by the `name` attribute.
            - By.XPATH: Locate by an XPath expression.
            - By.CSS_SELECTOR: Locate by a CSS selector.
            - By.CLASS_NAME: Locate by the `class` attribute.
            - By.TAG_NAME: Locate by the tag name (e.g., "input", "button").
            - By.LINK_TEXT: Locate a link element by its exact text.
            - By.PARTIAL_LINK_TEXT: Locate a link element by partial text match.
            - RelativeBy: Locate elements relative to a specified root element.
    
        Example:
        --------
        element = driver.find_element(By.ID, 'foo')
    
        Returns:
        -------
        WebElement
            The first matching `WebElement` found on the page.
        """
        if by == By.ID:
            by = By.CSS_SELECTOR
            value = f'[id="{value}"]'
        elif by == By.CLASS_NAME:
            by = By.CSS_SELECTOR
            value = f".{value}"
        elif by == By.NAME:
            by = By.CSS_SELECTOR
            value = f'[name="{value}"]'
    
>       return self._execute(Command.FIND_ELEMENT_FROM_SHADOW_ROOT, {"using": by, "value": value})["value"]

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.shadowroot.ShadowRoot (session="bd41e611b00fbdea9c540ea47145adcd", element="f.197037B91AA38E639635C9D216D70522.d.21EF49956096CAC0BD1F70990433864A.e.19")>
command = 'findElementFromShadowRoot'
params = {'shadowId': 'f.197037B91AA38E639635C9D216D70522.d.21EF49956096CAC0BD1F70990433864A.e.19', 'using': 'css selector', 'value': 'ak-stage-consent'}

    def _execute(self, command, params=None):
        """Executes a command against the underlying HTML element.
    
        Args:
          command: The name of the command to _execute as a string.
          params: A dictionary of named parameters to send with the command.
    
        Returns:
          The command's JSON response loaded into a dictionary object.
        """
        if not params:
            params = {}
        params["shadowId"] = self._id
>       return self.session.execute(command, params)

.venv/lib/python3.13.../webdriver/remote/shadowroot.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.webdriver.WebDriver (session="bd41e611b00fbdea9c540ea47145adcd")>
driver_command = 'findElementFromShadowRoot'
params = {'using': 'css selector', 'value': 'ak-stage-consent'}

    def execute(self, driver_command: str, params: dict = None) -> dict:
        """Sends a command to be executed by a command.CommandExecutor.
    
        Parameters:
        -----------
        driver_command : str
            - The name of the command to execute as a string.
    
        params : dict
            - A dictionary of named Parameters to send with the command.
    
        Returns:
        --------
          dict - The command's JSON response loaded into a dictionary object.
        """
        params = self._wrap_value(params)
    
        if self.session_id:
            if not params:
                params = {"sessionId": self.session_id}
            elif "sessionId" not in params:
                params["sessionId"] = self.session_id
    
        response = self.command_executor.execute(driver_command, params)
        if response:
>           self.error_handler.check_response(response)

.venv/lib/python3.13.../webdriver/remote/webdriver.py:448: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <selenium.webdriver.remote.errorhandler.ErrorHandler object at 0x7f88a9fcee00>
response = {'status': 404, 'value': '{"value":{"error":"detached shadow root","message":"detached shadow root: detached shadow ro...unknown>\\n#25 0x56390a96da53 \\u003Cunknown>\\n#26 0x7fcc79fc7aa4 \\u003Cunknown>\\n#27 0x7fcc7a054a34 __clone\\n"}}'}

    def check_response(self, response: Dict[str, Any]) -> None:
        """Checks that a JSON response from the WebDriver does not have an
        error.
    
        :Args:
         - response - The JSON response from the WebDriver server as a dictionary
           object.
    
        :Raises: If the response contains an error message.
        """
        status = response.get("status", None)
        if not status or status == ErrorCode.SUCCESS:
            return
        value = None
        message = response.get("message", "")
        screen: str = response.get("screen", "")
        stacktrace = None
        if isinstance(status, int):
            value_json = response.get("value", None)
            if value_json and isinstance(value_json, str):
                import json
    
                try:
                    value = json.loads(value_json)
                    if len(value) == 1:
                        value = value["value"]
                    status = value.get("error", None)
                    if not status:
                        status = value.get("status", ErrorCode.UNKNOWN_ERROR)
                        message = value.get("value") or value.get("message")
                        if not isinstance(message, str):
                            value = message
                            message = message.get("message")
                    else:
                        message = value.get("message", None)
                except ValueError:
                    pass
    
        exception_class: Type[WebDriverException]
        e = ErrorCode()
        error_codes = [item for item in dir(e) if not item.startswith("__")]
        for error_code in error_codes:
            error_info = getattr(ErrorCode, error_code)
            if isinstance(error_info, list) and status in error_info:
                exception_class = getattr(ExceptionMapping, error_code, WebDriverException)
                break
        else:
            exception_class = WebDriverException
    
        if not value:
            value = response["value"]
        if isinstance(value, str):
            raise exception_class(value)
        if message == "" and "message" in value:
            message = value["message"]
    
        screen = None  # type: ignore[assignment]
        if "screen" in value:
            screen = value["screen"]
    
        stacktrace = None
        st_value = value.get("stackTrace") or value.get("stacktrace")
        if st_value:
            if isinstance(st_value, str):
                stacktrace = st_value.split("\n")
            else:
                stacktrace = []
                try:
                    for frame in st_value:
                        line = frame.get("lineNumber", "")
                        file = frame.get("fileName", "<anonymous>")
                        if line:
                            file = f"{file}:{line}"
                        meth = frame.get("methodName", "<anonymous>")
                        if "className" in frame:
                            meth = f"{frame['className']}.{meth}"
                        msg = "    at %s (%s)"
                        msg = msg % (meth, file)
                        stacktrace.append(msg)
                except TypeError:
                    pass
        if exception_class == UnexpectedAlertPresentException:
            alert_text = None
            if "data" in value:
                alert_text = value["data"].get("text")
            elif "alert" in value:
                alert_text = value["alert"].get("text")
            raise exception_class(message, screen, stacktrace, alert_text)  # type: ignore[call-arg]  # mypy is not smart enough here
>       raise exception_class(message, screen, stacktrace)
E       selenium.common.exceptions.DetachedShadowRootException: Message: detached shadow root: detached shadow root not found
E         (Session info: chrome=140.0.7339.80)
E       Stacktrace:
E       #0 0x56390a96e94a <unknown>
E       #1 0x56390a3e38a0 <unknown>
E       #2 0x56390a3f6eeb <unknown>
E       #3 0x56390a3f5ca2 <unknown>
E       #4 0x56390a3eadf9 <unknown>
E       #5 0x56390a3e902f <unknown>
E       #6 0x56390a3ecd78 <unknown>
E       #7 0x56390a3ece03 <unknown>
E       #8 0x56390a434f75 <unknown>
E       #9 0x56390a435761 <unknown>
E       #10 0x56390a429256 <unknown>
E       #11 0x56390a45b05d <unknown>
E       #12 0x56390a429107 <unknown>
E       #13 0x56390a45b1fe <unknown>
E       #14 0x56390a480c23 <unknown>
E       #15 0x56390a45ae03 <unknown>
E       #16 0x56390a427968 <unknown>
E       #17 0x56390a4285e1 <unknown>
E       #18 0x56390a932548 <unknown>
E       #19 0x56390a936272 <unknown>
E       #20 0x56390a919313 <unknown>
E       #21 0x56390a936dc5 <unknown>
E       #22 0x56390a8fe49f <unknown>
E       #23 0x56390a95b158 <unknown>
E       #24 0x56390a95b332 <unknown>
E       #25 0x56390a96da53 <unknown>
E       #26 0x7fcc79fc7aa4 <unknown>
E       #27 0x7fcc7a054a34 __clone

.venv/lib/python3.13.../webdriver/remote/errorhandler.py:232: DetachedShadowRootException

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

"id_token": access_token.id_token.to_jwt(self.provider),
}

if (now() - self.params.refresh_token.expires) < timedelta(hours=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this interval be configurable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants