forked from singer-io/tap-dynamodb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path__init__.py
105 lines (82 loc) · 3.4 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import json
import sys
import time
from terminaltables import AsciiTable
import singer
from singer import metadata
from tap_dynamodb.discover import discover_streams
from tap_dynamodb.dynamodb import setup_aws_client
from tap_dynamodb.sync import sync_stream
LOGGER = singer.get_logger()
REQUIRED_CONFIG_KEYS = ["region_name"]
def do_discover(config):
LOGGER.info("Starting discover")
streams = discover_streams(config)
if not streams:
raise Exception("No streams found")
catalog = {"streams": streams}
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished discover")
def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)
def do_sync(config, catalog, state):
LOGGER.info('Starting sync.')
counts = {}
sync_times = {}
for stream in catalog['streams']:
start_time = time.time()
stream_name = stream['tap_stream_id']
mdata = metadata.to_map(stream['metadata'])
if not stream_is_selected(mdata):
LOGGER.info("%s: Skipping - not selected", stream_name)
continue
singer.write_state(state)
key_properties = metadata.get(mdata, (), 'table-key-properties')
singer.write_schema(stream_name, stream['schema'], key_properties)
filter_expression = metadata.get(mdata, (), 'FilterExpression')
filter_value = metadata.get(mdata, (), 'ExpressionAttributeValues')
scan_params = {}
if filter_expression and filter_value:
scan_params = { 'FilterExpression': filter_expression, 'ExpressionAttributeValues': json.loads(filter_value) }
config = { **config, **scan_params }
LOGGER.info("Applying scan_params: %s for stream: %s", str(scan_params), stream_name)
LOGGER.info("%s: Starting sync", stream_name)
counts[stream_name] = sync_stream(config, state, stream)
sync_times[stream_name] = time.time() - start_time
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counts[stream_name])
LOGGER.info(get_sync_summary(catalog, counts, sync_times))
LOGGER.info('Done syncing.')
def get_sync_summary(catalog, counts, times):
headers = [['table name',
'replication method',
'total records',
'write speed']]
rows = []
for stream_id, stream_count in counts.items():
stream = [x for x in catalog['streams'] if x['tap_stream_id'] == stream_id][0]
md_map = metadata.to_map(stream['metadata'])
replication_method = metadata.get(md_map, (), 'replication-method')
stream_time = times[stream_id]
if stream_time == 0:
stream_time = 0.000001
row = [stream_id,
replication_method,
'{} records'.format(stream_count),
'{:.1f} records/second'.format(stream_count/stream_time)]
rows.append(row)
data = headers + rows
table = AsciiTable(data, title='Sync Summary')
return '\n\n' + table.table
@singer.utils.handle_top_exception(LOGGER)
def main():
args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
config = args.config
# TODO Is this the right way to do this? It seems bad
#if not config.get('use_local_dynamo'):
# setup_aws_client(config)
if args.discover:
do_discover(args.config)
elif args.catalog:
do_sync(config, args.catalog.to_dict(), args.state)
if __name__ == '__main__':
main()