diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 8d883287340f4..f90d05cc46db3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -6,6 +6,7 @@ import os from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from urllib.parse import urljoin import requests import vcr @@ -239,7 +240,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: def _create_prepared_request( self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None ) -> requests.PreparedRequest: - args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params} + args = {"method": self.http_method, "url": urljoin(self.url_base, path), "headers": headers, "params": params} if self.http_method.upper() in BODY_REQUEST_METHODS: if json and data: raise RequestBodyException( diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py index 7041fa9e25823..79ef21bc658d4 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py @@ -10,6 +10,7 @@ from abc import ABC, abstractmethod from io import StringIO from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union +from urllib.parse import urljoin import pendulum import requests @@ -50,7 +51,7 @@ def __init__( ): super().__init__(*args, **kwargs) - self._url_base = url_base + self._url_base = url_base.rstrip("/") + "/" self._replication_start_date = replication_start_date self.marketplace_ids = marketplace_ids self._session.auth = aws_signature @@ -139,7 +140,7 @@ class ReportsAmazonSPStream(Stream, ABC): """ primary_key = None - path_prefix = f"/reports/{REPORTS_API_VERSION}" + path_prefix = f"reports/{REPORTS_API_VERSION}" sleep_seconds = 30 data_field = "payload" @@ -154,7 +155,7 @@ def __init__( ): self._authenticator = authenticator self._session = requests.Session() - self._url_base = url_base + self._url_base = url_base.rstrip("/") + "/" self._session.auth = aws_signature self._replication_start_date = replication_start_date self.marketplace_ids = marketplace_ids @@ -195,7 +196,11 @@ def _create_prepared_request( """ Override to make http_method configurable per method call """ - args = {"method": http_method, "url": self.url_base + path, "headers": headers, "params": params} + args = { + "method": http_method, + "url": urljoin(self.url_base, path), + "headers": headers, "params": params + } if http_method.upper() in BODY_REQUEST_METHODS: if json and data: raise RequestBodyException( @@ -470,7 +475,7 @@ class Orders(IncrementalAmazonSPStream): page_size_field = "MaxResultsPerPage" def path(self, **kwargs) -> str: - return f"/orders/{ORDERS_API_VERSION}/orders" + return f"orders/{ORDERS_API_VERSION}/orders" def request_params( self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs @@ -508,7 +513,7 @@ def __init__(self, *args, **kwargs): ).strftime(self.time_format) def path(self, **kwargs) -> str: - return f"/vendor/directFulfillment/shipping/{VENDORS_API_VERSION}/shippingLabels" + return f"vendor/directFulfillment/shipping/{VENDORS_API_VERSION}/shippingLabels" def request_params( self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index 777c3a8e0c800..0ca695e49eaf7 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -20,7 +20,7 @@ class AmplitudeStream(HttpStream, ABC): - url_base = "https://amplitude.com/api" + url_base = "https://amplitude.com/api/" api_version = 2 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -31,7 +31,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield from respose_data.get(self.name, []) def path(self, **kwargs) -> str: - return f"/{self.api_version}/{self.name}" + return f"{self.api_version}/{self.name}" class Cohorts(AmplitudeStream): @@ -160,7 +160,7 @@ def request_params( def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - return f"/{self.api_version}/export" + return f"{self.api_version}/export" class ActiveUsers(IncrementalAmplitudeStream): @@ -177,7 +177,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield {"date": date, "statistics": dict(zip(response_data["seriesLabels"], series[i]))} def path(self, **kwargs) -> str: - return f"/{self.api_version}/users" + return f"{self.api_version}/users" class AverageSessionLength(IncrementalAmplitudeStream): @@ -196,4 +196,4 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield {"date": date, "length": series[i]} def path(self, **kwargs) -> str: - return f"/{self.api_version}/sessions/average" + return f"{self.api_version}/sessions/average" diff --git a/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/streams.py b/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/streams.py index ae4889c35feae..6b448248215a0 100755 --- a/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/streams.py +++ b/airbyte-integrations/connectors/source-google-search-console/source_google_search_console/streams.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator -BASE_URL = "https://www.googleapis.com/webmasters/v3" +BASE_URL = "https://www.googleapis.com/webmasters/v3/" ROW_LIMIT = 25000 @@ -67,7 +67,7 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> str: - return f"/sites/{stream_slice.get('site_url')}" + return f"sites/{stream_slice.get('site_url')}" class Sitemaps(GoogleSearchConsole): @@ -83,7 +83,7 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> str: - return f"/sites/{stream_slice.get('site_url')}/sitemaps" + return f"sites/{stream_slice.get('site_url')}/sitemaps" class SearchAnalytics(GoogleSearchConsole, ABC): @@ -102,7 +102,7 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> str: - return f"/sites/{stream_slice.get('site_url')}/searchAnalytics/query" + return f"sites/{stream_slice.get('site_url')}/searchAnalytics/query" @property def cursor_field(self) -> Union[str, List[str]]: diff --git a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py index c15c36dd207aa..bcd1954355f2e 100644 --- a/airbyte-integrations/connectors/source-marketo/source_marketo/source.py +++ b/airbyte-integrations/connectors/source-marketo/source_marketo/source.py @@ -30,7 +30,7 @@ def __init__(self, config: Mapping[str, Any], stream_name: str = None, param: Ma self.config = config self.start_date = config["start_date"] self.window_in_days = config["window_in_days"] - self._url_base = config["domain_url"] + self._url_base = config["domain_url"].rstrip("/") + "/" self.stream_name = stream_name self.param = param self.export_id = export_id @@ -40,7 +40,7 @@ def url_base(self) -> str: return self._url_base def path(self, **kwargs) -> str: - return f"/rest/v1/{self.name}.json" + return f"rest/v1/{self.name}.json" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: next_page = response.json().get("nextPageToken") @@ -158,7 +158,7 @@ def get_export_status(self, stream_slice): ) def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return f"/bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json" + return f"bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json" def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: date_slices = super().stream_slices(sync_mode, stream_state, **kwargs) @@ -252,7 +252,7 @@ class MarketoExportCreate(MarketoStream): http_method = "POST" def path(self, **kwargs) -> str: - return f"/bulk/v1/{self.stream_name}/export/create.json" + return f"bulk/v1/{self.stream_name}/export/create.json" def request_body_json(self, **kwargs) -> Optional[Mapping]: params = {"format": "CSV"} @@ -280,7 +280,7 @@ class MarketoExportStart(MarketoStream): http_method = "POST" def path(self, **kwargs) -> str: - return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/enqueue.json" + return f"bulk/v1/{self.stream_name}/export/{self.export_id}/enqueue.json" class MarketoExportStatus(MarketoStream): @@ -290,7 +290,7 @@ class MarketoExportStatus(MarketoStream): """ def path(self, **kwargs) -> str: - return f"/bulk/v1/{self.stream_name}/export/{self.export_id}/status.json" + return f"bulk/v1/{self.stream_name}/export/{self.export_id}/status.json" def parse_response(self, response: requests.Response, **kwargs) -> List[str]: return [response.json()[self.data_field][0]["status"]] @@ -384,7 +384,7 @@ class ActivityTypes(MarketoStream): """ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return "/rest/v1/activities/types.json" + return "rest/v1/activities/types.json" class Programs(IncrementalMarketoStream): @@ -401,7 +401,7 @@ def __init__(self, config: Mapping[str, Any]): self.offset = 0 def path(self, **kwargs) -> str: - return f"/rest/asset/v1/{self.name}.json" + return f"rest/asset/v1/{self.name}.json" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: data = response.json().get(self.data_field) diff --git a/airbyte-integrations/connectors/source-okta/source_okta/source.py b/airbyte-integrations/connectors/source-okta/source_okta/source.py index c39afa0eac49f..f9d7ec39e845a 100644 --- a/airbyte-integrations/connectors/source-okta/source_okta/source.py +++ b/airbyte-integrations/connectors/source-okta/source_okta/source.py @@ -21,7 +21,7 @@ class OktaStream(HttpStream, ABC): def __init__(self, url_base: str, *args, **kwargs): super().__init__(*args, **kwargs) # Inject custom url base to the stream - self._url_base = url_base + self._url_base = url_base.rstrip("/") + "/" @property def url_base(self) -> str: diff --git a/airbyte-integrations/connectors/source-shortio/source_shortio/source.py b/airbyte-integrations/connectors/source-shortio/source_shortio/source.py index 2602b0d4b388c..3f6a4bcc99dbb 100644 --- a/airbyte-integrations/connectors/source-shortio/source_shortio/source.py +++ b/airbyte-integrations/connectors/source-shortio/source_shortio/source.py @@ -22,7 +22,7 @@ def get_auth_header(self) -> Mapping[str, Any]: class Links(HttpStream, ABC): - url_base = "https://api.short.io/api" + url_base = "https://api.short.io/api/" limit = 150 primary_key = "id" before_id = None @@ -60,7 +60,7 @@ def path( next_page_token: Mapping[str, Any] = None, ) -> str: # Get all the links - return "/links" + return "links" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ diff --git a/airbyte-integrations/connectors/source-typeform/source_typeform/source.py b/airbyte-integrations/connectors/source-typeform/source_typeform/source.py index 5b01fb7f2ecb6..12708338e457e 100644 --- a/airbyte-integrations/connectors/source-typeform/source_typeform/source.py +++ b/airbyte-integrations/connectors/source-typeform/source_typeform/source.py @@ -20,7 +20,7 @@ class TypeformStream(HttpStream, ABC): - url_base = "https://api.typeform.com" + url_base = "https://api.typeform.com/" # maximum number of entities in API response per single page limit: int = 200 date_format: str = "YYYY-MM-DDTHH:mm:ss[Z]" @@ -55,7 +55,7 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Optional[Any] = None, ) -> str: - return "/forms" + return "forms" def next_page_token(self, response: requests.Response) -> Optional[Any]: page = self.get_current_page_token(response.url) @@ -108,7 +108,7 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Optional[Any] = None, ) -> str: - return f"/forms/{stream_slice['form_id']}" + return f"forms/{stream_slice['form_id']}" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield response.json() @@ -149,7 +149,7 @@ class Responses(TrimFormsMixin, IncrementalTypeformStream): limit: int = 1000 def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: - return f"/forms/{stream_slice['form_id']}/responses" + return f"forms/{stream_slice['form_id']}/responses" def get_form_id(self, record: Mapping[str, Any]) -> Optional[str]: """