Skip to content

Commit

Permalink
chore(key-value): convert command to dao (#29344)
Browse files Browse the repository at this point in the history
  • Loading branch information
villebro authored and eschutho committed Jul 24, 2024
1 parent f9ca394 commit a77d4e7
Show file tree
Hide file tree
Showing 36 changed files with 868 additions and 1,163 deletions.
12 changes: 7 additions & 5 deletions superset/commands/dashboard/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand
from superset.commands.key_value.upsert import UpsertKeyValueCommand
from superset.daos.dashboard import DashboardDAO
from superset.daos.key_value import KeyValueDAO
from superset.dashboards.permalink.exceptions import DashboardPermalinkCreateFailedError
from superset.dashboards.permalink.types import DashboardPermalinkState
from superset.key_value.exceptions import (
Expand Down Expand Up @@ -70,14 +71,15 @@ def run(self) -> str:
"state": self.state,
}
user_id = get_user_id()
key = UpsertKeyValueCommand(
entry = KeyValueDAO.upsert_entry(
resource=self.resource,
key=get_deterministic_uuid(self.salt, (user_id, value)),
value=value,
codec=self.codec,
).run()
assert key.id # for type checks
return encode_permalink_key(key=key.id, salt=self.salt)
)
db.session.flush()
assert entry.id # for type checks
return encode_permalink_key(key=entry.id, salt=self.salt)

def validate(self) -> None:
pass
9 changes: 2 additions & 7 deletions superset/commands/dashboard/permalink/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from superset.commands.dashboard.exceptions import DashboardNotFoundError
from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand
from superset.commands.key_value.get import GetKeyValueCommand
from superset.daos.dashboard import DashboardDAO
from superset.daos.key_value import KeyValueDAO
from superset.dashboards.permalink.exceptions import DashboardPermalinkGetFailedError
from superset.dashboards.permalink.types import DashboardPermalinkValue
from superset.key_value.exceptions import (
Expand All @@ -43,12 +43,7 @@ def run(self) -> Optional[DashboardPermalinkValue]:
self.validate()
try:
key = decode_permalink_id(self.key, salt=self.salt)
command = GetKeyValueCommand(
resource=self.resource,
key=key,
codec=self.codec,
)
value: Optional[DashboardPermalinkValue] = command.run()
value = KeyValueDAO.get_value(self.resource, key, self.codec)
if value:
DashboardDAO.get_by_id_or_slug(value["dashboardId"])
return value
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,28 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import uuid
from typing import Any

from flask import current_app

from superset.commands.base import BaseCommand
from superset.distributed_lock.utils import get_key
from superset.key_value.types import JsonKeyValueCodec, KeyValueResource

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class BaseDistributedLockCommand(BaseCommand):
key: uuid.UUID
codec = JsonKeyValueCodec()
resource = KeyValueResource.LOCK

def __init__(self, namespace: str, params: dict[str, Any] | None = None):
self.key = get_key(namespace, **(params or {}))

def validate(self) -> None:
pass
64 changes: 64 additions & 0 deletions superset/commands/distributed_lock/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
from datetime import datetime, timedelta
from functools import partial

from flask import current_app
from sqlalchemy.exc import SQLAlchemyError

from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.exceptions import CreateKeyValueDistributedLockFailedException
from superset.key_value.exceptions import (
KeyValueCodecEncodeException,
KeyValueUpsertFailedError,
)
from superset.key_value.types import KeyValueResource
from superset.utils.decorators import on_error, transaction

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class CreateDistributedLock(BaseDistributedLockCommand):
lock_expiration = timedelta(seconds=30)

def validate(self) -> None:
pass

@transaction(
on_error=partial(
on_error,
catches=(
KeyValueCodecEncodeException,
KeyValueUpsertFailedError,
SQLAlchemyError,
),
reraise=CreateKeyValueDistributedLockFailedException,
),
)
def run(self) -> None:
KeyValueDAO.delete_expired_entries(self.resource)
KeyValueDAO.create_entry(
resource=KeyValueResource.LOCK,
value={"value": True},
codec=self.codec,
key=self.key,
expires_on=datetime.now() + self.lock_expiration,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
from datetime import datetime
from functools import partial

from sqlalchemy import and_
from flask import current_app
from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.base import BaseCommand
from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.exceptions import DeleteKeyValueDistributedLockFailedException
from superset.key_value.exceptions import KeyValueDeleteFailedError
from superset.key_value.models import KeyValueEntry
from superset.key_value.types import KeyValueResource
from superset.utils.decorators import on_error, transaction

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class DeleteExpiredKeyValueCommand(BaseCommand):
resource: KeyValueResource

def __init__(self, resource: KeyValueResource):
"""
Delete all expired key-value pairs
:param resource: the resource (dashboard, chart etc)
:return: was the entry deleted or not
"""
self.resource = resource

@transaction(on_error=partial(on_error, reraise=KeyValueDeleteFailedError))
def run(self) -> None:
self.delete_expired()

class DeleteDistributedLock(BaseDistributedLockCommand):
def validate(self) -> None:
pass

def delete_expired(self) -> None:
(
db.session.query(KeyValueEntry)
.filter(
and_(
KeyValueEntry.resource == self.resource.value,
KeyValueEntry.expires_on <= datetime.now(),
)
)
.delete()
)
@transaction(
on_error=partial(
on_error,
catches=(
KeyValueDeleteFailedError,
SQLAlchemyError,
),
reraise=DeleteKeyValueDistributedLockFailedException,
),
)
def run(self) -> None:
KeyValueDAO.delete_entry(self.resource, self.key)
45 changes: 45 additions & 0 deletions superset/commands/distributed_lock/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import logging
from typing import cast

from flask import current_app

from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.distributed_lock.types import LockValue

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class GetDistributedLock(BaseDistributedLockCommand):
def validate(self) -> None:
pass

def run(self) -> LockValue | None:
entry = KeyValueDAO.get_entry(
resource=self.resource,
key=self.key,
)
if not entry or entry.is_expired():
return None

return cast(LockValue, self.codec.decode(entry.value))
16 changes: 7 additions & 9 deletions superset/commands/explore/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand
from superset.commands.key_value.create import CreateKeyValueCommand
from superset.daos.key_value import KeyValueDAO
from superset.explore.permalink.exceptions import ExplorePermalinkCreateFailedError
from superset.explore.utils import check_access as check_chart_access
from superset.key_value.exceptions import (
Expand Down Expand Up @@ -65,15 +66,12 @@ def run(self) -> str:
"datasource": self.datasource,
"state": self.state,
}
command = CreateKeyValueCommand(
resource=self.resource,
value=value,
codec=self.codec,
)
key = command.run()
if key.id is None:
entry = KeyValueDAO.create_entry(self.resource, value, self.codec)
db.session.flush()
key = entry.id
if key is None:
raise ExplorePermalinkCreateFailedError("Unexpected missing key id")
return encode_permalink_key(key=key.id, salt=self.salt)
return encode_permalink_key(key=key, salt=self.salt)

def validate(self) -> None:
pass
8 changes: 2 additions & 6 deletions superset/commands/explore/permalink/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from superset.commands.dataset.exceptions import DatasetNotFoundError
from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand
from superset.commands.key_value.get import GetKeyValueCommand
from superset.daos.key_value import KeyValueDAO
from superset.explore.permalink.exceptions import ExplorePermalinkGetFailedError
from superset.explore.permalink.types import ExplorePermalinkValue
from superset.explore.utils import check_access as check_chart_access
Expand All @@ -44,11 +44,7 @@ def run(self) -> Optional[ExplorePermalinkValue]:
self.validate()
try:
key = decode_permalink_id(self.key, salt=self.salt)
value: Optional[ExplorePermalinkValue] = GetKeyValueCommand(
resource=self.resource,
key=key,
codec=self.codec,
).run()
value = KeyValueDAO.get_value(self.resource, key, self.codec)
if value:
chart_id: Optional[int] = value.get("chartId")
# keep this backward compatible for old permalinks
Expand Down
Loading

0 comments on commit a77d4e7

Please sign in to comment.