-
Notifications
You must be signed in to change notification settings - Fork 7
Issue139/monetdbe append #167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
914acd1
2e6509c
bcd5547
c1a5f44
d7cbde9
3422937
29993be
d022499
9d640e1
a53e21b
80e3857
7131ecf
bd93fb4
c674d6d
3772f27
e47266d
8b0d6c2
8377c4b
2b2ba8e
8653534
b82f2a1
c16b604
1008080
3c30f57
81be071
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,10 +6,9 @@ | |
| from collections import namedtuple | ||
|
|
||
| import numpy as np | ||
|
|
||
| from monetdbe._lowlevel import ffi, lib | ||
| from monetdbe import exceptions | ||
| from monetdbe._cffi.convert import make_string, monet_c_type_map, extract, numpy_monetdb_map, precision_warning | ||
| from monetdbe._cffi.convert import make_string, monet_c_type_map, extract, numpy_monetdb_map, precision_warning, timestamp_to_date, get_null_value | ||
| from monetdbe._cffi.convert.bind import monetdbe_decimal_to_bte, monetdbe_decimal_to_sht, monetdbe_decimal_to_int, monetdbe_decimal_to_lng, prepare_bind | ||
| from monetdbe._cffi.errors import check_error | ||
| from monetdbe._cffi.types_ import monetdbe_result, monetdbe_database, monetdbe_column, monetdbe_statement | ||
|
|
@@ -33,28 +32,27 @@ def result_fetch_numpy(result: monetdbe_result) -> Mapping[str, np.ndarray]: | |
| name = make_string(rcol.name) | ||
| type_info = monet_c_type_map[rcol.type] | ||
|
|
||
| np_mask = np.ma.nomask # type: ignore[attr-defined] | ||
| # for non float/int we for now first make a numpy object array which we then convert to the right numpy type | ||
| if type_info.numpy_type.type == np.object_: | ||
| np_col: np.ndarray = np.array([extract(rcol, r) for r in range(result.nrows)]) | ||
| values = [extract(rcol, r) for r in range(result.nrows)] | ||
| np_col: np.ndarray = np.array(values) | ||
| np_mask = np.array([v is None for v in values]) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me, this seems almost the same except for an extra list comprehension. my guess is you did this to get rid of the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added an issue for this #168 |
||
| if rcol.type == lib.monetdbe_str: | ||
| np_col = np_col.astype(str) | ||
| elif rcol.type == lib.monetdbe_date: | ||
| np_col = np_col.astype('datetime64[D]') # type: ignore | ||
| elif rcol.type == lib.monetdbe_time: | ||
| warn("Not converting column with type column since no proper numpy equivalent") | ||
| elif rcol.type == lib.monetdbe_timestamp: | ||
| np_col = np_col.astype('datetime64[ns]') # type: ignore | ||
| np_col = np_col.astype('datetime64[ms]') # type: ignore | ||
| else: | ||
| buffer_size = result.nrows * type_info.numpy_type.itemsize # type: ignore | ||
| c_buffer = ffi.buffer(rcol.data, buffer_size) | ||
| np_col = np.frombuffer(c_buffer, dtype=type_info.numpy_type) # type: ignore | ||
| np_mask = np_col == get_null_value(rcol) | ||
|
|
||
| if type_info.null_value: | ||
| mask = np_col == type_info.null_value | ||
| else: | ||
| mask = np.ma.nomask # type: ignore[attr-defined] | ||
|
|
||
| masked: np.ndarray = np.ma.masked_array(np_col, mask=mask) | ||
| masked: np.ndarray = np.ma.masked_array(np_col, mask=np_mask) | ||
|
|
||
| result_dict[name] = masked | ||
| return result_dict | ||
|
|
@@ -248,7 +246,6 @@ def append(self, table: str, data: Mapping[str, np.ndarray], schema: str = 'sys' | |
| """ | ||
| Directly append an array structure | ||
| """ | ||
|
|
||
| self._switch() | ||
| n_columns = len(data) | ||
| existing_columns = list(self.get_columns(schema=schema, table=table)) | ||
|
|
@@ -259,31 +256,49 @@ def append(self, table: str, data: Mapping[str, np.ndarray], schema: str = 'sys' | |
| raise exceptions.ProgrammingError(error) | ||
|
|
||
| work_columns = ffi.new(f'monetdbe_column * [{n_columns}]') | ||
| work_objs = [] | ||
| # cffi_objects assists to keep all in-memory native data structure alive during the execution of this call | ||
| cffi_objects = [] | ||
| cffi_objects = list() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think in an other PR or a previous commit i merged |
||
| for column_num, (column_name, existing_type) in enumerate(existing_columns): | ||
| column_values = data[column_name] | ||
| work_column = ffi.new('monetdbe_column *') | ||
| type_info = numpy_monetdb_map(column_values.dtype) | ||
|
|
||
| # try to convert the values if types don't match | ||
| if type_info.c_type != existing_type: | ||
| precision_warning(type_info.c_type, existing_type) | ||
| to_numpy_type = monet_c_type_map[existing_type].numpy_type | ||
| try: | ||
| column_values = column_values.astype(to_numpy_type) | ||
| type_info = numpy_monetdb_map(column_values.dtype) | ||
| except Exception as e: | ||
| existing_type_string = monet_c_type_map[existing_type].c_string_type | ||
| error = f"Can't convert '{type_info.c_string_type}' " \ | ||
| f"to type '{existing_type_string}' for column '{column_name}': {e} " | ||
| raise ValueError(error) | ||
| if type_info.c_type == lib.monetdbe_timestamp and existing_type == lib.monetdbe_date and np.issubdtype(column_values.dtype, np.datetime64): | ||
| """ | ||
| We are going to cast to a monetdbe_date and | ||
| consider monetdbe_timestamp as a 'base type' to signal this. | ||
| """ | ||
| type_info = timestamp_to_date() | ||
| else: | ||
| precision_warning(type_info.c_type, existing_type) | ||
| to_numpy_type = monet_c_type_map[existing_type].numpy_type | ||
| try: | ||
| column_values = column_values.astype(to_numpy_type) | ||
| type_info = numpy_monetdb_map(column_values.dtype) | ||
| except Exception as e: | ||
| existing_type_string = monet_c_type_map[existing_type].c_string_type | ||
| error = f"Can't convert '{type_info.c_string_type}' " \ | ||
| f"to type '{existing_type_string}' for column '{column_name}': {e} " | ||
| raise ValueError(error) | ||
|
|
||
| work_column.type = type_info.c_type | ||
| work_column.count = column_values.shape[0] | ||
| work_column.name = ffi.new('char[]', column_name.encode()) | ||
| if type_info.numpy_type.kind == 'U': | ||
| if type_info.numpy_type.kind == 'M': | ||
| t = ffi.new('monetdbe_data_timestamp[]', work_column.count) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just a comment, no action required, but this function is getting a bit messy, we should split it up in the future. |
||
| cffi_objects.append(t) | ||
| unit = np.datetime_data(column_values.dtype)[0].encode() | ||
| p = ffi.from_buffer("int64_t*", column_values) | ||
|
|
||
| lib.initialize_timestamp_array_from_numpy(self._monetdbe_database, t, work_column.count, p, unit, existing_type) | ||
| work_column.data = t | ||
| elif type_info.numpy_type.kind == 'U': | ||
| # first massage the numpy array of unicode into a matrix of null terminated rows of bytes. | ||
| m = ffi.from_buffer("bool*", column_values.mask) if np.ma.isMaskedArray(column_values) else 0 # type: ignore[attr-defined] | ||
| cffi_objects.append(m) | ||
| v = np.char.encode(column_values).view('b').reshape((work_column.count, -1)) | ||
| v = np.c_[v, np.zeros(work_column.count, dtype=np.int8)] | ||
| stride_length = v.shape[1] | ||
|
|
@@ -292,12 +307,14 @@ def append(self, table: str, data: Mapping[str, np.ndarray], schema: str = 'sys' | |
| cffi_objects.append(t) | ||
| p = ffi.from_buffer("char*", v) | ||
| cffi_objects.append(p) | ||
| lib.initialize_string_array_from_numpy(t, work_column.count, p, stride_length) | ||
| lib.initialize_string_array_from_numpy(t, work_column.count, p, stride_length, ffi.cast("bool*", m)) | ||
| work_column.data = t | ||
| else: | ||
| work_column.data = ffi.from_buffer(f"{type_info.c_string_type}*", column_values) | ||
| p = ffi.from_buffer(f"{type_info.c_string_type}*", column_values) | ||
| cffi_objects.append(p) | ||
| work_column.data = p | ||
| work_columns[column_num] = work_column | ||
| cffi_objects.append(work_column) | ||
| work_objs.append(work_column) | ||
| check_error(lib.monetdbe_append(self._monetdbe_database, schema.encode(), | ||
| table.encode(), work_columns, n_columns)) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you removed that to fix mypy complaining, numpy added new type annotations, and in master i fixed the type annotation for the null field (an optional
np.floatableI believe)