Skip to content

Commit

Permalink
Pass partition_by information through state
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 24, 2022
1 parent 67ce7e8 commit 2354e28
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
3 changes: 2 additions & 1 deletion pipelinewise/fastsync/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None):
bookmark = db_engine.fetch_current_incremental_key_pos(
table, replication_key
)

if 'partition-by' in table_meta:
bookmark['partition_by'] = table_meta['partition-by']
return bookmark


Expand Down
17 changes: 13 additions & 4 deletions pipelinewise/fastsync/mongodb_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
dbname = args.tap.get('dbname')
partition_by = utils.get_metadata_for_table(
table, args.properties, dbname=dbname).get('partition-by')

try:
filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(
Expand Down Expand Up @@ -83,7 +81,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Creating temp table in Bigquery
bigquery.create_schema(target_schema)
bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True)
bigquery.create_table(
target_schema,
table,
bigquery_columns,
primary_key,
is_temporary=True,
partition_by=bookmark.get('partition_by'),
)

# Load into Bigquery table
bigquery.copy_to_table(
Expand All @@ -102,7 +107,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_by=partition_by
target_schema,
table,
bigquery_columns,
primary_key,
partition_by=bookmark.get('partition_by'),
)
bigquery.swap_tables(target_schema, table)

Expand Down
11 changes: 7 additions & 4 deletions pipelinewise/fastsync/mysql_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
partition_by = utils.get_metadata_for_table(
table, args.properties).get('partition-by')

try:
filename = 'pipelinewise_fastsync_{}_{}.csv'.format(
Expand Down Expand Up @@ -119,7 +117,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery_columns,
primary_key,
is_temporary=True,
partition_by=partition_by,
partition_by=bookmark.get('partition_by'),
)

# Load into Bigquery table
Expand All @@ -140,7 +138,12 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_by=partition_by)
target_schema,
table,
bigquery_columns,
primary_key,
partition_by=bookmark.get('partition_by'),
)
bigquery.swap_tables(target_schema, table)

# Save bookmark to singer state file
Expand Down
10 changes: 6 additions & 4 deletions pipelinewise/fastsync/postgres_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
dbname = args.tap.get('dbname')
partition_by = utils.get_metadata_for_table(
table, args.properties, dbname=dbname).get('partition-by')

try:
filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(
Expand Down Expand Up @@ -121,7 +119,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery_columns,
primary_key,
is_temporary=True,
partition_by=partition_by,
partition_by=bookmark.get('partition_by'),
)

# Load into Bigquery table
Expand All @@ -142,7 +140,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_by=partition_by,
target_schema,
table,
bigquery_columns,
primary_key,
partition_by=bookmark.get('partition_by'),
)
bigquery.swap_tables(target_schema, table)

Expand Down
10 changes: 6 additions & 4 deletions pipelinewise/fastsync/s3_csv_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
partition_by = utils.get_metadata_for_table(
table_name, args.properties).get('partition-by')

try:
filename = utils.gen_export_filename(
Expand Down Expand Up @@ -77,7 +75,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:
primary_key,
is_temporary=True,
sort_columns=True,
partition_by=partition_by,
partition_by=bookmark.get('partition_by'),
)

# Load into Bigquery table
Expand All @@ -97,7 +95,11 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(
target_schema, table_name, bigquery_columns, primary_key, partition_by=partition_by,
target_schema,
table_name,
bigquery_columns,
primary_key,
partition_by=bookmark.get('partition_by'),
)
bigquery.swap_tables(target_schema, table_name)

Expand Down

0 comments on commit 2354e28

Please sign in to comment.