Skip to content

Commit ff7e64f

Browse files
committed
Handle auth_method & json_keyfile properly
1 parent 6371036 commit ff7e64f

File tree

8 files changed

+60
-13
lines changed

8 files changed

+60
-13
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.embulk.output.bigquery_java;
2+
3+
import com.google.auth.Credentials;
4+
import com.google.auth.oauth2.ComputeEngineCredentials;
5+
import com.google.auth.oauth2.GoogleCredentials;
6+
import com.google.auth.oauth2.ServiceAccountCredentials;
7+
import com.google.auth.oauth2.UserCredentials;
8+
import java.io.ByteArrayInputStream;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import org.embulk.config.ConfigException;
12+
import org.embulk.output.bigquery_java.config.PluginTask;
13+
import org.embulk.util.config.units.LocalFile;
14+
15+
public class Auth {
16+
private final String authMethod;
17+
private final LocalFile jsonKeyFile;
18+
19+
public Auth(PluginTask task) {
20+
authMethod = task.getAuthMethod();
21+
jsonKeyFile = task.getJsonKeyfile();
22+
}
23+
24+
public Credentials getCredentials(String... scopes) throws IOException {
25+
return getGoogleCredentials().createScoped(scopes);
26+
}
27+
28+
private GoogleCredentials getGoogleCredentials() throws IOException {
29+
if ("authorized_user".equalsIgnoreCase(authMethod)) {
30+
return UserCredentials.fromStream(getCredentialsStream());
31+
} else if ("service_account".equalsIgnoreCase(authMethod)) {
32+
return ServiceAccountCredentials.fromStream(getCredentialsStream());
33+
} else if ("compute_engine".equalsIgnoreCase(authMethod)) {
34+
return ComputeEngineCredentials.create();
35+
} else if ("application_default".equalsIgnoreCase(authMethod)) {
36+
return GoogleCredentials.getApplicationDefault();
37+
} else {
38+
throw new ConfigException("Unknown auth method: " + authMethod);
39+
}
40+
}
41+
42+
private InputStream getCredentialsStream() {
43+
return new ByteArrayInputStream(jsonKeyFile.getContent());
44+
}
45+
}

src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.embulk.output.bigquery_java;
22

3-
import com.google.auth.oauth2.ServiceAccountCredentials;
3+
import com.google.api.services.bigquery.BigqueryScopes;
44
import com.google.cloud.bigquery.BigQuery;
55
import com.google.cloud.bigquery.BigQueryException;
66
import com.google.cloud.bigquery.BigQueryOptions;
@@ -27,7 +27,6 @@
2727
import com.google.cloud.bigquery.TableResult;
2828
import com.google.cloud.bigquery.TimePartitioning;
2929
import com.google.cloud.bigquery.WriteChannelConfiguration;
30-
import java.io.FileInputStream;
3130
import java.io.IOException;
3231
import java.io.OutputStream;
3332
import java.nio.channels.Channels;
@@ -88,15 +87,15 @@ public BigqueryClient(PluginTask task, Schema schema) {
8887
}
8988
columnOptions = task.getColumnOptions().orElse(Collections.emptyList());
9089
try {
91-
bigquery = getClientWithJsonKey(task.getJsonKeyfile());
90+
bigquery = getBigQueryService();
9291
} catch (IOException e) {
9392
throw new RuntimeException(e);
9493
}
9594
}
9695

97-
private static BigQuery getClientWithJsonKey(String key) throws IOException {
96+
private BigQuery getBigQueryService() throws IOException {
9897
return BigQueryOptions.newBuilder()
99-
.setCredentials(ServiceAccountCredentials.fromStream(new FileInputStream(key)))
98+
.setCredentials(new Auth(task).getCredentials(BigqueryScopes.BIGQUERY))
10099
.build()
101100
.getService();
102101
}

src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.embulk.util.config.Config;
66
import org.embulk.util.config.ConfigDefault;
77
import org.embulk.util.config.Task;
8+
import org.embulk.util.config.units.LocalFile;
89

910
public interface PluginTask extends Task {
1011

@@ -20,7 +21,7 @@ public interface PluginTask extends Task {
2021
String getAuthMethod();
2122

2223
@Config("json_keyfile")
23-
String getJsonKeyfile();
24+
LocalFile getJsonKeyfile();
2425

2526
@Config("dataset")
2627
String getDataset();

src/test/java/org/embulk/output/bigquery_java/TestBigqueryJavaOutputPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.embulk.util.config.ConfigMapper;
2727
import org.embulk.util.config.ConfigMapperFactory;
2828
import org.embulk.util.config.Task;
29+
import org.embulk.util.config.units.LocalFile;
2930
import org.junit.Rule;
3031
import org.junit.Test;
3132
import org.junit.rules.TemporaryFolder;
@@ -85,7 +86,7 @@ public void testWithTimePartitioning() {
8586

8687
public interface TestTask extends Task {
8788
@Config("json_keyfile")
88-
String getJsonKeyfile();
89+
LocalFile getJsonKeyfile();
8990

9091
@Config("dataset")
9192
String getDataset();

src/test/java/org/embulk/output/bigquery_java/config/TestBigqueryTaskBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void setAbortOnError_DefaultMaxBadRecord_True() {
4343
"type: bigquery_java",
4444
"mode: replace",
4545
"auth_method: service_account",
46-
"json_keyfile: json_key.json",
46+
"json_keyfile: { content: \"\" }",
4747
"dataset: dataset",
4848
"table: table",
4949
"source_format: NEWLINE_DELIMITED_JSON",
@@ -71,7 +71,7 @@ public void setFileExt_JSONL_GZIP_JSONL_GZ() {
7171
"type: bigquery_java",
7272
"mode: replace",
7373
"auth_method: service_account",
74-
"json_keyfile: json_key.json",
74+
"json_keyfile: { content: \"\" }",
7575
"dataset: dataset",
7676
"table: table",
7777
"source_format: NEWLINE_DELIMITED_JSON",
@@ -99,7 +99,7 @@ public void clustering() {
9999
"type: bigquery_java",
100100
"mode: replace",
101101
"auth_method: service_account",
102-
"json_keyfile: json_key.json",
102+
"json_keyfile: { content: \"\" }",
103103
"dataset: dataset",
104104
"table: table",
105105
"source_format: NEWLINE_DELIMITED_JSON",

src/test/java/org/embulk/output/bigquery_java/visitor/TestJsonColumnVisitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.embulk.spi.type.Types;
2121
import org.embulk.util.config.ConfigMapper;
2222
import org.embulk.util.config.ConfigMapperFactory;
23+
import org.embulk.util.config.units.LocalFile;
2324
import org.junit.Test;
2425
import org.msgpack.value.Value;
2526
import org.msgpack.value.impl.ImmutableArrayValueImpl;
@@ -130,7 +131,7 @@ private Object visitColumn(
130131
throws JsonProcessingException {
131132
ConfigSource configSource = CONFIG_MAPPER_FACTORY.newConfigSource();
132133
configSource.set("mode", "replace");
133-
configSource.set("json_keyfile", "test");
134+
configSource.set("json_keyfile", LocalFile.ofContent(""));
134135
configSource.set("dataset", "test");
135136
configSource.set("table", "test");
136137
configSource.set("source_format", "NEWLINE_DELIMITED_JSON");

src/test/resources/java/org/embulk/output/bigquery_java/base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
type: bigquery_java
22
mode: replace
33
auth_method: service_account
4-
json_keyfile: json_key.json
4+
json_keyfile: { content: "" }
55
dataset: dataset
66
table: table
77
source_format: NEWLINE_DELIMITED_JSON

src/test/resources/java/org/embulk/output/bigquery_java/time_partitioning.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
type: bigquery_java
22
mode: replace
33
auth_method: service_account
4-
json_keyfile: json_key.json
4+
json_keyfile: { content: "" }
55
dataset: dataset
66
table: table
77
source_format: NEWLINE_DELIMITED_JSON

0 commit comments

Comments
 (0)