Skip to content

Commit

Permalink
Improve URL creation in the CDK (airbytehq#8513)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergei Solonitcyn <sergei.solonitcyn@zazmic.com>
  • Loading branch information
sergei-solonitcyn authored Dec 5, 2021
1 parent feb4083 commit 6ee922c
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 31 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import urljoin

import requests
import vcr
Expand Down Expand Up @@ -239,7 +240,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params}
args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params}
if self.http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from abc import ABC, abstractmethod
from io import StringIO
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import urljoin

import pendulum
import requests
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(
):
super().__init__(*args, **kwargs)

self._url_base = url_base
self._url_base = url_base.rstrip("/") + "/"
self._replication_start_date = replication_start_date
self.marketplace_ids = marketplace_ids
self._session.auth = aws_signature
Expand Down Expand Up @@ -139,7 +140,7 @@ class ReportsAmazonSPStream(Stream, ABC):
"""

primary_key = None
path_prefix = f"/reports/{REPORTS_API_VERSION}"
path_prefix = f"reports/{REPORTS_API_VERSION}"
sleep_seconds = 30
data_field = "payload"

Expand All @@ -154,7 +155,7 @@ def __init__(
):
self._authenticator = authenticator
self._session = requests.Session()
self._url_base = url_base
self._url_base = url_base.rstrip("/") + "/"
self._session.auth = aws_signature
self._replication_start_date = replication_start_date
self.marketplace_ids = marketplace_ids
Expand Down Expand Up @@ -195,7 +196,11 @@ def _create_prepared_request(
"""
Override to make http_method configurable per method call
"""
args = {"method": http_method, "url": self.url_base + path, "headers": headers, "params": params}
args = {
"method": http_method,
"url": urljoin(self.url_base, path),
"headers": headers, "params": params
}
if http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
Expand Down Expand Up @@ -470,7 +475,7 @@ class Orders(IncrementalAmazonSPStream):
page_size_field = "MaxResultsPerPage"

def path(self, **kwargs) -> str:
return f"/orders/{ORDERS_API_VERSION}/orders"
return f"orders/{ORDERS_API_VERSION}/orders"

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
Expand Down Expand Up @@ -508,7 +513,7 @@ def __init__(self, *args, **kwargs):
).strftime(self.time_format)

def path(self, **kwargs) -> str:
return f"/vendor/directFulfillment/shipping/{VENDORS_API_VERSION}/shippingLabels"
return f"vendor/directFulfillment/shipping/{VENDORS_API_VERSION}/shippingLabels"

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

class AmplitudeStream(HttpStream, ABC):

url_base = "https://amplitude.com/api"
url_base = "https://amplitude.com/api/"
api_version = 2

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand All @@ -31,7 +31,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield from respose_data.get(self.name, [])

def path(self, **kwargs) -> str:
return f"/{self.api_version}/{self.name}"
return f"{self.api_version}/{self.name}"


class Cohorts(AmplitudeStream):
Expand Down Expand Up @@ -160,7 +160,7 @@ def request_params(
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"/{self.api_version}/export"
return f"{self.api_version}/export"


class ActiveUsers(IncrementalAmplitudeStream):
Expand All @@ -177,7 +177,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield {"date": date, "statistics": dict(zip(response_data["seriesLabels"], series[i]))}

def path(self, **kwargs) -> str:
return f"/{self.api_version}/users"
return f"{self.api_version}/users"


class AverageSessionLength(IncrementalAmplitudeStream):
Expand All @@ -196,4 +196,4 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield {"date": date, "length": series[i]}

def path(self, **kwargs) -> str:
return f"/{self.api_version}/sessions/average"
return f"{self.api_version}/sessions/average"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator

BASE_URL = "https://www.googleapis.com/webmasters/v3"
BASE_URL = "https://www.googleapis.com/webmasters/v3/"
ROW_LIMIT = 25000


Expand Down Expand Up @@ -67,7 +67,7 @@ def path(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"/sites/{stream_slice.get('site_url')}"
return f"sites/{stream_slice.get('site_url')}"


class Sitemaps(GoogleSearchConsole):
Expand All @@ -83,7 +83,7 @@ def path(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"/sites/{stream_slice.get('site_url')}/sitemaps"
return f"sites/{stream_slice.get('site_url')}/sitemaps"


class SearchAnalytics(GoogleSearchConsole, ABC):
Expand All @@ -102,7 +102,7 @@ def path(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"/sites/{stream_slice.get('site_url')}/searchAnalytics/query"
return f"sites/{stream_slice.get('site_url')}/searchAnalytics/query"

@property
def cursor_field(self) -> Union[str, List[str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, config: Mapping[str, Any], stream_name: str = None, param: Ma
self.config = config
self.start_date = config["start_date"]
self.window_in_days = config["window_in_days"]
self._url_base = config["domain_url"]
self._url_base = config["domain_url"].rstrip("/") + "/"
self.stream_name = stream_name
self.param = param
self.export_id = export_id
Expand All @@ -40,7 +40,7 @@ def url_base(self) -> str:
return self._url_base

def path(self, **kwargs) -> str:
return f"/rest/v1/{self.name}.json"
return f"rest/v1/{self.name}.json"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
next_page = response.json().get("nextPageToken")
Expand Down Expand Up @@ -158,7 +158,7 @@ def get_export_status(self, stream_slice):
)

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"/bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json"
return f"bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json"

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
date_slices = super().stream_slices(sync_mode, stream_state, **kwargs)
Expand Down Expand Up @@ -252,7 +252,7 @@ class MarketoExportCreate(MarketoStream):
http_method = "POST"

def path(self, **kwargs) -> str:
return f"/bulk/v1/{self.stream_name}/export/create.json"
return f"bulk/v1/{self.stream_name}/export/create.json"

def request_body_json(self, **kwargs) -> Optional[Mapping]:
params = {"format": "CSV"}
Expand Down Expand Up @@ -280,7 +280,7 @@ class MarketoExportStart(MarketoStream):
http_method = "POST"

def path(self, **kwargs) -> str:
return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/enqueue.json"
return f"bulk/v1/{self.stream_name}/export/{self.export_id}/enqueue.json"


class MarketoExportStatus(MarketoStream):
Expand All @@ -290,7 +290,7 @@ class MarketoExportStatus(MarketoStream):
"""

def path(self, **kwargs) -> str:
return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/status.json"
return f"bulk/v1/{self.stream_name}/export/{self.export_id}/status.json"

def parse_response(self, response: requests.Response, **kwargs) -> List[str]:
return [response.json()[self.data_field][0]["status"]]
Expand Down Expand Up @@ -384,7 +384,7 @@ class ActivityTypes(MarketoStream):
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "/rest/v1/activities/types.json"
return "rest/v1/activities/types.json"


class Programs(IncrementalMarketoStream):
Expand All @@ -401,7 +401,7 @@ def __init__(self, config: Mapping[str, Any]):
self.offset = 0

def path(self, **kwargs) -> str:
return f"/rest/asset/v1/{self.name}.json"
return f"rest/asset/v1/{self.name}.json"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
data = response.json().get(self.data_field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class OktaStream(HttpStream, ABC):
def __init__(self, url_base: str, *args, **kwargs):
super().__init__(*args, **kwargs)
# Inject custom url base to the stream
self._url_base = url_base
self._url_base = url_base.rstrip("/") + "/"

@property
def url_base(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_auth_header(self) -> Mapping[str, Any]:

class Links(HttpStream, ABC):

url_base = "https://api.short.io/api"
url_base = "https://api.short.io/api/"
limit = 150
primary_key = "id"
before_id = None
Expand Down Expand Up @@ -60,7 +60,7 @@ def path(
next_page_token: Mapping[str, Any] = None,
) -> str:
# Get all the links
return "/links"
return "links"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class TypeformStream(HttpStream, ABC):
url_base = "https://api.typeform.com"
url_base = "https://api.typeform.com/"
# maximum number of entities in API response per single page
limit: int = 200
date_format: str = "YYYY-MM-DDTHH:mm:ss[Z]"
Expand Down Expand Up @@ -55,7 +55,7 @@ def path(
stream_slice: Mapping[str, Any] = None,
next_page_token: Optional[Any] = None,
) -> str:
return "/forms"
return "forms"

def next_page_token(self, response: requests.Response) -> Optional[Any]:
page = self.get_current_page_token(response.url)
Expand Down Expand Up @@ -108,7 +108,7 @@ def path(
stream_slice: Mapping[str, Any] = None,
next_page_token: Optional[Any] = None,
) -> str:
return f"/forms/{stream_slice['form_id']}"
return f"forms/{stream_slice['form_id']}"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield response.json()
Expand Down Expand Up @@ -149,7 +149,7 @@ class Responses(TrimFormsMixin, IncrementalTypeformStream):
limit: int = 1000

def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return f"/forms/{stream_slice['form_id']}/responses"
return f"forms/{stream_slice['form_id']}/responses"

def get_form_id(self, record: Mapping[str, Any]) -> Optional[str]:
"""
Expand Down

0 comments on commit 6ee922c

Please sign in to comment.