Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modest performance, address #12647 #12656

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working on cython issues
  • Loading branch information
kshedden committed Apr 21, 2016
commit 7e156b7e5a96390fa39884d7f1c16e5234ff0bd8
25 changes: 8 additions & 17 deletions pandas/io/sas/sas7bdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import numpy as np
import struct
import pandas.io.sas.sas_constants as const
from .saslib import (_rle_decompress, _rdc_decompress, _do_read)
from pandas.io.sas.saslib import do_read


class _subheader_pointer(object):
Expand Down Expand Up @@ -550,11 +550,14 @@ def read(self, nrows=None):
nd = (self.column_types == b'd').sum()
ns = (self.column_types == b's').sum()

self._string_chunk = np.empty((ns, nrows), dtype=np.object)
self._string_chunk = []
for j,ct in enumerate(self.column_types):
if ct == b's':
self._string_chunk.append([None] * nrows)
self._byte_chunk = np.empty((nd, 8 * nrows), dtype=np.uint8)

self._current_row_in_chunk_index = 0
_do_read(self, nrows)
do_read(self, nrows)

rslt = self._chunk_to_dataframe()
if self.index is not None:
Expand Down Expand Up @@ -583,16 +586,6 @@ def _read_next_page(self):

return False

def _decompress(self, row_length, page):
page = np.frombuffer(page, dtype=np.uint8)
if self.compression == const.rle_compression:
return _rle_decompress(row_length, page)
elif self.compression == const.rdc_compression:
return _rdc_decompress(row_length, page)
else:
raise ValueError("unknown SAS compression method: %s" %
self.compression)

def _chunk_to_dataframe(self):

n = self._current_row_in_chunk_index
Expand All @@ -614,11 +607,9 @@ def _chunk_to_dataframe(self):
rslt[name] = epoch + pd.to_timedelta(rslt[name], unit='d')
jb += 1
elif self.column_types[j] == b's':
rslt[name] = self._string_chunk[js, :]
rslt[name] = rslt[name].apply(lambda x: x.rstrip(b'\x00 '))
rslt[name] = pd.Series(self._string_chunk[js], dtype=np.object)
if self.convert_text and (self.encoding is not None):
rslt[name] = rslt[name].apply(
lambda x: x.decode(encoding=self.encoding))
rslt[name] = rslt[name].str.decode(self.encoding)
if self.blank_missing:
ii = rslt[name].str.len() == 0
rslt.loc[ii, name] = np.nan
Expand Down
107 changes: 51 additions & 56 deletions pandas/io/sas/saslib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import sas_constants as const
# algorithm. It is partially documented here:
#
# https://cran.r-project.org/web/packages/sas7bdat/vignettes/sas7bdat.pdf
def _rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
cdef np.ndarray[uint8_t, ndim=1] rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):

cdef uint8_t control_byte
cdef uint8_t [:] result = np.zeros(result_length, np.uint8)

cdef int rpos = 0
cdef int ipos = 0
cdef int i
cdef int nbytes
cdef uint8_t x
cdef length = len(inbuff)
cdef:
uint8_t control_byte, x
np.ndarray[uint8_t, ndim=1] result = np.zeros(result_length, np.uint8)
int rpos = 0
int ipos = 0
int i, nbytes
length = len(inbuff)

while ipos < length:
control_byte = inbuff[ipos] & 0xF0
Expand Down Expand Up @@ -107,24 +105,22 @@ def _rle_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
if len(result) != result_length:
print("RLE: %v != %v\n", (len(result), result_length))

return np.asarray(result).tostring()
return np.asarray(result, dtype=np.uint8)


# rdc_decompress decompresses data using the Ross Data Compression algorithm:
#
# http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/CUJ/1992/9210/ross/ross.htm
def _rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):

cdef uint8_t cmd
cdef uint16_t ctrl_bits
cdef uint16_t ctrl_mask = 0
cdef uint16_t ofs
cdef uint16_t cnt
cdef int ipos = 0
cdef int rpos = 0
cdef int k
cdef np.ndarray[uint8_t, ndim=1] rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):

cdef uint8_t [:] outbuff = np.zeros(result_length, dtype=np.uint8)
cdef:
uint8_t cmd, ofs, cnt
uint16_t ctrl_bits
uint16_t ctrl_mask = 0
int ipos = 0
int rpos = 0
int k
np.ndarray[uint8_t, ndim=1] outbuff = np.zeros(result_length, dtype=np.uint8)

ii = -1

Expand Down Expand Up @@ -191,24 +187,33 @@ def _rdc_decompress(int result_length, np.ndarray[uint8_t, ndim=1] inbuff):
if len(outbuff) != result_length:
raise ValueError("RDC: %v != %v\n", len(outbuff), result_length)

return np.asarray(outbuff).tostring()
return np.asarray(outbuff, dtype=np.uint8)


def _do_read(parser, int nrows):
cdef np.ndarray[uint8_t, ndim=1] decompress(object parser, int row_length, uint8_t[:] page):
page = np.frombuffer(page, dtype=np.uint8)
if parser.compression == const.rle_compression:
return rle_decompress(row_length, page)
elif parser.compression == const.rdc_compression:
return rdc_decompress(row_length, page)
else:
raise ValueError("unknown SAS compression method: %s" %
parser.compression)


def do_read(object parser, int nrows):
cdef int i

for i in range(nrows):
done = _readline(parser)
done = readline(parser)
if done:
break


def _readline(parser):
cdef bint readline(object parser):

cdef int offset
cdef int bit_offset
cdef int align_correction
cdef int subheader_pointer_length
cdef:
int offset, bit_offset, align_correction, subheader_pointer_length

bit_offset = parser._page_bit_offset
subheader_pointer_length = parser._subheader_pointer_length
Expand Down Expand Up @@ -236,9 +241,7 @@ def _readline(parser):
parser._current_row_on_page_index])
process_byte_array_with_data(parser,
current_subheader_pointer.offset,
current_subheader_pointer.length,
parser._byte_chunk,
parser._string_chunk)
current_subheader_pointer.length)
return False
elif parser._current_page_type in const.page_mix_types:
align_correction = (bit_offset + const.subheader_pointers_offset +
Expand All @@ -250,9 +253,7 @@ def _readline(parser):
offset += (parser._current_page_subheaders_count *
subheader_pointer_length)
offset += parser._current_row_on_page_index * parser.row_length
process_byte_array_with_data(parser, offset, parser.row_length,
parser._byte_chunk,
parser._string_chunk)
process_byte_array_with_data(parser, offset, parser.row_length)
mn = min(parser.row_count, parser._mix_page_row_count)
if parser._current_row_on_page_index == mn:
done = parser._read_next_page()
Expand All @@ -266,8 +267,7 @@ def _readline(parser):
const.subheader_pointers_offset +
parser._current_row_on_page_index *
parser.row_length,
parser.row_length, parser._byte_chunk,
parser._string_chunk)
parser.row_length)
flag = (parser._current_row_on_page_index ==
parser._current_page_block_count)
if flag:
Expand All @@ -281,25 +281,20 @@ def _readline(parser):
parser._current_page_type)


def process_byte_array_with_data(parser, int offset, int length, uint8_t[:, ::1] byte_chunk,
object[:, ::1] string_chunk):

cdef int s
cdef int j
cdef int k
cdef int m
cdef int start
cdef int jb
cdef int js
cdef int lngt
cdef void process_byte_array_with_data(object parser, int offset, int length):

cdef long[:] lengths = parser._column_data_lengths
cdef long[:] offsets = parser._column_data_offsets
cdef char[:] column_types = parser.column_types
cdef:
int s, j, k, m, start, jb, js, lngt
long[:] lengths = parser._column_data_lengths
long[:] offsets = parser._column_data_offsets
char[:] column_types = parser.column_types
uint8_t[:, :] byte_chunk = parser._byte_chunk
#object[:, :] string_chunk = parser._string_chunk

source = parser._cached_page[offset:offset+length]
source = np.frombuffer(parser._cached_page[offset:offset+length], dtype=np.uint8)
if (parser.compression != "") and (length < parser.row_length):
source = parser._decompress(parser.row_length, source)
source = decompress(parser, parser.row_length, source)
return

s = 8 * parser._current_row_in_chunk_index
js = 0
Expand All @@ -318,10 +313,10 @@ def process_byte_array_with_data(parser, int offset, int length, uint8_t[:, ::1]
byte_chunk[jb, m + k] = source[start + k]
jb += 1
elif column_types[j] == b's':
string_chunk[js, parser._current_row_in_chunk_index] = bytes(source[start:start+lngt])
parser._string_chunk[js][parser._current_row_in_chunk_index] = source[start:(start+lngt)].tostring().rstrip()
js += 1
else:
raise ValueError("unknown column type: %s" % parser.columns[j].ctype)
raise ValueError("unknown column type: %s" % parser.columns[j].ctype)

parser._current_row_on_page_index += 1
parser._current_row_in_chunk_index += 1
Expand Down