-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Slack: Implement OAuth support with OAuth authenticator #6570
Changes from all commits
a390ec7
33feb3e
4e498e4
8b9b942
e41c1fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"credentials": { | ||
"option_title": "Default OAuth2.0 authorization", | ||
"client_id": "fake-client-id", | ||
"client_secret": "fake-client-secret", | ||
"refresh_token": "fake-refresh-token", | ||
"access_token": "fake-access-token" | ||
}, | ||
"start_date": "2021-07-22T20:00:00Z", | ||
"lookback_window": 2, | ||
"join_channels": true | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,8 @@ | |
from airbyte_cdk.sources import AbstractSource | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http import HttpStream | ||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator | ||
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator | ||
from pendulum import DateTime, Period | ||
from slack_sdk import WebClient | ||
|
||
|
||
class SlackStream(HttpStream, ABC): | ||
|
@@ -107,7 +106,7 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str, | |
yield {"member_id": member_id, "channel_id": stream_slice["channel_id"]} | ||
|
||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: | ||
channels_stream = Channels(authenticator=self.authenticator) | ||
channels_stream = Channels(authenticator=self._session.auth) | ||
for channel_record in channels_stream.read_records(sync_mode=SyncMode.full_refresh): | ||
yield {"channel_id": channel_record["id"]} | ||
|
||
|
@@ -188,7 +187,7 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg | |
yield from super().read_records(stream_slice=stream_slice, **kwargs) | ||
else: | ||
# if channel is not provided, then get channels and read accordingly | ||
channels = Channels(authenticator=self.authenticator) | ||
channels = Channels(authenticator=self._session.auth) | ||
for channel_record in channels.read_records(sync_mode=SyncMode.full_refresh): | ||
stream_slice["channel"] = channel_record["id"] | ||
yield from super().read_records(stream_slice=stream_slice, **kwargs) | ||
|
@@ -221,7 +220,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
""" | ||
|
||
stream_state = stream_state or {} | ||
channels_stream = Channels(authenticator=self.authenticator) | ||
channels_stream = Channels(authenticator=self._session.auth) | ||
|
||
if self.cursor_field in stream_state: | ||
# Since new messages can be posted to threads continuously after the parent message has been posted, we get messages from the latest date | ||
|
@@ -232,7 +231,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
# If there is no state i.e: this is the first sync then there is no use for lookback, just get messages from the default start date | ||
messages_start_date = pendulum.from_timestamp(self._start_ts) | ||
|
||
messages_stream = ChannelMessages(authenticator=self.authenticator, default_start_date=messages_start_date) | ||
messages_stream = ChannelMessages(authenticator=self._session.auth, default_start_date=messages_start_date) | ||
|
||
for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}): | ||
self.logger.info(f"Syncing replies {message_chunk}") | ||
|
@@ -276,7 +275,7 @@ def path(self, **kwargs) -> str: | |
return "conversations.join" | ||
|
||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: | ||
channels_stream = Channels(authenticator=self.authenticator) | ||
channels_stream = Channels(authenticator=self._session.auth) | ||
for channel in channels_stream.read_records(sync_mode=SyncMode.full_refresh): | ||
yield {"channel": channel["id"], "channel_name": channel["name"]} | ||
|
||
|
@@ -285,16 +284,41 @@ def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[ | |
|
||
|
||
class SourceSlack(AbstractSource): | ||
def _get_authenticator(self, config: Mapping[str, Any]): | ||
# Added to maintain backward compatibility with previous versions | ||
if "api_token" in config: | ||
return TokenAuthenticator(config["api_token"]) | ||
|
||
credentials = config.get("credentials") | ||
credentials_title = credentials.get("option_title") | ||
if credentials_title == "Default OAuth2.0 authorization": | ||
yevhenii-ldv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# We can get `refresh_token` only if the token rotation function is enabled for the Slack Oauth Application. | ||
# If it is disabled, then we use the generated `access_token`, which acts without expiration. | ||
# https://api.slack.com/authentication/rotation | ||
if credentials.get("refresh_token", "").strip(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should use the option name, as that is the only thing guaranteed to be uniquely present There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a fork here that needs to be handled. If Token rotation is enabled for the application, then in the response to the |
||
return Oauth2Authenticator( | ||
token_refresh_endpoint="https://slack.com/api/oauth.v2.access", | ||
client_id=credentials["client_id"], | ||
client_secret=credentials["client_secret"], | ||
refresh_token=credentials["refresh_token"], | ||
) | ||
return TokenAuthenticator(credentials["access_token"]) | ||
elif credentials_title == "API Token Credentials": | ||
return TokenAuthenticator(credentials["api_token"]) | ||
else: | ||
raise Exception(f"No supported option_title: {credentials_title} specified. See spec.json for references") | ||
|
||
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: | ||
slack_client = WebClient(token=config["api_token"]) | ||
users = slack_client.users_list(limit=1).get("members", []) | ||
if len(users) > 0: | ||
try: | ||
authenticator = self._get_authenticator(config) | ||
users_stream = Users(authenticator=authenticator) | ||
next(users_stream.read_records(SyncMode.full_refresh)) | ||
return True, None | ||
else: | ||
return False, "There are no users in the given Slack instance" | ||
except Exception: | ||
return False, "There are no users in the given Slack instance or your token is incorrect" | ||
|
||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
authenticator = TokenAuthenticator(config["api_token"]) | ||
authenticator = self._get_authenticator(config) | ||
default_start_date = pendulum.parse(config["start_date"]) | ||
threads_lookback_window = pendulum.Duration(days=config["lookback_window"]) | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,21 +4,15 @@ | |||||
"$schema": "http://json-schema.org/draft-07/schema#", | ||||||
"title": "Slack Spec", | ||||||
"type": "object", | ||||||
"required": ["api_token", "start_date", "lookback_window", "join_channels"], | ||||||
"additionalProperties": false, | ||||||
"required": ["start_date", "lookback_window", "join_channels"], | ||||||
"additionalProperties": true, | ||||||
"properties": { | ||||||
"api_token": { | ||||||
"type": "string", | ||||||
"title": "API Token", | ||||||
"description": "A slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.", | ||||||
"airbyte_secret": true | ||||||
}, | ||||||
"start_date": { | ||||||
"type": "string", | ||||||
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", | ||||||
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.", | ||||||
"examples": ["2017-01-25T00:00:00Z"], | ||||||
"title": "Start date" | ||||||
"title": "Start Date" | ||||||
}, | ||||||
"lookback_window": { | ||||||
"type": "integer", | ||||||
|
@@ -31,7 +25,84 @@ | |||||
"default": true, | ||||||
"title": "Join all channels", | ||||||
"description": "Whether to join all channels or to sync data only from channels the bot is already in. If false, you'll need to manually add the bot to all the channels from which you'd like to sync messages. " | ||||||
}, | ||||||
"credentials": { | ||||||
"title": "Authentication mechanism", | ||||||
"description": "Choose how to authenticate into Slack", | ||||||
"type": "object", | ||||||
"oneOf": [ | ||||||
{ | ||||||
"type": "object", | ||||||
"title": "Sign in via Slack (OAuth)", | ||||||
"required": [ | ||||||
"access_token", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refresh token is the required one since it can be used to generate access tokens
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment above |
||||||
"client_id", | ||||||
"client_secret", | ||||||
"option_title" | ||||||
], | ||||||
"properties": { | ||||||
"option_title": { | ||||||
"type": "string", | ||||||
"const": "Default OAuth2.0 authorization" | ||||||
}, | ||||||
"client_id": { | ||||||
"title": "Client ID", | ||||||
"description": "Slack client_id. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this id.", | ||||||
"type": "string", | ||||||
"examples": ["slack-client-id-example"] | ||||||
}, | ||||||
"client_secret": { | ||||||
"title": "Client Secret", | ||||||
"description": "Slack client_secret. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help finding this secret.", | ||||||
"type": "string", | ||||||
"examples": ["slack-client-secret-example"], | ||||||
"airbyte_secret": true | ||||||
}, | ||||||
"access_token": { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i suggest we remove the access token since it expires anyways, and just rely on refresh token There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment above |
||||||
"title": "Access token", | ||||||
"description": "Slack access_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.", | ||||||
"type": "string", | ||||||
"examples": ["slack-access-token-example"], | ||||||
"airbyte_secret": true | ||||||
}, | ||||||
"refresh_token": { | ||||||
"title": "Refresh token", | ||||||
"description": "Slack refresh_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> if you need help generating the token.", | ||||||
"type": "string", | ||||||
"examples": ["slack-refresh-token-example"], | ||||||
"airbyte_secret": true | ||||||
} | ||||||
}, | ||||||
"order": 0 | ||||||
}, | ||||||
{ | ||||||
"type": "object", | ||||||
"title": "API Token", | ||||||
"required": ["api_token", "option_title"], | ||||||
"properties": { | ||||||
"option_title": { | ||||||
"type": "string", | ||||||
"const": "API Token Credentials" | ||||||
}, | ||||||
"api_token": { | ||||||
"type": "string", | ||||||
"title": "API Token", | ||||||
"description": "A Slack bot token. See the <a href=\"https://docs.airbyte.io/integrations/sources/slack\">docs</a> for instructions on how to generate it.", | ||||||
"airbyte_secret": true | ||||||
} | ||||||
}, | ||||||
"order": 1 | ||||||
} | ||||||
] | ||||||
} | ||||||
} | ||||||
}, | ||||||
"authSpecification": { | ||||||
"auth_type": "oauth2.0", | ||||||
"oauth2Specification": { | ||||||
"rootObject": ["credentials", 0], | ||||||
"oauthFlowInitParameters": [["client_id"], ["client_secret"]], | ||||||
"oauthFlowOutputParameters": [["access_token"], ["refresh_token"]] | ||||||
} | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be backwards compatible with the previous config format to prevent a breaking change for current users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added backward compatibility, but for this it was necessary to remove
credentials
fromrequired
.@avida described this issue in more detail in this comment.