This repository was archived by the owner on Feb 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
62 lines (56 loc) · 1.88 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import logging
import pandas as pd
from tqdm import tqdm
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.sql.expression import ClauseElement
def sql2df(dsn_or_engin: [str, Engine],
sql: [ClauseElement, str],
debug: bool = False,
chunksize: int = None,
**kwargs):
"""Generate DataFrame from sql result"""
if isinstance(dsn_or_engin, str):
# Use server side cursors by passing server_side_cursors=True
engine = create_engine(dsn_or_engin, server_side_cursors=True)
else:
engine = dsn_or_engin
if debug:
logging.info(sql.compile(engine, compile_kwargs={'literal_binds': True}))
with engine.connect() as conn:
if chunksize:
frames = pd.read_sql(sql, conn, chunksize=chunksize, **kwargs)
data = [i for i in frames]
if data:
return pd.concat(data)
return pd.read_sql(sql, conn, **kwargs)
def iter_2_df(iterator,
chunk_size: int = 0,
func=None,
**kwargs) -> pd.DataFrame:
"""Turn an Mongo iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
If no result, return empty pandas.DataFrame
Args:
iterator: an iterator
chunk_size: the row size of each small pandas.DataFrame, 0 means no chunk
func: generator to transform each record
kwargs: extra parameters passed to tqdm.tqdm
Returns:
pandas.DataFrame
"""
records = []
frames = []
for i, record in enumerate(tqdm(iterator, **kwargs)):
if func:
for new_record in func(record):
records.append(new_record)
else:
records.append(record)
if chunk_size and (i % chunk_size == chunk_size - 1):
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
frames = pd.concat(frames) if frames else pd.DataFrame()
return frames