Skip to content

Develop #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 19, 2021
Merged
597 changes: 0 additions & 597 deletions poetry.lock

This file was deleted.

46 changes: 30 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
[tool.poetry]
name = "sage-engine"
version = "2.2.0"
description = "Sage: a SPARQL query engine for public Linked Data providers"
version = "2.3.0"
description = "Sage: a preemptive SPARQL query engine for online Knowledge Graphs"
repository = "https://github.com/sage-org/sage-engine"
authors = [ "Thomas Minier <tminier01@gmail.com>" ]
authors = [ "Thomas Minier <tminier01@gmail.com>",
"Julien Aimonier-Davat <julien.aimonier-davat@univ-nantes.fr>",
"Pascal Molli <pascal.molli@univ-nantes.fr>",
"Hala Skaf <hala.skaf@univ-nantes.fr>" ]
keywords = [ "rdf", "sparql", "query engine" ]
classifiers = [
"Topic :: Database :: Database Engines/Servers",
Expand All @@ -20,35 +23,46 @@ exclude = [ "tests" ]

[tool.poetry.scripts]
sage = "sage.cli.http_server:start_sage_server"
sage-query = "sage.cli.commons:sage_query"
sage-debug= "sage.cli.debug:sage_query_debug"
sage-grpc = "sage.cli.grpc_server:start_grpc_server"
sage-postgres-init = "sage.cli.postgres:init_postgres"
sage-postgres-index = "sage.cli.postgres:index_postgres"
sage-postgres-put = "sage.cli.postgres:put_postgres"
sage-sqlite-init = "sage.cli.sqlite:init_sqlite"
sage-sqlite-index = "sage.cli.sqlite:index_sqlite"
sage-sqlite-put = "sage.cli.sqlite:put_sqlite"
sage-hbase-init = "sage.cli.hbase:init_hbase"
sage-hbase-put = "sage.cli.hbase:put_hbase"

[tool.poetry.dependencies]
python = "^3.7"
uvloop = "0.14.0"
PyYAML = "5.1.2"
rdflib = "4.2.2"
rdflib-jsonld = "0.4.0"
protobuf = "3.11.0"
click = "7.0"
fastapi = "0.44.1"
uvicorn = "0.10.8"
grpcio = "^1.26"
uvloop = "0.15.2"
PyYAML = "5.4.1"
rdflib = "5.0.0"
rdflib-jsonld = "0.5.0"
protobuf = "3.15.7"
click = "7.1.2"
fastapi = "0.63.0"
uvicorn = "0.13.4"
grpcio = "^1.36"
coloredlogs="15.0"
pylru="^1.0"
# optional dependencies
pybind11 = { version = "2.2.4", optional = true }
hdt = { version = "2.3", optional = true }
psycopg2-binary = { version = "2.7.7", optional = true }
psycopg2-binary = { version = "2.8.6", optional = true }
happybase = { version = "1.2.0", optional = true }

[tool.poetry.extras]
hdt = ["pybind11", "hdt"]
postgres = ["psycopg2-binary"]
hbase = ["happybase"]

[tool.poetry.dev-dependencies]
pytest = "^5.3"
pytest-asyncio = "^0.10.0"
requests = "^2.22"
pytest = "^6.2"
pytest-asyncio = "^O.14"
requests = "^2.25"
sphinx = "^2.3"
sphinx_rtd_theme = "^0.4.3"

Expand Down
10 changes: 5 additions & 5 deletions sage/cli/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def sage_query(entrypoint, default_graph_uri, query, file, format, limit):

# prepare query headers
headers = {
"accept": "application/sparql-results+json",
"accept": "text/html",
"content-type": "application/json",
"next": None
}
Expand All @@ -52,10 +52,10 @@ def sage_query(entrypoint, default_graph_uri, query, file, format, limit):
while has_next and count < limit:
response = requests.post(entrypoint, headers=headers, data=dumps(payload))
json_response = response.json()
has_next = json_response["head"]['hasNext']
payload["next"] = json_response["head"]["next"]
for bindings in json_response["results"]['bindings']:
print(bindings)
has_next = json_response['next']
payload['next'] = json_response['next']
for bindings in json_response['bindings']:
print(str(bindings))
count += 1
if count >= limit:
break
102 changes: 102 additions & 0 deletions sage/cli/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# commons.py
# Author: Thomas MINIER - MIT License 2017-2019
# Author: Pascal Molli - MIT License 2017-2019

from rdflib import BNode, Literal, URIRef, Variable
from rdflib import Graph

from starlette.testclient import TestClient
from tests.http.utils import post_sparql

from sage.http_server.server import run_app
from sage.query_engine.optimizer.query_parser import parse_query
from sage.database.core.yaml_config import load_config
from sage.query_engine.sage_engine import SageEngine

import click
import requests
from json import dumps
from math import inf
from sys import exit
import json
import asyncio
import uvloop

import coloredlogs
import logging
from time import time

# install logger
coloredlogs.install(level='INFO', fmt='%(asctime)s - %(levelname)s %(message)s')
logger = logging.getLogger(__name__)


# (results, saved, done, _) = await engine.execute(scan, 10e7)
async def execute(engine,iterator,limit):
# try:
while iterator.has_next():
value = await iterator.next()
print(value)
# except:
# print("error in debug/execute")

# try:
# (results, saved, done, _) = await engine.execute(iterator, 10e7)
# for r in results:
# print(str(r))
# except StopAsyncIteration:
# pass


@click.command()
@click.argument("config_file")
@click.argument("default_graph_uri")
@click.option("-q", "--query", type=str, default=None, help="SPARQL query to execute (passed in command-line)")
@click.option("-f", "--file", type=str, default=None, help="File containing a SPARQL query to execute")
@click.option("-l", "--limit", type=int, default=None, help="Maximum number of solutions bindings to fetch, similar to the SPARQL LIMIT modifier.")
def sage_query_debug(config_file, default_graph_uri, query, file, limit ):
"""
debug a SPARQL query on an embedded Sage Server.

Example usage: sage-query config.yaml http://example.org/swdf-postgres -f queries/spo.sparql
"""
# assert that we have a query to evaluate
if query is None and file is None:
print("Error: you must specificy a query to execute, either with --query or --file. See sage-query --help for more informations.")
exit(1)

## setting the log level of the asyncio logger to logging.DEBUG, for example the following snippet of code can be run at startup of the application:
#logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.DEBUG)

if limit is None:
limit = inf

# load query from file if required
if file is not None:
with open(file) as query_file:
query = query_file.read()

dataset = load_config(config_file)
if dataset is None:
print("config file {config_file} not found")
exit(1)
graph = dataset.get_graph(default_graph_uri)
if graph is None:
print("RDF Graph not found:"+default_graph_uri)
exit(1)
engine = SageEngine()
cards = list()
context=dict()
context['quantum']=1000000
context['max_results']=1000000
from time import time
context['start_timestamp']=time()
iterator,cards = parse_query(query, dataset, default_graph_uri,context)
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(engine,iterator,limit))
loop.close()


if __name__ == '__main__':
sage_query_debug()
128 changes: 128 additions & 0 deletions sage/cli/hbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# cli.py
# Author: Thomas MINIER - MIT License 2019
import happybase
import click
import coloredlogs
import logging

from sys import exit
from os.path import isfile
from yaml import load
from hdt import HDTDocument
from time import time
from sage.database.hbase.utils import build_row_key
from sage.cli.utils import load_graph, get_nb_triples
from sage.cli.parsers import ParserFactory, ParseError

coloredlogs.install(level='INFO', fmt='%(asctime)s - %(levelname)s %(message)s')
logger = logging.getLogger(__name__)


@click.command()
@click.argument("config")
@click.argument("graph_name")
def init_hbase(config, graph_name):
"""
Initialize the RDF Graph DATASET_NAME with a Apache HBase backend, described in the configuration file CONFIG.
"""
# load graph from config file
graph, kind = load_graph(config, graph_name, logger, backends=['hbase'])

thrift_port = graph['thrift_port'] if 'thrift_port' in graph else 9090

logger.info("Connexion to the HBase server...")
connection = happybase.Connection(graph['thrift_host'], protocol="compact", transport="framed", port=thrift_port, table_prefix=graph_name)
logger.info("Connected to the HBase server !")

# create HBase tables
families = {'rdf': dict()}
logger.info("Creating HBase tables for RDF Graph '{}'...".format(graph_name))
connection.create_table('spo', families)
connection.create_table('pos', families)
connection.create_table('osp', families)
logger.info("RDF Graph '{}' successfully created in HBase".format(graph_name))
connection.close()


@click.command()
@click.argument("rdf_file")
@click.argument("config")
@click.argument("graph_name")
@click.option("-f", "--format", type=click.Choice(["nt", "hdt"]),
default="nt", show_default=True,
help="Format of the input file. Supported: nt (N-triples) and hdt (HDT).")
@click.option("-b", "--batch-size", type=int, default=1000, show_default=True,
help="Batch size used for batch loading")
def put_hbase(rdf_file, config, graph_name, format, batch_size):
"""
Insert RDF triples from HDT file HDT_FILE into the RDF Graph graph_name, described in the configuration file CONFIG. The dataset must use the Apache HBase backend.
"""
# load graph from config file
graph, kind = load_graph(config, graph_name, logger, backends=['hbase'])

thrift_port = graph['thrift_port'] if 'thrift_port' in graph else 9090

logger.info("Connexion to the HBase server...")
connection = happybase.Connection(graph['thrift_host'], protocol="compact", transport="framed", port=thrift_port, table_prefix=graph_name)
logger.info("Connected to the HBase server !")

spo_batch = connection.table('spo').batch(batch_size=batch_size)
pos_batch = connection.table('pos').batch(batch_size=batch_size)
osp_batch = connection.table('osp').batch(batch_size=batch_size)

logger.info("Reading RDF source file...")
nb_triples = get_nb_triples(rdf_file, format)
logger.info(f"Found ~{nb_triples} RDF triples to ingest.")

start = time()
inserted = 0
dropped = 0

with click.progressbar(length=nb_triples, label=f"Inserting RDF triples 0/{nb_triples} - {dropped} triples dropped.") as bar:

def on_bucket(bucket):
nonlocal inserted, dropped
for (s, p, o) in bucket:
columns = {
b'rdf:subject': s.encode('utf-8'),
b'rdf:predicate': p.encode('utf-8'),
b'rdf:object': o.encode('utf-8')
}
spo_key = build_row_key(s, p, o)
pos_key = build_row_key(p, o, s)
osp_key = build_row_key(o, s, p)
spo_batch.put(spo_key, columns)
pos_batch.put(pos_key, columns)
osp_batch.put(osp_key, columns)
inserted = inserted + len(bucket)
bar.label = f"Inserting RDF triples {inserted}/{nb_triples} - {dropped} triples dropped."
bar.update(len(bucket))

def on_error(error):
nonlocal dropped, inserted
if isinstance(error, ParseError):
logger.warning(error)
dropped = dropped + 1
bar.label = f"Inserting RDF triples {inserted}/{nb_triples} - {dropped} triples dropped."
bar.update(0)
else:
logger.error(error)
exit(1)

def on_complete():
nonlocal start
# send last batch
spo_batch.send()
pos_batch.send()
osp_batch.send()
logger.info(f"RDF triples ingestion successfully completed in {time() - start}s")
logger.info("Committing and cleaning up...")
connection.close()
logger.info(f"RDF data from file '{rdf_file}' successfully inserted into RDF graph '{graph_name}'")

logger.info("Starting RDF triples ingestion...")
parser = ParserFactory.create_parser(format, batch_size)
parser.on_bucket = on_bucket
parser.on_error = on_error
parser.on_complete = on_complete
parser.parsefile(rdf_file)
Loading