Skip to content

Commit ea739f0

Browse files
committed
feat(bq2bq): add spillover date via SQL support
1 parent 631527a commit ea739f0

File tree

1 file changed

+44
-14
lines changed

1 file changed

+44
-14
lines changed

task/bq2bq/executor/bumblebee/transformation.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def __init__(self,
2727
dend: datetime,
2828
execution_time: datetime,
2929
dry_run: bool):
30+
self.spillover_query = spillover_query
3031
self.bigquery_service = bigquery_service
3132
self.task_config = task_config
3233
self.sql_query = sql_query
@@ -78,15 +79,26 @@ def transform(self):
7879
elif bq_destination_table.partitioning_type == "DAY":
7980
partition_strategy = timedelta(days=1)
8081

81-
# queries where source data/partition directly map with destination partitions
82-
transformation = MultiPartitionTransformation(self.bigquery_service,
83-
self.task_config,
84-
self.sql_query,
85-
self.dstart, self.dend,
86-
self.dry_run,
87-
localised_execution_time,
88-
partition_strategy,
89-
self.task_config.concurrency)
82+
if self.spillover_query:
83+
transformation = LegacySpilloverTransformation(self.bigquery_service,
84+
self.task_config,
85+
self.sql_query,
86+
self.spillover_query,
87+
self.dstart,
88+
self.dend,
89+
self.dry_run,
90+
localised_execution_time,
91+
partition_strategy)
92+
else:
93+
# queries where source data/partition directly map with destination partitions
94+
transformation = MultiPartitionTransformation(self.bigquery_service,
95+
self.task_config,
96+
self.sql_query,
97+
self.dstart, self.dend,
98+
self.dry_run,
99+
localised_execution_time,
100+
partition_strategy,
101+
self.task_config.concurrency)
90102
else:
91103
raise Exception("unable to generate a transformation for request, unsupported partition strategy")
92104
transformation.transform()
@@ -376,7 +388,11 @@ def transform(self):
376388
# break query file
377389
task_queries = self.task_query.split(OPTIMUS_QUERY_BREAK_MARKER)
378390
if len(task_queries) < len(datetime_list):
379-
raise Exception("query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER, len(task_queries), len(datetime_list), self.task_query))
391+
raise Exception(
392+
"query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER,
393+
len(task_queries),
394+
len(datetime_list),
395+
self.task_query))
380396

381397
tasks = []
382398
query_index = 0
@@ -413,45 +429,59 @@ def __init__(self,
413429
sql_query: str,
414430
spillover_query: str,
415431
start_time: datetime,
432+
end_time: datetime,
416433
dry_run: bool,
417-
execution_time: datetime):
434+
execution_time: datetime,
435+
partition_delta: timedelta):
418436
self.bigquery_service = bigquery_service
419437
self.task_config = task_config
420438
self.sql_query = sql_query
421439
self.spillover_query = spillover_query
422440
self.dry_run = dry_run
423441
self.start_time = start_time
442+
self.end_time = end_time
424443
self.execution_time = execution_time
444+
self.partition_delta = partition_delta
425445

426446
self.concurrency = self.task_config.concurrency
427447

428448
def transform(self):
429449
datetime_list = []
430-
default_datetime = [self.start_time]
431-
datetime_list.extend(default_datetime)
450+
# default_datetime = [self.start_time]
451+
# datetime_list.extend(default_datetime)
432452

433453
if self.task_config.use_spillover:
434454
spillover = SpilloverDatetimes(self.bigquery_service,
435455
self.spillover_query,
436456
self.task_config,
437457
self.start_time,
458+
self.end_time,
438459
self.dry_run,
439460
self.execution_time)
440461
spillover_datetimes = spillover.collect_datetimes()
441462
datetime_list.extend(spillover_datetimes)
442463

443464
datetime_list = distinct_list(datetime_list)
444465

466+
execute_for = self.start_time
467+
468+
# tables are partitioned for day
469+
# iterate from start to end for each partition
470+
while execute_for < self.end_time:
471+
execute_for += self.partition_delta
472+
445473
tasks = []
446474
for partition_time in datetime_list:
447475
logger.info("create transformation for partition: {}".format(partition_time))
448476
loader = PartitionLoader(self.bigquery_service, self.task_config.destination_table,
449477
self.task_config.load_method, partition_time)
450478

479+
task_window = WindowFactory.create_window_with_time(partition_time, partition_time + self.partition_delta)
480+
451481
task = PartitionTransformation(self.task_config,
452482
loader,
453483
self.sql_query,
454-
self.window,
484+
task_window,
455485
self.dry_run,
456486
self.execution_time)
457487
tasks.append(task)

0 commit comments

Comments
 (0)