Skip to content

Commit

Permalink
TDL-19620 Refactoring changes (#6)
Browse files Browse the repository at this point in the history
* refactoring for exception classes & helper methods

* directory struct change:improving readability for streams.py

* removed abstract decorator for set_params method

* Updated Circle Ci configurtion file

* Updated the formatting of the function call as per the suggestion

Co-authored-by: Collin Simon <cosimon@users.noreply.github.com>

* formatting changes

Co-authored-by: RushT007 <rtodkar@stitchdata-talend.com>
Co-authored-by: rdeshmukh15 <107538720+rdeshmukh15@users.noreply.github.com>
Co-authored-by: Collin Simon <cosimon@users.noreply.github.com>
Co-authored-by: “rdeshmukh15” <“redeshmukh@talend.com”>
  • Loading branch information
5 people authored Jul 1, 2022
1 parent 8064ad6 commit 022e26e
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 538 deletions.
43 changes: 33 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
version: 2.1

executors:
docker-executor:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester

version: 2
jobs:
build:
executor: docker-executor
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- run: echo 'CI done'

- checkout
- run:
name: 'Setup virtual env'
command: |
python3 -mvenv /usr/local/share/virtualenvs/tap-dixa
source /usr/local/share/virtualenvs/tap-dixa/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
- run:
name: 'JSON Validator'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_dixa/schemas/*.json
- run:
name: 'Pylint'
command: |
source /usr/local/share/virtualenvs/tap-dixa/bin/activate
pip install pylint==2.14.1
pylint tap_dixa --disable C,W,R,no-member
workflows:
version: 2
commit:
jobs:
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
cron: "0 0 * * *"
filters:
branches:
only:
- master
jobs:
- build:
context: circleci-user
5 changes: 2 additions & 3 deletions tap_dixa/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import singer
from singer import utils

from tap_dixa.discover import discover
from tap_dixa.sync import sync

Expand All @@ -15,14 +14,14 @@ def main():

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover()
catalog = discover(args.config)
catalog.dump()
# Otherwise run in sync mode
else:
if args.catalog:
catalog = args.catalog
else:
catalog = discover()
catalog = discover(args.config)
sync(args.config, args.state, catalog)


Expand Down
123 changes: 13 additions & 110 deletions tap_dixa/client.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,17 @@
""" Module providing DixaAPi Client"""
import base64
from enum import Enum


import backoff
import requests
from singer import get_logger

LOGGER = get_logger()


class DixaClientError(Exception):
def __init__(self, message=None, response=None):
super().__init__(message)
self.message = message
self.response = response


class DixaClient5xxError(DixaClientError):
pass


class DixaClient401Error(DixaClientError):
pass


class DixaClient400Error(DixaClientError):
pass


class DixaClient429Error(DixaClientError):
pass


class DixaClient422Error(DixaClientError):
pass


ERROR_CODE_EXCEPTION_MAPPING = {
500: {
'raise_exception': DixaClient5xxError,
'message': 'Server Error',
},
503: {
'raise_exception': DixaClient5xxError,
'message': 'Server Error',
},
401: {
'raise_exception': DixaClient401Error,
'message': 'Invalid or missing credentials'
},
400: {
'raise_exception': DixaClient400Error,
'message': 'Invalid query parameters'
},
429: {
'raise_exception': DixaClient429Error,
'message': 'API limit has been reached'
},
422: {
'raise_exception': DixaClient422Error,
'message': 'Exceeded max allowed 10 csids per request'
},
}


def raise_for_error(resp: requests.Response):
"""
Raises the associated response exception.
Takes in a response object, checks the status code, and throws the associated
exception based on the status code.
:param resp: requests.Response object
"""
try:
resp.raise_for_status()
except (requests.HTTPError, requests.ConnectionError) as error:
try:
error_code = resp.status_code
client_exception = ERROR_CODE_EXCEPTION_MAPPING.get(error_code, {})
exc = client_exception.get('raise_exception', DixaClientError)
message = client_exception.get('message', 'Client Error')

raise exc(message, resp) from None

except (ValueError, TypeError):
raise DixaClientError(error) from None


def retry_after_wait_gen():
"""
Returns a generator that is passed to backoff decorator to indicate how long
to backoff for in seconds.
"""
while True:
sleep_time = 60
LOGGER.info("API rate limit exceeded -- sleeping for %s seconds", sleep_time)
yield sleep_time


class DixaURL(Enum):
"""
Enum representing the Dixa base url API variants.
"""
exports = 'https://exports.dixa.io'
integrations = 'https://dev.dixa.io'
from .exceptions import DixaClient429Error, raise_for_error, retry_after_wait_gen
from .helpers import DixaURL


class Client:
"""DixaClient Class for performing extraction from DixaApi"""

def __init__(self, api_token: str):
self._api_token = api_token
Expand All @@ -123,19 +28,18 @@ def _to_base64(string: str) -> str:
:return: Base64 encoded string
"""
message = f"bearer:{string}"
message_bytes = message.encode('utf-8')
message_bytes = message.encode("utf-8")
base64_bytes = base64.b64encode(message_bytes)
return base64_bytes.decode('utf-8')
return base64_bytes.decode("utf-8")

def _set_auth_header(self):
"""
Sets the corresponding Authorization header based on the base url variant.
"""
if self._base_url == DixaURL.exports.value:
self._headers['Authorization'] = f"Basic {self._to_base64(self._api_token)}"

if self._base_url == DixaURL.integrations.value:
self._headers['Authorization'] = f"{self._api_token}"
if self._base_url == DixaURL.EXPORTS.value:
self._headers["Authorization"] = f"Basic {self._to_base64(self._api_token)}"
elif self._base_url == DixaURL.INTEGRATIONS.value:
self._headers["Authorization"] = f"{self._api_token}"

def _build_url(self, endpoint: str) -> str:
"""
Expand All @@ -150,13 +54,13 @@ def _get(self, url, headers=None, params=None, data=None):
"""
Wraps the _make_request function with a 'GET' method
"""
return self._make_request(url, method='GET', headers=headers, params=params, data=data)
return self._make_request(url, method="GET", headers=headers, params=params, data=data)

def _post(self, url, headers=None, params=None, data=None):
"""
Wraps the _make_request function with a 'POST' method
"""
return self._make_request(url, method='POST', headers=headers, params=params, data=data)
return self._make_request(url, method="POST", headers=headers, params=params, data=data)

@backoff.on_exception(retry_after_wait_gen, DixaClient429Error, jitter=None, max_tries=3)
def _make_request(self, url, method, headers=None, params=None, data=None) -> dict:
Expand All @@ -170,7 +74,6 @@ def _make_request(self, url, method, headers=None, params=None, data=None) -> di
:param data: The data passed to the body of the request
:return: A dictionary representing the response from the API
"""

with self._session as session:
response = session.request(method, url, headers=headers, params=params, data=data)

Expand Down
72 changes: 26 additions & 46 deletions tap_dixa/discover.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,18 @@
""" Module providing disovery method of tap-dixa"""
import json
import os

from singer import metadata
from singer.catalog import Catalog

from tap_dixa.streams import STREAMS

from .helpers import (
_get_key_properties_from_meta,
_get_replication_key_from_meta,
_get_replication_method_from_meta,
get_abs_path
)


def get_abs_path(path):
"""
Gets the absolute path of the provided relative path.
"""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

def _get_key_properties_from_meta(schema_meta):
"""
Retrieves the 'table-key-properties' value from the schema metadata.
"""
return schema_meta[0].get('metadata').get('table-key-properties')

def _get_replication_method_from_meta(schema_meta):
"""
Retrieves the 'forced-replication-method' value from the schema metadata.
"""
return schema_meta[0].get('metadata').get('forced-replication-method')

def _get_replication_key_from_meta(schema_meta):
"""
Retrieves the 'valid-replication-keys' value from the schema metadata.
"""
if _get_replication_method_from_meta(schema_meta) == 'INCREMENTAL':
return schema_meta[0].get('metadata').get('valid-replication-keys')[0]
return None

def get_schemas():
"""
Builds the singer schema and metadata dictionaries.
Expand All @@ -44,27 +23,28 @@ def get_schemas():

for stream_name, stream_object in STREAMS.items():

schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
schema_path = get_abs_path(f"schemas/{stream_name}.json")
with open(schema_path) as file:
schema = json.load(file)

if stream_object.replication_method == 'INCREMENTAL':
if stream_object.replication_method == "INCREMENTAL":
replication_keys = stream_object.valid_replication_keys
else:
replication_keys = None

meta = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_object.key_properties,
replication_method=stream_object.replication_method,
valid_replication_keys=replication_keys,
)
meta = metadata.get_standard_metadata(schema=schema,
key_properties=stream_object.key_properties,
replication_method=stream_object.replication_method,
valid_replication_keys=replication_keys,)

meta = metadata.to_map(meta)

if replication_keys:
for replication_key in replication_keys:
meta = metadata.write(meta, ('properties', replication_key), 'inclusion', 'automatic')
meta = metadata.write(meta,
("properties", replication_key),
"inclusion",
"automatic")

meta = metadata.to_list(meta)

Expand All @@ -74,7 +54,7 @@ def get_schemas():
return schemas, schemas_metadata


def discover():
def discover(config: dict):
"""
Builds the singer catalog for all the streams in the schemas directory.
"""
Expand All @@ -86,15 +66,15 @@ def discover():
schema_meta = schemas_metadata[schema_name]

catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': schema,
'key_properties': _get_key_properties_from_meta(schema_meta),
'replication_method': _get_replication_method_from_meta(schema_meta),
'replication_key': _get_replication_key_from_meta(schema_meta),
'metadata': schema_meta
"stream": schema_name,
"tap_stream_id": schema_name,
"schema": schema,
"key_properties": _get_key_properties_from_meta(schema_meta),
"replication_method": _get_replication_method_from_meta(schema_meta),
"replication_key": _get_replication_key_from_meta(schema_meta),
"metadata": schema_meta,
}

streams.append(catalog_entry)

return Catalog.from_dict({'streams': streams})
return Catalog.from_dict({"streams": streams})
Loading

0 comments on commit 022e26e

Please sign in to comment.