Skip to content

Update pandas testing and fix flake8 issues #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }} ${{ matrix.os }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -39,7 +39,7 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Create Release
id: create_release
uses: actions/create-release@v1
Expand All @@ -59,9 +59,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: '3.8'
- name: Install dependencies
Expand Down
56 changes: 49 additions & 7 deletions btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import grpc
from grpc._cython.cygrpc import CompressionAlgorithm

from btrdb.exceptions import InvalidOperation, StreamNotFoundError
from btrdb.exceptions import InvalidOperation, StreamNotFoundError, retry
from btrdb.stream import Stream, StreamSet
from btrdb.utils.conversion import to_uuid
from btrdb.utils.general import unpack_stream_descriptor
Expand Down Expand Up @@ -179,7 +179,7 @@ def _is_arrow_enabled(info):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"major version: {major}")
logger.debug(f"minor version: {minor}")
if major >= 5 and minor >= 30:
if major >= 5 and minor >= 29:
return True
else:
return False
Expand All @@ -193,11 +193,23 @@ class BTrDB(object):
def __init__(self, endpoint):
self.ep = endpoint
self._executor = ThreadPoolExecutor()
self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info())
try:
self._ARROW_ENABLED = _is_arrow_enabled(self.ep.info())
except Exception:
self._ARROW_ENABLED = False
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")

def query(self, stmt, params=[]):
@retry
def query(
self,
stmt,
params=None,
auto_retry=False,
retries=5,
retry_delay=3,
retry_backoff=4,
):
"""
Performs a SQL query on the database metadata and returns a list of
dictionaries from the resulting cursor.
Expand Down Expand Up @@ -227,6 +239,8 @@ def query(self, stmt, params=[]):

See https://btrdb.readthedocs.io/en/latest/ for more info.
"""
if params is None:
params = list()
return [
json.loads(row.decode("utf-8"))
for page in self.ep.sql_query(stmt, params)
Expand Down Expand Up @@ -317,7 +331,18 @@ def stream_from_uuid(self, uuid):
"""
return Stream(self, to_uuid(uuid))

def create(self, uuid, collection, tags=None, annotations=None):
@retry
def create(
self,
uuid,
collection,
tags=None,
annotations=None,
auto_retry=False,
retries=5,
retry_delay=3,
retry_backoff=4,
):
"""
Tells BTrDB to create a new stream with UUID `uuid` in `collection` with specified `tags` and `annotations`.

Expand Down Expand Up @@ -379,8 +404,17 @@ def list_collections(self, starts_with=""):
"""
return [c for some in self.ep.listCollections(starts_with) for c in some]

@retry
def streams_in_collection(
self, *collection, is_collection_prefix=True, tags=None, annotations=None
self,
*collection,
is_collection_prefix=True,
tags=None,
annotations=None,
auto_retry=False,
retries=5,
retry_delay=3,
retry_backoff=4,
):
"""
Search for streams matching given parameters
Expand Down Expand Up @@ -436,7 +470,15 @@ def streams_in_collection(

return result

def collection_metadata(self, prefix):
@retry
def collection_metadata(
self,
prefix,
auto_retry=False,
retries=5,
retry_delay=3,
retry_backoff=4,
):
"""
Gives statistics about metadata for collections that match a
prefix.
Expand Down
8 changes: 4 additions & 4 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import io
import typing
import uuid
import grpc

from btrdb.exceptions import BTrDBError, check_proto_stat, error_handler
from btrdb.grpcinterface import btrdb_pb2, btrdb_pb2_grpc
Expand All @@ -35,9 +35,10 @@

try:
import pyarrow as pa
except:
except ImportError:
pa = None


class Endpoint(object):
"""Server endpoint where we make specific requests."""

Expand Down Expand Up @@ -155,7 +156,6 @@ def arrowWindows(self, uu, start, end, width, depth, version=0):
with pa.ipc.open_stream(result.arrowBytes) as reader:
yield reader.read_all(), result.versionMajor


@error_handler
def streamInfo(self, uu, omitDescriptor, omitVersion):
params = btrdb_pb2.StreamInfoParams(
Expand Down Expand Up @@ -381,7 +381,7 @@ def generateCSV(
yield result.row

@error_handler
def sql_query(self, stmt, params=[]):
def sql_query(self, stmt, params: typing.List):
request = btrdb_pb2.SQLQueryParams(query=stmt, params=params)
for page in self.stub.SQLQuery(request):
check_proto_stat(page.stat)
Expand Down
49 changes: 49 additions & 0 deletions btrdb/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,63 @@
##########################################################################

import inspect
import logging
import time
from functools import wraps

from grpc import RpcError

##########################################################################
## Module Variables and Constants
##########################################################################
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 3 # time to wait before first retry in seconds
RETRY_BACKOFF_FACTOR = (
4 # exponential factor by which retry backoff increases between tries
)


##########################################################################
## Decorators
##########################################################################

# config logging for auto-retry
logging.basicConfig(
format="%(asctime)s:%(levelname)s:%(message)s",
level=logging.INFO,
datefmt="%m/%d/%Y %I:%M:%S",
)


def retry(fn, *args, **kwargs):
"""
decorates functions and retries it in the event of BTrDBErrors or RpcErrors
"""

@wraps(fn)
def retry_func(*args, **kwargs):
# don't retry if arg is set to False
if not kwargs.get("auto_retry"):
return fn(*args, **kwargs)

total_retries = kwargs.get("retries") or MAX_RETRIES
retries = total_retries
delay = kwargs.get("retry_delay") or INITIAL_RETRY_DELAY
backoff = kwargs.get("retry_backoff") or RETRY_BACKOFF_FACTOR
while retries > 0:
try:
return fn(*args, **kwargs)
except (BTrDBError, RpcError) as e:
msg = f"ERROR: {e}, attempting retry {total_retries-retries+1}/{total_retries} in {delay} seconds..."
logging.info(msg)
time.sleep(delay)
retries -= 1
delay *= backoff
err = e
handle_grpc_error(err)

return retry_func


def consume_generator(fn, *args, **kwargs):
# when a generator is passed back to the calling function, it may encounter an error
Expand Down
Loading