Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def send_list_messages(
message_creator: Callable[[str], ServiceBusMessage],
):
list_messages = [message_creator(body) for body in messages]
sender.send_messages(list_messages) # type: ignore[arg-type]
sender.send_messages(list_messages)

@staticmethod
def send_batch_message(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def does_collection_exist(self, collection_name: str, database_name: str) -> boo
.get_database_client(self.__get_database_name(database_name))
.query_containers(
"SELECT * FROM r WHERE r.id=@id",
parameters=[{"name": "@id", "value": collection_name}], # type: ignore[list-item]
parameters=[{"name": "@id", "value": collection_name}],
)
)
if not existing_container:
Expand All @@ -238,7 +238,7 @@ def create_collection(
.get_database_client(self.__get_database_name(database_name))
.query_containers(
"SELECT * FROM r WHERE r.id=@id",
parameters=[{"name": "@id", "value": collection_name}], # type: ignore[list-item]
parameters=[{"name": "@id", "value": collection_name}],
)
)

Expand All @@ -259,7 +259,7 @@ def does_database_exist(self, database_name: str) -> bool:
existing_database = list(
self.get_conn().query_databases(
"SELECT * FROM r WHERE r.id=@id",
parameters=[{"name": "@id", "value": database_name}], # type: ignore[list-item]
parameters=[{"name": "@id", "value": database_name}],
)
)
if not existing_database:
Expand All @@ -279,7 +279,7 @@ def create_database(self, database_name: str) -> None:
existing_database = list(
self.get_conn().query_databases(
"SELECT * FROM r WHERE r.id=@id",
parameters=[{"name": "@id", "value": database_name}], # type: ignore[list-item]
parameters=[{"name": "@id", "value": database_name}],
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def service_client(self) -> DataLakeServiceClient:
"""Return the DataLakeServiceClient object (cached)."""
return self.get_conn()

def get_conn(self) -> DataLakeServiceClient: # type: ignore[override]
def get_conn(self) -> DataLakeServiceClient:
"""Return the DataLakeServiceClient object."""
conn = self.get_connection(self.conn_id)
extra = conn.extra_dejson or {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,18 @@ def get_conn(self) -> RequestAdapter:
proxies=proxies,
)
http_client = GraphClientFactory.create_with_default_middleware(
api_version=api_version, # type: ignore
api_version=api_version,
client=httpx.AsyncClient(
mounts=httpx_proxies,
timeout=Timeout(timeout=self.timeout),
verify=verify,
trust_env=trust_env,
base_url=base_url,
),
host=host, # type: ignore
host=host,
)
auth_provider = AzureIdentityAuthenticationProvider(
credentials=credentials, # type: ignore
credentials=credentials,
scopes=scopes,
allowed_hosts=allowed_hosts,
)
Expand Down Expand Up @@ -360,7 +360,7 @@ def get_credentials(
self.log.info("MSAL Proxies: %s", msal_proxies)
if certificate_path or certificate_data:
return CertificateCredential(
tenant_id=tenant_id, # type: ignore
tenant_id=tenant_id,
client_id=login, # type: ignore
password=password,
certificate_path=certificate_path,
Expand All @@ -371,7 +371,7 @@ def get_credentials(
connection_verify=verify,
)
return ClientSecretCredential(
tenant_id=tenant_id, # type: ignore
tenant_id=tenant_id,
client_id=login, # type: ignore
client_secret=password, # type: ignore
authority=authority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def get_conn(self) -> BlobServiceClient:
)

# TODO: rework the interface as it might also return AsyncContainerClient
def _get_container_client(self, container_name: str) -> ContainerClient: # type: ignore[override]
def _get_container_client(self, container_name: str) -> ContainerClient:
"""
Instantiate a container client.

Expand Down Expand Up @@ -621,7 +621,7 @@ async def get_async_conn(self) -> AsyncBlobServiceClient:
self.blob_service_client = AsyncBlobServiceClient(
account_url=account_url,
credential=token_credential,
**extra, # type:ignore[arg-type]
**extra,
)
return self.blob_service_client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import XCom # type: ignore[no-redef]
from airflow.models import XCom
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def paginate(
if top and odata_count:
if len(response.get("value", [])) == top and context:
results = operator.pull_xcom(context)
skip = sum([len(result["value"]) for result in results]) + top if results else top # type: ignore
skip = sum([len(result["value"]) for result in results]) + top if results else top
query_parameters["$skip"] = skip
return operator.url, query_parameters
return response.get("@odata.nextLink"), operator.query_parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PowerBILink(BaseOperatorLink):

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
url = (
"https://app.powerbi.com" # type: ignore[attr-defined]
"https://app.powerbi.com"
f"/groups/{operator.group_id}/datasets/{operator.dataset_id}" # type: ignore[attr-defined]
"/details?experience=power-bi"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import XCom # type: ignore[no-redef]
from airflow.models import XCom
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def execute(self, context: Context) -> None:

self.log.info("Dumping Oracle query results to local file")
conn = oracle_hook.get_conn()
cursor = conn.cursor() # type: ignore[attr-defined]
cursor = conn.cursor()
cursor.execute(self.sql, self.sql_params)

with TemporaryDirectory(prefix="airflow_oracle_to_azure_op_") as temp:
Expand All @@ -108,4 +108,4 @@ def execute(self, context: Context) -> None:
os.path.join(temp, self.filename), os.path.join(self.azure_data_lake_path, self.filename)
)
cursor.close()
conn.close() # type: ignore[attr-defined]
conn.close()
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
BaseSensorOperator,
)
else:
from airflow.models import BaseOperator, BaseOperatorLink # type: ignore[no-redef]
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if AIRFLOW_V_3_1_PLUS:
Expand All @@ -52,7 +52,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
from airflow.sdk import BaseHook
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef]
from airflow.utils.xcom import XCOM_RETURN_KEY

__all__ = [
"AIRFLOW_V_3_0_PLUS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
task_id="workspaces",
conn_id="powerbi",
url="myorg/admin/workspaces/modified",
result_processor=lambda context, response: list(map(lambda workspace: workspace["id"], response)), # type: ignore[typeddict-item, index]
result_processor=lambda context, response: list(map(lambda workspace: workspace["id"], response)), # type: ignore[index]
)
# [END howto_operator_powerbi_workspaces]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ def test_powerbi_link(self, create_task_instance_of_operator):
ti.xcom_push(key="powerbi_dataset_refresh_id", value=NEW_REFRESH_REQUEST_ID)
url = ti.task.operator_extra_links[0].get_link(operator=ti.task, ti_key=ti.key)
EXPECTED_ITEM_RUN_OP_EXTRA_LINK = (
"https://app.powerbi.com" # type: ignore[attr-defined]
f"/groups/{GROUP_ID}/datasets/{DATASET_ID}" # type: ignore[attr-defined]
"/details?experience=power-bi"
f"https://app.powerbi.com/groups/{GROUP_ID}/datasets/{DATASET_ID}/details?experience=power-bi"
)

assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] |
""",
handler=fetch_all_handler,
)
primary_keys = [pk[0] for pk in primary_keys] if primary_keys else [] # type: ignore
primary_keys = [pk[0] for pk in primary_keys] if primary_keys else []
self.log.debug("Primary keys for table '%s': %s", table, primary_keys)
return primary_keys # type: ignore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models import BaseOperator # type: ignore[no-redef]
from airflow.models import BaseOperator

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,10 @@ def run(
if ps_path is not None:
self.log.info("Running command as powershell script: '%s'...", command)
encoded_ps = b64encode(command.encode("utf_16_le")).decode("ascii")
command_id = winrm_client.run_command( # type: ignore[attr-defined]
shell_id, f"{ps_path} -encodedcommand {encoded_ps}"
)
command_id = winrm_client.run_command(shell_id, f"{ps_path} -encodedcommand {encoded_ps}")
else:
self.log.info("Running command: '%s'...", command)
command_id = winrm_client.run_command( # type: ignore[attr-defined]
shell_id, command
)
command_id = winrm_client.run_command(shell_id, command)

# See: https://github.com/diyan/pywinrm/blob/master/winrm/protocol.py
stdout_buffer = []
Expand All @@ -271,9 +267,7 @@ def run(
stderr,
return_code,
command_done,
) = winrm_client.get_command_output_raw( # type: ignore[attr-defined]
shell_id, command_id
)
) = winrm_client.get_command_output_raw(shell_id, command_id)

# Only buffer stdout if we need to so that we minimize memory usage.
if return_output:
Expand All @@ -285,12 +279,10 @@ def run(
for line in stderr.decode(output_encoding).splitlines():
self.log.warning(line)

winrm_client.cleanup_command( # type: ignore[attr-defined]
shell_id, command_id
)
winrm_client.cleanup_command(shell_id, command_id)

return return_code, stdout_buffer, stderr_buffer
except Exception as e:
raise AirflowException(f"WinRM operator error: {e}")
finally:
winrm_client.close_shell(shell_id) # type: ignore[attr-defined]
winrm_client.close_shell(shell_id)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models import BaseOperator # type: ignore[no-redef]
from airflow.models import BaseOperator

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseHook
Expand Down