Skip to content

Commit

Permalink
Merge pull request #256 from commonsense/gin-index
Browse files Browse the repository at this point in the history
Use PostgreSQL's GIN index and `jsonb_path_ops` to form queries
  • Loading branch information
jlowryduda authored Apr 8, 2019
2 parents e83ffff + 5caa31e commit 0cbdffb
Show file tree
Hide file tree
Showing 22 changed files with 1,537 additions and 1,662 deletions.
40 changes: 18 additions & 22 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,12 @@ if USE_MORPHOLOGY:
rule all:
input:
DATA + "/assertions/assertions.csv",
DATA + "/psql/edges.csv.gz",
DATA + "/psql/edge_sources.csv.gz",
DATA + "/psql/edge_features.csv.gz",
DATA + "/psql/nodes.csv.gz",
DATA + "/psql/node_prefixes.csv.gz",
DATA + "/psql/sources.csv.gz",
DATA + "/psql/relations.csv.gz",
DATA + "/psql/edges.csv",
DATA + "/psql/edge_features.csv",
DATA + "/psql/edges_gin.shuf.csv",
DATA + "/psql/nodes.csv",
DATA + "/psql/sources.csv",
DATA + "/psql/relations.csv",
DATA + "/psql/done",
DATA + "/stats/languages.txt",
DATA + "/stats/language_edges.txt",
Expand All @@ -142,13 +141,12 @@ rule evaluation:

rule webdata:
input:
DATA + "/psql/edges.csv.gz",
DATA + "/psql/edge_sources.csv.gz",
DATA + "/psql/edge_features.csv.gz",
DATA + "/psql/nodes.csv.gz",
DATA + "/psql/node_prefixes.csv.gz",
DATA + "/psql/sources.csv.gz",
DATA + "/psql/relations.csv.gz",
DATA + "/psql/edges.csv",
DATA + "/psql/edge_features.csv",
DATA + "/psql/edges_gin.shuf.csv",
DATA + "/psql/nodes.csv",
DATA + "/psql/sources.csv",
DATA + "/psql/relations.csv",
DATA + "/psql/done",
DATA + "/vectors/mini.h5",

Expand Down Expand Up @@ -411,30 +409,28 @@ rule prepare_db:
DATA + "/assertions/assertions.msgpack"
output:
DATA + "/psql/edges.csv",
DATA + "/psql/edge_sources.csv",
DATA + "/psql/edge_features.csv",
temp(DATA + "/psql/edges_gin.csv"),
DATA + "/psql/nodes.csv",
DATA + "/psql/node_prefixes.csv",
DATA + "/psql/sources.csv",
DATA + "/psql/relations.csv"
shell:
"cn5-db prepare_data {input} {DATA}/psql"

rule gzip_db:
rule shuffle_gin:
input:
DATA + "/psql/{name}.csv"
DATA + "/psql/edges_gin.csv"
output:
DATA + "/psql/{name}.csv.gz"
DATA + "/psql/edges_gin.shuf.csv"
shell:
"gzip -c {input} > {output}"
"shuf {input} > {output}"

rule load_db:
input:
DATA + "/psql/edges.csv",
DATA + "/psql/edge_sources.csv",
DATA + "/psql/edge_features.csv",
DATA + "/psql/edges_gin.shuf.csv",
DATA + "/psql/nodes.csv",
DATA + "/psql/node_prefixes.csv",
DATA + "/psql/sources.csv",
DATA + "/psql/relations.csv"
output:
Expand Down
5 changes: 0 additions & 5 deletions conceptnet5/builders/combine_assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ def make_assertion(line_group):
# on word senses. These don't get merged together, but they should.
uri, rel, start, end, _ = lines[0].split('\t')

# We can't distinguish word senses well enough yet, so only keep them
# up to the part of speech
start = uri_prefix(start, 4)
end = uri_prefix(end, 4)

if not (keep_concept(start) and keep_concept(end)):
return None

Expand Down
5 changes: 0 additions & 5 deletions conceptnet5/db/connection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import psycopg2
import time
import sys
import os
from conceptnet5.db import config
from conceptnet5.util import get_data_filename

_CONNECTIONS = {}

Expand Down Expand Up @@ -52,4 +48,3 @@ def check_db_connection(dbname=None):
if dbname is None:
dbname = config.DB_NAME
_get_db_connection_inner(dbname)

247 changes: 182 additions & 65 deletions conceptnet5/db/prepare_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@


def write_row(outfile, items):
"""
Write a tab-separated row to a file.
"""
print('\t'.join(sanitize(str(x)) for x in items), file=outfile)


def write_ordered_set(filename, oset):
"""
Write an OrderedSet of strings as a two-column table, containing
the ID and the string value.
"""
with open(filename, 'w', encoding='utf-8') as outfile:
for i, item in enumerate(oset):
print('%d\t%s' % (i, sanitize(item)), file=outfile)


def write_relations(filename, oset):
"""
To create the `relations` table, we need one additional column of
information beyond `write_ordered_set`, which is a boolean of whether
the relation is directed.
"""
with open(filename, 'w', encoding='utf-8') as outfile:
for i, rel in enumerate(oset):
directed_str = 't'
Expand All @@ -25,94 +37,199 @@ def write_relations(filename, oset):


def sanitize(text):
return text.replace('\n', '').replace('\t', '').replace('\\', '\\\\')
"""
We're using a very simple approach to writing a CSV (which is actually
tab-separated, but a lot of software considers that a kind of CSV). Our
CSV output will be formatted correctly and understood by psql as long
as we avoid certain characters that would conflict:
- Newlines (which of course separate CSV entries)
- Tabs (which separate our columns)
- Null codepoints (which are not allowed in psql)
We also need to escape every literal backslash, as backslashes are
interpreted by psql.
It's a good question whether this step should be necessary -- names
of concepts shouldn't include control characters, and the few
instances of backslashes in ConceptNet appear to be mistakes.
"""
return (
text.replace('\n', '')
.replace('\t', '')
.replace('\x00', '')
.replace('\\', '\\\\')
)


def gin_indexable_edge(edge):
"""
Convert an edge into a dictionary that can be matched with the JSONB @>
operator, which tests if one dictionary includes all the information in
another. This operator can be indexed by GIN.
We replace the 'start', 'end', 'rel', and 'dataset' URIs with lists
of their URI prefixes. We query those slots with a single-element list,
which will be a sub-list of the prefix list if it's a match.
As an example, a query for {'start': '/c/en'} will become the GIN
query {'start': ['/c/en']}, which will match indexed edges such as
{
'start': ['/c/en', '/c/en/dog'],
'end': ['/c/en', '/c/en/bark'],
'rel': ['/r/CapableOf'],
...
}
"""
gin_edge = {}
gin_edge['uri'] = edge['uri']
gin_edge['start'] = uri_prefixes(edge['start'])
gin_edge['end'] = uri_prefixes(edge['end'])
gin_edge['rel'] = uri_prefixes(edge['rel'])
gin_edge['dataset'] = uri_prefixes(edge['dataset'])
flat_sources = set()
for source in edge['sources']:
for value in source.values():
flat_sources.update(uri_prefixes(value, min_pieces=3))
gin_edge['sources'] = sorted(flat_sources)
return gin_edge


def assertions_to_sql_csv(msgpack_filename, output_dir):
"""
Scan through the list of assertions (edges that are unique in their
start, end, and relation) and produce CSV files that can be loaded
into PostgreSQL tables.
The columns of these CSV files are unlabeled, but they correspond
to the order of the table columns defined in schema.py.
"""
# Construct the filenames of the CSV files, one per table
output_nodes = output_dir + '/nodes.csv'
output_edges = output_dir + '/edges.csv'
output_relations = output_dir + '/relations.csv'
output_sources = output_dir + '/sources.csv'
output_edge_sources = output_dir + '/edge_sources.csv'
output_node_prefixes = output_dir + '/node_prefixes.csv'
output_features = output_dir + '/edge_features.csv'
output_edges_gin = output_dir + '/edges_gin.csv'

# We can't rely on Postgres to assign IDs, because we need to know the
# IDs to refer to them _before_ they're in Postgres. So we track our own
# unique IDs using OrderedSet.
node_list = OrderedSet()
source_list = OrderedSet()
assertion_list = OrderedSet()
relation_list = OrderedSet()
seen_prefixes = set()

edge_file = open(output_edges, 'w', encoding='utf-8')
edge_source_file = open(output_edge_sources, 'w', encoding='utf-8')
node_prefix_file = open(output_node_prefixes, 'w', encoding='utf-8')
feature_file = open(output_features, 'w', encoding='utf-8')

for assertion in read_msgpack_stream(msgpack_filename):
if assertion['uri'] in assertion_list:
continue
assertion_idx = assertion_list.add(assertion['uri'])
rel_idx = relation_list.add(assertion['rel'])
start_idx = node_list.add(assertion['start'])
end_idx = node_list.add(assertion['end'])

source_indices = []
sources = assertion['sources']
for source in sources:
for sourceval in sorted(source.values()):
source_idx = source_list.add(sourceval)
source_indices.append(source_idx)

jsondata = json.dumps(assertion, ensure_ascii=False, sort_keys=True)
weight = assertion['weight']
write_row(
edge_file,
[assertion_idx, assertion['uri'],
rel_idx, start_idx, end_idx,
weight, jsondata]
)
for node in (assertion['start'], assertion['end'], assertion['dataset']):
write_prefixes(node_prefix_file, seen_prefixes, node_list, node)
for source_idx in sorted(set(source_indices)):
write_row(edge_source_file, [assertion_idx, source_idx])

if assertion['rel'] in SYMMETRIC_RELATIONS:
features = [(0, start_idx), (0, end_idx)]
else:
features = [(1, start_idx), (-1, end_idx)]

for direction, node_idx in features:
write_row(feature_file, [rel_idx, direction, node_idx, assertion_idx])

edge_file.close()
edge_source_file.close()
node_prefix_file.close()

# These are three files that we will write incrementally as we iterate
# through the edges. The syntax restrictions on 'with' leave me with no
# way to format this that satisfies my style checker and auto-formatter.
with open(output_edges, 'w', encoding='utf-8') as edge_file,\
open(output_edges_gin, 'w', encoding='utf-8') as edge_gin_file,\
open(output_features, 'w', encoding='utf-8') as feature_file:
for assertion in read_msgpack_stream(msgpack_filename):
# Assertions are supposed to be unique. If they're not, we should
# find out and the build should fail.
if assertion['uri'] in assertion_list:
raise ValueError("Duplicate assertion: {!r}".format(assertion))

# Get unique IDs for the relation, start, and end, and the assertion
# itself. The relation, start, and end IDs may already exists; this is
# handled by OrderedSet.
assertion_idx = assertion_list.add(assertion['uri'])
rel_idx = relation_list.add(assertion['rel'])
start_idx = node_list.add(assertion['start'])
end_idx = node_list.add(assertion['end'])

# Also get unique IDs for each of the sources listed as contributing
# to this assertion.
source_indices = []
sources = assertion['sources']
for source in sources:
for sourceval in sorted(source.values()):
source_idx = source_list.add(sourceval)
source_indices.append(source_idx)

# Write the edge data to the `edge_file`.
jsondata = json.dumps(assertion, ensure_ascii=False, sort_keys=True)
weight = assertion['weight']
write_row(
edge_file,
[
assertion_idx,
assertion['uri'],
rel_idx,
start_idx,
end_idx,
weight,
jsondata,
],
)

# Convert the edge to the form that we can easily filter using GIN
# indexing, and write that to the `edge_gin_file`.
write_row(
edge_gin_file,
[
assertion_idx,
weight,
json.dumps(
gin_indexable_edge(assertion),
ensure_ascii=False,
sort_keys=True,
),
],
)

# Extract the 'features' (combinations of the relation and one node)
# that are present in the edge. We may need to match the node using
# a prefix of that node, so store the feature separately for each
# prefix.
features = []

# Get the IDs in the node table for each prefix of the nodes
start_p_indices = [
node_list.add(prefix) for prefix in uri_prefixes(assertion['start'], 3)
]
end_p_indices = [
node_list.add(prefix) for prefix in uri_prefixes(assertion['end'], 3)
]

# Write the feature data, the 'direction' (forward, backward, or
# symmetric), and the edge ID to the feature table.
if assertion['rel'] in SYMMETRIC_RELATIONS:
for start_p_idx in start_p_indices:
features.append((0, start_p_idx))
for end_p_idx in end_p_indices:
features.append((0, end_p_idx))
else:
for start_p_idx in start_p_indices:
features.append((1, start_p_idx))
for end_p_idx in end_p_indices:
features.append((-1, end_p_idx))

for direction, node_idx in features:
write_row(feature_file, [rel_idx, direction, node_idx, assertion_idx])

# Write our tables of unique IDs
write_ordered_set(output_nodes, node_list)
write_ordered_set(output_sources, source_list)
write_relations(output_relations, relation_list)


def write_prefixes(prefix_file, seen_prefixes, node_list, node):
for prefix in uri_prefixes(node):
if (node, prefix) not in seen_prefixes:
seen_prefixes.add((node, prefix))
node_idx = node_list.add(node)
prefix_idx = node_list.add(prefix)
write_row(prefix_file, [node_idx, prefix_idx])


def load_sql_csv(connection, input_dir):
"""
Load the CSV files we created into PostgreSQL using the `copy_from`
method, which is the same as the COPY command at the psql command line.
"""
for (filename, tablename) in [
(input_dir + '/relations.csv', 'relations'),
(input_dir + '/nodes.csv', 'nodes'),
(input_dir + '/edges.csv', 'edges'),
(input_dir + '/sources.csv', 'sources'),
(input_dir + '/edge_sources.csv', 'edge_sources'),
(input_dir + '/node_prefixes.csv', 'node_prefixes'),
(input_dir + '/edge_features.csv', 'edge_features')
(input_dir + '/edges_gin.shuf.csv', 'edges_gin'),
(input_dir + '/edge_features.csv', 'edge_features'),
]:
cursor = connection.cursor()
with open(filename, 'rb') as file:
cursor.copy_from(file, tablename)
cursor.close()
with connection.cursor() as cursor:
with open(filename, 'rb') as file:
cursor.copy_from(file, tablename)
connection.commit()
Loading

0 comments on commit 0cbdffb

Please sign in to comment.