Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client-py/SessionExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@
)

# execute sql query statement
with session.execute_query_statement("select * from root.sg_test_01.d_01") as session_data_set:
with session.execute_query_statement(
"select * from root.sg_test_01.d_01"
) as session_data_set:
session_data_set.set_fetch_size(1024)
while session_data_set.has_next():
print(session_data_set.next())
Expand Down
3 changes: 2 additions & 1 deletion client-py/iotdb/utils/BitMap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
# under the License.
#


class BitMap(object):
BIT_UTIL = [1, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5, 1 << 6, 1 << 7]

def __init__(self, size):
self.__size = size
self.bits = []
for i in range (size // 8 + 1):
for i in range(size // 8 + 1):
self.bits.append(0)

def mark(self, position):
Expand Down
1 change: 1 addition & 0 deletions client-py/iotdb/utils/IoTDBConstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TSEncoding(Enum):
REGULAR = 7
GORILLA = 8


@unique
class Compressor(Enum):
UNCOMPRESSED = 0
Expand Down
138 changes: 137 additions & 1 deletion client-py/iotdb/utils/IoTDBRpcDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
#

# for package
import binascii
import logging

import numpy as np
import pandas as pd
from thrift.transport import TTransport
from iotdb.thrift.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq
from iotdb.utils.IoTDBConstants import TSDataType
Expand Down Expand Up @@ -111,7 +114,9 @@ def close(self):
if self.__client is not None:
try:
status = self.__client.closeOperation(
TSCloseOperationReq(self.__session_id, self.__query_id, self.__statement_id)
TSCloseOperationReq(
self.__session_id, self.__query_id, self.__statement_id
)
)
logger.debug(
"close session {}, message: {}".format(
Expand Down Expand Up @@ -142,6 +147,137 @@ def has_cached_result(self):
len(self.__query_data_set.time) != 0
)

def _has_next_result_set(self):
if self.has_cached_result():
return True
if self.__empty_resultSet:
return False
if self.fetch_results():
return True
return False

def _to_bitstring(self, b):
return "{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b))

def resultset_to_pandas(self):
result = {}
for column_name in self.__column_name_list:
result[column_name] = None
while self._has_next_result_set():
time_array = np.frombuffer(
self.__query_data_set.time, np.dtype(np.longlong).newbyteorder(">")
)
if time_array.dtype.byteorder == ">":
time_array = time_array.byteswap().newbyteorder("<")
if (
self.get_ignore_timestamp() is None
or self.get_ignore_timestamp() is False
):
if result[IoTDBRpcDataSet.TIMESTAMP_STR] is None:
result[IoTDBRpcDataSet.TIMESTAMP_STR] = time_array
else:
result[IoTDBRpcDataSet.TIMESTAMP_STR] = np.concatenate(
(result[IoTDBRpcDataSet.TIMESTAMP_STR], time_array), axis=0
)
self.__query_data_set.time = []
total_length = len(time_array)

for i in range(len(self.__query_data_set.bitmapList)):
if self.get_ignore_timestamp() is True:
column_name = self.get_column_names()[i]
else:
column_name = self.get_column_names()[i + 1]

location = (
self.__column_ordinal_dict[column_name]
- IoTDBRpcDataSet.START_INDEX
)
if location < 0:
continue
data_type = self.__column_type_deduplicated_list[location]
value_buffer = self.__query_data_set.valueList[location]
value_buffer_len = len(value_buffer)

data_array = None
if data_type == TSDataType.DOUBLE:
data_array = np.frombuffer(
value_buffer, np.dtype(np.double).newbyteorder(">")
)
elif data_type == TSDataType.FLOAT:
data_array = np.frombuffer(
value_buffer, np.dtype(np.float32).newbyteorder(">")
)
elif data_type == TSDataType.BOOLEAN:
data_array = np.frombuffer(value_buffer, np.dtype("?"))
elif data_type == TSDataType.INT32:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int32).newbyteorder(">")
)
elif data_type == TSDataType.INT64:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int64).newbyteorder(">")
)
elif data_type == TSDataType.TEXT:
j = 0
offset = 0
data_array = []
while offset < value_buffer_len:
length = int.from_bytes(
value_buffer[offset : offset + 4],
byteorder="big",
signed=False,
)
offset += 4
value_bytes = value_buffer[offset : offset + length]
value = value_bytes.decode("utf-8")
data_array.append(value)
j += 1
offset += length
data_array = np.array(data_array, dtype=np.object)
else:
raise RuntimeError("unsupported data type {}.".format(data_type))
if data_array.dtype.byteorder == ">":
data_array = data_array.byteswap().newbyteorder("<")
self.__query_data_set.valueList[location] = None

if len(data_array) < total_length:
if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
tmp_array = np.full(total_length, np.nan, np.float32)
if data_array.dtype == np.int32:
tmp_array = pd.Series(tmp_array).astype("Int32")
else:
tmp_array = pd.Series(tmp_array).astype("Int64")
elif (
data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE
):
tmp_array = np.full(total_length, np.nan, data_array.dtype)
elif data_type == TSDataType.BOOLEAN:
tmp_array = np.full(total_length, np.nan, np.float32)
tmp_array = pd.Series(tmp_array).astype("boolean")
elif data_type == TSDataType.TEXT:
tmp_array = np.full(total_length, None, dtype=data_array.dtype)
bitmap_buffer = self.__query_data_set.bitmapList[location]
bitmap_str = self._to_bitstring(bitmap_buffer)
j = 0
for index in range(total_length):
if bitmap_str[index] == "1":
tmp_array[index] = data_array[j]
j += 1
data_array = tmp_array

if result[column_name] is None:
result[column_name] = data_array
else:
result[column_name] = np.concatenate(
(result[column_name], data_array), axis=0
)
for k, v in result.items():
if v is None:
result[k] = []

df = pd.DataFrame(result)
return df

def construct_one_row(self):
# simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
self.__time_bytes = self.__query_data_set.time[:8]
Expand Down
26 changes: 1 addition & 25 deletions client-py/iotdb/utils/SessionDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,31 +147,7 @@ def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
:param result_set:
:return:
"""
# get column names and fields
column_names = result_set.get_column_names()

value_dict = {}

if "Time" in column_names:
offset = 1
else:
offset = 0

for i in range(len(column_names)):
value_dict[column_names[i]] = []

while result_set.has_next():
record = result_set.next()

if "Time" in column_names:
value_dict["Time"].append(record.get_timestamp())

for col in range(len(record.get_fields())):
field: Field = record.get_fields()[col]

value_dict[column_names[col + offset]].append(get_typed_point(field))

return pd.DataFrame(value_dict)
return result_set.iotdb_rpc_data_set.resultset_to_pandas()


def get_typed_point(field: Field, none_value=None):
Expand Down
10 changes: 7 additions & 3 deletions client-py/iotdb/utils/Tablet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@


class Tablet(object):
def __init__(self, device_id, measurements, data_types, values, timestamps, use_new=False):
def __init__(
self, device_id, measurements, data_types, values, timestamps, use_new=False
):
"""
creating a tablet for insertion
for example, considering device: root.sg1.d1
Expand Down Expand Up @@ -176,7 +178,9 @@ def get_binary_values(self):
has_none = True

else:
raise RuntimeError("Unsupported data type:" + str(self.__data_types[i]))
raise RuntimeError(
"Unsupported data type:" + str(self.__data_types[i])
)

if has_none:
for i in range(self.__column_number):
Expand Down Expand Up @@ -216,7 +220,7 @@ def get_binary_values(self):
offset = 0
for bs in bs_list:
_l = len(bs)
ret[offset:offset + _l] = bs
ret[offset : offset + _l] = bs
offset += _l
return ret

Expand Down
Loading