Skip to content

Optimize logging calls #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,20 @@ def __init__(self, addrportstr, apikey=None):
else:
self.channel = grpc.insecure_channel(addrportstr, chan_ops)
if apikey is not None:

class AuthCallDetails(grpc.ClientCallDetails):
def __init__(self, apikey, client_call_details):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append(
('authorization', "Bearer " + apikey)
)
metadata.append(("authorization", "Bearer " + apikey))
self.method = client_call_details.method
self.timeout = client_call_details.timeout
self.credentials = client_call_details.credentials
self.wait_for_ready = client_call_details.wait_for_ready
self.compression = client_call_details.compression
self.metadata = metadata

class AuthorizationInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
Expand All @@ -132,28 +132,53 @@ class AuthorizationInterceptor(
):
def __init__(self, apikey):
self.apikey = apikey
def intercept_unary_unary(self, continuation, client_call_details, request):
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
def intercept_unary_stream(self, continuation, client_call_details, request):
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return continuation(AuthCallDetails(self.apikey, client_call_details), request)
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return continuation(AuthCallDetails(self.apikey, client_call_details), request)

def intercept_unary_unary(
self, continuation, client_call_details, request
):
return continuation(
AuthCallDetails(self.apikey, client_call_details), request
)

def intercept_unary_stream(
self, continuation, client_call_details, request
):
return continuation(
AuthCallDetails(self.apikey, client_call_details), request
)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return continuation(
AuthCallDetails(self.apikey, client_call_details),
request_iterator,
)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return continuation(
AuthCallDetails(self.apikey, client_call_details),
request_iterator,
)

self.channel = grpc.intercept_channel(
self.channel,
AuthorizationInterceptor(apikey),
)


def _is_arrow_enabled(info):
info = {
"majorVersion": info.majorVersion,
"minorVersion": info.minorVersion,
}
major = info.get("majorVersion", -1)
minor = info.get("minorVersion", -1)
logger.debug(f"major version: {major}")
logger.debug(f"minor version: {minor}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"major version: {major}")
logger.debug(f"minor version: {minor}")
if major >= 5 and minor >= 30:
return True
else:
Expand All @@ -169,7 +194,8 @@ def __init__(self, endpoint):
self.ep = endpoint
self._executor = ThreadPoolExecutor()
self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info())
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")

def query(self, stmt, params=[]):
"""
Expand Down
13 changes: 9 additions & 4 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,19 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):

@error_handler
def nearest(self, uu, time, version, backward):
logger.debug(f"nearest function params: {uu}\t{time}\t{version}\t{backward}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"nearest function params: {uu}\t{time}\t{version}\t{backward}"
)
params = btrdb_pb2.NearestParams(
uuid=uu.bytes, time=time, versionMajor=version, backward=backward
)
logger.debug(f"params from nearest: {params}")
logger.debug(f"uuid: {uu}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"params from nearest: {params}")
logger.debug(f"uuid: {uu}")
result = self.stub.Nearest(params)
logger.debug(f"nearest, results: {result}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"nearest, results: {result}")
check_proto_stat(result.stat)
return result.value, result.versionMajor

Expand Down
45 changes: 30 additions & 15 deletions btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
## Module Variables
##########################################################################
logger = logging.getLogger(__name__)
IS_DEBUG = logger.isEnabledFor(logging.DEBUG)
INSERT_BATCH_SIZE = 50000
MINIMUM_TIME = -(16 << 56)
MAXIMUM_TIME = (48 << 56) - 1
Expand Down Expand Up @@ -511,7 +512,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int:
chunksize = INSERT_BATCH_SIZE
assert isinstance(data, pa.Table)
tmp_table = data.rename_columns(["time", "value"])
logger.debug(f"tmp_table schema: {tmp_table.schema}")
if IS_DEBUG:
logger.debug(f"tmp_table schema: {tmp_table.schema}")
new_schema = pa.schema(
[
(pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)),
Expand All @@ -536,7 +538,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int:
# Process the batches as needed
version = []
for tab in table_slices:
logger.debug(f"Table Slice: {tab}")
if IS_DEBUG:
logger.debug(f"Table Slice: {tab}")
feather_bytes = _table_slice_to_feather_bytes(table_slice=tab)
version.append(
self._btrdb.ep.arrowInsertValues(
Expand Down Expand Up @@ -730,7 +733,8 @@ def values(self, start, end, version=0):
materialized = []
start = to_nanoseconds(start)
end = to_nanoseconds(end)
logger.debug(f"For stream - {self.uuid} - {self.name}")
if IS_DEBUG:
logger.debug(f"For stream - {self.uuid} - {self.name}")
point_windows = self._btrdb.ep.rawValues(self._uuid, start, end, version)
for point_list, version in point_windows:
for point in point_list:
Expand Down Expand Up @@ -883,7 +887,8 @@ def arrow_aligned_windows(
_arrow_not_impl_str.format("arrow_aligned_windows")
)

logger.debug(f"For stream - {self.uuid} - {self.name}")
if IS_DEBUG:
logger.debug(f"For stream - {self.uuid} - {self.name}")
start = to_nanoseconds(start)
end = to_nanoseconds(end)
arr_bytes = self._btrdb.ep.arrowAlignedWindows(
Expand All @@ -892,8 +897,9 @@ def arrow_aligned_windows(
# exhausting the generator from above
bytes_materialized = list(arr_bytes)

logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
if IS_DEBUG:
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
# ignore versions for now
materialized_table = _materialize_stream_as_table(bytes_materialized)
stream_names = [
Expand Down Expand Up @@ -1005,8 +1011,9 @@ def arrow_windows(
# exhausting the generator from above
bytes_materialized = list(arr_bytes)

logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
if IS_DEBUG:
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
# ignore versions for now
materialized = _materialize_stream_as_table(bytes_materialized)
stream_names = [
Expand Down Expand Up @@ -1042,11 +1049,15 @@ def nearest(self, time, version, backward=False):

"""
try:
logger.debug(f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}")
if IS_DEBUG:
logger.debug(
f"checking nearest for: {self.uuid}\t\t{time}\t\t{version}"
)
rp, version = self._btrdb.ep.nearest(
self._uuid, to_nanoseconds(time), version, backward
)
logger.debug(f"Nearest for stream: {self.uuid} - {rp}")
if IS_DEBUG:
logger.debug(f"Nearest for stream: {self.uuid} - {rp}")
except BTrDBError as exc:
if not isinstance(exc, NoSuchPoint):
raise
Expand Down Expand Up @@ -1223,7 +1234,8 @@ def earliest(self):
params = self._params_from_filters()
start = params.get("start", MINIMUM_TIME)
versions = self.versions()
logger.debug(f"versions: {versions}")
if IS_DEBUG:
logger.debug(f"versions: {versions}")
earliest_points_gen = self._btrdb._executor.map(
lambda s: s.nearest(start, version=versions.get(s.uuid, 0), backward=False),
self._streams,
Expand Down Expand Up @@ -1842,8 +1854,9 @@ def _arrow_multivalues(self, period_ns: int):
# exhausting the generator from above
bytes_materialized = list(arr_bytes)

logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
if IS_DEBUG:
logger.debug(f"Length of materialized list: {len(bytes_materialized)}")
logger.debug(f"materialized bytes[0:1]: {bytes_materialized[0:1]}")
data = _materialize_stream_as_table(bytes_materialized)
return data

Expand Down Expand Up @@ -1919,9 +1932,11 @@ def _materialize_stream_as_table(arrow_bytes):
for b, _ in arrow_bytes:
with pa.ipc.open_stream(b) as reader:
schema = reader.schema
logger.debug(f"schema: {schema}")
if IS_DEBUG:
logger.debug(f"schema: {schema}")
table_list.append(reader.read_all())
logger.debug(f"table list: {table_list}")
if IS_DEBUG:
logger.debug(f"table list: {table_list}")
table = pa.concat_tables(table_list)
return table

Expand Down