Skip to content

Commit 8596fde

Browse files
committed
Added operation logs
1 parent 34c6090 commit 8596fde

File tree

1 file changed

+46
-15
lines changed

1 file changed

+46
-15
lines changed

src/main/java/org/embulk/output/kintone/KintonePageOutput.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.kintone.client.model.record.RadioButtonFieldValue;
2525
import com.kintone.client.model.record.Record;
2626
import com.kintone.client.model.record.RecordForUpdate;
27+
import com.kintone.client.model.record.RecordRevision;
2728
import com.kintone.client.model.record.RichTextFieldValue;
2829
import com.kintone.client.model.record.SingleLineTextFieldValue;
2930
import com.kintone.client.model.record.TimeFieldValue;
@@ -59,6 +60,24 @@
5960
import org.slf4j.LoggerFactory;
6061

6162
public class KintonePageOutput implements TransactionalPageOutput {
63+
private enum Operation {
64+
INSERT("added"),
65+
UPDATE("updated");
66+
final String processed;
67+
68+
Operation(String processed) {
69+
this.processed = processed;
70+
}
71+
72+
void compute(Map<Operation, Integer> results, int i) {
73+
results.compute(this, (k, v) -> v == null ? i : v + i);
74+
}
75+
76+
void info(Map<Operation, Integer> results) {
77+
LOGGER.info("Number of records {}: {}", processed, results.get(this));
78+
}
79+
}
80+
6281
private static final Logger LOGGER =
6382
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
6483
private static final List<String> RETRYABLE_ERROR_CODES =
@@ -131,12 +150,17 @@ public TaskReport commit() {
131150
return CONFIG_MAPPER_FACTORY.newTaskReport();
132151
}
133152

134-
private void insert(List<Record> records) {
135-
executeWithRetry(() -> client.get().record().addRecords(task.getAppId(), records), records);
153+
private void insert(List<Record> records, Map<Operation, Integer> results) {
154+
List<Long> list =
155+
executeWithRetry(() -> client.get().record().addRecords(task.getAppId(), records), records);
156+
Operation.INSERT.compute(results, list.size());
136157
}
137158

138-
private void update(List<RecordForUpdate> records) {
139-
executeWithRetry(() -> client.get().record().updateRecords(task.getAppId(), records), records);
159+
private void update(List<RecordForUpdate> records, Map<Operation, Integer> results) {
160+
List<RecordRevision> list =
161+
executeWithRetry(
162+
() -> client.get().record().updateRecords(task.getAppId(), records), records);
163+
Operation.UPDATE.compute(results, list.size());
140164
}
141165

142166
private <T> T executeWithRetry(Supplier<T> operation) {
@@ -215,20 +239,22 @@ public void insertPage(Page page) {
215239
task.getPreferNulls(),
216240
task.getIgnoreNulls(),
217241
task.getReduceKeyName().orElse(null));
242+
Map<Operation, Integer> results = new TreeMap<>();
218243
while (reader.nextRecord()) {
219244
Record record = new Record();
220245
visitor.setRecord(record);
221246
reader.getSchema().visitColumns(visitor);
222247
putWrongTypeFields(record);
223248
records.add(record);
224249
if (records.size() == task.getChunkSize()) {
225-
insert(records);
250+
insert(records, results);
226251
records.clear();
227252
}
228253
}
229254
if (!records.isEmpty()) {
230-
insert(records);
255+
insert(records, results);
231256
}
257+
Operation.INSERT.info(results);
232258
}
233259

234260
public void updatePage(Page page) {
@@ -244,6 +270,7 @@ public void updatePage(Page page) {
244270
task.getIgnoreNulls(),
245271
task.getReduceKeyName().orElse(null),
246272
task.getUpdateKeyName().orElse(Id.FIELD));
273+
Map<Operation, Integer> results = new TreeMap<>();
247274
while (reader.nextRecord()) {
248275
Record record = new Record();
249276
IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey();
@@ -259,13 +286,14 @@ public void updatePage(Page page) {
259286
}
260287
records.add(idOrUpdateKey.forUpdate(record));
261288
if (records.size() == task.getChunkSize()) {
262-
update(records);
289+
update(records, results);
263290
records.clear();
264291
}
265292
}
266293
if (!records.isEmpty()) {
267-
update(records);
294+
update(records, results);
268295
}
296+
Operation.UPDATE.info(results);
269297
}
270298

271299
public void insertOrUpdatePage(Page page) {
@@ -281,6 +309,7 @@ public void insertOrUpdatePage(Page page) {
281309
task.getIgnoreNulls(),
282310
task.getReduceKeyName().orElse(null),
283311
task.getUpdateKeyName().orElse(Id.FIELD));
312+
Map<Operation, Integer> results = new TreeMap<>();
284313
while (reader.nextRecord()) {
285314
Record record = new Record();
286315
IdOrUpdateKey idOrUpdateKey = new IdOrUpdateKey();
@@ -291,17 +320,19 @@ public void insertOrUpdatePage(Page page) {
291320
records.add(record);
292321
idOrUpdateKeys.add(idOrUpdateKey);
293322
if (records.size() == UPSERT_BATCH_SIZE) {
294-
insertOrUpdate(records, idOrUpdateKeys);
323+
insertOrUpdate(records, idOrUpdateKeys, results);
295324
records.clear();
296325
idOrUpdateKeys.clear();
297326
}
298327
}
299328
if (!records.isEmpty()) {
300-
insertOrUpdate(records, idOrUpdateKeys);
329+
insertOrUpdate(records, idOrUpdateKeys, results);
301330
}
331+
results.keySet().forEach(operation -> operation.info(results));
302332
}
303333

304-
private void insertOrUpdate(List<Record> records, List<IdOrUpdateKey> idOrUpdateKeys) {
334+
private void insertOrUpdate(
335+
List<Record> records, List<IdOrUpdateKey> idOrUpdateKeys, Map<Operation, Integer> results) {
305336
if (records.size() != idOrUpdateKeys.size()) {
306337
throw new RuntimeException("records.size() != idOrUpdateKeys.size()");
307338
}
@@ -343,18 +374,18 @@ private void insertOrUpdate(List<Record> records, List<IdOrUpdateKey> idOrUpdate
343374
insertRecords.add(record);
344375
}
345376
if (insertRecords.size() == task.getChunkSize()) {
346-
insert(insertRecords);
377+
insert(insertRecords, results);
347378
insertRecords.clear();
348379
} else if (updateRecords.size() == task.getChunkSize()) {
349-
update(updateRecords);
380+
update(updateRecords, results);
350381
updateRecords.clear();
351382
}
352383
}
353384
if (!insertRecords.isEmpty()) {
354-
insert(insertRecords);
385+
insert(insertRecords, results);
355386
}
356387
if (!updateRecords.isEmpty()) {
357-
update(updateRecords);
388+
update(updateRecords, results);
358389
}
359390
}
360391

0 commit comments

Comments
 (0)