Skip to content

Commit

Permalink
Merge pull request #364 from shipyardapp/jr/integ-533-microsoft-power…
Browse files Browse the repository at this point in the history
…-bi-oauth-support

Merged into Main
  • Loading branch information
johnathan-rodriguez authored Aug 13, 2024
2 parents 75c96d3 + 5a9d68f commit b3404f7
Show file tree
Hide file tree
Showing 12 changed files with 681 additions and 357 deletions.
237 changes: 177 additions & 60 deletions shipyard_blueprints/microsoft-power-bi/poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion shipyard_blueprints/microsoft-power-bi/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "shipyard-microsoft-power-bi"
version = "0.1.2"
version = "0.2.0a0"
description = ""
authors = ["johnathan-rodriguez <johnathan.rodriguez@shipyardapp.com>"]
readme = "README.md"
Expand All @@ -14,6 +14,7 @@ shipyard-templates = "^0.9.0"

[tool.poetry.group.dev.dependencies]
python-dotenv = "^1.0.0"
pytest = "^8.3.2"

[build-system]
requires = ["poetry-core"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
from .microsoft_power_bi import MicrosoftPowerBiClient
from . import microsoft_power_bi_utils
from .cli import authtest
from .cli import trigger_sync
from shipyard_microsoft_power_bi.microsoft_power_bi import MicrosoftPowerBiClient
Empty file.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import sys

from shipyard_microsoft_power_bi import MicrosoftPowerBiClient
from shipyard_templates.shipyard_logger import ShipyardLogger

from shipyard_microsoft_power_bi import MicrosoftPowerBiClient

logger = ShipyardLogger.get_logger()
logger.setLevel("AUTHTEST")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import sys
import argparse
from shipyard_microsoft_power_bi import MicrosoftPowerBiClient
import sys

from shipyard_templates import ShipyardLogger
from shipyard_templates.exit_code_exception import ExitCodeException

from shipyard_microsoft_power_bi import MicrosoftPowerBiClient, utils

logger = ShipyardLogger.get_logger()


def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--client-id", dest="client_id", required=True)
parser.add_argument("--client-secret", dest="client_secret", required=True)
parser.add_argument("--tenant-id", dest="tenant_id", required=True)
parser.add_argument("--client-id", dest="client_id", required=False)
parser.add_argument("--client-secret", dest="client_secret", required=False)
parser.add_argument("--tenant-id", dest="tenant_id", required=False)
parser.add_argument("--group-id", dest="group_id", required=True)
parser.add_argument("--object-type", dest="object_type", required=True)
parser.add_argument("--object-id", dest="object_id", required=True)
Expand All @@ -20,61 +25,13 @@ def get_args():
return parser.parse_args()


def validate_args(args):
"""
Handle datatype conversions and validations for arguments
@param args: Namespace object containing arguments
@return: Tuple containing wait_for_completion and wait_time
"""

if args.poke_interval is None or args.poke_interval == "":
wait_time = 1
else:
wait_time = int(args.poke_interval) * 60

if wait_time < 0:
raise ExitCodeException(
"Error: poke-interval must be greater than 0",
MicrosoftPowerBiClient.EXIT_CODE_INVALID_INPUT,
)
elif wait_time > 60:
raise ExitCodeException(
"Error: poke-interval must be less than or equal to 60",
MicrosoftPowerBiClient.EXIT_CODE_INVALID_INPUT,
)

wait_for_completion = None
if type(args.wait_for_completion) is str:
if args.wait_for_completion.upper() == "TRUE":
wait_for_completion = True
elif args.wait_for_completion.upper() == "FALSE":
wait_for_completion = False

elif type(args.wait_for_completion) is bool:
wait_for_completion = args.wait_for_completion

return wait_for_completion, wait_time


def main():
try: # Initialize client to ensure client.logger is available
try:
args = get_args()
wait_for_completion, wait_time = utils.validate_args(args)

client = MicrosoftPowerBiClient(
client_id=args.client_id,
client_secret=args.client_secret,
tenant_id=args.tenant_id,
)
except ExitCodeException as e:
print(e)
sys.exit(e.exit_code)
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(MicrosoftPowerBiClient.EXIT_CODE_UNKNOWN_ERROR)

try:
wait_for_completion, wait_time = validate_args(args)
credentials = utils.get_credential_group(args)
client = MicrosoftPowerBiClient(**credentials)

client.refresh(
object_type=args.object_type,
Expand All @@ -84,11 +41,11 @@ def main():
wait_time=wait_time,
)
except ExitCodeException as e:
client.logger.error(e)
logger.error(e)
sys.exit(e.exit_code)
except Exception as e:
client.logger.error(f"Unexpected error: {e}")
sys.exit(client.EXIT_CODE_UNKNOWN_ERROR)
logger.error(f"Unexpected error: {e}")
sys.exit(MicrosoftPowerBiClient.EXIT_CODE_UNKNOWN_ERROR)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import time
from json import JSONDecodeError

from requests import request
from shipyard_templates import DataVisualization, ExitCodeException, ShipyardLogger

from shipyard_microsoft_power_bi import microsoft_power_bi_utils as bi_utils
from shipyard_microsoft_power_bi import utils

logger = ShipyardLogger.get_logger()

Expand All @@ -25,7 +27,12 @@ class MicrosoftPowerBiClient(DataVisualization):
EXIT_CODE_DATAFLOW_REFRESH_ALREADY_IN_PROGRESS = 102
EXIT_CODE_UNKNOWN_REFRESH_JOB_STATUS = 103

def __init__(self, client_id: str, client_secret: str, tenant_id: str, **kwargs):
FAILED_JOB_STATUSES = {"Failed", "PartialComplete"}
ONGOING_JOB_STATUSES = {"Queued", "InProgress", "Unknown"}
COMPLETE_JOB_STATUSES = {"Completed", "Success"}

def __init__(self, client_id: str = None, client_secret: str = None, tenant_id: str = None, access_token=None,
username=None, password=None, **kwargs):
"""
Initializes the MicrosoftPowerBiClient with the provided credentials.
Expand All @@ -36,11 +43,52 @@ def __init__(self, client_id: str, client_secret: str, tenant_id: str, **kwargs)
**kwargs: Additional keyword arguments passed to the superclass.
"""

super().__init__(**kwargs)
self.access_token = None
self.password = password
self.username = username
self.client_id = client_id
self.client_secret = client_secret
self.tenant_id = tenant_id
self._access_token = access_token

@property
def access_token(self):
if self._access_token is not None:
return self._access_token
if self.username and self.password:
try:
results = utils.generate_token_from_un_pw(client_id=self.client_id, tenant_id=self.tenant_id,
username=self.username, password=self.password)
self._set_access_token(results)
except Exception as e:
logger.error(
f"Failed to generate token using username/password: {e}"
)
if self.client_id and self.client_secret:
try:
results = utils.generate_access_token_from_client(client_id=self.client_id, tenant_id=self.tenant_id,
client_secret=self.client_secret)
self._set_access_token(results)
except Exception as e:
logger.error(
f"Failed to generate token using client credentials: {e}"
)

if not self._access_token:
raise ExitCodeException("Failed to generate access token", self.EXIT_CODE_INVALID_CREDENTIALS)
return self._access_token

@access_token.setter
def access_token(self, access_token):
self._access_token = access_token

def _set_access_token(self, result):
if "access_token" not in result:
raise ExitCodeException(
f"Failed to connect to Power BI using basic authentication. Error: {result}",
self.EXIT_CODE_INVALID_CREDENTIALS
)
self.access_token = result["access_token"]
logger.info("Successfully connected to Power BI")

def _request(self, endpoint: str, method: str = "GET", **kwargs):
"""
Expand All @@ -52,7 +100,7 @@ def _request(self, endpoint: str, method: str = "GET", **kwargs):
@return: The response from the request.
"""
if self.access_token is None:
self.access_token = bi_utils.generate_access_token(self)
self.access_token = utils.generate_access_token(self)

logger.debug(f"Attempting to {method} {endpoint}")

Expand Down Expand Up @@ -80,7 +128,62 @@ def _request(self, endpoint: str, method: str = "GET", **kwargs):
logger.debug("No Content")
return response
else:
bi_utils.handle_error_response(self, response)

self._handle_error_response(response)

def _handle_error_response(self, response):
try:
logger.debug(response.text)
response_details = response.json()
error_info = response_details.get("error", {})

except JSONDecodeError:
logger.warning("Response was not JSON...handling as text")
error_info = {"message": response.text} if response.text else {}

if error_info.get("code") == "DailyDataflowRefreshLimitExceeded":
raise ExitCodeException(
error_info.get(
"message",
f"<Daily Dataflow Refresh Limit Exceeded> \n {response.text}",
),
self.EXIT_CODE_RATE_LIMIT,
)
if response.status_code == 401:
raise ExitCodeException(
error_info.get("message", f"<Unauthorized> \n {response.text}"),
self.EXIT_CODE_INVALID_CREDENTIALS,
)
elif response.status_code == 403:
raise ExitCodeException(
error_info.get("message", f"<Forbidden> \n {response.text}"),
self.EXIT_CODE_INVALID_CREDENTIALS,
)
elif response.status_code == 404:
raise ExitCodeException(
error_info.get("message", f"<Not Found\n {response.text}"),
self.EXIT_CODE_INVALID_INPUT,
)
elif response.status_code == 400:
raise ExitCodeException(
error_info.get("message", f"<Bad Request\n {response.text}"),
self.EXIT_CODE_BAD_REQUEST,
)
elif response.status_code == 405:
raise ExitCodeException(
error_info.get("message", f"<Method Not Allowed\n {response.text}"),
self.EXIT_CODE_BAD_REQUEST,
)
elif response.status_code == 429:
raise ExitCodeException(
error_info.get("message", f"<Too Many Requests\n {response.text}"),
self.EXIT_CODE_RATE_LIMIT,
)
else:
raise ExitCodeException(
f"Unknown Error<{response.status_code}>: {response.text}",
self.EXIT_CODE_UNKNOWN_ERROR,
)

def connect(self):
"""
Expand All @@ -89,7 +192,7 @@ def connect(self):
@return: 1 if the connection fails, 0 otherwise.
"""
try:
bi_utils.generate_access_token(self)
self.access_token
except ExitCodeException as e:
logger.authtest(e)
return 1
Expand Down Expand Up @@ -148,9 +251,7 @@ def refresh_dataset(
logger.info("Dataset refresh triggered")
if wait_for_completion:
request_id = response.headers.get("RequestId")
bi_utils.wait_for_dataset_refresh_completion(
self, group_id, dataset_id, request_id, wait_time=wait_time
)
self.wait_for_dataset_refresh_completion(group_id, dataset_id, request_id, wait_time=wait_time)
else:
return response

Expand Down Expand Up @@ -180,9 +281,7 @@ def refresh_dataflow(
data=json.dumps(data),
)
if wait_for_completion:
bi_utils.wait_for_dataflow_refresh_completion(
self, group_id, dataflow_id, wait_time=wait_time
)
self.wait_for_dataflow_refresh_completion(group_id, dataflow_id, wait_time=wait_time)
else:
logger.info("Dataflow refresh triggered")
return response
Expand Down Expand Up @@ -287,3 +386,73 @@ def get_dataflows(self, group_id: str):
return self._request(
f"{self.BASE_URL}/groups/{group_id}/dataflows", method="GET"
)

def wait_for_dataflow_refresh_completion(
self, group_id, dataflow_id, wait_time=60):
logger.info("Waiting for refresh to complete")
job_status = "Unknown"

while job_status in self.ONGOING_JOB_STATUSES:
transactions = self.get_dataflow_transactions(group_id, dataflow_id)
sorted_transactions = sorted(transactions["value"], key=lambda x: x["startTime"], reverse=True)

if len(sorted_transactions) == 0:
raise ExitCodeException(
f"No transactions found for dataflow {dataflow_id}",
self.EXIT_CODE_INVALID_INPUT,
)

job_status = sorted_transactions[0].get("status") # Get the latest transaction status

if job_status in self.COMPLETE_JOB_STATUSES:
logger.info(f"Job completed with status {job_status}")
logger.info("Refresh completed")
return

if job_status in self.FAILED_JOB_STATUSES:
raise ExitCodeException(
f"Dataflow refresh failed with status {job_status}",
self.EXIT_CODE_FAILED_REFRESH_JOB,
)

elif job_status in self.ONGOING_JOB_STATUSES:
logger.info(
f"Job currently in {job_status}. Waiting {wait_time} seconds before checking again."
)
time.sleep(wait_time)
else:
raise ExitCodeException(
f"Unknown job status {job_status}",
self.EXIT_CODE_UNKNOWN_REFRESH_JOB_STATUS,
)

def wait_for_dataset_refresh_completion(self, group_id, dataset_id, request_id, wait_time=60):
logger.info("Waiting for refresh to complete")
job_status = "Unknown"

while job_status in self.ONGOING_JOB_STATUSES:
job_status = self.check_recent_dataset_refresh_by_request_id(
group_id, dataset_id, request_id
).get("status")

if job_status in self.COMPLETE_JOB_STATUSES:
logger.info(f"Job completed with status {job_status}")
logger.info("Refresh completed")
return

if job_status in self.FAILED_JOB_STATUSES:
raise ExitCodeException(
f"Dataset refresh failed with status {job_status}",
self.EXIT_CODE_FAILED_REFRESH_JOB,
)

elif job_status in self.ONGOING_JOB_STATUSES:
logger.info(
f"Job currently in {job_status}. Waiting {wait_time} seconds before checking again."
)
time.sleep(wait_time)
else:
raise ExitCodeException(
f"Unknown job status {job_status}",
self.EXIT_CODE_UNKNOWN_REFRESH_JOB_STATUS,
)
Loading

0 comments on commit b3404f7

Please sign in to comment.