Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions authentik/admin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq import actor
from packaging.version import parse
from requests import RequestException
Expand All @@ -13,7 +12,7 @@
from authentik.events.models import Event, EventAction
from authentik.lib.config import CONFIG
from authentik.lib.utils.http import get_http_session
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()
VERSION_NULL = "0.0.0"
Expand All @@ -35,7 +34,7 @@ def _set_prom_info():

@actor(description=_("Update latest version info."))
def update_latest_version():
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
if CONFIG.get_bool("disable_update_check"):
cache.set(VERSION_CACHE_KEY, VERSION_NULL, VERSION_CACHE_TIMEOUT)
self.info("Version check disabled.")
Expand Down
7 changes: 4 additions & 3 deletions authentik/blueprints/v1/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask, CurrentTaskNotFound
from django_dramatiq_postgres.middleware import CurrentTaskNotFound
from dramatiq.actor import actor
from dramatiq.middleware import Middleware
from structlog.stdlib import get_logger
Expand All @@ -39,6 +39,7 @@
from authentik.events.utils import sanitize_dict
from authentik.lib.config import CONFIG
from authentik.tasks.apps import PRIORITY_HIGH
from authentik.tasks.middleware import CurrentTask
from authentik.tasks.models import Task
from authentik.tasks.schedules.models import Schedule
from authentik.tenants.models import Tenant
Expand Down Expand Up @@ -155,7 +156,7 @@ def blueprints_find() -> list[BlueprintFile]:
throws=(DatabaseError, ProgrammingError, InternalError),
)
def blueprints_discovery(path: str | None = None):
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
count = 0
for blueprint in blueprints_find():
if path and blueprint.path != path:
Expand Down Expand Up @@ -195,7 +196,7 @@ def check_blueprint_v1_file(blueprint: BlueprintFile):
@actor(description=_("Apply single blueprint."))
def apply_blueprint(instance_pk: UUID):
try:
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
except CurrentTaskNotFound:
self = Task()
self.set_uid(str(instance_pk))
Expand Down
7 changes: 3 additions & 4 deletions authentik/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from structlog.stdlib import get_logger

Expand All @@ -15,14 +14,14 @@
User,
)
from authentik.lib.utils.db import chunked_queryset
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()


@actor(description=_("Remove expired objects."))
def clean_expired_models():
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
for cls in ExpiringModel.__subclasses__():
cls: ExpiringModel
objects = (
Expand All @@ -37,7 +36,7 @@ def clean_expired_models():

@actor(description=_("Remove temporary users created by SAML Sources."))
def clean_temporary_users():
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
_now = datetime.now()
deleted_users = 0
for user in User.objects.filter(**{f"attributes__{USER_ATTRIBUTE_GENERATED}": True}):
Expand Down
5 changes: 2 additions & 3 deletions authentik/crypto/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.base import load_pem_x509_certificate
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from structlog.stdlib import get_logger

from authentik.crypto.models import CertificateKeyPair
from authentik.lib.config import CONFIG
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()

Expand All @@ -38,7 +37,7 @@ def ensure_certificate_valid(body: str):

@actor(description=_("Discover, import and update certificates from the filesystem."))
def certificate_discovery():
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
certs = {}
private_keys = {}
discovered = 0
Expand Down
7 changes: 3 additions & 4 deletions authentik/enterprise/policies/unique_password/tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from django.db.models.aggregates import Count
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from structlog import get_logger

from authentik.enterprise.policies.unique_password.models import (
UniquePasswordPolicy,
UserPasswordHistory,
)
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()

Expand All @@ -19,7 +18,7 @@
)
)
def check_and_purge_password_history():
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()

if not UniquePasswordPolicy.objects.exists():
UserPasswordHistory.objects.all().delete()
Expand All @@ -39,7 +38,7 @@ def trim_password_histories():
UniquePasswordPolicy policies.
"""

self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()

# No policy, we'll let the cleanup above do its thing
if not UniquePasswordPolicy.objects.exists():
Expand Down
5 changes: 2 additions & 3 deletions authentik/enterprise/providers/ssf/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from django.http import HttpRequest
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from requests.exceptions import RequestException
from structlog.stdlib import get_logger
Expand All @@ -20,7 +19,7 @@
from authentik.lib.utils.http import get_http_session
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.engine import PolicyEngine
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

session = get_http_session()
LOGGER = get_logger()
Expand Down Expand Up @@ -74,7 +73,7 @@ def _check_app_access(stream: Stream, event_data: dict) -> bool:

@actor(description=_("Send an SSF event."))
def send_ssf_event(stream_uuid: UUID, event_data: dict[str, Any]):
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()

stream = Stream.objects.filter(pk=stream_uuid).first()
if not stream:
Expand Down
7 changes: 3 additions & 4 deletions authentik/events/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from django.db.models.query_utils import Q
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from guardian.shortcuts import get_anonymous_user
from structlog.stdlib import get_logger
Expand All @@ -19,7 +18,7 @@
from authentik.lib.utils.db import chunked_queryset
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()

Expand All @@ -38,7 +37,7 @@ def event_trigger_dispatch(event_uuid: UUID):
)
def event_trigger_handler(event_uuid: UUID, trigger_name: str):
"""Check if policies attached to NotificationRule match event"""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()

event: Event = Event.objects.filter(event_uuid=event_uuid).first()
if not event:
Expand Down Expand Up @@ -131,7 +130,7 @@ def gdpr_cleanup(user_pk: int):
@actor(description=_("Cleanup seen notifications and notifications whose event expired."))
def notification_cleanup():
"""Cleanup seen notifications and notifications whose event expired."""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
amount = notifications.count()
notifications.delete()
Expand Down
12 changes: 6 additions & 6 deletions authentik/lib/sync/outgoing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from django.db.models import Model, QuerySet
from django.db.models.query import Q
from django.utils.text import slugify
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import Actor
from dramatiq.composition import group
from dramatiq.errors import Retry
Expand All @@ -22,6 +21,7 @@
from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
from authentik.lib.utils.errors import exception_to_dict
from authentik.lib.utils.reflection import class_to_path, path_to_class
from authentik.tasks.middleware import CurrentTask
from authentik.tasks.models import Task


Expand Down Expand Up @@ -61,7 +61,7 @@ def sync(
provider_pk: int,
sync_objects: Actor[[str, int, int, bool], None],
):
task: Task = CurrentTask.get_task()
task = CurrentTask.get_task()
self.logger = get_logger().bind(
provider_type=class_to_path(self._provider_model),
provider_pk=provider_pk,
Expand Down Expand Up @@ -118,7 +118,7 @@ def sync_objects(
override_dry_run=False,
**filter,
):
task: Task = CurrentTask.get_task()
task = CurrentTask.get_task()
_object_type: type[Model] = path_to_class(object_type)
self.logger = get_logger().bind(
provider_type=class_to_path(self._provider_model),
Expand Down Expand Up @@ -173,7 +173,7 @@ def sync_objects(
except TransientSyncException as exc:
self.logger.warning("failed to sync object", exc=exc, user=obj)
task.warning(
f"Failed to sync {str(obj)} due to " f"transient error: {str(exc)}",
f"Failed to sync {str(obj)} due to transient error: {str(exc)}",
obj=sanitize_item(obj),
exception=exception_to_dict(exc),
)
Expand Down Expand Up @@ -207,7 +207,7 @@ def sync_signal_direct(
provider_pk: int,
raw_op: str,
):
task: Task = CurrentTask.get_task()
task = CurrentTask.get_task()
self.logger = get_logger().bind(
provider_type=class_to_path(self._provider_model),
)
Expand Down Expand Up @@ -281,7 +281,7 @@ def sync_signal_m2m(
action: str,
pk_set: list[int],
):
task: Task = CurrentTask.get_task()
task = CurrentTask.get_task()
self.logger = get_logger().bind(
provider_type=class_to_path(self._provider_model),
)
Expand Down
9 changes: 4 additions & 5 deletions authentik/outposts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from django.core.cache import cache
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from docker.constants import DEFAULT_UNIX_SOCKET
from dramatiq.actor import actor
from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME
Expand Down Expand Up @@ -41,7 +40,7 @@
from authentik.providers.rac.controllers.kubernetes import RACKubernetesController
from authentik.providers.radius.controllers.docker import RadiusDockerController
from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()
CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
Expand Down Expand Up @@ -108,7 +107,7 @@ def outpost_service_connection_monitor(connection_pk: Any):
@actor(description=_("Create/update/monitor/delete the deployment of an Outpost."))
def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False):
"""Create/update/monitor/delete the deployment of an Outpost"""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
self.set_uid(outpost_pk)
logs = []
if from_cache:
Expand Down Expand Up @@ -142,7 +141,7 @@ def outpost_token_ensurer():
"""
Periodically ensure that all Outposts have valid Service Accounts and Tokens
"""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
all_outposts = Outpost.objects.all()
for outpost in all_outposts:
_ = outpost.token
Expand All @@ -169,7 +168,7 @@ def outpost_send_update(pk: Any):
@actor(description=_("Checks the local environment and create Service connections."))
def outpost_connection_discovery():
"""Checks the local environment and create Service connections."""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
if not CONFIG.get_bool("outposts.discover"):
self.info("Outpost integration discovery is disabled")
return
Expand Down
5 changes: 2 additions & 3 deletions authentik/providers/oauth2/tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""OAuth2 Provider Tasks"""

from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from structlog.stdlib import get_logger

from authentik.lib.utils.http import get_http_session
from authentik.providers.oauth2.models import OAuth2Provider
from authentik.providers.oauth2.utils import create_logout_token
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()

Expand All @@ -31,7 +30,7 @@ def send_backchannel_logout_request(
Returns:
bool: True if the request was sent successfully, False otherwise
"""
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
LOGGER.debug("Sending back-channel logout request", provider_pk=provider_pk, sub=sub)

provider = OAuth2Provider.objects.filter(pk=provider_pk).first()
Expand Down
2 changes: 1 addition & 1 deletion authentik/root/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@
{"max_retries": CONFIG.get_int("worker.task_max_retries") if not TEST else 0},
),
("dramatiq.results.middleware.Results", {"store_results": True}),
("django_dramatiq_postgres.middleware.CurrentTask", {}),
("authentik.tasks.middleware.CurrentTask", {}),
("authentik.tasks.middleware.TenantMiddleware", {}),
("authentik.tasks.middleware.RelObjMiddleware", {}),
("authentik.tasks.middleware.MessagesMiddleware", {}),
Expand Down
5 changes: 2 additions & 3 deletions authentik/sources/kerberos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from django_dramatiq_postgres.middleware import CurrentTask
from dramatiq.actor import actor
from structlog.stdlib import get_logger

from authentik.lib.config import CONFIG
from authentik.lib.sync.outgoing.exceptions import StopSync
from authentik.sources.kerberos.models import KerberosSource
from authentik.sources.kerberos.sync import KerberosSync
from authentik.tasks.models import Task
from authentik.tasks.middleware import CurrentTask

LOGGER = get_logger()
CACHE_KEY_STATUS = "goauthentik.io/sources/kerberos/status/"
Expand All @@ -33,7 +32,7 @@ def kerberos_connectivity_check(pk: str):
description=_("Sync Kerberos source."),
)
def kerberos_sync(pk: str):
self: Task = CurrentTask.get_task()
self = CurrentTask.get_task()
source: KerberosSource = KerberosSource.objects.filter(enabled=True, pk=pk).first()
if not source:
return
Expand Down
Loading
Loading