Skip to content

Commit

Permalink
Merge pull request #13 from ezcater/ag-async-report-polling
Browse files Browse the repository at this point in the history
Adding asyncio to tap to allow for concurrent polling of reports
  • Loading branch information
KAllan357 authored Feb 21, 2018
2 parents 58604d8 + 76e6f9c commit 6bc30fa
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions tap_bing_ads/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env python3

import time
import asyncio
import json
import csv
import sys
import re
import io
from datetime import datetime, timedelta
from datetime import datetime
from zipfile import ZipFile

import singer
Expand Down Expand Up @@ -584,7 +584,7 @@ def type_report_row(row):

row[field_name] = value

def poll_report(client, report_name, start_date, end_date, request_id):
async def poll_report(client, report_name, start_date, end_date, request_id):
download_url = None
with metrics.job_timer('generate_report'):
for i in range(1, MAX_NUM_REPORT_POLLS + 1):
Expand Down Expand Up @@ -613,7 +613,7 @@ def poll_report(client, report_name, start_date, end_date, request_id):
start_date,
end_date))
else:
time.sleep(REPORT_POLL_SLEEP)
await asyncio.sleep(REPORT_POLL_SLEEP)

return download_url

Expand Down Expand Up @@ -642,7 +642,7 @@ def stream_report(stream_name, report_name, url, report_time):
singer.write_record(stream_name, row)
counter.increment()

def sync_report(client, account_id, report_stream):
async def sync_report(client, account_id, report_stream):
report_name = stringcase.pascalcase(report_stream.stream)

report_schema = get_report_schema(client, report_name)
Expand Down Expand Up @@ -702,7 +702,7 @@ def sync_report(client, account_id, report_stream):

request_id = client.SubmitGenerateReport(report_request)

download_url = poll_report(client, report_name, start_date, end_date, request_id)
download_url = await poll_report(client, report_name, start_date, end_date, request_id)

if download_url:
stream_report(report_stream.stream,
Expand All @@ -713,23 +713,26 @@ def sync_report(client, account_id, report_stream):
singer.write_bookmark(STATE, state_key, 'date', end_date.isoformat())
singer.write_state(STATE)

def sync_reports(account_id, catalog):
async def sync_reports(account_id, catalog):
client = create_sdk_client('ReportingService', account_id)

reports_to_sync = filter(lambda x: x.is_selected() and x.stream[-6:] == 'report',
catalog.streams)

for report_stream in reports_to_sync:
sync_report_tasks = [
sync_report(client, account_id, report_stream)
for report_stream in reports_to_sync
]
await asyncio.gather(*sync_report_tasks)

def sync_account_data(account_id, catalog, selected_streams):
async def sync_account_data(account_id, catalog, selected_streams):
LOGGER.info('Syncing core objects')
sync_core_objects(account_id, selected_streams)

LOGGER.info('Syncing reports')
sync_reports(account_id, catalog)
await sync_reports(account_id, catalog)

def do_sync_all_accounts(account_ids, catalog):
async def do_sync_all_accounts(account_ids, catalog):
selected_streams = {}
for stream in filter(lambda x: x.is_selected(), catalog.streams):
selected_streams[stream.tap_stream_id] = stream
Expand All @@ -738,10 +741,13 @@ def do_sync_all_accounts(account_ids, catalog):
LOGGER.info('Syncing Accounts')
sync_accounts_stream(account_ids, selected_streams['accounts'])

for account_id in account_ids:
sync_account_data_tasks = [
sync_account_data(account_id, catalog, selected_streams)
for account_id in account_ids
]
await asyncio.gather(*sync_account_data_tasks)

def main_impl():
async def main_impl():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)

CONFIG.update(args.config)
Expand All @@ -752,14 +758,15 @@ def main_impl():
do_discover(account_ids)
LOGGER.info("Discovery complete")
elif args.catalog:
do_sync_all_accounts(account_ids, args.catalog)
await do_sync_all_accounts(account_ids, args.catalog)
LOGGER.info("Sync Completed")
else:
LOGGER.info("No catalog was provided")

def main():
try:
main_impl()
loop = asyncio.get_event_loop()
loop.run_until_complete(main_impl())
except Exception as exc:
LOGGER.critical(exc)
raise exc
Expand Down

0 comments on commit 6bc30fa

Please sign in to comment.