Skip to content

Commit

Permalink
Bump SQLAlchemy to 2.0, using rdflib-sqlalchemy fork (#109)
Browse files Browse the repository at this point in the history
* use my sqlalchemy fork

* Update use of text

* wrap strings in sqlalchemy.text

* fixing use of sqlalchemy 2.0

* final fixes?
  • Loading branch information
gtfierro authored Feb 12, 2024
1 parent 35edb1d commit 5e1973a
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 399 deletions.
3 changes: 1 addition & 2 deletions brickschema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
"""

import logging
from . import inference, namespaces, graph
from .graph import Graph, GraphCollection
from .namespaces import bind_prefixes
from . import inference, namespaces

logging.basicConfig(
format="%(asctime)s,%(msecs)03d %(levelname)-7s [%(filename)s:%(lineno)d] %(message)s",
Expand Down
1 change: 0 additions & 1 deletion brickschema/bin/brick_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
.. _`pySHACL`: https://github.com/RDFLib/pySHACL
"""
import sys
import argparse
from rdflib import Graph
from brickschema.validate import Validator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rdflib
from brickschema.inference import HaystackInferenceSession
from brickschema.namespaces import BRICK, A, RDFS
from brickschema.namespaces import BRICK, A
from rdflib import Namespace, Graph
from typer import progressbar

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Optional, List
from typing import Optional

import typer
from tabulate import tabulate
Expand Down
6 changes: 3 additions & 3 deletions brickschema/brickify/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def cleaned_value(value, replace_dict: Optional[Dict] = {}):
:param replace_dict: Key-value pairs for regex replacements
:returns: cleaned List|float|string
"""
if type(value) == float:
if isinstance(value, float):
return int(value) if str(value)[-2:] == ".0" else value
if type(value) == list:
if isinstance(value, list):
return [cleaned_value(item, replace_dict) for item in value]
clean_value = value
if type(value) == str:
if isinstance(value, str):
try:
if "." in value:
return float(value)
Expand Down
9 changes: 8 additions & 1 deletion brickschema/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,14 @@ def expand(
self._iterative_expand(og)
return self
elif profile == "owlrl":
self._inferbackend = OWLRLReasonableInferenceSession()
if backend is None:
backend = "reasonable"
if backend == "reasonable":
self._inferbackend = OWLRLReasonableInferenceSession()
elif backend == "allegrograph":
self._inferbackend = OWLRLAllegroInferenceSession()
elif backend == "owlrl":
self._inferbackend = OWLRLNaiveInferenceSession()
elif profile == "vbis":
self._inferbackend = VBISTagInferenceSession(
brick_version=self._brick_version
Expand Down
9 changes: 1 addition & 8 deletions brickschema/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,17 @@
from colorama import init as colorama_init
from colorama import Fore, Style

from pprint import pprint
from rdflib import URIRef
from collections import defaultdict
import dedupe
from .graph import Graph
from .namespaces import BRICK
from dedupe._typing import (
Data,
TrainingData,
RecordDict,
Literal,
RecordID,
)
import sys
from dedupe.core import unique
from dedupe.canonical import getCanonicalRep
from typing import List, Tuple, Dict, Set, Any
import itertools
from typing import List, Tuple, Any

colorama_init()
DEBUG = False
Expand Down
2 changes: 1 addition & 1 deletion brickschema/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
and related ontology namespaces
wrapper class and convenience methods for a Brick graph
"""
from rdflib import Namespace, XSD
from rdflib import Namespace

BRICK11 = Namespace("https://brickschema.org/schema/1.1/Brick#")
TAG11 = Namespace("https://brickschema.org/schema/1.1/BrickTag#")
Expand Down
103 changes: 56 additions & 47 deletions brickschema/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rdflib import plugin
from rdflib.store import Store
from rdflib_sqlalchemy import registerplugins
from sqlalchemy import text
from sqlalchemy import text, Row
import pickle
from .graph import Graph, BrickBase

Expand Down Expand Up @@ -95,29 +95,30 @@ def __init__(self, uri: str, *args, **kwargs):

with self.conn() as conn:
conn.execute(text("PRAGMA journal_mode=WAL;"))
conn.execute(changeset_table_defn)
conn.execute(changeset_table_idx)
conn.execute(redo_table_defn)
# conn.execute("PRAGMA synchronous=OFF;")
conn.execute(text(changeset_table_defn))
conn.execute(text(changeset_table_idx))
conn.execute(text(redo_table_defn))

@property
def latest_version(self):
with self.conn() as conn:
rows = conn.execute(
rows = conn.execute(text(
"SELECT id, timestamp from changesets "
"ORDER BY timestamp DESC LIMIT 1"
)
))
res = rows.fetchone()
return res
return res._asdict() if res else None

def version_before(self, ts: str) -> str:
"""Returns the version timestamp most immediately
*before* the given iso8601-formatted timestamp"""
with self.conn() as conn:
rows = conn.execute(
rows = conn.execute(text(
"SELECT timestamp from changesets "
"WHERE timestamp < ? "
"ORDER BY timestamp DESC LIMIT 1",
(ts,),
"WHERE timestamp < :ts "
"ORDER BY timestamp DESC LIMIT 1"),
{"ts": ts}
)
res = rows.fetchone()
return res[0]
Expand All @@ -142,32 +143,30 @@ def undo(self):
self, conn, self.version_before(self.latest_version["timestamp"])
)
conn.execute(
"INSERT INTO redos(id, timestamp, graph, is_insertion, triple) SELECT id, timestamp, graph, is_insertion, triple FROM changesets WHERE id = ?",
(changeset_id,),
text("INSERT INTO redos(id, timestamp, graph, is_insertion, triple) SELECT id, timestamp, graph, is_insertion, triple FROM changesets WHERE id = :id").bindparams(id=changeset_id)
)
conn.execute("DELETE FROM changesets WHERE id = ?", (changeset_id,))
conn.execute(text("DELETE FROM changesets WHERE id = :id").bindparams(id=changeset_id))

def redo(self):
"""
Redoes the most recent changeset.
"""
with self.conn() as conn:
redo_record = conn.execute(
"SELECT * from redos " "ORDER BY timestamp ASC LIMIT 1"
).fetchone()
text("SELECT * from redos " "ORDER BY timestamp ASC LIMIT 1")
).mappings().fetchone()
if redo_record is None:
raise Exception("No changesets to redo")
changeset_id = redo_record["id"]
logger.info(f"Redoing changeset {changeset_id}")
conn.execute(
"INSERT INTO changesets SELECT * FROM redos WHERE id = ?",
(changeset_id,),
text("INSERT INTO changesets SELECT * FROM redos WHERE id = :id").bindparams(id=changeset_id)
)
conn.execute("DELETE FROM redos WHERE id = ?", (changeset_id,))
conn.execute(text("DELETE FROM redos WHERE id = :id").bindparams(id=changeset_id))
self._graph_at(self, conn, redo_record["timestamp"])
for row in conn.execute(
"SELECT * from changesets WHERE id = ?", (changeset_id,)
):
text("SELECT * from changesets WHERE id = :id").bindparams(id=changeset_id)
).mappings():
triple = pickle.loads(row["triple"])
graph = self.get_context(redo_record["graph"])
if row["is_insertion"]:
Expand All @@ -182,15 +181,14 @@ def versions(self, graph=None):
"""
with self.conn() as conn:
if graph is None:
rows = conn.execute(
rows = conn.execute(text(
"SELECT DISTINCT id, graph, timestamp from changesets "
"ORDER BY timestamp DESC"
)
))
else:
rows = conn.execute(
rows = conn.execute(text(
"SELECT DISTINCT id, graph, timestamp from changesets "
"WHERE graph = ? ORDER BY timestamp DESC",
(graph,),
"WHERE graph = :g ORDER BY timestamp DESC").bindparams(g=graph)
)
return list(rows)

Expand All @@ -200,9 +198,9 @@ def add_precommit_hook(self, hook):
def add_postcommit_hook(self, hook):
self._postcommit_hooks[hook.__name__] = hook

@contextmanager
@property
def conn(self):
yield self.store.engine.connect()
return self.store.engine.begin

@contextmanager
def new_changeset(self, graph_name, ts=None):
Expand All @@ -217,24 +215,30 @@ def new_changeset(self, graph_name, ts=None):
# delta. This means that we save the deletions in the changeset as "inserts", and the additions
# as "deletions".
if cs.deletions:
conn.exec_driver_sql(
"INSERT INTO changesets VALUES (?, ?, ?, ?, ?)",
[
(str(cs.uid), ts, graph_name, True, pickle.dumps(triple))
for triple in cs.deletions
],
)
for triple in cs.deletions:
conn.execute(
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
deletion=True,
triple=pickle.dumps(triple),
)
)
graph = self.get_context(graph_name)
for triple in cs.deletions:
graph.remove(triple)
if cs.additions:
conn.exec_driver_sql(
"INSERT INTO changesets VALUES (?, ?, ?, ?, ?)",
[
(str(cs.uid), ts, graph_name, False, pickle.dumps(triple))
for triple in cs.additions
],
)
for triple in cs.additions:
conn.execute(
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
deletion=False,
triple=pickle.dumps(triple),
)
)
with BatchAddGraph(
self.get_context(graph_name), batch_size=10000
) as graph:
Expand Down Expand Up @@ -288,18 +292,23 @@ def _graph_at(self, alter_graph, conn, timestamp=None, graph=None):
"""
if timestamp is None:
timestamp = datetime.now().isoformat()
if isinstance(timestamp, (dict, Row)):
timestamp = timestamp["timestamp"]

print(f"Getting graph at {timestamp}", type(timestamp))
if graph is not None:
rows = conn.execute(
"SELECT * FROM changesets WHERE graph = ? AND timestamp > ? ORDER BY timestamp DESC",
(graph, timestamp),
text("SELECT * FROM changesets WHERE graph = :g AND timestamp > :ts ORDER BY timestamp DESC").bindparams(
g=graph, ts=timestamp
)
)
else:
rows = conn.execute(
"SELECT * FROM changesets WHERE timestamp > ? ORDER BY timestamp DESC",
(timestamp,),
text("SELECT * FROM changesets WHERE timestamp > :ts ORDER BY timestamp DESC").bindparams(
ts=timestamp
)
)
for row in rows:
for row in rows.mappings():
triple = pickle.loads(row["triple"])
if row["is_insertion"]:
alter_graph.add((triple[0], triple[1], triple[2]))
Expand Down
1 change: 0 additions & 1 deletion brickschema/topquadrant_shacl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import subprocess
import platform
import pkgutil
import tempfile
import rdflib
from rdflib import OWL, SH
Expand Down
Loading

0 comments on commit 5e1973a

Please sign in to comment.