Skip to content

Commit 755886b

Browse files
committed
add retain_column_descriptions and retain_column_policy_tags to config
1 parent 9710802 commit 755886b

File tree

5 files changed

+249
-0
lines changed

5 files changed

+249
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ Under construction
6262
| progress_log_interval (x) | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal |
6363
| before_load | string | optional | nil | if set, this SQL will be executed before loading all records in append mode. In replace mode, SQL is not executed. |
6464
| retain_column_descriptions | boolean | optional | false | In case of replace mode, the column's descriptions are taken over. |
65+
| retain_column_policy_tags | boolean | optional | false | In case of replace mode, the table policy tags are taken over. |
6566

6667

6768
Client or request options

src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.google.cloud.bigquery.Dataset;
1010
import com.google.cloud.bigquery.DatasetInfo;
1111
import com.google.cloud.bigquery.Field;
12+
import com.google.cloud.bigquery.FieldList;
1213
import com.google.cloud.bigquery.FieldValue;
1314
import com.google.cloud.bigquery.FormatOptions;
1415
import com.google.cloud.bigquery.Job;
@@ -43,6 +44,7 @@
4344
import java.util.stream.Collectors;
4445
import java.util.stream.Stream;
4546
import java.util.stream.StreamSupport;
47+
import javax.validation.constraints.NotNull;
4648
import org.embulk.output.bigquery_java.config.BigqueryColumnOption;
4749
import org.embulk.output.bigquery_java.config.BigqueryTimePartitioning;
4850
import org.embulk.output.bigquery_java.config.PluginTask;
@@ -74,6 +76,7 @@ public class BigqueryClient {
7476
private PluginTask task;
7577
private Schema schema;
7678
private List<BigqueryColumnOption> columnOptions;
79+
private FieldList cachedSrcFields = null;
7780

7881
public BigqueryClient(PluginTask task, Schema schema) {
7982
this.task = task;
@@ -93,6 +96,21 @@ public BigqueryClient(PluginTask task, Schema schema) {
9396
}
9497
}
9598

99+
@NotNull
100+
private FieldList getSrcFields() {
101+
if (cachedSrcFields == null) {
102+
cachedSrcFields = FieldList.of();
103+
Table srcTable = getTable(task.getTable());
104+
if (srcTable != null) {
105+
com.google.cloud.bigquery.Schema srcSchema = srcTable.getDefinition().getSchema();
106+
if (srcSchema != null) {
107+
cachedSrcFields = srcSchema.getFields();
108+
}
109+
}
110+
}
111+
return cachedSrcFields;
112+
}
113+
96114
private static BigQuery getClientWithJsonKey(String key) throws IOException {
97115
return BigQueryOptions.newBuilder()
98116
.setCredentials(ServiceAccountCredentials.fromStream(new FileInputStream(key)))
@@ -672,6 +690,22 @@ protected com.google.cloud.bigquery.Schema buildSchema(
672690
BigqueryUtil.findColumnOption(col.getName(), columnOptions);
673691
Field.Builder fieldBuilder = createFieldBuilder(task, col, columnOption);
674692

693+
if ((task.getMode().equals("replace")
694+
&& (task.getRetainColumnDescriptions() || task.getRetainColumnPolicyTags()))) {
695+
getSrcFields().stream()
696+
.filter(x -> x.getName().equals(col.getName()))
697+
.findFirst()
698+
.ifPresent(
699+
field -> {
700+
if (task.getRetainColumnDescriptions()) {
701+
fieldBuilder.setDescription(field.getDescription());
702+
}
703+
if (task.getRetainColumnPolicyTags()) {
704+
fieldBuilder.setPolicyTags(field.getPolicyTags());
705+
}
706+
});
707+
}
708+
675709
if (columnOption.isPresent()) {
676710
BigqueryColumnOption colOpt = columnOption.get();
677711
if (!colOpt.getMode().isEmpty()) {

src/main/java/org/embulk/output/bigquery_java/config/PluginTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,12 @@ public interface PluginTask extends Task {
160160
@Config("merge_rule")
161161
@ConfigDefault("null")
162162
Optional<List<String>> getMergeRule();
163+
164+
@Config("retain_column_descriptions")
165+
@ConfigDefault("false")
166+
Boolean getRetainColumnDescriptions();
167+
168+
@Config("retain_column_policy_tags")
169+
@ConfigDefault("false")
170+
Boolean getRetainColumnPolicyTags();
163171
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package org.embulk.output.bigquery_java;
2+
3+
import static org.junit.Assert.assertArrayEquals;
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertNull;
6+
7+
import com.google.cloud.bigquery.FieldList;
8+
import com.google.cloud.bigquery.PolicyTags;
9+
import com.google.cloud.bigquery.StandardSQLTypeName;
10+
import java.io.IOException;
11+
import java.lang.reflect.Field;
12+
import java.lang.reflect.InvocationTargetException;
13+
import java.lang.reflect.Method;
14+
import java.util.ArrayList;
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.function.Function;
18+
import java.util.stream.Collectors;
19+
import org.embulk.config.ConfigSource;
20+
import org.embulk.input.file.LocalFileInputPlugin;
21+
import org.embulk.output.bigquery_java.config.PluginTask;
22+
import org.embulk.parser.csv.CsvParserPlugin;
23+
import org.embulk.spi.FileInputPlugin;
24+
import org.embulk.spi.OutputPlugin;
25+
import org.embulk.spi.ParserPlugin;
26+
import org.embulk.spi.Schema;
27+
import org.embulk.spi.type.Types;
28+
import org.embulk.test.EmbulkTests;
29+
import org.embulk.test.TestingEmbulk;
30+
import org.embulk.util.config.ConfigMapper;
31+
import org.embulk.util.config.ConfigMapperFactory;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.rules.TemporaryFolder;
35+
36+
public class TestBigqueryClient {
37+
protected static final ConfigMapperFactory CONFIG_MAPPER_FACTORY =
38+
ConfigMapperFactory.builder().addDefaultModules().build();
39+
40+
protected static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper();
41+
42+
private static final String BASIC_RESOURCE_PATH =
43+
"/java/org/embulk/output/bigquery_java/bigquery_client/";
44+
45+
private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileName) {
46+
return embulk.loadYamlResource(BASIC_RESOURCE_PATH + fileName);
47+
}
48+
49+
@Rule
50+
public TestingEmbulk embulk =
51+
TestingEmbulk.builder()
52+
.registerPlugin(OutputPlugin.class, "bigquery_java", BigqueryJavaOutputPlugin.class)
53+
.registerPlugin(FileInputPlugin.class, "file", LocalFileInputPlugin.class)
54+
.registerPlugin(ParserPlugin.class, "csv", CsvParserPlugin.class)
55+
.build();
56+
57+
@Rule public TemporaryFolder testFolder = new TemporaryFolder();
58+
59+
private com.google.cloud.bigquery.Schema invokeTakeoverBuildSchema(
60+
Function<ConfigSource, ConfigSource> setupConfig,
61+
Function<com.google.cloud.bigquery.Field.Builder, com.google.cloud.bigquery.Field>
62+
setupField0,
63+
Function<com.google.cloud.bigquery.Field.Builder, com.google.cloud.bigquery.Field>
64+
setupField1)
65+
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException,
66+
InvocationTargetException {
67+
ConfigSource testConfig = EmbulkTests.config("EMBULK_OUTPUT_BIGQUERY_TEST_CONFIG");
68+
TestBigqueryJavaOutputPlugin.TestTask testTask =
69+
CONFIG_MAPPER.map(testConfig, TestBigqueryJavaOutputPlugin.TestTask.class);
70+
ConfigSource config = loadYamlResource(embulk, "takeover.yml");
71+
config.set("json_keyfile", testTask.getJsonKeyfile());
72+
config.set("dataset", testTask.getDataset());
73+
config.set("table", testTask.getTable());
74+
PluginTask task = CONFIG_MAPPER.map(setupConfig.apply(config), PluginTask.class);
75+
Schema schema = Schema.builder().add("c0", Types.LONG).add("c1", Types.STRING).build();
76+
BigqueryClient bigqueryClient = new BigqueryClient(task, schema);
77+
Field field = BigqueryClient.class.getDeclaredField("cachedSrcFields");
78+
field.setAccessible(true);
79+
List<com.google.cloud.bigquery.Field> fieldList = new ArrayList<>();
80+
fieldList.add(
81+
setupField0.apply(
82+
com.google.cloud.bigquery.Field.newBuilder("c0", StandardSQLTypeName.INT64)));
83+
fieldList.add(
84+
setupField1.apply(
85+
com.google.cloud.bigquery.Field.newBuilder("c1", StandardSQLTypeName.STRING)));
86+
field.set(bigqueryClient, FieldList.of(fieldList));
87+
Method method = BigqueryClient.class.getDeclaredMethod("buildSchema", Schema.class, List.class);
88+
method.setAccessible(true);
89+
return (com.google.cloud.bigquery.Schema)
90+
method.invoke(bigqueryClient, schema, task.getColumnOptions().orElse(null));
91+
}
92+
93+
private com.google.cloud.bigquery.Schema invokeRetainDescriptionBuildSchema(
94+
String mode, Boolean retainColumnDescriptions, String d0, String d1)
95+
throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException,
96+
NoSuchMethodException {
97+
return invokeTakeoverBuildSchema(
98+
configSource ->
99+
configSource
100+
.set("mode", mode)
101+
.set("retain_column_descriptions", retainColumnDescriptions),
102+
builder -> builder.setDescription(d0).build(),
103+
builder -> builder.setDescription(d1).build());
104+
}
105+
106+
@Test
107+
public void testRetainDescriptionTrue()
108+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
109+
InvocationTargetException, IOException {
110+
com.google.cloud.bigquery.Schema schema =
111+
invokeRetainDescriptionBuildSchema("replace", true, "prev_c0", "prev_c1");
112+
assertEquals("c0", schema.getFields().get(0).getDescription());
113+
assertEquals("prev_c1", schema.getFields().get(1).getDescription());
114+
}
115+
116+
@Test
117+
public void testRetainDescriptionFalse()
118+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
119+
InvocationTargetException, IOException {
120+
com.google.cloud.bigquery.Schema schema =
121+
invokeRetainDescriptionBuildSchema("replace", false, "prev_c0", "prev_c1");
122+
assertEquals("c0", schema.getFields().get(0).getDescription());
123+
assertNull(schema.getFields().get(1).getDescription());
124+
}
125+
126+
@Test
127+
public void testRetainDescriptionTrueButNotModeReplace()
128+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
129+
InvocationTargetException, IOException {
130+
com.google.cloud.bigquery.Schema schema =
131+
invokeRetainDescriptionBuildSchema("insert", true, "prev_c0", "prev_c1");
132+
assertEquals("c0", schema.getFields().get(0).getDescription());
133+
assertNull(schema.getFields().get(1).getDescription());
134+
}
135+
136+
private com.google.cloud.bigquery.Schema invokeRetainPolicyTagsBuildSchema(
137+
String mode, Boolean retainPolicyTags, String[] tags0, String[] tags1)
138+
throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException,
139+
NoSuchMethodException {
140+
List<String> n0 = Arrays.stream(tags0).collect(Collectors.toList());
141+
List<String> n1 = Arrays.stream(tags1).collect(Collectors.toList());
142+
PolicyTags p0 = PolicyTags.newBuilder().setNames(n0).build();
143+
PolicyTags p1 = PolicyTags.newBuilder().setNames(n1).build();
144+
return invokeTakeoverBuildSchema(
145+
configSource ->
146+
configSource.set("mode", mode).set("retain_column_policy_tags", retainPolicyTags),
147+
builder -> builder.setPolicyTags(p0).build(),
148+
builder -> builder.setPolicyTags(p1).build());
149+
}
150+
151+
@Test
152+
public void testRetainColumnPolicyTagsTrue()
153+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
154+
InvocationTargetException, IOException {
155+
com.google.cloud.bigquery.Schema schema =
156+
invokeRetainPolicyTagsBuildSchema(
157+
"replace", true, new String[] {"c0"}, new String[] {"c10", "c11"});
158+
assertArrayEquals(
159+
new String[] {"c0"},
160+
schema.getFields().get(0).getPolicyTags().getNames().toArray(new String[0]));
161+
assertArrayEquals(
162+
new String[] {"c10", "c11"},
163+
schema.getFields().get(1).getPolicyTags().getNames().toArray(new String[0]));
164+
}
165+
166+
@Test
167+
public void testRetainColumnPolicyTagsFalse()
168+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
169+
InvocationTargetException, IOException {
170+
com.google.cloud.bigquery.Schema schema =
171+
invokeRetainPolicyTagsBuildSchema(
172+
"replace", false, new String[] {"c0"}, new String[] {"c10", "c11"});
173+
assertNull(schema.getFields().get(0).getPolicyTags());
174+
assertNull(schema.getFields().get(1).getPolicyTags());
175+
}
176+
177+
@Test
178+
public void testRetainColumnPolicyTagsTrueButNotModeReplace()
179+
throws NoSuchMethodException, NoSuchFieldException, IllegalAccessException,
180+
InvocationTargetException, IOException {
181+
com.google.cloud.bigquery.Schema schema =
182+
invokeRetainPolicyTagsBuildSchema(
183+
"insert", true, new String[] {"c0"}, new String[] {"c10", "c11"});
184+
assertNull(schema.getFields().get(0).getPolicyTags());
185+
assertNull(schema.getFields().get(1).getPolicyTags());
186+
}
187+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
type: bigquery_java
2+
mode: replace
3+
auth_method: service_account
4+
json_keyfile: json_key.json
5+
dataset: dataset
6+
table: table
7+
source_format: NEWLINE_DELIMITED_JSON
8+
compression: GZIP
9+
auto_create_dataset: false
10+
auto_create_table: true
11+
path_prefix: /tmp/bq_compress/bq_
12+
column_options:
13+
- name: c0
14+
type: INTEGER
15+
mode: NULLABLE
16+
description: c0
17+
- name: c1
18+
type: STRING
19+
mode: NULLABLE

0 commit comments

Comments
 (0)