Skip to content

support json type #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions proton_driver/block.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .reader import read_varint, read_binary_uint8, read_binary_int32
from .varint import write_varint
from .writer import write_binary_uint8, write_binary_int32
from .columns import nestedcolumn
from .columns.util import get_inner_columns_with_types


class BlockInfo(object):
Expand Down Expand Up @@ -172,7 +172,7 @@ def _pure_mutate_dicts_to_rows(
for name, type_ in columns_with_types:
cwt = None
if type_.startswith('nested'):
cwt = nestedcolumn.get_columns_with_types(type_)
cwt = get_inner_columns_with_types('nested', type_)
columns_with_cwt.append((name, cwt))

for i, row in enumerate(data):
Expand Down
11 changes: 10 additions & 1 deletion proton_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class Client(object):
* ``quota_key`` -- A string to differentiate quotas when the user have
keyed quotas configured on server.
New in version *0.2.3*.
* ``namedtuple_as_json`` -- Controls named tuple and nested types
deserialization. To interpret these column as
Python tuple set ``namedtuple_as_json``
to ``False``. Default: False.
New in version *0.2.12*.
"""

available_client_settings = (
Expand All @@ -58,7 +63,8 @@ class Client(object):
'use_numpy',
'opentelemetry_traceparent',
'opentelemetry_tracestate',
'quota_key'
'quota_key',
'namedtuple_as_json'
)

def __init__(self, *args, **kwargs):
Expand All @@ -85,6 +91,9 @@ def __init__(self, *args, **kwargs):
),
'quota_key': self.settings.pop(
'quota_key', ''
),
'namedtuple_as_json': self.settings.pop(
'namedtuple_as_json', False
)
}

Expand Down
27 changes: 19 additions & 8 deletions proton_driver/columns/arraycolumn.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,31 @@ class ArrayColumn(Column):
py_types = (list, tuple)

def __init__(self, nested_column, **kwargs):
self.size_column = UInt64Column()
self.init_kwargs = kwargs
self.size_column = UInt64Column(**kwargs)
self.nested_column = nested_column
self._write_depth_0_size = True
super(ArrayColumn, self).__init__(**kwargs)
self.null_value = []

def write_data(self, data, buf):
# Column of Array(T) is stored in "compact" format and passed to server
# wrapped into another Array without size of wrapper array.
self.nested_column = ArrayColumn(self.nested_column)
self.nested_column = ArrayColumn(
self.nested_column, **self.init_kwargs
)
self.nested_column.nullable = self.nullable
self.nullable = False
self._write_depth_0_size = False
self._write(data, buf)

def read_data(self, rows, buf):
self.nested_column = ArrayColumn(self.nested_column)
def read_data(self, n_rows, buf):
self.nested_column = ArrayColumn(
self.nested_column, **self.init_kwargs
)
self.nested_column.nullable = self.nullable
self.nullable = False
return self._read(rows, buf)[0]
return self._read(n_rows, buf)[0]

def _write_sizes(self, value, buf):
nulls_map = []
Expand Down Expand Up @@ -99,14 +105,19 @@ def _write_nulls_data(self, value, buf):
self.nested_column._write_nulls_map(value, buf)

def _write(self, value, buf):
value = self.prepare_items(value)
self._write_sizes(value, buf)
self._write_nulls_data(value, buf)
self._write_data(value, buf)

def read_state_prefix(self, buf):
return self.nested_column.read_state_prefix(buf)
super(ArrayColumn, self).read_state_prefix(buf)

self.nested_column.read_state_prefix(buf)

def write_state_prefix(self, buf):
super(ArrayColumn, self).write_state_prefix(buf)

self.nested_column.write_state_prefix(buf)

def _read(self, size, buf):
Expand Down Expand Up @@ -145,6 +156,6 @@ def _read(self, size, buf):
return tuple(data)


def create_array_column(spec, column_by_spec_getter):
def create_array_column(spec, column_by_spec_getter, column_options):
inner = spec[6:-1]
return ArrayColumn(column_by_spec_getter(inner))
return ArrayColumn(column_by_spec_getter(inner), **column_options)
39 changes: 39 additions & 0 deletions proton_driver/columns/jsoncolumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from .base import Column
from .stringcolumn import String
from ..reader import read_binary_uint8, read_binary_str
from ..util.compat import json
from ..writer import write_binary_uint8


class JsonColumn(Column):
py_types = (dict, )

# No NULL value actually
null_value = {}

def __init__(self, column_by_spec_getter, **kwargs):
self.column_by_spec_getter = column_by_spec_getter
self.string_column = String(**kwargs)
super(JsonColumn, self).__init__(**kwargs)

def write_state_prefix(self, buf):
# Read in binary format.
# Write in text format.
write_binary_uint8(1, buf)

def read_items(self, n_items, buf):
read_binary_uint8(buf)
spec = read_binary_str(buf)
col = self.column_by_spec_getter(
spec, dict(namedtuple_as_json=True)
)
col.read_state_prefix(buf)
return col.read_data(n_items, buf)

def write_items(self, items, buf):
items = [x if isinstance(x, str) else json.dumps(x) for x in items]
self.string_column.write_items(items, buf)


def create_json_column(spec, column_by_spec_getter, column_options):
return JsonColumn(column_by_spec_getter, **column_options)
71 changes: 4 additions & 67 deletions proton_driver/columns/nestedcolumn.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,10 @@

from .arraycolumn import create_array_column
from .util import get_inner_spec


def create_nested_column(spec, column_by_spec_getter):
def create_nested_column(spec, column_by_spec_getter, column_options):
return create_array_column(
'array(tuple({}))'.format(','.join(get_nested_columns(spec))),
column_by_spec_getter=column_by_spec_getter
'array(tuple({}))'.format(get_inner_spec('nested', spec)),
column_by_spec_getter, column_options
)


def get_nested_columns(spec):
brackets = 0
column_begin = 0

inner_spec = get_inner_spec(spec)
nested_columns = []
for i, x in enumerate(inner_spec + ','):
if x == ',':
if brackets == 0:
nested_columns.append(inner_spec[column_begin:i])
column_begin = i + 1
elif x == '(':
brackets += 1
elif x == ')':
brackets -= 1
elif x == ' ':
if brackets == 0:
column_begin = i + 1
return nested_columns


def get_columns_with_types(spec):
brackets = 0
prev_comma = 0
prev_space = 0

inner_spec = get_inner_spec(spec)
columns_with_types = []

for i, x in enumerate(inner_spec + ','):
if x == ',':
if brackets == 0:
columns_with_types.append((
inner_spec[prev_comma:prev_space].strip(),
inner_spec[prev_space:i]
))
prev_comma = i + 1
elif x == '(':
brackets += 1
elif x == ')':
brackets -= 1
elif x == ' ':
if brackets == 0:
prev_space = i + 1
return columns_with_types


def get_inner_spec(spec):
brackets = 0
offset = len('nested')
i = offset
for i, ch in enumerate(spec[offset:], offset):
if ch == '(':
brackets += 1

elif ch == ')':
brackets -= 1

if brackets == 0:
break

return spec[offset + 1:i]
23 changes: 19 additions & 4 deletions proton_driver/columns/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
IntervalSecondColumn
)
from .ipcolumn import IPv4Column, IPv6Column
from .jsoncolumn import create_json_column


column_by_type = {c.ch_type: c for c in [
Expand Down Expand Up @@ -64,7 +65,11 @@ def get_column_by_spec(spec, column_options, use_numpy=None):
logger.warning('NumPy support is not implemented for %s. '
'Using generic column', spec)

def create_column_with_options(x):
def create_column_with_options(x, settings=None):
if settings:
client_settings = column_options['context'].client_settings
client_settings.update(settings)
column_options['context'].client_settings = client_settings
return get_column_by_spec(x, column_options, use_numpy=use_numpy)

if spec == 'string' or spec.startswith('fixed_string'):
Expand All @@ -80,13 +85,23 @@ def create_column_with_options(x):
return create_decimal_column(spec, column_options)

elif spec.startswith('array'):
return create_array_column(spec, create_column_with_options)
return create_array_column(
spec, create_column_with_options, column_options
)

elif spec.startswith('tuple'):
return create_tuple_column(spec, create_column_with_options)
return create_tuple_column(
spec, create_column_with_options, column_options
)
elif spec.startswith('json'):
return create_json_column(
spec, create_column_with_options, column_options
)

elif spec.startswith('nested'):
return create_nested_column(spec, create_column_with_options)
return create_nested_column(
spec, create_column_with_options, column_options
)

elif spec.startswith('nullable'):
return create_nullable_column(spec, create_column_with_options)
Expand Down
63 changes: 29 additions & 34 deletions proton_driver/columns/tuplecolumn.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@

from .base import Column
from .util import get_inner_columns_with_types


class TupleColumn(Column):
py_types = (list, tuple)

def __init__(self, nested_columns, **kwargs):
def __init__(self, names, nested_columns, **kwargs):
self.names = names
self.nested_columns = nested_columns
client_settings = kwargs['context'].client_settings
self.namedtuple_as_json = client_settings.get(
'namedtuple_as_json', False
)

super(TupleColumn, self).__init__(**kwargs)
self.null_value = tuple(x.null_value for x in nested_columns)

def write_data(self, items, buf):
items = self.prepare_items(items)
items = list(zip(*items))

for i, x in enumerate(self.nested_columns):
Expand All @@ -20,46 +29,32 @@ def write_items(self, items, buf):

def read_data(self, n_items, buf):
rv = [x.read_data(n_items, buf) for x in self.nested_columns]
return list(zip(*rv))
rv = list(zip(*rv))

if self.names[0] and self.namedtuple_as_json:
return [dict(zip(self.names, x)) for x in rv]
else:
return rv

def read_items(self, n_items, buf):
return self.read_data(n_items, buf)

def read_state_prefix(self, buf):
super(TupleColumn, self).read_state_prefix(buf)

def create_tuple_column(spec, column_by_spec_getter):
brackets = 0
column_begin = 0

inner_spec = get_inner_spec(spec)
nested_columns = []
for i, x in enumerate(inner_spec + ','):
if x == ',':
if brackets == 0:
nested_columns.append(inner_spec[column_begin:i])
column_begin = i + 1
elif x == '(':
brackets += 1
elif x == ')':
brackets -= 1
elif x == ' ':
if brackets == 0:
column_begin = i + 1

return TupleColumn([column_by_spec_getter(x) for x in nested_columns])
for x in self.nested_columns:
x.read_state_prefix(buf)

def write_state_prefix(self, buf):
super(TupleColumn, self).write_state_prefix(buf)

def get_inner_spec(spec):
brackets = 0
offset = len('tuple')
i = offset
for i, ch in enumerate(spec[offset:], offset):
if ch == '(':
brackets += 1
for x in self.nested_columns:
x.write_state_prefix(buf)

elif ch == ')':
brackets -= 1

if brackets == 0:
break
def create_tuple_column(spec, column_by_spec_getter, column_options):
columns_with_types = get_inner_columns_with_types('tuple', spec)
names, types = zip(*columns_with_types)

return spec[offset + 1:i]
return TupleColumn(names, [column_by_spec_getter(x) for x in types],
**column_options)
Loading