Skip to content

[WIP] Add adapter for CrateDB #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ rdflib_sqlalchemy.egg-info
/.eggs/
/dist/
/venv/
.venv*
docs/api
*.sqlite

# JetBrains IDE files
/.idea/

2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
norecursedirs = .git env

# Output in color, run doctests
addopts = --color=yes
addopts = -rfEXs --color=yes

testpaths = test
# Run tests from files matching this glob
Expand Down
66 changes: 66 additions & 0 deletions rdflib_sqlalchemy/cratedb_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql.base import RESERVED_WORDS as POSTGRESQL_RESERVED_WORDS


def cratedb_patch_dialect():
try:
from crate.client.sqlalchemy import CrateDialect
from crate.client.sqlalchemy.compiler import CrateDDLCompiler
except ImportError:
return

def visit_create_index(
self, create, include_schema=False, include_table_schema=True, **kw
):
return "SELECT 1;"

CrateDDLCompiler.visit_create_index = visit_create_index
CrateDialect.preparer = CrateIdentifierPreparer


def cratedb_polyfill_refresh_after_dml_engine(engine: sa.engine.Engine):
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
):
"""
Run a `REFRESH TABLE ...` command after each DML operation (INSERT, UPDATE, DELETE).
"""

if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)):
if not isinstance(clauseelement.table, sa.sql.Join):
full_table_name = f'"{clauseelement.table.name}"'
if clauseelement.table.schema is not None:
full_table_name = f'"{clauseelement.table.schema}".' + full_table_name
conn.execute(sa.text(f'REFRESH TABLE {full_table_name};'))

sa.event.listen(engine, "after_execute", receive_after_execute)


RESERVED_WORDS = set(list(POSTGRESQL_RESERVED_WORDS) + ["object"])


class CrateIdentifierPreparer(sa.sql.compiler.IdentifierPreparer):

reserved_words = RESERVED_WORDS

def _unquote_identifier(self, value):
if value[0] == self.initial_quote:
value = value[1:-1].replace(
self.escape_to_quote, self.escape_quote
)
return value

def format_type(self, type_, use_schema=True):
if not type_.name:
raise sa.exc.CompileError("PostgreSQL ENUM type requires a name.")

name = self.quote(type_.name)
effective_schema = self.schema_for_object(type_)

if (
not self.omit_schema
and use_schema
and effective_schema is not None
):
name = self.quote_schema(effective_schema) + "." + name
return name
7 changes: 6 additions & 1 deletion rdflib_sqlalchemy/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ def open(self, configuration, create=True):
kwargs = configuration

self.engine = sqlalchemy.create_engine(url, **kwargs)

# CrateDB needs a fix to synchronize write operations.
from rdflib_sqlalchemy.cratedb_patch import cratedb_polyfill_refresh_after_dml_engine
cratedb_polyfill_refresh_after_dml_engine(self.engine)

try:
conn = self.engine.connect()
except OperationalError:
Expand Down Expand Up @@ -307,7 +312,7 @@ def destroy(self, configuration):
Delete all tables and stored data associated with the store.
"""
if self.engine is None:
self.engine = self.open(configuration, create=False)
self.open(configuration, create=False)

with self.engine.begin():
try:
Expand Down
14 changes: 9 additions & 5 deletions rdflib_sqlalchemy/tables.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from sqlalchemy import Column, Table, Index, types
from sqlalchemy import Column, Table, Index, text, types
from sqlalchemy.sql import quoted_name

from rdflib_sqlalchemy.cratedb_patch import cratedb_patch_dialect
from rdflib_sqlalchemy.types import TermType


cratedb_patch_dialect()

MYSQL_MAX_INDEX_LENGTH = 200

TABLE_NAME_TEMPLATES = [
Expand All @@ -25,7 +29,7 @@ def create_asserted_statements_table(interned_id, metadata):
return Table(
"{interned_id}_asserted_statements".format(interned_id=interned_id),
metadata,
Column("id", types.Integer, nullable=False, primary_key=True),
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
Column("subject", TermType, nullable=False),
Column("predicate", TermType, nullable=False),
Column("object", TermType, nullable=False),
Expand Down Expand Up @@ -71,7 +75,7 @@ def create_type_statements_table(interned_id, metadata):
return Table(
"{interned_id}_type_statements".format(interned_id=interned_id),
metadata,
Column("id", types.Integer, nullable=False, primary_key=True),
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
Column("member", TermType, nullable=False),
Column("klass", TermType, nullable=False),
Column("context", TermType, nullable=False),
Expand Down Expand Up @@ -110,7 +114,7 @@ def create_literal_statements_table(interned_id, metadata):
return Table(
"{interned_id}_literal_statements".format(interned_id=interned_id),
metadata,
Column("id", types.Integer, nullable=False, primary_key=True),
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
Column("subject", TermType, nullable=False),
Column("predicate", TermType, nullable=False),
Column("object", TermType),
Expand Down Expand Up @@ -154,7 +158,7 @@ def create_quoted_statements_table(interned_id, metadata):
return Table(
"{interned_id}_quoted_statements".format(interned_id=interned_id),
metadata,
Column("id", types.Integer, nullable=False, primary_key=True),
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
Column("subject", TermType, nullable=False),
Column("predicate", TermType, nullable=False),
Column("object", TermType),
Expand Down
66 changes: 66 additions & 0 deletions test/test_sqlalchemy_cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import os
import unittest

import pytest
try:
import crate # noqa
assert crate # quiets unused import warning
except ImportError:
pytest.skip("crate not installed, skipping CrateDB tests",
allow_module_level=True)

from . import context_case
from . import graph_case


if os.environ.get("DB") != "crate":
pytest.skip("CrateDB not under test", allow_module_level=True)

sqlalchemy_url = os.environ.get(
"DBURI",
"crate://crate@localhost/")

_logger = logging.getLogger(__name__)


class SQLACrateDBGraphTestCase(graph_case.GraphTestCase):
storetest = True
storename = "SQLAlchemy"
uri = sqlalchemy_url
create = True

def setUp(self):
super(SQLACrateDBGraphTestCase, self).setUp(
uri=self.uri,
storename=self.storename,
)

def tearDown(self):
super(SQLACrateDBGraphTestCase, self).tearDown(uri=self.uri)


class SQLACrateDBContextTestCase(context_case.ContextTestCase):
storetest = True
storename = "SQLAlchemy"
uri = sqlalchemy_url
create = True

def setUp(self):
super(SQLACrateDBContextTestCase, self).setUp(
uri=self.uri,
storename=self.storename,
)

def tearDown(self):
super(SQLACrateDBContextTestCase, self).tearDown(uri=self.uri)

def testLenInMultipleContexts(self):
pytest.skip("Known issue.")


SQLACrateDBGraphTestCase.storetest = True
SQLACrateDBContextTestCase.storetest = True

if __name__ == "__main__":
unittest.main()