Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Slack: Implement OAuth support with OAuth authenticator #6570

Merged
merged 5 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ jobs:
SALESFORCE_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_INTEGRATION_TESTS_CREDS }}
SENDGRID_INTEGRATION_TEST_CREDS: ${{ secrets.SENDGRID_INTEGRATION_TEST_CREDS }}
SHOPIFY_INTEGRATION_TEST_CREDS: ${{ secrets.SHOPIFY_INTEGRATION_TEST_CREDS }}
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SOURCE_ASANA_TEST_CREDS: ${{ secrets.SOURCE_ASANA_TEST_CREDS }}
SOURCE_OKTA_TEST_CREDS: ${{ secrets.SOURCE_OKTA_TEST_CREDS }}
SOURCE_SLACK_TEST_CREDS: ${{ secrets.SOURCE_SLACK_TEST_CREDS }}
SOURCE_SLACK_OAUTH_TEST_CREDS: ${{ secrets.SOURCE_SLACK_OAUTH_TEST_CREDS }}
SOURCE_US_CENSUS_TEST_CREDS: ${{ secrets.SOURCE_US_CENSUS_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SOURCE_SNAPCHAT_MARKETING_CREDS: ${{ secrets.SOURCE_SNAPCHAT_MARKETING_CREDS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ jobs:
SALESFORCE_INTEGRATION_TESTS_CREDS: ${{ secrets.SALESFORCE_INTEGRATION_TESTS_CREDS }}
SENDGRID_INTEGRATION_TEST_CREDS: ${{ secrets.SENDGRID_INTEGRATION_TEST_CREDS }}
SHOPIFY_INTEGRATION_TEST_CREDS: ${{ secrets.SHOPIFY_INTEGRATION_TEST_CREDS }}
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SOURCE_OKTA_TEST_CREDS: ${{ secrets.SOURCE_OKTA_TEST_CREDS }}
SOURCE_SLACK_TEST_CREDS: ${{ secrets.SOURCE_SLACK_TEST_CREDS }}
SOURCE_SLACK_OAUTH_TEST_CREDS: ${{ secrets.SOURCE_SLACK_OAUTH_TEST_CREDS }}
SOURCE_US_CENSUS_TEST_CREDS: ${{ secrets.SOURCE_US_CENSUS_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SOURCE_SNAPCHAT_MARKETING_CREDS: ${{ secrets.SOURCE_SNAPCHAT_MARKETING_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "c2281cee-86f9-4a86-bb48-d23286b4c7bd",
"name": "Slack",
"dockerRepository": "airbyte/source-slack",
"dockerImageTag": "0.1.11",
"dockerImageTag": "0.1.12",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/slack",
"icon": "slack.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
- sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
name: Slack
dockerRepository: airbyte/source-slack
dockerImageTag: 0.1.11
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.io/integrations/sources/slack
icon: slack.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-slack
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_oauth_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
Expand All @@ -17,9 +21,6 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 7200
cursor_paths:
channel_messages: ["float_ts"]
threads: ["float_ts"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/full_refresh_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"api_token": "invalid_api_token",
"credentials": {
"option_title": "API Token Credentials",
"api_token": "fake-api-token"
},
"start_date": "2022-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"credentials": {
"option_title": "Default OAuth2.0 authorization",
"client_id": "fake-client-id",
"client_secret": "fake-client-secret",
"refresh_token": "fake-refresh-token",
"access_token": "fake-access-token"
},
"start_date": "2021-07-22T20:00:00Z",
"lookback_window": 2,
"join_channels": true
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-slack/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=["airbyte-cdk~=0.1.13", "slack_sdk==3.4.2", "pendulum>=2,<3"],
install_requires=["airbyte-cdk~=0.1", "pendulum>=2,<3"],
package_data={"": ["*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
Expand Down
50 changes: 37 additions & 13 deletions airbyte-integrations/connectors/source-slack/source_slack/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from pendulum import DateTime, Period
from slack_sdk import WebClient


class SlackStream(HttpStream, ABC):
Expand Down Expand Up @@ -107,7 +106,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
yield {"member_id": member_id, "channel_id": stream_slice["channel_id"]}

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)
for channel_record in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"channel_id": channel_record["id"]}

Expand Down Expand Up @@ -188,7 +187,7 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg
yield from super().read_records(stream_slice=stream_slice, **kwargs)
else:
# if channel is not provided, then get channels and read accordingly
channels = Channels(authenticator=self.authenticator)
channels = Channels(authenticator=self._session.auth)
for channel_record in channels.read_records(sync_mode=SyncMode.full_refresh):
stream_slice["channel"] = channel_record["id"]
yield from super().read_records(stream_slice=stream_slice, **kwargs)
Expand Down Expand Up @@ -221,7 +220,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
"""

stream_state = stream_state or {}
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)

if self.cursor_field in stream_state:
# Since new messages can be posted to threads continuously after the parent message has been posted, we get messages from the latest date
Expand All @@ -232,7 +231,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
# If there is no state i.e: this is the first sync then there is no use for lookback, just get messages from the default start date
messages_start_date = pendulum.from_timestamp(self._start_ts)

messages_stream = ChannelMessages(authenticator=self.authenticator, default_start_date=messages_start_date)
messages_stream = ChannelMessages(authenticator=self._session.auth, default_start_date=messages_start_date)

for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}):
self.logger.info(f"Syncing replies {message_chunk}")
Expand Down Expand Up @@ -276,7 +275,7 @@ def path(self, **kwargs) -> str:
return "conversations.join"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
channels_stream = Channels(authenticator=self.authenticator)
channels_stream = Channels(authenticator=self._session.auth)
for channel in channels_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"channel": channel["id"], "channel_name": channel["name"]}

Expand All @@ -285,16 +284,41 @@ def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[


class SourceSlack(AbstractSource):
def _get_authenticator(self, config: Mapping[str, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be backwards compatible with the previous config format to prevent a breaking change for current users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added backward compatibility, but for this it was necessary to remove credentials from required.
@avida described this issue in more detail in this comment.

# Added to maintain backward compatibility with previous versions
if "api_token" in config:
return TokenAuthenticator(config["api_token"])

credentials = config.get("credentials")
credentials_title = credentials.get("option_title")
if credentials_title == "Default OAuth2.0 authorization":
yevhenii-ldv marked this conversation as resolved.
Show resolved Hide resolved
# We can get `refresh_token` only if the token rotation function is enabled for the Slack Oauth Application.
# If it is disabled, then we use the generated `access_token`, which acts without expiration.
# https://api.slack.com/authentication/rotation
if credentials.get("refresh_token", "").strip():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the option name, as that is the only thing guaranteed to be uniquely present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a fork here that needs to be handled. If Token rotation is enabled for the application, then in the response to the oauth.v2.access request we will receive both Access token(which expires after some time) and Refresh token(with which we can create a new Access token).
However, if the Token rotation function is disabled for the application, then in this case we will receive only Access token in the response to oauth.v2.access and it will be unlimited.
For this reason, we cannot indicate in the specification that Refresh Token is a required field (since it simply may not exist) and we cannot remove Access Token from required (since it may be unlimited and we will need to use it).

return Oauth2Authenticator(
token_refresh_endpoint="https://slack.com/api/oauth.v2.access",
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
refresh_token=credentials["refresh_token"],
)
return TokenAuthenticator(credentials["access_token"])
elif credentials_title == "API Token Credentials":
return TokenAuthenticator(credentials["api_token"])
else:
raise Exception(f"No supported option_title: {credentials_title} specified. See spec.json for references")

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
slack_client = WebClient(token=config["api_token"])
users = slack_client.users_list(limit=1).get("members", [])
if len(users) > 0:
try:
authenticator = self._get_authenticator(config)
users_stream = Users(authenticator=authenticator)
next(users_stream.read_records(SyncMode.full_refresh))
return True, None
else:
return False, "There are no users in the given Slack instance"
except Exception:
return False, "There are no users in the given Slack instance or your token is incorrect"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(config["api_token"])
authenticator = self._get_authenticator(config)
default_start_date = pendulum.parse(config["start_date"])
threads_lookback_window = pendulum.Duration(days=config["lookback_window"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,15 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Slack Spec",
"type": "object",
"required": ["api_token", "start_date", "lookback_window", "join_channels"],
"additionalProperties": false,
"required": ["start_date", "lookback_window", "join_channels"],
"additionalProperties": true,
"properties": {
"api_token": {
"type": "string",
"title": "API Token",
"description": "A slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.",
"airbyte_secret": true
},
"start_date": {
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2017-01-25T00:00:00Z"],
"title": "Start date"
"title": "Start Date"
},
"lookback_window": {
"type": "integer",
Expand All @@ -31,7 +25,84 @@
"default": true,
"title": "Join all channels",
"description": "Whether to join all channels or to sync data only from channels the bot is already in. If false, you'll need to manually add the bot to all the channels from which you'd like to sync messages. "
},
"credentials": {
"title": "Authentication mechanism",
"description": "Choose how to authenticate into Slack",
"type": "object",
"oneOf": [
{
"type": "object",
"title": "Sign in via Slack (OAuth)",
"required": [
"access_token",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refresh token is the required one since it can be used to generate access tokens

Suggested change
"access_token",
"refresh_token",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment above

"client_id",
"client_secret",
"option_title"
],
"properties": {
"option_title": {
"type": "string",
"const": "Default OAuth2.0 authorization"
},
"client_id": {
"title": "Client ID",
"description": "Slack client_id. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this id.",
"type": "string",
"examples": ["slack-client-id-example"]
},
"client_secret": {
"title": "Client Secret",
"description": "Slack client_secret. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this secret.",
"type": "string",
"examples": ["slack-client-secret-example"],
"airbyte_secret": true
},
"access_token": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest we remove the access token since it expires anyways, and just rely on refresh token

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment above

"title": "Access token",
"description": "Slack access_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["slack-access-token-example"],
"airbyte_secret": true
},
"refresh_token": {
"title": "Refresh token",
"description": "Slack refresh_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["slack-refresh-token-example"],
"airbyte_secret": true
}
},
"order": 0
},
{
"type": "object",
"title": "API Token",
"required": ["api_token", "option_title"],
"properties": {
"option_title": {
"type": "string",
"const": "API Token Credentials"
},
"api_token": {
"type": "string",
"title": "API Token",
"description": "A Slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.",
"airbyte_secret": true
}
},
"order": 1
}
]
}
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", 0],
"oauthFlowInitParameters": [["client_id"], ["client_secret"]],
"oauthFlowOutputParameters": [["access_token"], ["refresh_token"]]
}
}
}
13 changes: 11 additions & 2 deletions docs/integrations/sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

This source can sync data for the [Slack API](https://api.slack.com/). It supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.

This Source Connector is based on a [Singer Tap](https://github.com/airbytehq/tap-slack).

### Output schema

This Source is capable of syncing the following core Streams:
Expand Down Expand Up @@ -46,6 +44,16 @@ The Slack connector should not run into Slack API limitations under normal usage

### Requirements

#### Slack connector can be connected using two types of authentication: OAuth2.0 or API Token

#### Using OAuth2.0 authenticator
* Client ID - issued when you created your app
* Client Secret - issued when you created your app
* Refresh Token - a special kind of token used to obtain a renewed access token

yevhenii-ldv marked this conversation as resolved.
Show resolved Hide resolved
You can get more detailed information about this type of authentication by reading [Slack's documentation about OAuth2.0](https://api.slack.com/authentication/oauth-v2).

#### Using API Token
* Slack API Token

### Setup guide
Expand Down Expand Up @@ -101,6 +109,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.12 | 2021-10-07 | [6570](https://github.com/airbytehq/airbyte/pull/6570) | Implement OAuth support with OAuth authenticator |
| 0.1.11 | 2021-08-27 | [5830](https://github.com/airbytehq/airbyte/pull/5830) | Fixed sync operations hang forever issue |
| 0.1.10 | 2021-08-27 | [5697](https://github.com/airbytehq/airbyte/pull/5697) | Fixed max retries issue |
| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fixed reading threads issue |
Expand Down
1 change: 1 addition & 0 deletions tools/bin/ci_credentials.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ write_standard_creds source-sendgrid "$SENDGRID_INTEGRATION_TEST_CREDS"
write_standard_creds source-shopify "$SHOPIFY_INTEGRATION_TEST_CREDS"
write_standard_creds source-shortio "$SOURCE_SHORTIO_TEST_CREDS"
write_standard_creds source-slack "$SOURCE_SLACK_TEST_CREDS"
write_standard_creds source-slack "$SOURCE_SLACK_OAUTH_TEST_CREDS" "config_oauth.json"
write_standard_creds source-smartsheets "$SMARTSHEETS_TEST_CREDS"
write_standard_creds source-snapchat-marketing "$SOURCE_SNAPCHAT_MARKETING_CREDS"
write_standard_creds source-snowflake "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "config.json"
Expand Down