diff --git a/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/__init__.py b/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/__init__.py index 6924af4c2666..6b67612fda7f 100644 --- a/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/__init__.py +++ b/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/__init__.py @@ -1,6 +1,5 @@ from . import _version from .credentials import ( # noqa - DatabaseCredentials, ConnectionComponents, AsyncDriver, SyncDriver, diff --git a/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/credentials.py b/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/credentials.py index 0dc0291cbd34..9080bcb8cf2c 100644 --- a/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/credentials.py +++ b/src/integrations/prefect-sqlalchemy/prefect_sqlalchemy/credentials.py @@ -1,16 +1,10 @@ """Credential classes used to perform authenticated interactions with SQLAlchemy""" -import warnings from enum import Enum -from typing import Any, Dict, Optional, Union +from typing import Dict, Optional, Union -from pydantic import AnyUrl, BaseModel, Field, SecretStr -from sqlalchemy.engine import Connection, create_engine -from sqlalchemy.engine.url import URL, make_url -from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine -from sqlalchemy.pool import NullPool - -from prefect.blocks.core import Block +from pydantic import BaseModel, Field, SecretStr +from sqlalchemy.engine.url import URL class AsyncDriver(Enum): @@ -151,209 +145,3 @@ def create_url(self) -> URL: if url_param is not None } ) - - -class DatabaseCredentials(Block): - """ - Block used to manage authentication with a database. - - Attributes: - driver: The driver name, e.g. "postgresql+asyncpg" - database: The name of the database to use. - username: The user name used to authenticate. - password: The password used to authenticate. - host: The host address of the database. - port: The port to connect to the database. - query: A dictionary of string keys to string values to be passed to - the dialect and/or the DBAPI upon connect. To specify non-string - parameters to a Python DBAPI directly, use connect_args. - url: Manually create and provide a URL to create the engine, - this is useful for external dialects, e.g. Snowflake, because some - of the params, such as "warehouse", is not directly supported in - the vanilla `sqlalchemy.engine.URL.create` method; do not provide - this alongside with other URL params as it will raise a `ValueError`. - connect_args: The options which will be passed directly to the - DBAPI's connect() method as additional keyword arguments. - - Example: - Load stored database credentials: - ```python - from prefect_sqlalchemy import DatabaseCredentials - database_block = DatabaseCredentials.load("BLOCK_NAME") - ``` - """ - - _block_type_name = "Database Credentials" - _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/fb3f4debabcda1c5a3aeea4f5b3f94c28845e23e-250x250.png" # noqa - _documentation_url = "https://prefecthq.github.io/prefect-sqlalchemy/credentials/#prefect_sqlalchemy.credentials.DatabaseCredentials" # noqa - - driver: Optional[Union[AsyncDriver, SyncDriver, str]] = Field( - default=None, description="The driver name to use." - ) - username: Optional[str] = Field( - default=None, description="The user name used to authenticate." - ) - password: Optional[SecretStr] = Field( - default=None, description="The password used to authenticate." - ) - database: Optional[str] = Field( - default=None, description="The name of the database to use." - ) - host: Optional[str] = Field( - default=None, description="The host address of the database." - ) - port: Optional[str] = Field( - default=None, description="The port to connect to the database." - ) - query: Optional[Dict[str, str]] = Field( - default=None, - description=( - "A dictionary of string keys to string values to be passed to the dialect " - "and/or the DBAPI upon connect. To specify non-string parameters to a " - "Python DBAPI directly, use connect_args." - ), - ) - url: Optional[AnyUrl] = Field( - default=None, - description=( - "Manually create and provide a URL to create the engine, this is useful " - "for external dialects, e.g. Snowflake, because some of the params, " - "such as 'warehouse', is not directly supported in the vanilla " - "`sqlalchemy.engine.URL.create` method; do not provide this " - "alongside with other URL params as it will raise a `ValueError`." - ), - ) - connect_args: Optional[Dict[str, Any]] = Field( - default=None, - description=( - "The options which will be passed directly to the DBAPI's connect() " - "method as additional keyword arguments." - ), - ) - - def block_initialization(self): - """ - Initializes the engine. - """ - warnings.warn( - "DatabaseCredentials is now deprecated and will be removed March 2023; " - "please use SqlAlchemyConnector instead.", - DeprecationWarning, - ) - if isinstance(self.driver, AsyncDriver): - drivername = self.driver.value - self._driver_is_async = True - elif isinstance(self.driver, SyncDriver): - drivername = self.driver.value - self._driver_is_async = False - else: - drivername = self.driver - self._driver_is_async = drivername in AsyncDriver._value2member_map_ - - url_params = dict( - drivername=drivername, - username=self.username, - password=self.password.get_secret_value() if self.password else None, - database=self.database, - host=self.host, - port=self.port, - query=self.query, - ) - if not self.url: - required_url_keys = ("drivername", "database") - if not all(url_params[key] for key in required_url_keys): - required_url_keys = ("driver", "database") - raise ValueError( - f"If the `url` is not provided, " - f"all of these URL params are required: " - f"{required_url_keys}" - ) - self.rendered_url = URL.create( - **{ - url_key: url_param - for url_key, url_param in url_params.items() - if url_param is not None - } - ) # from params - else: - if any(val for val in url_params.values()): - raise ValueError( - f"The `url` should not be provided " - f"alongside any of these URL params: " - f"{url_params.keys()}" - ) - self.rendered_url = make_url(str(self.url)) - - def get_engine(self) -> Union["Connection", "AsyncConnection"]: - """ - Returns an authenticated engine that can be - used to query from databases. - - Returns: - The authenticated SQLAlchemy Connection / AsyncConnection. - - Examples: - Create an asynchronous engine to PostgreSQL using URL params. - ```python - from prefect import flow - from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver - - @flow - def sqlalchemy_credentials_flow(): - sqlalchemy_credentials = DatabaseCredentials( - driver=AsyncDriver.POSTGRESQL_ASYNCPG, - username="prefect", - password="prefect_password", - database="postgres" - ) - print(sqlalchemy_credentials.get_engine()) - - sqlalchemy_credentials_flow() - ``` - - Create a synchronous engine to Snowflake using the `url` kwarg. - ```python - from prefect import flow - from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver - - @flow - def sqlalchemy_credentials_flow(): - url = ( - "snowflake://:" - "@/" - "?warehouse=" - ) - sqlalchemy_credentials = DatabaseCredentials(url=url) - print(sqlalchemy_credentials.get_engine()) - - sqlalchemy_credentials_flow() - ``` - """ - engine_kwargs = dict( - url=self.rendered_url, - connect_args=self.connect_args or {}, - poolclass=NullPool, - ) - if self._driver_is_async: - engine = create_async_engine(**engine_kwargs) - else: - engine = create_engine(**engine_kwargs) - return engine - - class Config: - """Configuration of pydantic.""" - - # Support serialization of the 'URL' type - arbitrary_types_allowed = True - json_encoders = {URL: lambda u: u.render_as_string()} - - def dict(self, *args, **kwargs) -> Dict: - """ - Convert to a dictionary. - """ - # Support serialization of the 'URL' type - d = super().dict(*args, **kwargs) - d["rendered_url"] = SecretStr( - self.rendered_url.render_as_string(hide_password=False) - ) - return d diff --git a/src/prefect/client/orchestration.py b/src/prefect/client/orchestration.py index 769a6b097672..635fcd72f4ce 100644 --- a/src/prefect/client/orchestration.py +++ b/src/prefect/client/orchestration.py @@ -94,7 +94,6 @@ FlowRunPolicy, Log, Parameter, - QueueFilter, TaskRunPolicy, TaskRunResult, Variable, @@ -994,7 +993,6 @@ async def decrement_v1_concurrency_slots( async def create_work_queue( self, name: str, - tags: Optional[List[str]] = None, description: Optional[str] = None, is_paused: Optional[bool] = None, concurrency_limit: Optional[int] = None, @@ -1006,8 +1004,6 @@ async def create_work_queue( Args: name: a unique name for the work queue - tags: DEPRECATED: an optional list of tags to filter on; only work scheduled with these tags - will be included in the queue. This option will be removed on 2023-02-23. description: An optional description for the work queue. is_paused: Whether or not the work queue is paused. concurrency_limit: An optional concurrency limit for the work queue. @@ -1021,18 +1017,7 @@ async def create_work_queue( Returns: The created work queue """ - if tags: - warnings.warn( - ( - "The use of tags for creating work queue filters is deprecated." - " This option will be removed on 2023-02-23." - ), - DeprecationWarning, - ) - filter = QueueFilter(tags=tags) - else: - filter = None - create_model = WorkQueueCreate(name=name, filter=filter) + create_model = WorkQueueCreate(name=name, filter=None) if description is not None: create_model.description = description if is_paused is not None: diff --git a/src/prefect/logging/handlers.py b/src/prefect/logging/handlers.py index 53093ac1d49a..cd1997af8bc3 100644 --- a/src/prefect/logging/handlers.py +++ b/src/prefect/logging/handlers.py @@ -138,8 +138,6 @@ def emit(self, record: logging.LogRecord): return # Respect the global settings toggle if not getattr(record, "send_to_api", True): return # Do not send records that have opted out - if not getattr(record, "send_to_orion", True): - return # Backwards compatibility log = self.prepare(record) APILogWorker.instance().send(log) diff --git a/src/prefect/logging/loggers.py b/src/prefect/logging/loggers.py index b5adab6a5d59..724574402860 100644 --- a/src/prefect/logging/loggers.py +++ b/src/prefect/logging/loggers.py @@ -1,7 +1,6 @@ import io import logging import sys -import warnings from builtins import print from contextlib import contextmanager from functools import lru_cache @@ -34,23 +33,6 @@ class PrefectLogAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): kwargs["extra"] = {**(self.extra or {}), **(kwargs.get("extra") or {})} - - from prefect._internal.compatibility.deprecated import ( - PrefectDeprecationWarning, - generate_deprecation_message, - ) - - if "send_to_orion" in kwargs["extra"]: - warnings.warn( - generate_deprecation_message( - 'The "send_to_orion" option', - start_date="May 2023", - help='Use "send_to_api" instead.', - ), - PrefectDeprecationWarning, - stacklevel=4, - ) - return (msg, kwargs) def getChild( diff --git a/tests/client/test_prefect_client.py b/tests/client/test_prefect_client.py index 93592fadaf2a..531484e6b178 100644 --- a/tests/client/test_prefect_client.py +++ b/tests/client/test_prefect_client.py @@ -1701,10 +1701,6 @@ async def test_read_nonexistant_work_queue(self, prefect_client): with pytest.raises(prefect.exceptions.ObjectNotFound): await prefect_client.read_work_queue_by_name("foo") - async def test_create_work_queue_with_tags_deprecated(self, prefect_client): - with pytest.deprecated_call(): - await prefect_client.create_work_queue(name="test-queue", tags=["a"]) - async def test_get_runs_from_queue_includes(self, prefect_client, deployment): wq_1 = await prefect_client.read_work_queue_by_name(name="wq") wq_2 = await prefect_client.create_work_queue(name="wq2") diff --git a/tests/test_logging.py b/tests/test_logging.py index ac7ff4088755..5041c865d26c 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -19,7 +19,6 @@ import prefect.logging.configuration import prefect.settings from prefect import flow, task -from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning from prefect._internal.concurrency.api import create_call, from_sync from prefect.context import FlowRunContext, TaskRunContext from prefect.exceptions import MissingContextError @@ -423,23 +422,6 @@ def test_does_not_send_logs_that_opt_out(self, logger, mock_log_worker, task_run mock_log_worker.instance().send.assert_not_called() - def test_does_not_send_logs_that_opt_out_deprecated( - self, logger, mock_log_worker, task_run - ): - with TaskRunContext.model_construct(task_run=task_run): - with pytest.warns( - PrefectDeprecationWarning, - match=( - 'The "send_to_orion" option has been deprecated. It will not be' - ' available in new releases after Nov 2023. Use "send_to_api" instead.' - ), - ): - PrefectLogAdapter(logger, extra={}).info( - "test", extra={"send_to_orion": False} - ) - - mock_log_worker.instance().send.assert_not_called() - def test_does_not_send_logs_when_handler_is_disabled( self, logger, mock_log_worker, task_run ):