Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Amazon SP extra endpoint support #5248

Merged
merged 54 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
edb86b4
Add GET_FBA_INVENTORY_AGED_DATA data
htrueman Jul 16, 2021
31f3d32
Add GET_MERCHANT_LISTINGS_ALL_DATA stream support
htrueman Jul 19, 2021
9777d16
Update schemas
htrueman Jul 20, 2021
53524e7
Update configured_catalog.json
htrueman Jul 20, 2021
539dd07
Update connector to airbyte-cdk
htrueman Jul 22, 2021
59af900
Add amazon seller partner test creds
htrueman Jul 22, 2021
b726bca
Update state sample files
htrueman Jul 22, 2021
4780a80
Apply code format
htrueman Jul 22, 2021
ee9b91f
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Jul 23, 2021
0eca787
Update acceptance-test-config.yml
htrueman Jul 23, 2021
26ff528
Add dummy integration test
htrueman Jul 23, 2021
2b08caa
Refactor auth signature.
htrueman Jul 29, 2021
17cbbd0
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Jul 29, 2021
ea341a2
Remove print_function import from auth.py
htrueman Jul 30, 2021
a87d7c5
Refactor source class.
htrueman Jul 30, 2021
de789a7
Add dummy integration test
htrueman Jul 30, 2021
fc1ccd4
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Jul 30, 2021
1a7d817
Typing added.
htrueman Aug 1, 2021
de328df
Add extra streams and schemas
htrueman Aug 3, 2021
359c8f4
Update docs and spec
htrueman Aug 6, 2021
6ef8446
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Aug 6, 2021
900fcb8
Post merge code fixes
htrueman Aug 6, 2021
aa0a5c8
Merge branch 'htrueman/source-amazon-sp-expand-endpoint-support' into…
htrueman Aug 6, 2021
62bc962
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Aug 6, 2021
e831b72
Fix test setup
htrueman Aug 9, 2021
f9c3428
Fix test setup
htrueman Aug 9, 2021
e3469e2
Add sample_state.json
htrueman Aug 9, 2021
bcd4efd
Update reports streams logics.
htrueman Aug 10, 2021
2961a08
Update tests config.
htrueman Aug 11, 2021
a518992
Merge fixes
htrueman Aug 13, 2021
46cff71
Add reports stream slices.
htrueman Aug 17, 2021
4a56620
Post review fixes.
htrueman Aug 17, 2021
fb5c029
Merge remote-tracking branch 'origin/htrueman/source-amazon-sp-change…
htrueman Aug 19, 2021
781912d
Streams update
htrueman Aug 19, 2021
1711c7a
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Aug 25, 2021
54f1cf0
Add reports document retrieval and decrypting.
htrueman Aug 25, 2021
f7f31e0
Add CVS parsing into result rows
htrueman Aug 26, 2021
eabc55e
Update ReportsAmazonSPStream class to be the child of Stream class.
htrueman Sep 1, 2021
c3c06e6
Schema updates
htrueman Sep 2, 2021
60b8258
Source check method updated
htrueman Sep 5, 2021
ff12577
Update ReportsAmazonSPStream retry report logics
htrueman Sep 5, 2021
034e09d
Update check_connection source method
htrueman Sep 10, 2021
1f3cb57
Update reports read_records method.
htrueman Sep 10, 2021
d59e101
Update streams.py
htrueman Sep 10, 2021
4a3daa0
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Sep 14, 2021
3573b18
Update acceptance tests config.
htrueman Sep 14, 2021
aeab6a8
Update report read_records logics
htrueman Sep 14, 2021
b49e49c
Add reports streams rate limit handling logics.
htrueman Sep 15, 2021
ba88b73
Merge remote-tracking branch 'origin/master' into htrueman/source-ama…
htrueman Sep 17, 2021
092d54d
Source Amazon SP: Update reports streams logics. (#5311)
htrueman Sep 17, 2021
c8e8248
Merge branch 'htrueman/source-amazon-sp-extra-endpoint-support' of gi…
htrueman Sep 17, 2021
434ccc2
Bump source version.
htrueman Sep 17, 2021
00b90e1
Mock time.sleep in test_reports_stream_send_request_backoff_exception…
htrueman Sep 17, 2021
44a4329
Acceptance test basic_read test disabled
htrueman Sep 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor auth signature.
Update streams.py
  • Loading branch information
htrueman committed Jul 29, 2021
commit 2b08caadfd11764aadb2b8677df787042e5e52d2
Original file line number Diff line number Diff line change
Expand Up @@ -24,92 +24,108 @@

from __future__ import print_function

import datetime
import hashlib
import hmac
import logging
import urllib.parse
from collections import OrderedDict
from typing import Any, Mapping

import pendulum
import requests
from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator
from requests.auth import AuthBase
from requests.compat import urlparse

log = logging.getLogger(__name__)

class AWSAuthenticator(Oauth2Authenticator):
def __init__(self, host: str, *args, **kwargs):
super().__init__(*args, **kwargs)

def sign_msg(key, msg):
""" Sign message using key """
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
self.host = host

def get_auth_header(self) -> Mapping[str, Any]:
return {
"host": self.host,
"user-agent": "python-requests",
"x-amz-access-token": self.get_access_token(),
"x-amz-date": pendulum.now("utc").strftime("%Y%m%dT%H%M%SZ"),
}


class AWSSigV4(AuthBase):
def __init__(self, service, **kwargs):
def __init__(self, service: str, aws_access_key_id: str, aws_secret_access_key: str, aws_session_token: str, region: str):
self.service = service
self.aws_access_key_id = kwargs.get("aws_access_key_id")
self.aws_secret_access_key = kwargs.get("aws_secret_access_key")
self.aws_session_token = kwargs.get("aws_session_token")
if self.aws_access_key_id is None or self.aws_secret_access_key is None:
raise KeyError("AWS Access Key ID and Secret Access Key are required")
self.region = kwargs.get("region")

def __call__(self, r):
t = datetime.datetime.utcnow()
self.amzdate = t.strftime("%Y%m%dT%H%M%SZ")
self.datestamp = t.strftime("%Y%m%d")
log.debug("Starting authentication with amzdate=%s", self.amzdate)
p = urlparse(r.url)

host = p.hostname
uri = urllib.parse.quote(p.path)
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_session_token = aws_session_token
self.region = region

@staticmethod
def sign_msg(key, msg):
""" Sign message using key """
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

def get_authorization_header(self, current_ts: pendulum.datetime, prepared_request: requests.PreparedRequest) -> str:
url_parsed = urlparse(prepared_request.url)
uri = urllib.parse.quote(url_parsed.path)
host = url_parsed.hostname

amz_date = current_ts.strftime("%Y%m%dT%H%M%SZ")
datestamp = current_ts.strftime("%Y%m%d")

# sort query parameters alphabetically
if len(p.query) > 0:
split_query_parameters = list(map(lambda param: param.split("="), p.query.split("&")))
if len(url_parsed.query) > 0:
split_query_parameters = list(map(lambda param: param.split("="), url_parsed.query.split("&")))
ordered_query_parameters = sorted(split_query_parameters, key=lambda param: (param[0], param[1]))
else:
ordered_query_parameters = list()

canonical_querystring = "&".join(map(lambda param: "=".join(param), ordered_query_parameters))

headers_to_sign = {"host": host, "x-amz-date": self.amzdate}
if self.aws_session_token is not None:
headers_to_sign = {"host": host, "x-amz-date": amz_date}
if self.aws_session_token:
headers_to_sign["x-amz-security-token"] = self.aws_session_token

ordered_headers = OrderedDict(sorted(headers_to_sign.items(), key=lambda t: t[0]))
ordered_headers = dict(sorted(headers_to_sign.items(), key=lambda h: h[0]))
canonical_headers = "".join(map(lambda h: ":".join(h) + "\n", ordered_headers.items()))
signed_headers = ";".join(ordered_headers.keys())

if r.method == "GET":
if prepared_request.method == "GET":
payload_hash = hashlib.sha256("".encode("utf-8")).hexdigest()
else:
if r.body:
payload_hash = hashlib.sha256(r.body.encode("utf-8")).hexdigest()
if prepared_request.body:
payload_hash = hashlib.sha256(prepared_request.body.encode("utf-8")).hexdigest()
else:
payload_hash = hashlib.sha256("".encode("utf-8")).hexdigest()

canonical_request = "\n".join([r.method, uri, canonical_querystring, canonical_headers, signed_headers, payload_hash])
canonical_request = "\n".join(
[prepared_request.method, uri, canonical_querystring, canonical_headers, signed_headers, payload_hash]
)

credential_scope = "/".join([self.datestamp, self.region, self.service, "aws4_request"])
credential_scope = "/".join([datestamp, self.region, self.service, "aws4_request"])
string_to_sign = "\n".join(
["AWS4-HMAC-SHA256", self.amzdate, credential_scope, hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()]
["AWS4-HMAC-SHA256", amz_date, credential_scope, hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()]
)
log.debug("String-to-Sign: '%s'", string_to_sign)

kDate = sign_msg(("AWS4" + self.aws_secret_access_key).encode("utf-8"), self.datestamp)
kRegion = sign_msg(kDate, self.region)
kService = sign_msg(kRegion, self.service)
kSigning = sign_msg(kService, "aws4_request")
signature = hmac.new(kSigning, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
datestamp_signed = self.sign_msg(("AWS4" + self.aws_secret_access_key).encode("utf-8"), datestamp)
region_signed = self.sign_msg(datestamp_signed, self.region)
service_signed = self.sign_msg(region_signed, self.service)
aws4_request_signed = self.sign_msg(service_signed, "aws4_request")
signature = hmac.new(aws4_request_signed, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()

authorization_header = "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}".format(
self.aws_access_key_id, credential_scope, signed_headers, signature
)
r.headers.update(
return authorization_header

def __call__(self, prepared_request):
current_ts = pendulum.now("utc")

prepared_request.headers.update(
{
"host": host,
"x-amz-date": self.amzdate,
"Authorization": authorization_header,
"host": urlparse(prepared_request.url).hostname,
"x-amz-date": current_ts.strftime("%Y%m%dT%H%M%SZ"),
"Authorization": self.get_authorization_header(current_ts, prepared_request),
"x-amz-security-token": self.aws_session_token,
}
)
return r
return prepared_request
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from source_amazon_seller_partner.auth import AWSSigV4
from source_amazon_seller_partner.auth import AWSAuthenticator, AWSSigV4
from source_amazon_seller_partner.constants import AWS_ENV, get_marketplaces_enum
from source_amazon_seller_partner.streams import FbaInventoryReports, FlatFileOrdersReports, MerchantListingsReports, Orders

Expand All @@ -43,21 +43,24 @@ def _get_stream_kwargs(self, config: Mapping[str, Any]):
boto3_client = boto3.client("sts", aws_access_key_id=config["aws_access_key"], aws_secret_access_key=config["aws_secret_key"])
role = boto3_client.assume_role(RoleArn=config["role_arn"], RoleSessionName="guid")
role_creds = role["Credentials"]
auth = AWSSigV4(
"execute-api",
aws_sig_v4 = AWSSigV4(
service="execute-api",
aws_access_key_id=role_creds.get("AccessKeyId"),
aws_secret_access_key=role_creds.get("SecretAccessKey"),
region=self.marketplace_values.region,
aws_session_token=role_creds.get("SessionToken"),
region=self.marketplace_values.region,
)
auth = AWSAuthenticator(
token_refresh_endpoint="https://api.amazon.com/auth/o2/token",
client_secret=config["lwa_client_secret"],
client_id=config["lwa_app_id"],
refresh_token=config["refresh_token"],
host=self.marketplace_values.endpoint[8:],
)
stream_kwargs = {
"url_base": self.marketplace_values.endpoint,
"authenticator": auth,
"access_token_credentials": {
"client_id": config["lwa_app_id"],
"client_secret": config["lwa_client_secret"],
"refresh_token": config["refresh_token"],
},
"aws_sig_v4": aws_sig_v4,
"replication_start_date": config["replication_start_date"],
}
return stream_kwargs
Expand All @@ -77,10 +80,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""

stream_kwargs = self._get_stream_kwargs(config)
streams = [
return [
MerchantListingsReports(**stream_kwargs),
FlatFileOrdersReports(**stream_kwargs),
FbaInventoryReports(**stream_kwargs),
Orders(marketplace_ids=self.marketplace_values.marketplace_id, **stream_kwargs),
]
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,22 @@
#

from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Iterable, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from source_amazon_seller_partner.auth import AWSSigV4


class AspStream(HttpStream, ABC):
class AmazonSPStream(HttpStream, ABC):
page_size = 100
data_field = "payload"

def __init__(self, url_base: str, authenticator: AWSSigV4, access_token_credentials: dict, replication_start_date: str):
def __init__(self, url_base: str, aws_sig_v4: AWSSigV4, replication_start_date: str, *args, **kwargs):
super().__init__(*args, **kwargs)

self._url_base = url_base
self._authenticator = authenticator
self._access_token_credentials = access_token_credentials
self._session = requests.Session()
self._aws_sig_v4 = aws_sig_v4
self._replication_start_date = replication_start_date

@property
Expand Down Expand Up @@ -84,8 +82,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
"""
:return an iterable containing each record in the response
"""
records = response.json().get(self.data_field, [])
yield from records
yield from response.json().get(self.data_field, [])

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Expand All @@ -97,70 +94,18 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}

def _get_access_token(self) -> str:
"""
Get's the access token
:return: access_token str
"""
data = {"grant_type": "refresh_token", **self._access_token_credentials}
headers = {"User-Agent": "python-sp-api-0.6.2", "content-type": "application/x-www-form-urlencoded;charset=UTF-8"}
res = requests.post("https://api.amazon.com/auth/o2/token", data, headers)
return res.json()["access_token"]

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
return {
"host": "sellingpartnerapi-na.amazon.com",
"user-agent": "python-sp-api-0.6.2",
"x-amz-access-token": self._get_access_token(),
"x-amz-date": pendulum.now("utc").strftime("%Y%m%dT%H%M%SZ"),
"content-type": "application/json",
}

def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, auth: AWSSigV4 = None
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params, "auth": auth}

if self.http_method.upper() == "POST":
args["json"] = json
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params, "auth": self._aws_sig_v4}

return self._session.prepare_request(requests.Request(**args))

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
auth=self.authenticator,
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []


class RecordsBase(AspStream, ABC):
def request_headers(self, *args, **kwargs) -> Mapping[str, Any]:
return {"content-type": "application/json"}


class RecordsBase(AmazonSPStream, ABC):
primary_key = "reportId"
cursor_field = "createdTime"
replication_start_date_field = "createdSince"
Expand Down Expand Up @@ -191,7 +136,7 @@ class FbaInventoryReports(RecordsBase):
name = "GET_FBA_INVENTORY_AGED_DATA"


class Orders(AspStream):
class Orders(AmazonSPStream):
name = "Orders"
primary_key = "AmazonOrderId"
cursor_field = "LastUpdateDate"
Expand All @@ -218,5 +163,4 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
"""
:return an iterable containing each record in the response
htrueman marked this conversation as resolved.
Show resolved Hide resolved
"""
records = response.json().get(self.data_field, {}).get(self.name, [])
yield from records
yield from response.json().get(self.data_field, {}).get(self.name, [])