Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New source: Chargebee (prepare connector for publishing) #5067

Merged
merged 14 commits into from
Jul 30, 2021
Merged
Prev Previous commit
Next Next commit
Implement change request
  • Loading branch information
Zirochkaa committed Jul 29, 2021
commit 019d4fbc9132b8dcda709ee70d0badd3e31d714a
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"site": "airbyte-test",
"site_api_key": "<your_api_key>",
"start_date": "2021-06-22T06:57:44Z"
"site_api_key": "fake-api-key-1",
"start_date": "2021-06-22T06:57:44Z",
"product_catalog": "2.0"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"site": "airbyte-test",
"site_api_key": "<your_api_key>",
"start_date": "2021-06-22T06:57:44Z"
"start_date": "2021-06-22T06:57:44Z",
"product_catalog": "2.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,19 @@
from typing import Any, List, Mapping, Tuple

import chargebee
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from chargebee import APIError

from .streams import AddonStream, CustomerStream, InvoiceStream, OrderStream, PlanStream, SubscriptionStream
from .streams import Addon, Customer, Invoice, Order, Plan, Subscription


class SourceChargebee(AbstractSource):
def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
# Configure the Chargebee Python SDK
chargebee.configure(api_key=config["site_api_key"], site=config["site"])
try:
subscription_stream = SubscriptionStream(start_date=config["start_date"])
subscription_stream = Subscription(start_date=config["start_date"])
next(subscription_stream.read_records(sync_mode=SyncMode.full_refresh))
return True, None
except Exception as err:
Expand All @@ -52,33 +50,24 @@ def streams(self, config) -> List[Stream]:
chargebee.configure(api_key=config["site_api_key"], site=config["site"])

kwargs = {"start_date": config["start_date"]}
product_catalog_version = 1
try:
plan_stream_records = PlanStream(**kwargs).read_records(sync_mode=SyncMode.full_refresh)
next(plan_stream_records)
except APIError as err:
if err.http_code == requests.codes.BAD_REQUEST and err.error_code == "api_restricted":
product_catalog_version = 2
except StopIteration:
# This means that user has access to `plan` stream, but there is no data.
pass
product_catalog_version = config["product_catalog"]

# Below streams are suitable for both `Product Catalog 1.0` and `Product Catalog 2.0`.
common_streams = [
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
CustomerStream(**kwargs),
InvoiceStream(**kwargs),
OrderStream(**kwargs),
SubscriptionStream(**kwargs),
Customer(**kwargs),
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
Invoice(**kwargs),
Order(**kwargs),
Subscription(**kwargs),
]

if product_catalog_version == 1:
if product_catalog_version == "1.0":
# Below streams are suitable only for `Product Catalog 1.0`.
product_catalog_v1_streams = [
AddonStream(**kwargs),
PlanStream(**kwargs),
Addon(**kwargs),
Plan(**kwargs),
]
return common_streams + product_catalog_v1_streams

if product_catalog_version == 2:
if product_catalog_version == "2.0":
# Below streams are suitable only for `Product Catalog 2.0`.
return common_streams
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Chargebee Spec",
"type": "object",
"required": ["site", "site_api_key", "start_date"],
"required": ["site", "site_api_key", "start_date", "product_catalog"],
"additionalProperties": false,
"properties": {
"site": {
Expand All @@ -26,6 +26,12 @@
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2021-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2021-01-25T00:00:00Z"]
},
"product_catalog": {
"title": "Product Catalog",
"type": "string",
"description": "Product Catalog version of your Chargebee site. Instructions on how to find your version you may find <a href=\"https://apidocs.chargebee.com/docs/api?prod_cat_ver=2\">here</a> under `API Version` section.",
"enum": ["1.0", "2.0"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
# SOFTWARE.
#

from datetime import datetime, timezone
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import backoff
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from chargebee.api_error import OperationFailedError
from chargebee.list_result import ListResult
from chargebee.model import Model
from chargebee.models import Addon, Customer, Invoice, Order, Plan, Subscription
from chargebee.models import Addon as AddonModel
from chargebee.models import Customer as CustomerModel
from chargebee.models import Invoice as InvoiceModel
from chargebee.models import Order as OrderModel
from chargebee.models import Plan as PlanModel
from chargebee.models import Subscription as SubscriptionModel

# Backoff params below
# according to Chargebee's guidance on rate limit
Expand All @@ -44,11 +49,12 @@ class ChargebeeStream(Stream):
primary_key = "id"
cursor_field = "updated_at"
page_size = 100
include_deleted = "true"
api: Model = None

def __init__(self, start_date: str):
# Convert `start_date` to timestamp(UTC).
self._start_date = int(datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc).timestamp())
self._start_date = pendulum.parse(start_date).int_timestamp

@property
def state_checkpoint_interval(self) -> int:
Expand All @@ -66,7 +72,7 @@ def request_params(
) -> MutableMapping[str, Any]:
params = {
"limit": self.page_size,
"include_deleted": "true",
"include_deleted": self.include_deleted,
"sort_by[asc]": self.cursor_field,
}

Expand All @@ -91,15 +97,13 @@ def parse_response(self, list_result: ListResult, **kwargs) -> Iterable[Mapping]
max_tries=MAX_TRIES,
max_time=MAX_TIME,
)
def _send_request(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> ListResult:
def _send_request(self, **kwargs) -> ListResult:
"""
Just a wrapper to allow @backoff decorator
Reference: https://apidocs.chargebee.com/docs/api/#error_codes_list
"""
kwargs = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
return self.api.list(kwargs)
params = self.request_params(**kwargs)
return self.api.list(params)

def read_records(
self,
Expand Down Expand Up @@ -142,31 +146,55 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return state_value


class SubscriptionStream(ChargebeeStream):
class Subscription(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/subscriptions?prod_cat_ver=2#list_subscriptions
"""

name = "subscription"
api = Subscription
api = SubscriptionModel


class CustomerStream(ChargebeeStream):
class Customer(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/customers?prod_cat_ver=2#list_customers
"""

name = "customer"
api = Customer
api = CustomerModel


class Invoice(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/invoices?prod_cat_ver=2#list_invoices
"""

class InvoiceStream(ChargebeeStream):
name = "invoice"
api = Invoice
api = InvoiceModel


class OrderStream(ChargebeeStream):
class Order(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/orders?prod_cat_ver=2#list_orders
"""

name = "order"
api = Order
api = OrderModel


class Plan(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/plans?prod_cat_ver=1&lang=curl#list_plans
"""

class PlanStream(ChargebeeStream):
name = "plan"
api = Plan
api = PlanModel


class Addon(ChargebeeStream):
"""
API docs: https://apidocs.chargebee.com/docs/api/addons?prod_cat_ver=1&lang=curl#list_addons
"""

class AddonStream(ChargebeeStream):
name = "addon"
api = Addon
api = AddonModel
23 changes: 20 additions & 3 deletions docs/integrations/sources/chargebee.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,28 @@ This Chargebee source uses the [Chargebee Python Client Library](https://github.

This connector outputs the following streams:

* [Subscription](https://apidocs.chargebee.com/docs/api/subscriptions?prod_cat_ver=2#list_subscriptions)
* [Subscriptions](https://apidocs.chargebee.com/docs/api/subscriptions?prod_cat_ver=2#list_subscriptions)
* [Customers](https://apidocs.chargebee.com/docs/api/customers?prod_cat_ver=2#list_customers)
* [Invoices](https://apidocs.chargebee.com/docs/api/invoices?prod_cat_ver=2#list_invoices)
* [Orders](https://apidocs.chargebee.com/docs/api/orders?prod_cat_ver=2#list_orders)
* [Plans](https://apidocs.chargebee.com/docs/api/plans?prod_cat_ver=1&lang=curl#list_plans)
* [Addons](https://apidocs.chargebee.com/docs/api/addons?prod_cat_ver=1&lang=curl#list_addons)

### Notes

Some streams may depend on Product Catalog version and be accessible only on sites with specific Product Catalog version. This means that we have following streams:

1. presented in both `Product Catalog 1.0` and `Product Catalog 2.0`:
- Subscriptions
- Customers
- Invoices
- Orders

2. presented only in `Product Catalog 1.0`:
- Plans
- Addons

3. presented only in `Product Catalog 2.0` will be added soon.

### Features

Expand All @@ -39,17 +54,19 @@ The Chargebee connector should not run into [Chargebee API](https://apidocs.char
* Chargebee Account;
* `site_api_key` - Chargebee API Key wih the necessary permissions \(described below\);
* `site` - Chargebee site prefix for your instance;
* `start_date` - start date for incremental streams.
* `start_date` - start date for incremental streams;
* `product_catalog` - Product Catalog version of your Chargebee site \(described below\).

### Setup guide

Log into Chargebee and then generate an [API Key](https://apidocs.chargebee.com/docs/api?prod_cat_ver=2#api_authentication).
Then follow [these](https://apidocs.chargebee.com/docs/api?prod_cat_ver=2) instructions, under `API Version` section, on how to find your Product Catalog version.


## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.2 | 2021-07-29 | [5067](https://github.com/airbytehq/airbyte/pull/5067) | Publish connector ???? |
| 0.1.2 | 2021-07-30 | [5067](https://github.com/airbytehq/airbyte/pull/5067) | Prepare connector for publishing |
| 0.1.1 | 2021-07-07 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add entrypoint and bump version for connector |
| 0.1.0 | 2021-06-30 | [3410](https://github.com/airbytehq/airbyte/pull/3410) | New Source: Chargebee |