Skip to content

Commit

Permalink
feat: PowerBI - get and show the refresh history list
Browse files Browse the repository at this point in the history
  • Loading branch information
RadekBuczkowski committed Apr 19, 2024
1 parent 91cc183 commit 007f9e4
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 59 deletions.
72 changes: 62 additions & 10 deletions docs/power_bi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ The number of minutes can be specified in the optional
"max_minutes_after_last_refresh" parameter (default is 12 hours).

You can also specify the optional "local_timezone_name" parameter to show
the last refresh time of the PowerBI dataset in a local timezone.
It is only used for printing timestamps. Default timezone is UTC.
the last refresh time of the PowerBI dataset in a local time zone.
It is only used for printing timestamps. The default time zone is UTC.

All parameters can only be specified in the constructor.

Expand Down Expand Up @@ -162,7 +162,10 @@ if the refresh succeeded.

If you want to refresh only selected tables in the dataset, you can
specify the optional "table_names" parameter with a list of table names.
If the list is not empty, only the selected tables will be refreshed.
If the list is not empty, only the selected tables will be refreshed.
(Note: It is not possible to list available tables programmatically
using the PowerBI API, like you can do with workspaces and datasets.
You have to check the table names visually in PowerBI.)

All parameters can only be specified in the constructor.

Expand Down Expand Up @@ -198,13 +201,13 @@ parameter (default is 15 minutes).
If the refresh fails or a time-out occurs, the method casts an exception.

The wait time between calls to the PowerBI API is synchronized with the
execution time of the previous dataset refresh, making sure as few requests
to the PowerBI API would be made as possible, while ensuring the method
would finish as soon as possible.
average execution time of previous dataset refreshes via API (only calls
refreshing all tables), making sure as few requests to the PowerBI API would
be made as possible, while ensuring the method finishes as soon as possible.

If you want to refresh only selected tables in the dataset, you can
specify the optional "table_names" parameter with a list of table names.
If the list is not empty, only the selected tables will be refreshed
If the list is not empty, only selected tables will be refreshed
(and the previous refresh time will be ignored).

Additionally, you can set the optional "number_of_retries" parameter to
Expand All @@ -213,9 +216,9 @@ Default is 0 (no retries). E.g. 1 means two attempts in total.
It is used only when the "timeout_in_seconds" parameter allows it,
so you need to set the "timeout_in_seconds" parameter high enough.

You can also specify the optional "local_timezone_name" parameter to show
the last refresh time of the PowerBI dataset in a local timezone.
It is only used for printing timestamps. Default timezone is UTC.
You can also specify the optional "local_timezone_name" parameter to
show the last refresh time of the PowerBI dataset in a local time zone.
It is only used for printing timestamps. The default time zone is UTC.

All parameters can only be specified in the constructor.

Expand Down Expand Up @@ -249,6 +252,55 @@ Refresh completed successfully at 2024-02-02 09:02 (local time).
True
```

## Step 7: Show and get the refresh history of a given dataset

The show_history() and get_history() methods can be used to show and get
the refresh history of a given dataset. The show_history() method displays
a Pandas data frame with the refresh history, and the get_history() method
returns the actual data frame converted to a Spark data frame.

According to MSDN, "there are always between 20–60 available refresh history
entries for each dataset, depending on the number of refreshes in the last 3 days.
The most recent 60 are kept if they are all less than 3 days old. Entries
more than 3 days old are deleted when there are more than 20 entries."

You can also specify the optional "local_timezone_name" parameter to convert
refresh times in the data frame to a local timezone. Depending on the parameter,
the names of the time columns in the data frame will have the suffix
"Utc" or "Local".

All above parameters can only be specified in the constructor.

```python
# example show and get refresh history
from spetlr.power_bi.PowerBi import PowerBi

client = MyPowerBiClient()
PowerBi(client,
workspace_name="Finance",
dataset_name="Invoicing",
local_timezone_name="Europe/Copenhagen").show_history()

PowerBi(client,
workspace_name="Finance",
dataset_name="Invoicing",
local_timezone_name="Europe/Copenhagen").show_history()

# alternatively:
df = PowerBi(client,
workspace_id="614850c2-3a5c-4d2d-bcaa-d3f20f32a2e0",
dataset_id="b1f0a07e-e348-402c-a2b2-11f3e31181ce",
local_timezone_name="Europe/Copenhagen").get_history()

df = PowerBi(client,
workspace_id="614850c2-3a5c-4d2d-bcaa-d3f20f32a2e0",
dataset_id="b1f0a07e-e348-402c-a2b2-11f3e31181ce",
local_timezone_name="Europe/Copenhagen").get_history()

df.display()

```

# Testing

Due to license restrictions, testing requires a valid PowerBI license.
Expand Down
187 changes: 149 additions & 38 deletions src/spetlr/power_bi/PowerBi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import msal
import pandas as pd
import requests
from dateutil import parser
from pyspark.sql import DataFrame
from pytz import timezone, utc

from spetlr.exceptions import SpetlrException
from spetlr.power_bi.PowerBiClient import PowerBiClient
from spetlr.spark import Spark


class PowerBi:
Expand Down Expand Up @@ -95,6 +96,10 @@ def __init__(
self.local_timezone_name = (
local_timezone_name if local_timezone_name is not None else "UTC"
)
self.is_utc = self.local_timezone_name.upper() == "UTC"
self.time_column_suffix = "Utc" if self.is_utc else "Local"
self.time_description_suffix = " (UTC)" if self.is_utc else " (local time)"

self.ignore_errors = ignore_errors
self.api_header = None
self.expire_time = 0
Expand Down Expand Up @@ -266,71 +271,148 @@ def _connect(self) -> bool:

return True

def _get_last_refresh(self) -> bool:
def _get_refresh_history(
self, newest_only: bool = False
) -> Union[pd.DataFrame, None]:
"""
Gets the latest record in the PowerBI dataset refresh history.
Return the PowerBI dataset refresh history.
:return: True if succeeded or False if failed (when ignore_errors==True)
:rtype: bool
:param bool newest_only : Limit the result to only the latest history row.
:return: data frame with the refresh history if succeeded or None if failed (when ignore_errors==True)
:rtype: Pandas data frame
:raises SpetlrException: if failed and ignore_errors==False
"""

if not self._connect():
return False
return None

self.last_status = None
self.last_exception = None
self.last_refresh_utc = None

# Note: we fetch only the latest refresh record, i.e. top=1
api_url = (
f"{self.powerbi_url}groups/{self.workspace_id}"
f"/datasets/{self.dataset_id}/refreshes?$top=1"
f"/datasets/{self.dataset_id}/refreshes"
)
if newest_only:
# Note: we fetch only the latest refresh record, i.e. top=1
api_url = api_url + "?$top=1"

api_call = requests.get(url=api_url, headers=self.api_header)
if api_call.status_code == 200:
json = api_call.json()
df = pd.DataFrame(
json["value"],
columns=[
"requestId",
"id",
"refreshType",
"status",
"startTime",
"endTime",
"status",
"serviceExceptionJson",
"requestId",
"refreshAttempts",
],
)
if not df.empty:
df.set_index("id")
self.last_status = df.status[0]
self.last_exception = df.serviceExceptionJson[0]
start_time = df.startTime[0]
end_time = df.endTime[0]
if (
self.last_status == "Completed"
and end_time is not None
and len(end_time) > 0
):
self.last_refresh_utc = parser.parse(end_time).replace(tzinfo=utc)
if self.table_names:
self.last_duration_in_seconds = 0
elif start_time is not None and len(start_time) > 0:
self.last_duration_in_seconds = int(
(
parser.parse(end_time) - parser.parse(start_time)
).total_seconds()
)
return True

self.schema = (
"Id long, RefreshType string, Status string, Seconds long, "
f"StartTime{self.time_column_suffix} timestamp, EndTime{self.time_column_suffix} timestamp, "
"Error string, RequestId string, RefreshAttempts string"
)

if df.empty:
return pd.DataFrame(
{
"Id": pd.Series(dtype="int64"),
"RefreshType": pd.Series(dtype="object"),
"Status": pd.Series(dtype="object"),
"Seconds": pd.Series(dtype="int64"),
("StartTime" + self.time_column_suffix): pd.Series(
dtype="datetime64[ns]"
),
("EndTime" + self.time_column_suffix): pd.Series(
dtype="datetime64[ns]"
),
"Error": pd.Series(dtype="object"),
"RequestId": pd.Series(dtype="object"),
"RefreshAttempts": pd.Series(dtype="object"),
}
)

df.set_index("id")
df["startTime"] = pd.to_datetime(df["startTime"])
df["endTime"] = pd.to_datetime(df["endTime"])
df.insert(
3,
"seconds",
(df["endTime"] - df["startTime"])
.astype("timedelta64[s]")
.astype("int64"),
)
zone = timezone(self.local_timezone_name)
df["startTime"] = df["startTime"].dt.tz_convert(zone).dt.tz_localize(None)
df["endTime"] = df["endTime"].dt.tz_convert(zone).dt.tz_localize(None)
df.rename(
columns={
"id": "Id",
"refreshType": "RefreshType",
"status": "Status",
"seconds": "Seconds",
"startTime": ("StartTime" + self.time_column_suffix),
"endTime": ("EndTime" + self.time_column_suffix),
"serviceExceptionJson": "Error",
"requestId": "RequestId",
"refreshAttempts": "RefreshAttempts",
},
inplace=True,
)
return df

elif api_call.status_code == 404:
self._raise_error(
"The specified dataset or workspace cannot be found, "
"or the dataset doesn't have a user with the required permissions!"
)
else:
self._raise_api_error("Failed to fetch refresh history!", api_call)
return False
return None

def _get_last_refresh(self) -> bool:
"""
Gets the latest record in the PowerBI dataset refresh history.
:return: True if succeeded or False if failed (when ignore_errors==True)
:rtype: bool
:raises SpetlrException: if failed and ignore_errors==False
"""

df = self._get_refresh_history()
if df is None:
return False

if not df.empty:
self.last_status = df.Status.iloc[0]
self.last_exception = df.Error.iloc[0]
# claculate the average duration of all previous API refresh calls without any tables specified
mean = df.loc[
(df["RefreshType"] == "ViaApi") & (df["Status"] == "Completed"),
df.Seconds.name,
].mean()
if pd.isna(mean) or self.table_names:
self.last_duration_in_seconds = 0
else:
self.last_duration_in_seconds = int(mean)
zone = timezone(self.local_timezone_name)
if self.last_status == "Completed":
if self.is_utc:
self.last_refresh_utc = utc.localize(df.EndTimeUtc.iloc[0])
else:
self.last_refresh_utc = zone.localize(
df.EndTimeLocal.iloc[0]
).astimezone(utc)

return True

def _verify_last_refresh(self) -> bool:
"""
Expand All @@ -343,18 +425,14 @@ def _verify_last_refresh(self) -> bool:
"""

if self.last_status is None:
self._raise_error("Refresh is still in progress or never triggered!")
print("No refresh was triggered yet.")
elif self.last_status == "Completed":
if self.last_refresh_utc is None:
self._raise_error("Completed at unknown refresh time!")
else:
last_refresh_str = self.last_refresh_utc.astimezone(
timezone(self.local_timezone_name)
).strftime("%Y-%m-%d %H:%M") + (
" (UTC)"
if self.local_timezone_name.lower() == "utc"
else " (local time)"
)
).strftime("%Y-%m-%d %H:%M") + (self.time_description_suffix)
min_refresh_time_utc = datetime.now(utc) - timedelta(
minutes=self.max_minutes_after_last_refresh
)
Expand Down Expand Up @@ -525,3 +603,36 @@ def refresh(self) -> bool:
break

return self._verify_last_refresh()

def show_history(self) -> None:
"""
Displays the refresh history of a PowerBI dataset.
:return: data frame with the refresh history if succeeded or None if failed (when ignore_errors==True)
:rtype: Pandas data frame
:raises SpetlrException: if failed and ignore_errors==False
"""

df = self._get_refresh_history()
if df is None:
return None
if df.empty:
print("The refresh history list is empty.")
df.display()

def get_history(self) -> Union[DataFrame, None]:
"""
Returns the refresh history of a PowerBI dataset in a Spark data frame.
:return: data frame with the refresh history if succeeded or None if failed (when ignore_errors==True)
:rtype: Spark data frame
:raises SpetlrException: if failed and ignore_errors==False
"""

df = self._get_refresh_history()
if df is None:
return None
if df.empty:
return Spark.get().createDataFrame(df, self.schema)
else:
return Spark.get().createDataFrame(df)
Loading

0 comments on commit 007f9e4

Please sign in to comment.