Skip to content

Commit

Permalink
Implement ability to partition BigQuery tables
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 23, 2022
1 parent 89efc0e commit 924640e
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 8 deletions.
1 change: 1 addition & 0 deletions pipelinewise/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]:
# Add replication_key only if replication_method is INCREMENTAL
'replication_key': table.get('replication_key')
if replication_method == 'INCREMENTAL' else None,
'partition_key': table.get('partition_key'),
}
)
)
Expand Down
6 changes: 6 additions & 0 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,12 @@ def make_default_selection(self, schema, selection_file):
]['metadata']['replication-key'] = tap_stream_sel[
'replication_key'
]
if 'partition_key' in tap_stream_sel:
schema['streams'][stream_idx]['metadata'][
stream_table_mdata_idx
]['metadata']['partition-key'] = tap_stream_sel[
'partition_key'
]
else:
self.logger.debug(
'Mark %s tap_stream_id as not selected', tap_stream_id
Expand Down
6 changes: 6 additions & 0 deletions pipelinewise/cli/schemas/tap.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
"LOG_BASED"
]
},
"partition_key": {
"type": "string"
},
"transformations": {
"type": "array",
"items": {
Expand Down Expand Up @@ -113,6 +116,9 @@
"replication_key": {
"type": "string"
},
"partition_key": {
"type": "string"
},
"transformations": {
"type": "array",
"items": {
Expand Down
4 changes: 4 additions & 0 deletions pipelinewise/fastsync/commons/target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def create_table(
primary_key: Optional[List[str]],
is_temporary: bool = False,
sort_columns=False,
partition_key: Optional[str] = None,
):

table_dict = utils.tablename_to_dict(table_name)
Expand Down Expand Up @@ -232,6 +233,9 @@ def create_table(
f'CREATE OR REPLACE TABLE {target_schema}.{target_table} ('
f'{",".join(columns)})'
)
if partition_key:
partition_key = partition_key.lower()
sql = sql + f' PARTITION BY DATE({partition_key})'
if primary_key:
primary_key = [c.lower() for c in primary_key]
sql = sql + f' CLUSTER BY {",".join(primary_key)}'
Expand Down
8 changes: 6 additions & 2 deletions pipelinewise/fastsync/mongodb_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
dbname = args.tap.get('dbname')
partition_key = utils.get_metadata_for_table(
table, args.properties, dbname=dbname).get('partition_key')

try:
dbname = args.tap.get('dbname')
filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(
dbname, table, time.strftime('%Y%m%d-%H%M%S')
)
Expand Down Expand Up @@ -99,7 +101,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery.obfuscate_columns(target_schema, table)

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(target_schema, table, bigquery_columns, primary_key)
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_key=partition_key
)
bigquery.swap_tables(target_schema, table)

# Save bookmark to singer state file
Expand Down
14 changes: 12 additions & 2 deletions pipelinewise/fastsync/mysql_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
partition_key = utils.get_metadata_for_table(
table, args.properties).get('partition_key')

try:
filename = 'pipelinewise_fastsync_{}_{}.csv'.format(
Expand Down Expand Up @@ -111,7 +113,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Creating temp table in Bigquery
bigquery.create_schema(target_schema)
bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True)
bigquery.create_table(
target_schema,
table,
bigquery_columns,
primary_key,
is_temporary=True,
partition_key=partition_key,
)

# Load into Bigquery table
bigquery.copy_to_table(
Expand All @@ -130,7 +139,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery.obfuscate_columns(target_schema, table)

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(target_schema, table, bigquery_columns, primary_key)
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_key=partition_key)
bigquery.swap_tables(target_schema, table)

# Save bookmark to singer state file
Expand Down
17 changes: 14 additions & 3 deletions pipelinewise/fastsync/postgres_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
dbname = args.tap.get('dbname')
partition_key = utils.get_metadata_for_table(
table, args.properties, dbname=dbname).get('partition_key')

try:
dbname = args.tap.get('dbname')
filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format(
dbname, table, time.strftime('%Y%m%d-%H%M%S')
)
Expand Down Expand Up @@ -113,7 +115,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:

# Creating temp table in Bigquery
bigquery.create_schema(target_schema)
bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True)
bigquery.create_table(
target_schema,
table,
bigquery_columns,
primary_key,
is_temporary=True,
partition_key=partition_key,
)

# Load into Bigquery table
bigquery.copy_to_table(
Expand All @@ -132,7 +141,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]:
bigquery.obfuscate_columns(target_schema, table)

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(target_schema, table, bigquery_columns, primary_key)
bigquery.create_table(
target_schema, table, bigquery_columns, primary_key, partition_key=partition_key,
)
bigquery.swap_tables(target_schema, table)

# Save bookmark to singer state file
Expand Down
7 changes: 6 additions & 1 deletion pipelinewise/fastsync/s3_csv_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:
bigquery = FastSyncTargetBigquery(args.target, args.transform)
tap_id = args.target.get('tap_id')
archive_load_files = args.target.get('archive_load_files', False)
partition_key = utils.get_metadata_for_table(
table_name, args.properties).get('partition_key')

try:
filename = utils.gen_export_filename(
Expand Down Expand Up @@ -75,6 +77,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:
primary_key,
is_temporary=True,
sort_columns=True,
partition_key=partition_key,
)

# Load into Bigquery table
Expand All @@ -93,7 +96,9 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]:
bigquery.obfuscate_columns(target_schema, table_name)

# Create target table and swap with the temp table in Bigquery
bigquery.create_table(target_schema, table_name, bigquery_columns, primary_key)
bigquery.create_table(
target_schema, table_name, bigquery_columns, primary_key, partition_key=partition_key,
)
bigquery.swap_tables(target_schema, table_name)

# Get bookmark
Expand Down

0 comments on commit 924640e

Please sign in to comment.