Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async for crate #595

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions aiocrate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2022 Amethyst Reese
# Licensed under the MIT license

"""asyncio bridge to the standard sqlite3 module"""

from sqlite3 import ( # pylint: disable=redefined-builtin
DatabaseError,
Error,
IntegrityError,
NotSupportedError,
OperationalError,
paramstyle,
ProgrammingError,
register_adapter,
register_converter,
Row,
sqlite_version,
sqlite_version_info,
Warning,
)

__author__ = "Amethyst Reese"
__version__ = '0.0.1'
from crate.client.connection import connect, Connection, Cursor

__all__ = [
"__version__",
"paramstyle",
"register_adapter",
"register_converter",
"sqlite_version",
"sqlite_version_info",
"connect",
"Connection",
"Cursor",
"Row",
"Warning",
"Error",
"DatabaseError",
"IntegrityError",
"ProgrammingError",
"OperationalError",
"NotSupportedError",
]
203 changes: 203 additions & 0 deletions aiocrate/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@


from crate.client.exceptions import ProgrammingError
from distutils.version import StrictVersion
import warnings

BULK_INSERT_MIN_VERSION = StrictVersion("0.42.0")


class Cursor(object):
"""
not thread-safe by intention
should not be shared between different threads
"""
lastrowid = None # currently not supported

def __init__(self, connection, cursor):
self.arraysize = 1
self.connection = connection
self._closed = False
self._result = None
self.rows = None
self._cursor = cursor

def execute(self, sql, parameters=None, bulk_parameters=None):
"""
Prepare and execute a database operation (query or command).
"""
if self.connection._closed:
raise ProgrammingError("Connection closed")

if self._closed:
raise ProgrammingError("Cursor closed")

self._result = self.connection.client.sql(sql, parameters,
bulk_parameters)
if "rows" in self._result:
self.rows = iter(self._result["rows"])

def executemany(self, sql, seq_of_parameters):
"""
Prepare a database operation (query or command) and then execute it
against all parameter sequences or mappings found in the sequence
``seq_of_parameters``.
"""
row_counts = []
durations = []
if self.connection.lowest_server_version >= BULK_INSERT_MIN_VERSION:
self.execute(sql, bulk_parameters=seq_of_parameters)
for result in self._result.get('results', []):
if result.get('rowcount') > -1:
row_counts.append(result.get('rowcount'))
if self.duration > -1:
durations.append(self.duration)
else:
for params in seq_of_parameters:
self.execute(sql, parameters=params)
if self.rowcount > -1:
row_counts.append(self.rowcount)
if self.duration > -1:
durations.append(self.duration)
self._result = {
"rowcount": sum(row_counts) if row_counts else -1,
"duration": sum(durations) if durations else -1,
"rows": [],
"cols": self._result.get("cols", []),
"results": self._result.get("results")
}
self.rows = iter(self._result["rows"])
return self._result["results"]

def fetchone(self):
"""
Fetch the next row of a query result set, returning a single sequence,
or None when no more data is available.
Alias for ``next()``.
"""
try:
return self.next()
except StopIteration:
return None

def __iter__(self):
"""
support iterator interface:
http://legacy.python.org/dev/peps/pep-0249/#iter

This iterator is shared. Advancing this iterator will advance other
iterators created from this cursor.
"""
warnings.warn("DB-API extension cursor.__iter__() used")
return self

def fetchmany(self, count=None):
"""
Fetch the next set of rows of a query result, returning a sequence of
sequences (e.g. a list of tuples). An empty sequence is returned when
no more rows are available.
"""
if count is None:
count = self.arraysize
if count == 0:
return self.fetchall()
result = []
for i in range(count):
try:
result.append(self.next())
except StopIteration:
pass
return result

def fetchall(self):
"""
Fetch all (remaining) rows of a query result, returning them as a
sequence of sequences (e.g. a list of tuples). Note that the cursor's
arraysize attribute can affect the performance of this operation.
"""
result = []
iterate = True
while iterate:
try:
result.append(self.next())
except StopIteration:
iterate = False
return result

def close(self):
"""
Close the cursor now
"""
self._closed = True
self._result = None

def setinputsizes(self, sizes):
"""
Not supported method.
"""
pass

def setoutputsize(self, size, column=None):
"""
Not supported method.
"""
pass

@property
def rowcount(self):
"""
This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like ``SELECT``) or affected
(for DML statements like ``UPDATE`` or ``INSERT``).
"""
if (self._closed or not self._result or "rows" not in self._result):
return -1
return self._result.get("rowcount", -1)

def next(self):
"""
Return the next row of a query result set, respecting if cursor was
closed.
"""
if self.rows is None:
raise ProgrammingError(
"No result available. " +
"execute() or executemany() must be called first."
)
elif not self._closed:
return next(self.rows)
else:
raise ProgrammingError("Cursor closed")

__next__ = next

@property
def description(self):
"""
This read-only attribute is a sequence of 7-item sequences.
"""
if self._closed:
return

description = []
for col in self._result["cols"]:
description.append((col,
None,
None,
None,
None,
None,
None))
return tuple(description)

@property
def duration(self):
"""
This read-only attribute specifies the server-side duration of a query
in milliseconds.
"""
if self._closed or \
not self._result or \
"duration" not in self._result:
return -1
return self._result.get("duration", 0)
80 changes: 80 additions & 0 deletions aiocrate/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8; -*-
#
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
# license agreements. See the NOTICE file distributed with this work for
# additional information regarding copyright ownership. Crate licenses
# this file to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.

from setuptools import setup, find_packages
import os
import re


requirements = [
'urllib3>=1.9',
]


def read(path):
with open(os.path.join(os.path.dirname(__file__), path)) as f:
return f.read()


#versionf_content = read("src/crate/client/__init__.py")
#version_rex = r'^__version__ = [\'"]([^\'"]*)[\'"]$'
#m = re.search(version_rex, versionf_content, re.M)
#if m:
# version = m.group(1)
#else:
# raise RuntimeError('Unable to find version string')

setup(
name='aiocrate',
version='0.0.1',
#package_dir={'': 'src'},
description='CrateDB Python Client',
long_description='',
long_description_content_type='text/x-rst',
platforms=['any'],
license='Apache License 2.0',
keywords='crate db api sqlalchemy',
extras_require=dict(
test=['zope.testing',
'zc.customdoctests>=1.0.1'],
sqlalchemy=['sqlalchemy>=1.0,<=1.4.50', 'geojson>=2.5.0']
),
python_requires='>=3.4',
install_requires=requirements,
package_data={'': ['*.txt']},
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Database'
],
)
Empty file added aiocrate/tests/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions aiocrate/tests/async_connection_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from unittest import TestCase
from sqlalchemy.ext.asyncio import create_async_engine
import sqlalchemy as sa


class SqlAlchemyAsyncConnectionTest(TestCase):

def setUp(self):
#self.async_engine = create_async_engine('crate+aiocrate://')
#self.connection = self.engine.connect()
pass

# def test_default_sqllite_connection(self):
# async_engine = create_async_engine('sqlite+aiosqlite://')
# import pdb;pdb.set_trace()
# conn = async_engine.raw_connection()
# self.assertEqual("<Connection <Client ['http://127.0.0.1:4200']>>",
# repr(conn.connection))

def test_default_connection(self):
async_engine = create_async_engine('sqlite+aiosqlite://')
#import pdb;pdb.set_trace()
#engine = sa.create_engine('crate://')
#import pdb;pdb.set_trace()
engine = sa.create_engine('crate+aiocrate://')
import pdb;pdb.set_trace()
async_engine = create_async_engine('crate+aiocrate://')

conn = engine.raw_connection()
self.assertEqual("<Connection <Client ['http://127.0.0.1:4200']>>",
repr(conn.connection))

# def test_connection_server(self):
# async_engine = create_async_engine("crate+aiocrate://otherhost:19201")
# conn = async_engine.raw_connection()
# self.assertEqual("<Connection <Client ['http://otherhost:19201']>>",
# repr(conn.connection))
#
# def test_connection_multiple_server(self):
# async_engine = create_async_engine(
# "crate+aiocrate://", connect_args={
# 'servers': ['localhost:4201', 'localhost:4202']
# }
# )
# conn = async_engine.raw_connection()
# self.assertEqual(
# "<Connection <Client ['http://localhost:4201', " +
# "'http://localhost:4202']>>",
# repr(conn.connection))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def read(path):
extras_require=dict(
test=['zope.testing',
'zc.customdoctests>=1.0.1'],
sqlalchemy=['sqlalchemy>=1.0,<1.4', 'geojson>=2.5.0']
sqlalchemy=['sqlalchemy>=1.0,<=1.4.50', 'geojson>=2.5.0']
),
python_requires='>=3.4',
install_requires=requirements,
Expand Down