-
Notifications
You must be signed in to change notification settings - Fork 3
/
records.py
49 lines (34 loc) · 1.46 KB
/
records.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from typing import Dict
import marshmallow as ma
import pandas as pd
from .base import DataFrameSchemaMeta, DataFrameSchemaOpts
from .converters import dtype_to_field
class RecordsDataFrameSchemaMeta(DataFrameSchemaMeta):
@classmethod
def get_fields(
mcs, opts: DataFrameSchemaOpts, dict_cls
) -> Dict[str, ma.fields.Field]:
if opts.dtypes is not None:
# create marshmallow fields
input_fields = {
k: dtype_to_field(v)
for k, v in zip(opts.dtypes.columns, opts.dtypes.dtypes)
}
# create schema dynamically
RecordSchema = type("RecordSchema", (ma.Schema,), input_fields)
fields: Dict[str, ma.fields.Field] = dict_cls()
fields["data"] = ma.fields.Nested(
RecordSchema, many=True, required=True
)
return fields
return dict_cls()
class RecordsDataFrameSchema(ma.Schema, metaclass=RecordsDataFrameSchemaMeta):
"""Schema to generate pandas DataFrame from list of records"""
OPTIONS_CLASS = DataFrameSchemaOpts
@ma.post_load
def make_df(self, data: dict, **kwargs) -> pd.DataFrame:
records_data = data["data"]
index_data = {i: row for i, row in enumerate(records_data)}
return pd.DataFrame.from_dict(
index_data, orient="index", columns=self.opts.dtypes.columns
).astype(dict(zip(self.opts.dtypes.columns, self.opts.dtypes.dtypes)))