11package org .embulk .output .bigquery_java ;
22
3- import java .io .FileInputStream ;
4- import java .io .IOException ;
5- import java .io .OutputStream ;
6- import java .util .ArrayList ;
7- import java .util .Collections ;
8- import java .util .List ;
9- import java .util .Optional ;
10- import java .nio .channels .Channels ;
11- import java .nio .file .Files ;
12- import java .nio .file .Path ;
13- import java .util .UUID ;
14-
15- import com .google .cloud .bigquery .*;
3+ import com .google .auth .oauth2 .ServiceAccountCredentials ;
4+ import com .google .cloud .bigquery .BigQuery ;
5+ import com .google .cloud .bigquery .BigQueryException ;
6+ import com .google .cloud .bigquery .BigQueryOptions ;
7+ import com .google .cloud .bigquery .Clustering ;
8+ import com .google .cloud .bigquery .CopyJobConfiguration ;
9+ import com .google .cloud .bigquery .Dataset ;
10+ import com .google .cloud .bigquery .DatasetInfo ;
11+ import com .google .cloud .bigquery .Field ;
12+ import com .google .cloud .bigquery .FieldValue ;
13+ import com .google .cloud .bigquery .FormatOptions ;
14+ import com .google .cloud .bigquery .Job ;
15+ import com .google .cloud .bigquery .JobId ;
16+ import com .google .cloud .bigquery .JobInfo ;
17+ import com .google .cloud .bigquery .JobStatistics ;
18+ import com .google .cloud .bigquery .LegacySQLTypeName ;
19+ import com .google .cloud .bigquery .QueryJobConfiguration ;
20+ import com .google .cloud .bigquery .StandardSQLTypeName ;
21+ import com .google .cloud .bigquery .StandardTableDefinition ;
22+ import com .google .cloud .bigquery .Table ;
23+ import com .google .cloud .bigquery .TableDataWriteChannel ;
24+ import com .google .cloud .bigquery .TableDefinition ;
25+ import com .google .cloud .bigquery .TableId ;
26+ import com .google .cloud .bigquery .TableInfo ;
27+ import com .google .cloud .bigquery .TableResult ;
28+ import com .google .cloud .bigquery .TimePartitioning ;
29+ import com .google .cloud .bigquery .WriteChannelConfiguration ;
30+ import com .google .common .annotations .VisibleForTesting ;
1631import org .embulk .output .bigquery_java .config .BigqueryColumnOption ;
1732import org .embulk .output .bigquery_java .config .BigqueryTimePartitioning ;
1833import org .embulk .output .bigquery_java .config .PluginTask ;
3045import org .embulk .spi .type .TimestampType ;
3146import org .embulk .spi .type .Type ;
3247import org .embulk .spi .util .RetryExecutor ;
33-
34- import static org .embulk .spi .util .RetryExecutor .retryExecutor ;
35-
36- import com .google .auth .oauth2 .ServiceAccountCredentials ;
37- import com .google .common .annotations .VisibleForTesting ;
38-
3948import org .slf4j .Logger ;
4049import org .slf4j .LoggerFactory ;
4150
51+ import java .io .FileInputStream ;
52+ import java .io .IOException ;
53+ import java .io .OutputStream ;
54+ import java .nio .channels .Channels ;
55+ import java .nio .file .Files ;
56+ import java .nio .file .Path ;
57+ import java .util .ArrayList ;
58+ import java .util .Collections ;
59+ import java .util .List ;
60+ import java .util .Optional ;
61+ import java .util .Spliterator ;
62+ import java .util .Spliterators ;
63+ import java .util .UUID ;
64+ import java .util .stream .Collectors ;
65+ import java .util .stream .Stream ;
66+ import java .util .stream .StreamSupport ;
67+
68+ import static org .embulk .spi .util .RetryExecutor .retryExecutor ;
4269
4370public class BigqueryClient {
4471 private final Logger logger = LoggerFactory .getLogger (BigqueryClient .class );
@@ -158,7 +185,7 @@ public TimePartitioning buildTimePartitioning(BigqueryTimePartitioning bigqueryT
158185 return timePartitioningBuilder .build ();
159186 }
160187
161- public JobStatistics .LoadStatistics load (Path loadFile , String table , JobInfo .WriteDisposition writeDestination ) throws BigqueryException {
188+ public JobStatistics .LoadStatistics load (Path loadFile , String table , JobInfo .WriteDisposition writeDisposition ) throws BigqueryException {
162189 String dataset = this .dataset ;
163190 int retries = this .task .getRetries ();
164191 PluginTask task = this .task ;
@@ -191,7 +218,7 @@ public JobStatistics.LoadStatistics call() {
191218 WriteChannelConfiguration writeChannelConfiguration =
192219 WriteChannelConfiguration .newBuilder (tableId )
193220 .setFormatOptions (FormatOptions .json ())
194- .setWriteDisposition (writeDestination )
221+ .setWriteDisposition (writeDisposition )
195222 .setMaxBadRecords (task .getMaxBadRecords ())
196223 .setIgnoreUnknownValues (task .getIgnoreUnknownValues ())
197224 .setSchema (buildSchema (schema , columnOptions ))
@@ -247,7 +274,7 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
247274 public JobStatistics .CopyStatistics copy (String sourceTable ,
248275 String destinationTable ,
249276 String destinationDataset ,
250- JobInfo .WriteDisposition writeDestination ) throws BigqueryException {
277+ JobInfo .WriteDisposition writeDisposition ) throws BigqueryException {
251278 String dataset = this .dataset ;
252279 int retries = this .task .getRetries ();
253280
@@ -265,7 +292,7 @@ public JobStatistics.CopyStatistics call() {
265292 TableId srcTableId = TableId .of (dataset , sourceTable );
266293
267294 CopyJobConfiguration copyJobConfiguration = CopyJobConfiguration .newBuilder (destTableId , srcTableId )
268- .setWriteDisposition (writeDestination )
295+ .setWriteDisposition (writeDisposition )
269296 .build ();
270297
271298 Job job = bigquery .create (JobInfo .newBuilder (copyJobConfiguration ).setJobId (JobId .of (jobId )).build ());
@@ -308,6 +335,167 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
308335 }
309336 }
310337
338+ public JobStatistics .QueryStatistics merge (String sourceTable , String targetTable , List <String > mergeKeys , List <String > mergeRule ) {
339+ StringBuilder sb = new StringBuilder ();
340+ sb .append ("MERGE " );
341+ sb .append (quoteIdentifier (dataset ));
342+ sb .append ("." );
343+ sb .append (quoteIdentifier (targetTable ));
344+ sb .append (" T" );
345+ sb .append (" USING " );
346+ sb .append (quoteIdentifier (dataset ));
347+ sb .append ("." );
348+ sb .append (quoteIdentifier (sourceTable ));
349+ sb .append (" S" );
350+ sb .append (" ON " );
351+ appendMergeKeys (sb , mergeKeys .isEmpty () ? getMergeKeys (targetTable ) : mergeKeys );
352+ sb .append (" WHEN MATCHED THEN" );
353+ sb .append (" UPDATE SET " );
354+ appendMergeRule (sb , mergeRule , schema );
355+ sb .append (" WHEN NOT MATCHED THEN" );
356+ sb .append (" INSERT (" );
357+ appendColumns (sb , schema );
358+ sb .append (") VALUES (" );
359+ appendColumns (sb , schema );
360+ sb .append (")" );
361+ String query = sb .toString ();
362+ logger .info (String .format ("embulk-output-bigquery: Execute query... %s" , query ));
363+ return executeQuery (query );
364+ }
365+
366+ private List <String > getMergeKeys (String table ) {
367+ String query =
368+ "SELECT" +
369+ " KCU.COLUMN_NAME " +
370+ "FROM " +
371+ quoteIdentifier (dataset ) + ".INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU " +
372+ "JOIN " +
373+ quoteIdentifier (dataset ) + ".INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC " +
374+ "ON" +
375+ " KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND" +
376+ " KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND" +
377+ " KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND" +
378+ " KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND" +
379+ " KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND" +
380+ " KCU.TABLE_NAME = TC.TABLE_NAME " +
381+ "WHERE" +
382+ " TC.TABLE_NAME = '" + table + "' AND" +
383+ " TC.CONSTRAINT_TYPE = 'PRIMARY KEY' " +
384+ "ORDER BY" +
385+ " KCU.ORDINAL_POSITION" ;
386+ return stream (runQuery (query ).iterateAll ())
387+ .flatMap (BigqueryClient ::stream )
388+ .map (FieldValue ::getStringValue )
389+ .collect (Collectors .toList ());
390+ }
391+
392+ public TableResult runQuery (String query ) {
393+ int retries = task .getRetries ();
394+ try {
395+ return retryExecutor ()
396+ .withRetryLimit (retries )
397+ .withInitialRetryWait (2 * 1000 )
398+ .withMaxRetryWait (10 * 1000 )
399+ .runInterruptible (new RetryExecutor .Retryable <TableResult >() {
400+ @ Override
401+ public TableResult call () throws Exception {
402+ QueryJobConfiguration configuration =
403+ QueryJobConfiguration .newBuilder (query )
404+ .setUseLegacySql (false )
405+ .build ();
406+ String job = String .format ("embulk_query_job_%s" , UUID .randomUUID ());
407+ JobId .Builder builder = JobId .newBuilder ().setJob (job );
408+ if (location != null ){
409+ builder .setLocation (location );
410+ }
411+ return bigquery .query (configuration , builder .build ());
412+ }
413+ @ Override
414+ public boolean isRetryableException (Exception exception ) {
415+ return exception instanceof BigqueryBackendException
416+ || exception instanceof BigqueryRateLimitExceededException
417+ || exception instanceof BigqueryInternalException ;
418+ }
419+ @ Override
420+ public void onRetry (Exception exception , int retryCount , int retryLimit , int retryWait ) {
421+ String message = String .format ("embulk-output-bigquery: Query job failed. Retrying %d/%d after %d seconds. Message: %s" ,
422+ retryCount , retryLimit , retryWait / 1000 , exception .getMessage ());
423+ if (retryCount % retries == 0 ) {
424+ logger .warn (message , exception );
425+ } else {
426+ logger .warn (message );
427+ }
428+ }
429+ @ Override
430+ public void onGiveup (Exception firstException , Exception lastException ) {
431+ logger .error ("embulk-output-bigquery: Give up retrying for Query job" );
432+ }
433+ });
434+ } catch (RetryExecutor .RetryGiveupException e ) {
435+ if (e .getCause () instanceof BigqueryException ) {
436+ throw (BigqueryException ) e .getCause ();
437+ }
438+ throw new RuntimeException (e );
439+ } catch (InterruptedException e ) {
440+ throw new BigqueryException ("interrupted" );
441+ }
442+ }
443+
444+ private static <T > Stream <T > stream (Iterable <T > iterable ) {
445+ return StreamSupport .stream (Spliterators .spliteratorUnknownSize (iterable .iterator (), Spliterator .ORDERED ), false );
446+ }
447+
448+ private static StringBuilder appendMergeKeys (StringBuilder sb , List <String > mergeKeys ) {
449+ if (mergeKeys .isEmpty ()) {
450+ throw new RuntimeException ("merge key or primary key is required" );
451+ }
452+ for (int i = 0 ; i < mergeKeys .size (); i ++) {
453+ if (i != 0 ) { sb .append (" AND " ); }
454+ String mergeKey = quoteIdentifier (mergeKeys .get (i ));
455+ sb .append ("T." );
456+ sb .append (mergeKey );
457+ sb .append (" = S." );
458+ sb .append (mergeKey );
459+ }
460+ return sb ;
461+ }
462+
463+ private static StringBuilder appendMergeRule (StringBuilder sb , List <String > mergeRule , Schema schema ) {
464+ return mergeRule .isEmpty () ? appendMergeRule (sb , schema ) : appendMergeRule (sb , mergeRule );
465+ }
466+
467+ private static StringBuilder appendMergeRule (StringBuilder sb , List <String > mergeRule ) {
468+ for (int i = 0 ; i < mergeRule .size (); i ++) {
469+ if (i != 0 ) { sb .append (", " ); }
470+ sb .append (mergeRule .get (i ));
471+ }
472+ return sb ;
473+ }
474+
475+ private static StringBuilder appendMergeRule (StringBuilder sb , Schema schema ) {
476+ for (int i = 0 ; i < schema .getColumnCount (); i ++) {
477+ if (i != 0 ) { sb .append (", " ); }
478+ String column = quoteIdentifier (schema .getColumnName (i ));
479+ sb .append ("T." );
480+ sb .append (column );
481+ sb .append (" = S." );
482+ sb .append (column );
483+ }
484+ return sb ;
485+ }
486+
487+ private static StringBuilder appendColumns (StringBuilder sb , Schema schema ) {
488+ for (int i = 0 ; i < schema .getColumnCount (); i ++) {
489+ if (i != 0 ) { sb .append (", " ); }
490+ sb .append (quoteIdentifier (schema .getColumnName (i )));
491+ }
492+ return sb ;
493+ }
494+
495+ private static String quoteIdentifier (String identifier ) {
496+ return "`" + identifier + "`" ;
497+ }
498+
311499 public JobStatistics .QueryStatistics executeQuery (String query ) {
312500 int retries = this .task .getRetries ();
313501 String location = this .location ;
@@ -372,7 +560,6 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
372560 }
373561 }
374562
375-
376563 public boolean deleteTable (String table ) {
377564 return deleteTable (table , null );
378565 }
0 commit comments