Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
gtfierro committed Feb 21, 2024
2 parents 7d3966f + 713e49c commit e64c27a
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 228 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
Expand All @@ -33,7 +33,7 @@ jobs:
java-version: '17'
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ hashFiles('**/poetry.lock') }}
Expand Down
4 changes: 2 additions & 2 deletions brickschema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

has_sqlalchemy = False
try:
from . import rdflib_sqlalchemy
import rdflib_sqlalchemy
has_sqlalchemy = True
except ImportError as e:
print(e)
logging.warning(
"sqlalchemy not installed. SQL-backed graph support will not be available."
"sqlalchemy not installed. SQL-backed graph support will not be available. Try 'pip install brickschema[persistence]' to install it."
)

__version__ = "0.2.0"
Expand Down
54 changes: 41 additions & 13 deletions brickschema/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from rdflib import ConjunctiveGraph
from rdflib.graph import BatchAddGraph
from rdflib import plugin
from rdflib import plugin, URIRef
from rdflib.store import Store
from brickschema.rdflib_sqlalchemy import registerplugins
from sqlalchemy import text, Row
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, uri: str, *args, **kwargs):
class Changeset(Graph):
def __init__(self, graph_name):
super().__init__()
self.name = graph_name
self.name = URIRef(graph_name)
self.uid = uuid.uuid4()
self.additions = []
self.deletions = []
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(self, uri: str, *args, **kwargs):
"""
To create an in-memory store, use uri="sqlite://"
"""
store = plugin.get("SQLAlchemy", Store)(identifier="my_store")
store = plugin.get("SQLAlchemy", Store)(identifier=URIRef("my_store"))
super().__init__(store, *args, **kwargs)
self.open(uri, create=True)
self._precommit_hooks = OrderedDict()
Expand Down Expand Up @@ -204,7 +204,11 @@ def conn(self):

@contextmanager
def new_changeset(self, graph_name, ts=None):
if not isinstance(graph_name, URIRef):
graph_name = URIRef(graph_name)
namespaces = []
buffered_adds = []
buffered_removes = []
with self.conn() as conn:
transaction_start = time.time()
cs = Changeset(graph_name)
Expand All @@ -220,30 +224,34 @@ def new_changeset(self, graph_name, ts=None):
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
graph=str(graph_name),
deletion=True,
triple=pickle.dumps(triple),
)
)
graph = self.get_context(graph_name)
for triple in cs.deletions:
graph.remove(triple)
buffered_removes.append(triple)
#graph = self.get_context(graph_name)
#for triple in cs.deletions:
# graph.remove(triple)
if cs.additions:
for triple in cs.additions:
conn.execute(
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
graph=str(graph_name),
deletion=False,
triple=pickle.dumps(triple),
)
)
with BatchAddGraph(
self.get_context(graph_name), batch_size=10000
) as graph:
for triple in cs.additions:
graph.add(triple)
for triple in cs.additions:
buffered_adds.append(triple)
# with BatchAddGraph(
# self.get_context(graph_name), batch_size=10000
# ) as graph:
# for triple in cs.additions:
# graph.add(triple)

# take care of precommit hooks
transaction_end = time.time()
Expand All @@ -259,12 +267,26 @@ def new_changeset(self, graph_name, ts=None):
logging.info(
f"Committing after {transaction_end - transaction_start} seconds"
)
# add the buffered changes to the graph
print([(type(c.identifier), c.identifier) for c in self.contexts()])
graph = self.get_context(graph_name)
for triple in buffered_removes:
print(f"Removing {triple}")
graph.remove(triple)
with BatchAddGraph(graph, batch_size=10000) as graph:
for triple in buffered_adds:
print(f"Adding {triple}")
graph.add(triple)
print(f"Self graph has {len(self)} triples")
# loop through all of the contexts and print length
# update namespaces
for pfx, ns in namespaces:
self.bind(pfx, ns)
for hook in self._postcommit_hooks.values():
hook(self)
self._latest_version = ts
for c in self.contexts():
print(f"{c.identifier} has {len(c)} triples")

def latest(self, graph):
return self.get_context(graph)
Expand All @@ -280,6 +302,7 @@ def graph_at(self, timestamp=None, graph=None):
for t in self.get_context(graph).triples((None, None, None)):
g.add(t)
else:
# TODO: this doesn't work for some reason
for t in self.triples((None, None, None)):
g.add(t)
with self.conn() as conn:
Expand All @@ -295,7 +318,9 @@ def _graph_at(self, alter_graph, conn, timestamp=None, graph=None):
if isinstance(timestamp, (dict, Row)):
timestamp = timestamp["timestamp"]

print(f"Getting graph at {timestamp}", type(timestamp))
print(f"Getting graph {graph} ({type(graph)}) at {timestamp}", type(timestamp))
# print # of rows in changesets
print(f"Changesets has {len(list(conn.execute(text('SELECT * FROM changesets'))))} rows")
if graph is not None:
rows = conn.execute(
text("SELECT * FROM changesets WHERE graph = :g AND timestamp > :ts ORDER BY timestamp DESC").bindparams(
Expand All @@ -309,9 +334,12 @@ def _graph_at(self, alter_graph, conn, timestamp=None, graph=None):
)
)
for row in rows.mappings():
print(f"Row: {row}")
triple = pickle.loads(row["triple"])
if row["is_insertion"]:
print(f"Adding {triple}")
alter_graph.add((triple[0], triple[1], triple[2]))
else:
print(f"Removing {triple}")
alter_graph.remove((triple[0], triple[1], triple[2]))
return alter_graph
Loading

0 comments on commit e64c27a

Please sign in to comment.