Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions providers/microsoft/azure/docs/operators/powerbi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,33 @@ To trigger a refresh for the specified dataset from the specified workspace, use
:start-after: [START howto_operator_powerbi_refresh_async]
:end-before: [END howto_operator_powerbi_refresh_async]

.. _howto/operator:PowerBIDatasetListOperator:

PowerBIDatasetListOperator
----------------------------------

To list all available and discoverable datasets from the specified workspace, use the :class:`~airflow.providers.microsoft.azure.operators.powerbi.PowerBIDatasetListOperator`.


.. exampleinclude:: /../tests/system/microsoft/azure/example_powerbi_dataset_list.py
:language: python
:dedent: 0
:start-after: [START howto_operator_powerbi_dataset_list_async]
:end-before: [END howto_operator_powerbi_dataset_list_async]

.. _howto/operator:PowerBIWorkspaceListOperator:

PowerBIWorkspaceListOperator
----------------------------------

To list all available and discoverable workspaces for the tenant, use the :class:`~airflow.providers.microsoft.azure.operators.powerbi.PowerBIWorkspaceListOperator`.

.. exampleinclude:: /../tests/system/microsoft/azure/example_powerbi_workspace_list.py
:language: python
:dedent: 0
:start-after: [START howto_operator_powerbi_workspace_list_async]
:end-before: [END howto_operator_powerbi_workspace_list_async]

Reference
---------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,25 @@ class PowerBIDatasetRefreshException(AirflowException):
"""An exception that indicates a dataset refresh failed to complete."""


class PowerBIWorkspaceListException(AirflowException):
"""An exception that indicates a failure in getting the list of groups (workspaces)."""


class PowerBIDatasetListException(AirflowException):
"""An exception that indicates a failure in getting the list of datasets."""


class PowerBIHook(KiotaRequestAdapterHook):
"""
A async hook to interact with Power BI.

:param conn_id: The Power BI connection id.
:param conn_id: The connection Id to connect to PowerBI.
:param timeout: The HTTP timeout being used by the `KiotaRequestAdapter` (default is None).
When no timeout is specified or set to None then there is no HTTP timeout on each request.
:param proxies: A dict defining the HTTP proxies to be used (default is None).
:param api_version: The API version of the Microsoft Graph API to be used (default is v1).
You can pass an enum named APIVersion which has 2 possible members v1 and beta,
or you can pass a string as `v1.0` or `beta`.
"""

conn_type: str = "powerbi"
Expand Down Expand Up @@ -200,6 +214,40 @@ async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> st
except AirflowException:
raise PowerBIDatasetRefreshException("Failed to trigger dataset refresh.")

async def get_workspace_list(self) -> list[str]:
"""
Triggers a request to get all available workspaces for the service principal.

:return: List of workspace IDs.
"""
try:
response = await self.run(url="myorg/groups", method="GET")

list_of_workspaces = response.get("value", [])

return [ws["id"] for ws in list_of_workspaces if "id" in ws]

except AirflowException:
raise PowerBIWorkspaceListException("Failed to get workspace ID list.")

async def get_dataset_list(self, *, group_id: str) -> list[str]:
"""
Triggers a request to get all datasets within a group (workspace).

:param group_id: Workspace ID.

:return: List of dataset IDs.
"""
try:
response = await self.run(url=f"myorg/groups/{group_id}/datasets", method="GET")

list_of_datasets = response.get("value", [])

return [ds["id"] for ds in list_of_datasets if "id" in ds]

except AirflowException:
raise PowerBIDatasetListException("Failed to get dataset ID list.")

async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None:
"""
Cancel the dataset refresh.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
from airflow.providers.microsoft.azure.triggers.powerbi import (
PowerBIDatasetListTrigger,
PowerBITrigger,
PowerBIWorkspaceListTrigger,
)

if TYPE_CHECKING:
from msgraph_core import APIVersion
Expand Down Expand Up @@ -167,3 +171,141 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
)
if event["status"] == "error":
raise AirflowException(event["message"])


class PowerBIWorkspaceListOperator(BaseOperator):
"""
Gets a list of workspaces where the service principal from the connection is assigned as admin.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PowerBIWorkspaceListOperator`

:param conn_id: The connection Id to connect to PowerBI.
:param timeout: The HTTP timeout being used by the `KiotaRequestAdapter`. Default is 1 week (60s * 60m * 24h * 7d).
When no timeout is specified or set to None then there is no HTTP timeout on each request.
:param proxies: A dict defining the HTTP proxies to be used (default is None).
:param api_version: The API version of the Microsoft Graph API to be used (default is v1).
You can pass an enum named APIVersion which has 2 possible members v1 and beta,
or you can pass a string as `v1.0` or `beta`.
"""

def __init__(
self,
*,
conn_id: str = PowerBIHook.default_conn_name,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
self.conn_id = conn_id
self.timeout = timeout

@property
def proxies(self) -> dict | None:
return self.hook.proxies

@property
def api_version(self) -> str | None:
return self.hook.api_version

def execute(self, context: Context):
"""List visible PowerBI Workspaces."""
self.defer(
trigger=PowerBIWorkspaceListTrigger(
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
),
method_name=self.execute_complete.__name__,
)

def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
"""
Return immediately - callback for when the trigger fires.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
self.xcom_push(
context=context,
key=f"{self.task_id}.powerbi_workspace_ids",
value=event["workspace_ids"],
)
if event["status"] == "error":
raise AirflowException(event["message"])


class PowerBIDatasetListOperator(BaseOperator):
"""
Gets a list of datasets where the service principal from the connection is assigned as admin.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PowerBIDatasetListOperator`

:param conn_id: The connection Id to connect to PowerBI.
:param group_id: The group Id to list discoverable datasets.
:param timeout: The HTTP timeout being used by the `KiotaRequestAdapter`. Default is 1 week (60s * 60m * 24h * 7d).
When no timeout is specified or set to None then there is no HTTP timeout on each request.
:param proxies: A dict defining the HTTP proxies to be used (default is None).
:param api_version: The API version of the Microsoft Graph API to be used (default is v1).
You can pass an enum named APIVersion which has 2 possible members v1 and beta,
or you can pass a string as `v1.0` or `beta`.
"""

def __init__(
self,
*,
group_id: str,
conn_id: str = PowerBIHook.default_conn_name,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
self.conn_id = conn_id
self.group_id = group_id
self.timeout = timeout

@property
def proxies(self) -> dict | None:
return self.hook.proxies

@property
def api_version(self) -> str | None:
return self.hook.api_version

def execute(self, context: Context):
"""List visible PowerBI datasets within group (Workspace)."""
self.defer(
trigger=PowerBIDatasetListTrigger(
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
group_id=self.group_id,
),
method_name=self.execute_complete.__name__,
)

def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
"""
Return immediately - callback for when the trigger fires.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
self.xcom_push(
context=context,
key=f"{self.task_id}.powerbi_dataset_ids",
value=event["dataset_ids"],
)
if event["status"] == "error":
raise AirflowException(event["message"])
Loading