|
13 | 13 |
|
14 | 14 | def clear_by_source(engine: Engine, table_name, source):
|
15 | 15 | index_name = table_name + '__s'
|
| 16 | + |
16 | 17 | def func(package):
|
| 18 | + with engine.connect() as conn: |
| 19 | + s = text('create index concurrently ' + |
| 20 | + f'{index_name} on {table_name} (_source)') |
| 21 | + try: |
| 22 | + logger.info('CREATING INDEX') |
| 23 | + conn.execute(s) |
| 24 | + logger.info('DONE CREATING INDEX') |
| 25 | + except ProgrammingError as e: |
| 26 | + logger.error('Failed to create index %s', e) |
| 27 | + s = text(f'delete from {table_name} where _source=:source' |
| 28 | + ).params(source=source) |
| 29 | + try: |
| 30 | + logger.info('DELETING PAST ROWS') |
| 31 | + conn.execute(s) |
| 32 | + logger.info('DONE DELETING') |
| 33 | + except ProgrammingError as e: |
| 34 | + logger.error('Failed to remove rows %s', e) |
17 | 35 | yield package.pkg
|
18 |
| - for i, resource in enumerate(package): |
19 |
| - if i == 0: |
20 |
| - with engine.connect() as conn: |
21 |
| - s = text('create index concurrently ' + |
22 |
| - f'{index_name} on {table_name} (_source)') |
23 |
| - try: |
24 |
| - logger.info('CREATING INDEX') |
25 |
| - conn.execute(s) |
26 |
| - logger.info('DONE CREATING INDEX') |
27 |
| - except ProgrammingError as e: |
28 |
| - logger.error('Failed to create index %s', e) |
29 |
| - s = text(f'delete from {table_name} where _source=:source' |
30 |
| - ).params(source=source) |
31 |
| - try: |
32 |
| - logger.info('DELETING PAST ROWS') |
33 |
| - conn.execute(s) |
34 |
| - logger.info('DONE DELETING') |
35 |
| - except ProgrammingError as e: |
36 |
| - logger.error('Failed to remove rows %s', e) |
37 |
| - yield resource |
| 36 | + yield from package |
38 | 37 |
|
39 | 38 | return func
|
40 | 39 |
|
|
0 commit comments