Skip to content

Commit 2da771e

Browse files
committed
Added upsert by API
1 parent 8596fde commit 2da771e

File tree

60 files changed

+413
-17
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+413
-17
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ kintone output plugin for Embulk stores app records from kintone.
1717
- **basic_auth_password**: kintone basic auth password (string, optional)
1818
- **guest_space_id**: kintone app belongs to guest space, guest space id is required. (integer, optional)
1919
- **mode**: kintone mode (string, required)
20-
- **update_key**: Column name to set update key (string, required if mode is update or insert_or_update)
20+
- **update_key**: Column name to set update key (string, required if mode is update, upsert or insert_or_update)
2121
- **reduce_key**: Key column name to reduce expanded SUBTABLE (string, optional)
2222
- **sort_columns**: List of columns for sorting input records (array of objects, optional)
2323
- **name**: Column name (string, required)
@@ -29,11 +29,13 @@ kintone output plugin for Embulk stores app records from kintone.
2929
- **skip_if_non_existing_id_or_update_key**: The skip policy if the record corresponding to the id or update key does not exist (string `auto`, `never` or `always`, default is `auto`). No effect for insert mode.
3030
- **auto**:
3131
- **update mode**: Skip the record if no id or update key value is specified.
32+
- **upsert mode**: Skip the record if no update key value is specified. Insert the record if no id is specified.
3233
- **insert_or_update mode**: Skip the record if corresponds to the id does not exist or no update key value is specified.
3334
- **never**: Never skip the record even if corresponds to the id or update key does not exist.
3435
- **update mode**: Throw exception if no id or update key value is specified.
36+
- **upsert mode**: Insert the record even if no id or update key value is specified.
3537
- **insert_or_update mode**: Insert the record if corresponds to the id or update key does not exist (also, if no id or update key value is specified).
36-
- **always**: Always skip the record if corresponds to the id or update key does not exist (also, if no id or update key value is specified). update mode and insert_or_update mode will the same behavior (only updated, never inserted).
38+
- **always**: Always skip the record if corresponds to the id or update key does not exist (also, if no id or update key value is specified). update mode, upsert mode and insert_or_update mode will the same behavior (only updated, never inserted).
3739
- **column_options** advanced: a key-value pairs where key is a column name and value is options for the column.
3840
- **field_code**: field code (string, required)
3941
- **type**: field type (string, required). See [this page](https://cybozu.dev/ja/kintone/docs/overview/field-types/#field-type-update) for list of available types. However, following types are not yet supported

src/main/java/org/embulk/output/kintone/KintoneMode.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@ public void add(Page page, Skip skip, KintonePageOutput output) {
3636
consumer.accept(page);
3737
}
3838
},
39+
UPSERT("upsert") {
40+
@Override
41+
public void validate(PluginTask task, KintoneClient client) {
42+
if (!task.getUpdateKeyName().isPresent() && client.getColumn(Id.FIELD) == null) {
43+
throw new ConfigException("When mode is upsert, require update_key or id column.");
44+
}
45+
client.validateIdOrUpdateKey(task.getUpdateKeyName().orElse(Id.FIELD));
46+
}
47+
48+
@Override
49+
public void add(Page page, Skip skip, KintonePageOutput output) {
50+
Consumer<Page> consumer =
51+
skip == Skip.ALWAYS ? output::insertOrUpdatePage : output::upsertPage;
52+
consumer.accept(page);
53+
}
54+
},
3955
INSERT_OR_UPDATE("insert_or_update") {
4056
@Override
4157
public void validate(PluginTask task, KintoneClient client) {

src/main/java/org/embulk/output/kintone/KintonePageOutput.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,15 @@ private void update(List<RecordForUpdate> records, Map<Operation, Integer> resul
163163
Operation.UPDATE.compute(results, list.size());
164164
}
165165

166+
private void upsert(List<RecordForUpdate> records, Map<Operation, Integer> results) {
167+
List<RecordRevision> list =
168+
executeWithRetry(
169+
() -> client.get().record().updateRecords(task.getAppId(), records, true), records);
170+
list.stream()
171+
.collect(Collectors.groupingBy(RecordRevision::getOperation))
172+
.forEach((key, value) -> Operation.valueOf(key).compute(results, value.size()));
173+
}
174+
166175
private <T> T executeWithRetry(Supplier<T> operation) {
167176
return executeWithRetry(operation, null);
168177
}
@@ -296,6 +305,51 @@ public void updatePage(Page page) {
296305
Operation.UPDATE.info(results);
297306
}
298307

308+
public void upsertPage(Page page) {
309+
Skip skip = task.getSkipIfNonExistingIdOrUpdateKey();
310+
String columnName = task.getUpdateKeyName().orElse(Id.FIELD);
311+
boolean isId = columnName.equals(Id.FIELD);
312+
int increments = 0;
313+
List<RecordForUpdate> records = new ArrayList<>();
314+
reader.setPage(page);
315+
KintoneColumnVisitor visitor =
316+
new KintoneColumnVisitor(
317+
reader,
318+
task.getDerivedColumns(),
319+
task.getColumnOptions(),
320+
task.getPreferNulls(),
321+
task.getIgnoreNulls(),
322+
task.getReduceKeyName().orElse(null),
323+
task.getUpdateKeyName().orElse(Id.FIELD));
324+
Map<Operation, Integer> results = new TreeMap<>();
325+
while (reader.nextRecord()) {
326+
Record record = new Record();
327+
IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey();
328+
visitor.setRecord(record);
329+
visitor.setIdOrUpdateKey(idOrUpdateKey);
330+
reader.getSchema().visitColumns(visitor);
331+
putWrongTypeFields(record);
332+
if (isId && !idOrUpdateKey.isIdPresent()) {
333+
// Record inserted because no id was specified
334+
idOrUpdateKey.getId().setValue(Long.MAX_VALUE - increments++);
335+
} else if (skip == Skip.AUTO && !isId && !idOrUpdateKey.isUpdateKeyPresent()) {
336+
LOGGER.warn("Record skipped because no update key value was specified");
337+
continue;
338+
} else if (!isId && !idOrUpdateKey.isUpdateKeyPresent()) {
339+
LOGGER.warn("Record inserted though no update key value was specified");
340+
}
341+
records.add(idOrUpdateKey.forUpdate(record));
342+
if (records.size() == task.getChunkSize()) {
343+
upsert(records, results);
344+
records.clear();
345+
}
346+
}
347+
if (!records.isEmpty()) {
348+
upsert(records, results);
349+
}
350+
results.keySet().forEach(operation -> operation.info(results));
351+
}
352+
299353
public void insertOrUpdatePage(Page page) {
300354
List<Record> records = new ArrayList<>();
301355
List<IdOrUpdateKey> idOrUpdateKeys = new ArrayList<>();

src/test/java/org/embulk/output/kintone/KintoneClientTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ public void testUpdate() {
5959
assertConfigException("The id column must be 'long'.", id(Types.STRING));
6060
}
6161

62+
@Test
63+
public void testUpsert() {
64+
merge(config("mode: upsert"));
65+
merge(config("update_key: null"));
66+
assertConfigException("When mode is upsert, require update_key or id column.");
67+
merge(config("update_key: non_existing_column"));
68+
assertConfigException("The column 'non_existing_column' for update does not exist.");
69+
merge(config("update_key: non_existing_field"));
70+
assertConfigException("The field 'non_existing_field' for update does not exist.");
71+
merge(config("update_key: invalid_type_field_multi_line_text"));
72+
assertConfigException("The update_key must be 'SINGLE_LINE_TEXT' or 'NUMBER'.");
73+
merge(config("update_key: long_number"));
74+
runWithMockClient(Lazy::get);
75+
merge(config("update_key: string_single_line_text"));
76+
runWithMockClient(Lazy::get);
77+
merge(config("update_key: $id"));
78+
runWithMockClient(Lazy::get, id(Types.LONG));
79+
merge(config("update_key: null"));
80+
assertConfigException("The id column must be 'long'.", id(Types.STRING));
81+
}
82+
6283
@Test
6384
public void testInsertOrUpdate() {
6485
merge(config("mode: insert_or_update"));

src/test/java/org/embulk/output/kintone/KintonePageOutputVerifier.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import java.util.Collection;
2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.function.BiConsumer;
2526
import java.util.function.Function;
27+
import java.util.function.Predicate;
2628
import java.util.stream.Collectors;
2729
import java.util.stream.IntStream;
2830
import org.embulk.config.TaskReport;
2931
import org.embulk.exec.PooledBufferAllocator;
3032
import org.embulk.output.kintone.record.Id;
33+
import org.embulk.output.kintone.record.Skip;
3134
import org.embulk.spi.BufferAllocator;
3235
import org.embulk.spi.Exec;
3336
import org.embulk.spi.Page;
@@ -40,37 +43,49 @@
4043
public class KintonePageOutputVerifier implements TransactionalPageOutput {
4144
private final TransactionalPageOutput transactionalPageOutput;
4245
private final String domain;
46+
private final KintoneMode mode;
4347
private final String field;
48+
private final Skip skip;
4449
private final List<String> values;
4550
private final List<String> addValues;
4651
private final List<Record> addRecords;
4752
private final List<RecordForUpdate> updateRecords;
53+
private final Predicate<Record> nonNull;
4854

4955
public KintonePageOutputVerifier(
5056
String domain,
57+
KintoneMode mode,
5158
String field,
59+
Skip skip,
5260
List<String> values,
5361
List<String> addValues,
5462
List<Record> addRecords,
55-
List<RecordForUpdate> updateRecords) {
56-
this(null, domain, field, values, addValues, addRecords, updateRecords);
63+
List<RecordForUpdate> updateRecords,
64+
Predicate<Record> nonNull) {
65+
this(null, domain, mode, field, skip, values, addValues, addRecords, updateRecords, nonNull);
5766
}
5867

5968
public KintonePageOutputVerifier(
6069
TransactionalPageOutput transactionalPageOutput,
6170
String domain,
71+
KintoneMode mode,
6272
String field,
73+
Skip skip,
6374
List<String> values,
6475
List<String> addValues,
6576
List<Record> addRecords,
66-
List<RecordForUpdate> updateRecords) {
77+
List<RecordForUpdate> updateRecords,
78+
Predicate<Record> nonNull) {
6779
this.transactionalPageOutput = transactionalPageOutput;
6880
this.domain = domain;
81+
this.mode = mode;
6982
this.field = field;
83+
this.skip = skip;
7084
this.values = values;
7185
this.addValues = addValues;
7286
this.addRecords = addRecords;
7387
this.updateRecords = updateRecords;
88+
this.nonNull = nonNull;
7489
}
7590

7691
@Override
@@ -124,7 +139,7 @@ private void runWithMockClient(MockClient.Runnable runnable) throws Exception {
124139
mockClient.run(runnable);
125140
@SuppressWarnings("unchecked")
126141
ArgumentCaptor<List<Record>> addRecordsArgumentCaptor = ArgumentCaptor.forClass(List.class);
127-
verify(mockRecordClient, atLeast(0)).addRecords(eq(0L), addRecordsArgumentCaptor.capture());
142+
addRecordsVerifier().accept(mockRecordClient, addRecordsArgumentCaptor);
128143
assertRecords(
129144
domain,
130145
addRecordsArgumentCaptor.getAllValues().stream()
@@ -134,8 +149,7 @@ private void runWithMockClient(MockClient.Runnable runnable) throws Exception {
134149
@SuppressWarnings("unchecked")
135150
ArgumentCaptor<List<RecordForUpdate>> updateRecordsArgumentCaptor =
136151
ArgumentCaptor.forClass(List.class);
137-
verify(mockRecordClient, atLeast(0))
138-
.updateRecords(eq(0L), updateRecordsArgumentCaptor.capture());
152+
updateRecordsVerifier().accept(mockRecordClient, updateRecordsArgumentCaptor);
139153
assertRecordForUpdates(
140154
domain,
141155
updateRecordsArgumentCaptor.getAllValues().stream()
@@ -144,6 +158,37 @@ private void runWithMockClient(MockClient.Runnable runnable) throws Exception {
144158
updateRecords);
145159
}
146160

161+
private BiConsumer<RecordClient, ArgumentCaptor<List<Record>>> addRecordsVerifier() {
162+
return mode == KintoneMode.UPSERT && skip != Skip.ALWAYS
163+
? (rc, ac) -> {}
164+
: this::verifyAddRecords;
165+
}
166+
167+
private BiConsumer<RecordClient, ArgumentCaptor<List<RecordForUpdate>>> updateRecordsVerifier() {
168+
return mode == KintoneMode.UPSERT && skip != Skip.ALWAYS
169+
? this::verifyUpsertRecords
170+
: this::verifyUpdateRecords;
171+
}
172+
173+
private void verifyAddRecords(
174+
RecordClient mockRecordClient, ArgumentCaptor<List<Record>> addRecordsArgumentCaptor) {
175+
verify(mockRecordClient, atLeast(0)).addRecords(eq(0L), addRecordsArgumentCaptor.capture());
176+
}
177+
178+
private void verifyUpdateRecords(
179+
RecordClient mockRecordClient,
180+
ArgumentCaptor<List<RecordForUpdate>> updateRecordsArgumentCaptor) {
181+
verify(mockRecordClient, atLeast(0))
182+
.updateRecords(eq(0L), updateRecordsArgumentCaptor.capture());
183+
}
184+
185+
private void verifyUpsertRecords(
186+
RecordClient mockRecordClient,
187+
ArgumentCaptor<List<RecordForUpdate>> updateRecordsArgumentCaptor) {
188+
verify(mockRecordClient, atLeast(0))
189+
.updateRecords(eq(0L), updateRecordsArgumentCaptor.capture(), eq(true));
190+
}
191+
147192
private List<String> getValues() {
148193
if (values.isEmpty() || updateRecords.isEmpty()) {
149194
return values;
@@ -164,13 +209,13 @@ record ->
164209
}
165210

166211
private List<Record> getRecords() {
167-
return updateRecords.stream().map(this::getRecord).collect(Collectors.toList());
212+
return updateRecords.stream().map(this::getRecord).filter(nonNull).collect(Collectors.toList());
168213
}
169214

170215
private Record getRecord(RecordForUpdate updateRecord) {
171216
Long id = updateRecord.getId();
172217
UpdateKey key = updateRecord.getUpdateKey();
173-
return id != null ? getRecord(id) : getRecord(key);
218+
return id != null ? getRecord(id) : key.getField() != null ? getRecord(key) : null;
174219
}
175220

176221
private Record getRecord(Long id) {

src/test/java/org/embulk/output/kintone/MockClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public void run(Runnable runnable) throws Exception {
9292
.thenReturn(mockGetRecordsByCursorResponseBody);
9393
when(mockRecordClient.addRecords(eq(0L), anyList())).thenReturn(Collections.emptyList());
9494
when(mockRecordClient.updateRecords(eq(0L), anyList())).thenReturn(Collections.emptyList());
95+
when(mockRecordClient.updateRecords(eq(0L), anyList(), eq(true)))
96+
.thenReturn(Collections.emptyList());
9597
com.kintone.client.KintoneClient mockKintoneClient = mock(KintoneClient.class);
9698
when(mockKintoneClient.app()).thenReturn(mockAppClient);
9799
when(mockKintoneClient.record()).thenReturn(mockRecordClient);

0 commit comments

Comments
 (0)