Skip to content

Commit

Permalink
Remove some ancient deprecations (#15310)
Browse files Browse the repository at this point in the history
  • Loading branch information
cicdw authored Sep 10, 2024
1 parent 488a0e9 commit 3ffaeb9
Show file tree
Hide file tree
Showing 7 changed files with 4 additions and 274 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from . import _version
from .credentials import ( # noqa
DatabaseCredentials,
ConnectionComponents,
AsyncDriver,
SyncDriver,
Expand Down
218 changes: 3 additions & 215 deletions src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/credentials.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
"""Credential classes used to perform authenticated interactions with SQLAlchemy"""

import warnings
from enum import Enum
from typing import Any, Dict, Optional, Union
from typing import Dict, Optional, Union

from pydantic import AnyUrl, BaseModel, Field, SecretStr
from sqlalchemy.engine import Connection, create_engine
from sqlalchemy.engine.url import URL, make_url
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine
from sqlalchemy.pool import NullPool

from prefect.blocks.core import Block
from pydantic import BaseModel, Field, SecretStr
from sqlalchemy.engine.url import URL


class AsyncDriver(Enum):
Expand Down Expand Up @@ -151,209 +145,3 @@ def create_url(self) -> URL:
if url_param is not None
}
)


class DatabaseCredentials(Block):
"""
Block used to manage authentication with a database.
Attributes:
driver: The driver name, e.g. "postgresql+asyncpg"
database: The name of the database to use.
username: The user name used to authenticate.
password: The password used to authenticate.
host: The host address of the database.
port: The port to connect to the database.
query: A dictionary of string keys to string values to be passed to
the dialect and/or the DBAPI upon connect. To specify non-string
parameters to a Python DBAPI directly, use connect_args.
url: Manually create and provide a URL to create the engine,
this is useful for external dialects, e.g. Snowflake, because some
of the params, such as "warehouse", is not directly supported in
the vanilla `sqlalchemy.engine.URL.create` method; do not provide
this alongside with other URL params as it will raise a `ValueError`.
connect_args: The options which will be passed directly to the
DBAPI's connect() method as additional keyword arguments.
Example:
Load stored database credentials:
```python
from prefect_sqlalchemy import DatabaseCredentials
database_block = DatabaseCredentials.load("BLOCK_NAME")
```
"""

_block_type_name = "Database Credentials"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/fb3f4debabcda1c5a3aeea4f5b3f94c28845e23e-250x250.png" # noqa
_documentation_url = "https://prefecthq.github.io/prefect-sqlalchemy/credentials/#prefect_sqlalchemy.credentials.DatabaseCredentials" # noqa

driver: Optional[Union[AsyncDriver, SyncDriver, str]] = Field(
default=None, description="The driver name to use."
)
username: Optional[str] = Field(
default=None, description="The user name used to authenticate."
)
password: Optional[SecretStr] = Field(
default=None, description="The password used to authenticate."
)
database: Optional[str] = Field(
default=None, description="The name of the database to use."
)
host: Optional[str] = Field(
default=None, description="The host address of the database."
)
port: Optional[str] = Field(
default=None, description="The port to connect to the database."
)
query: Optional[Dict[str, str]] = Field(
default=None,
description=(
"A dictionary of string keys to string values to be passed to the dialect "
"and/or the DBAPI upon connect. To specify non-string parameters to a "
"Python DBAPI directly, use connect_args."
),
)
url: Optional[AnyUrl] = Field(
default=None,
description=(
"Manually create and provide a URL to create the engine, this is useful "
"for external dialects, e.g. Snowflake, because some of the params, "
"such as 'warehouse', is not directly supported in the vanilla "
"`sqlalchemy.engine.URL.create` method; do not provide this "
"alongside with other URL params as it will raise a `ValueError`."
),
)
connect_args: Optional[Dict[str, Any]] = Field(
default=None,
description=(
"The options which will be passed directly to the DBAPI's connect() "
"method as additional keyword arguments."
),
)

def block_initialization(self):
"""
Initializes the engine.
"""
warnings.warn(
"DatabaseCredentials is now deprecated and will be removed March 2023; "
"please use SqlAlchemyConnector instead.",
DeprecationWarning,
)
if isinstance(self.driver, AsyncDriver):
drivername = self.driver.value
self._driver_is_async = True
elif isinstance(self.driver, SyncDriver):
drivername = self.driver.value
self._driver_is_async = False
else:
drivername = self.driver
self._driver_is_async = drivername in AsyncDriver._value2member_map_

url_params = dict(
drivername=drivername,
username=self.username,
password=self.password.get_secret_value() if self.password else None,
database=self.database,
host=self.host,
port=self.port,
query=self.query,
)
if not self.url:
required_url_keys = ("drivername", "database")
if not all(url_params[key] for key in required_url_keys):
required_url_keys = ("driver", "database")
raise ValueError(
f"If the `url` is not provided, "
f"all of these URL params are required: "
f"{required_url_keys}"
)
self.rendered_url = URL.create(
**{
url_key: url_param
for url_key, url_param in url_params.items()
if url_param is not None
}
) # from params
else:
if any(val for val in url_params.values()):
raise ValueError(
f"The `url` should not be provided "
f"alongside any of these URL params: "
f"{url_params.keys()}"
)
self.rendered_url = make_url(str(self.url))

def get_engine(self) -> Union["Connection", "AsyncConnection"]:
"""
Returns an authenticated engine that can be
used to query from databases.
Returns:
The authenticated SQLAlchemy Connection / AsyncConnection.
Examples:
Create an asynchronous engine to PostgreSQL using URL params.
```python
from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
@flow
def sqlalchemy_credentials_flow():
sqlalchemy_credentials = DatabaseCredentials(
driver=AsyncDriver.POSTGRESQL_ASYNCPG,
username="prefect",
password="prefect_password",
database="postgres"
)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
```
Create a synchronous engine to Snowflake using the `url` kwarg.
```python
from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
@flow
def sqlalchemy_credentials_flow():
url = (
"snowflake://<user_login_name>:<password>"
"@<account_identifier>/<database_name>"
"?warehouse=<warehouse_name>"
)
sqlalchemy_credentials = DatabaseCredentials(url=url)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
```
"""
engine_kwargs = dict(
url=self.rendered_url,
connect_args=self.connect_args or {},
poolclass=NullPool,
)
if self._driver_is_async:
engine = create_async_engine(**engine_kwargs)
else:
engine = create_engine(**engine_kwargs)
return engine

class Config:
"""Configuration of pydantic."""

# Support serialization of the 'URL' type
arbitrary_types_allowed = True
json_encoders = {URL: lambda u: u.render_as_string()}

def dict(self, *args, **kwargs) -> Dict:
"""
Convert to a dictionary.
"""
# Support serialization of the 'URL' type
d = super().dict(*args, **kwargs)
d["rendered_url"] = SecretStr(
self.rendered_url.render_as_string(hide_password=False)
)
return d
17 changes: 1 addition & 16 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
FlowRunPolicy,
Log,
Parameter,
QueueFilter,
TaskRunPolicy,
TaskRunResult,
Variable,
Expand Down Expand Up @@ -994,7 +993,6 @@ async def decrement_v1_concurrency_slots(
async def create_work_queue(
self,
name: str,
tags: Optional[List[str]] = None,
description: Optional[str] = None,
is_paused: Optional[bool] = None,
concurrency_limit: Optional[int] = None,
Expand All @@ -1006,8 +1004,6 @@ async def create_work_queue(
Args:
name: a unique name for the work queue
tags: DEPRECATED: an optional list of tags to filter on; only work scheduled with these tags
will be included in the queue. This option will be removed on 2023-02-23.
description: An optional description for the work queue.
is_paused: Whether or not the work queue is paused.
concurrency_limit: An optional concurrency limit for the work queue.
Expand All @@ -1021,18 +1017,7 @@ async def create_work_queue(
Returns:
The created work queue
"""
if tags:
warnings.warn(
(
"The use of tags for creating work queue filters is deprecated."
" This option will be removed on 2023-02-23."
),
DeprecationWarning,
)
filter = QueueFilter(tags=tags)
else:
filter = None
create_model = WorkQueueCreate(name=name, filter=filter)
create_model = WorkQueueCreate(name=name, filter=None)
if description is not None:
create_model.description = description
if is_paused is not None:
Expand Down
2 changes: 0 additions & 2 deletions src/prefect/logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ def emit(self, record: logging.LogRecord):
return # Respect the global settings toggle
if not getattr(record, "send_to_api", True):
return # Do not send records that have opted out
if not getattr(record, "send_to_orion", True):
return # Backwards compatibility

log = self.prepare(record)
APILogWorker.instance().send(log)
Expand Down
18 changes: 0 additions & 18 deletions src/prefect/logging/loggers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import io
import logging
import sys
import warnings
from builtins import print
from contextlib import contextmanager
from functools import lru_cache
Expand Down Expand Up @@ -34,23 +33,6 @@ class PrefectLogAdapter(logging.LoggerAdapter):

def process(self, msg, kwargs):
kwargs["extra"] = {**(self.extra or {}), **(kwargs.get("extra") or {})}

from prefect._internal.compatibility.deprecated import (
PrefectDeprecationWarning,
generate_deprecation_message,
)

if "send_to_orion" in kwargs["extra"]:
warnings.warn(
generate_deprecation_message(
'The "send_to_orion" option',
start_date="May 2023",
help='Use "send_to_api" instead.',
),
PrefectDeprecationWarning,
stacklevel=4,
)

return (msg, kwargs)

def getChild(
Expand Down
4 changes: 0 additions & 4 deletions tests/client/test_prefect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1701,10 +1701,6 @@ async def test_read_nonexistant_work_queue(self, prefect_client):
with pytest.raises(prefect.exceptions.ObjectNotFound):
await prefect_client.read_work_queue_by_name("foo")

async def test_create_work_queue_with_tags_deprecated(self, prefect_client):
with pytest.deprecated_call():
await prefect_client.create_work_queue(name="test-queue", tags=["a"])

async def test_get_runs_from_queue_includes(self, prefect_client, deployment):
wq_1 = await prefect_client.read_work_queue_by_name(name="wq")
wq_2 = await prefect_client.create_work_queue(name="wq2")
Expand Down
18 changes: 0 additions & 18 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import prefect.logging.configuration
import prefect.settings
from prefect import flow, task
from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning
from prefect._internal.concurrency.api import create_call, from_sync
from prefect.context import FlowRunContext, TaskRunContext
from prefect.exceptions import MissingContextError
Expand Down Expand Up @@ -423,23 +422,6 @@ def test_does_not_send_logs_that_opt_out(self, logger, mock_log_worker, task_run

mock_log_worker.instance().send.assert_not_called()

def test_does_not_send_logs_that_opt_out_deprecated(
self, logger, mock_log_worker, task_run
):
with TaskRunContext.model_construct(task_run=task_run):
with pytest.warns(
PrefectDeprecationWarning,
match=(
'The "send_to_orion" option has been deprecated. It will not be'
' available in new releases after Nov 2023. Use "send_to_api" instead.'
),
):
PrefectLogAdapter(logger, extra={}).info(
"test", extra={"send_to_orion": False}
)

mock_log_worker.instance().send.assert_not_called()

def test_does_not_send_logs_when_handler_is_disabled(
self, logger, mock_log_worker, task_run
):
Expand Down

0 comments on commit 3ffaeb9

Please sign in to comment.