Skip to content

Commit

Permalink
add google sheets oauth flow to server + fix auth rootObject type in …
Browse files Browse the repository at this point in the history
…protocol (airbytehq#7131)

* adding google sheets oauth flow to server

* fix oauth type in protocol yaml

* bump sheets version in definitions

* added GDrive scope

* update sheets to master changes

* update protocol incl. cdk

* protocol typing for oauth rootobject

* format
  • Loading branch information
Phlair authored Oct 21, 2021
1 parent bd5d4ee commit d660661
Show file tree
Hide file tree
Showing 16 changed files with 363 additions and 18 deletions.
6 changes: 4 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2023,8 +2023,10 @@ components:
If they were inside a oneOf {'switch': {oneOf: [{client_id...}, {non_oauth_param]}}, rootObject=['switch', 0]
"
type: array
items:
type: string
items: {} # <--- using generic any type. Build fails with oneOf (https://github.com/OpenAPITools/openapi-generator/issues/6161)
example:
- path
- 1
oauthFlowInitParameters:
description:
"Pointers to the fields in the rootObject needed to obtain the initial refresh/access tokens for the OAuth flow.
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.30
Updated OAuth2Specification.rootObject type in airbyte_protocol to allow string or int

## 0.1.29
Fix import logger error

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union

from pydantic import AnyUrl, BaseModel, Extra, Field

Expand Down Expand Up @@ -89,7 +89,7 @@ class OAuth2Specification(BaseModel):
class Config:
extra = Extra.allow

rootObject: Optional[List[str]] = Field(
rootObject: Optional[List[Union[str, int]]] = Field(
None,
description="A list of strings representing a pointer to the root object which contains any oauth parameters in the ConnectorSpecification.\nExamples:\nif oauth parameters were contained inside the top level, rootObject=[] If they were nested inside another object {'credentials': {'app_id' etc...}, rootObject=['credentials'] If they were inside a oneOf {'switch': {oneOf: [{client_id...}, {non_oauth_param]}}, rootObject=['switch', 0] ",
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.29",
version="0.1.30",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
- sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
name: Google Sheets
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.2.5
dockerImageTag: 0.2.6
documentationUrl: https://docs.airbyte.io/integrations/sources/google-sheets
icon: google-sheets.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union

from pydantic import AnyUrl, BaseModel, Extra, Field

Expand Down Expand Up @@ -89,7 +89,7 @@ class OAuth2Specification(BaseModel):
class Config:
extra = Extra.allow

rootObject: Optional[List[str]] = Field(
rootObject: Optional[List[Union[str, int]]] = Field(
None,
description="A list of strings representing a pointer to the root object which contains any oauth parameters in the ConnectorSpecification.\nExamples:\nif oauth parameters were contained inside the top level, rootObject=[] If they were nested inside another object {'credentials': {'app_id' etc...}, rootObject=['credentials'] If they were inside a oneOf {'switch': {oneOf: [{client_id...}, {non_oauth_param]}}, rootObject=['switch', 0] ",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"api_key": "api-key",
"start_date": "2021-01-01T00:00:00Z"
"api_key": "api-key",
"start_date": "2021-01-01T00:00:00Z"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Status,
Type,
)

# helpers
from airbyte_cdk.sources import Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow;
import io.airbyte.oauth.flows.google.GoogleAnalyticsOAuthFlow;
import io.airbyte.oauth.flows.google.GoogleSearchConsoleOAuthFlow;
import io.airbyte.oauth.flows.google.GoogleSheetsOAuthFlow;
import java.util.Map;
import java.util.UUID;

Expand All @@ -29,6 +30,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository) {
.put("airbyte/source-google-ads", new GoogleAdsOAuthFlow(configRepository))
.put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOAuthFlow(configRepository))
.put("airbyte/source-google-search-console", new GoogleSearchConsoleOAuthFlow(configRepository))
.put("airbyte/source-google-sheets", new GoogleSheetsOAuthFlow(configRepository))
.put("airbyte/source-salesforce", new SalesforceOAuthFlow(configRepository))
.put("airbyte/source-trello", new TrelloOAuthFlow(configRepository))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows.google;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.config.persistence.ConfigRepository;
import java.net.http.HttpClient;
import java.util.function.Supplier;

public class GoogleSheetsOAuthFlow extends GoogleOAuthFlow {

// space-delimited string for multiple scopes, see:
// https://datatracker.ietf.org/doc/html/rfc6749#section-3.3
@VisibleForTesting
static final String SCOPE_URL = "https://www.googleapis.com/auth/spreadsheets.readonly https://www.googleapis.com/auth/drive.readonly";

public GoogleSheetsOAuthFlow(final ConfigRepository configRepository) {
super(configRepository);
}

@VisibleForTesting
GoogleSheetsOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier<String> stateSupplier) {
super(configRepository, httpClient, stateSupplier);
}

@Override
protected String getScope() {
return SCOPE_URL;
}

@Override
protected String getClientIdUnsafe(final JsonNode config) {
// the config object containing client ID and secret is nested inside the "credentials" object
Preconditions.checkArgument(config.hasNonNull("credentials"));
return super.getClientIdUnsafe(config.get("credentials"));
}

@Override
protected String getClientSecretUnsafe(final JsonNode config) {
// the config object containing client ID and secret is nested inside the "credentials" object
Preconditions.checkArgument(config.hasNonNull("credentials"));
return super.getClientSecretUnsafe(config.get("credentials"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows.google;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GoogleSheetsOAuthFlowIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSheetsOAuthFlowIntegrationTest.class);
private static final String REDIRECT_URL = "http://localhost/code";
private static final Path CREDENTIALS_PATH = Path.of("secrets/google_sheets.json");

private ConfigRepository configRepository;
private GoogleSheetsOAuthFlow googleSheetsOAuthFlow;
private HttpServer server;
private ServerHandler serverHandler;

@BeforeEach
public void setup() throws IOException {
if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a oauth credentials file.");
}
configRepository = mock(ConfigRepository.class);
googleSheetsOAuthFlow = new GoogleSheetsOAuthFlow(configRepository);

server = HttpServer.create(new InetSocketAddress(80), 0);
server.setExecutor(null); // creates a default executor
server.start();
serverHandler = new ServerHandler("code");
server.createContext("/code", serverHandler);
}

@AfterEach
void tearDown() {
server.stop(1);
}

@Test
public void testFullGoogleOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
int limit = 20;
final UUID workspaceId = UUID.randomUUID();
final UUID definitionId = UUID.randomUUID();
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.jsonNode(Map.of("credentials", ImmutableMap.builder()
.put("client_id", credentialsJson.get("credentials").get("client_id").asText())
.put("client_secret", credentialsJson.get("credentials").get("client_secret").asText())
.build())))));
final String url = googleSheetsOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
LOGGER.info("Waiting for user consent at: {}", url);
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
// access...
while (!serverHandler.isSucceeded() && limit > 0) {
Thread.sleep(1000);
limit -= 1;
}
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
final Map<String, Object> params = googleSheetsOAuthFlow.completeSourceOAuth(workspaceId, definitionId,
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);
LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
assertTrue(params.containsKey("credentials"));
final Map<String, Object> credentials = (Map<String, Object>) params.get("credentials");
assertTrue(credentials.containsKey("refresh_token"));
assertTrue(credentials.get("refresh_token").toString().length() > 0);
assertTrue(credentials.containsKey("access_token"));
assertTrue(credentials.get("access_token").toString().length() > 0);
}

static class ServerHandler implements HttpHandler {

final private String expectedParam;
private String paramValue;
private boolean succeeded;

public ServerHandler(final String expectedParam) {
this.expectedParam = expectedParam;
this.paramValue = "";
this.succeeded = false;
}

public boolean isSucceeded() {
return succeeded;
}

public String getParamValue() {
return paramValue;
}

@Override
public void handle(final HttpExchange t) {
final String query = t.getRequestURI().getQuery();
LOGGER.info("Received query: '{}'", query);
final Map<String, String> data;
try {
data = deserialize(query);
final String response;
if (data != null && data.containsKey(expectedParam)) {
paramValue = data.get(expectedParam);
response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...",
expectedParam, paramValue);
LOGGER.info(response);
t.sendResponseHeaders(200, response.length());
succeeded = true;
} else {
response = String.format("Unable to parse query params from redirected url: %s", query);
t.sendResponseHeaders(500, response.length());
}
final OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
} catch (final RuntimeException | IOException e) {
LOGGER.error("Failed to parse from body {}", query, e);
}
}

private static Map<String, String> deserialize(final String query) {
if (query == null) {
return null;
}
final Map<String, String> result = new HashMap<>();
for (final String param : query.split("&")) {
final String[] entry = param.split("=");
if (entry.length > 1) {
result.put(entry[0], entry[1]);
} else {
result.put(entry[0], "");
}
}
return result;
}

}

}
Loading

0 comments on commit d660661

Please sign in to comment.