Skip to content

Commit 7b046a9

Browse files
committed
Encode additional parameters in Component.data and store in 1 table
1 parent 72cbed1 commit 7b046a9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+662
-680
lines changed

.github/workflows/ci_code.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ jobs:
103103
104104
- name: Unit Testing
105105
run: |
106+
sqlite3 test.db "create table t(f int); drop table t;"
106107
make unit_testing pytest_arguments="--cov=superduper --cov-report=xml" SUPERDUPER_CONFIG=test/configs/${{ matrix.config }}
107108
108109
- name: Usecase Testing

.github/workflows/ci_plugins.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ jobs:
9595
- name: Plugin Testing
9696
run: |
9797
export PYTHONPATH=./
98+
if [ "${{ matrix.plugin }}" = "sql" ]; then
99+
sqlite3 test.db "create table t(f int); drop table t;"
100+
fi
98101
if [ -d "plugins/${{ matrix.plugin }}/plugin_test" ]; then
99102
pytest --cov=superduper --cov-report=xml plugins/${{ matrix.plugin }}/plugin_test
100103
else
@@ -103,6 +106,9 @@ jobs:
103106
104107
- name: Optionally run the base testing
105108
run: |
109+
if [ "${{ matrix.plugin }}" = "sql" ]; then
110+
sqlite3 test.db "create table t(f int); drop table t;"
111+
fi
106112
SUPERDUPER_CONFIG="plugins/${{ matrix.plugin }}/plugin_test/config.yaml"
107113
if [ -f "$SUPERDUPER_CONFIG" ]; then
108114
echo "Running the base testing..."

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616
- Add assertion to verify directory copy in FileSystemArtifactStore
1717
- Batch the Qdrant requests and add a retry to the config of Qdrant
1818
- Add use_component_cache to config
19+
- Save data in the `Component` table instead of in individual tables
1920

2021
### Bug fixes
2122

plugins/qdrant/superduper_qdrant/qdrant.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ def _do_scroll(offset):
160160

161161
return ids
162162

163-
164163
def _create_collection(self):
165164
measure = (
166165
self.measure.name

plugins/sql/plugin_test/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
data_backend: sqlite://
1+
data_backend: sqlite://./test.db
22
auto_schema: false
33
force_apply: true
44
json_native: false

plugins/sql/plugin_test/test_query.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import typing as t
12
from test.utils.setup.fake_data import add_listeners, add_models, add_random_data
23

34
import numpy as np
@@ -44,7 +45,7 @@ def test_renamings(db):
4445
add_random_data(db, n=5)
4546
add_models(db)
4647
add_listeners(db)
47-
t = db["documents"]
48+
t = db["documentz"]
4849
listener_uuid = [db.load('Listener', k).outputs for k in db.show("Listener")][0]
4950
q = t.select("id", "x", "y").outputs(listener_uuid.split('__', 1)[-1])
5051
data = q.execute()
@@ -62,13 +63,13 @@ def test_serialize_query(db):
6263

6364
def test_get_data(db):
6465
add_random_data(db, n=5)
65-
db["documents"].limit(2)
66-
db.metadata.get_component("Table", "documents")
66+
db["documentz"].limit(2)
67+
db.metadata.get_component("Table", "documentz")
6768

6869

6970
def test_insert_select(db):
7071
add_random_data(db, n=5)
71-
q = db["documents"].select("id", "x", "y").limit(2)
72+
q = db["documentz"].select("id", "x", "y").limit(2)
7273
r = q.execute()
7374

7475
assert len(r) == 2
@@ -77,7 +78,7 @@ def test_insert_select(db):
7778

7879
def test_filter(db):
7980
add_random_data(db, n=5)
80-
t = db["documents"]
81+
t = db["documentz"]
8182
q = t.select("id", "y")
8283
r = q.execute()
8384
ys = [x["y"] for x in r]
@@ -88,17 +89,18 @@ def test_filter(db):
8889
assert len(r) == uq[1][0]
8990

9091

91-
class documents(Base):
92+
class documents_plugin(Base):
93+
primary_id: t.ClassVar[str] = 'id'
9294
this: 'str'
9395

9496

9597
def test_select_using_ids(db):
96-
db.create(documents)
98+
db.create(documents_plugin)
9799

98-
table = db["documents"]
100+
table = db["documents_plugin"]
99101
table.insert([{"this": f"is a test {i}", "id": str(i)} for i in range(4)])
100102

101-
basic_select = db['documents'].select()
103+
basic_select = db['documents_plugin'].select()
102104

103105
assert len(basic_select.execute()) == 4
104106
assert len(basic_select.subset(['1', '2'])) == 2
@@ -112,16 +114,16 @@ def my_func(this: str):
112114

113115
my_func = ObjectModel('my_func', object=my_func)
114116

115-
db.create(documents)
117+
db.create(documents_plugin)
116118

117-
table = db["documents"]
119+
table = db["documents_plugin"]
118120
table.insert([{"this": f"is a test {i}", "id": str(i)} for i in range(4)])
119121

120122
listener = Listener(
121123
'test',
122124
model=my_func,
123125
key='this',
124-
select=db['documents'].select(),
126+
select=db['documents_plugin'].select(),
125127
)
126128
db.apply(listener)
127129

plugins/sql/superduper_sql/data_backend.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,10 @@ class SQLDatabackend(IbisDataBackend):
359359

360360
def __init__(self, uri, plugin, flavour=None):
361361
super().__init__(uri, plugin, flavour)
362-
self._create_sqlalchemy_engine()
362+
if 'sqlite://./' in uri:
363+
self._create_sqlalchemy_engine(uri.replace('./', '//'))
364+
else:
365+
self._create_sqlalchemy_engine(uri)
363366
self.sm = sessionmaker(bind=self.alchemy_engine)
364367

365368
@property
@@ -374,6 +377,8 @@ def update(self, table, condition, key, value):
374377
with self.sm() as session:
375378
metadata = MetaData()
376379

380+
assert table in self.list_tables()
381+
377382
metadata.reflect(bind=session.bind)
378383
table = Table(table, metadata, autoload_with=session.bind)
379384

@@ -422,16 +427,16 @@ def delete(self, table, condition):
422427
except NoSuchTableError:
423428
raise exceptions.NotFound("Table", table)
424429

425-
def _create_sqlalchemy_engine(self):
430+
def _create_sqlalchemy_engine(self, uri):
426431
with self.connection_manager.get_connection() as conn:
427-
self.alchemy_engine = create_engine(self.uri, creator=lambda: conn.con)
432+
self.alchemy_engine = create_engine(uri, creator=lambda: conn.con)
428433
if not self._test_engine():
429434
logging.warn(
430435
"Unable to reuse the ibis connection "
431436
"to create the SQLAlchemy engine. "
432437
"Creating a new connection with the URI."
433438
)
434-
self.alchemy_engine = create_engine(self.uri)
439+
self.alchemy_engine = create_engine(uri)
435440

436441
def _test_engine(self):
437442
"""Test the engine."""

superduper/backends/base/data_backend.py

Lines changed: 46 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import functools
23
import hashlib
34
import json
@@ -643,11 +644,7 @@ def delete(self, table, condition):
643644
:param table: The table to delete from.
644645
:param condition: The condition to delete.
645646
"""
646-
r_table = self._get_with_component_identifier('Table', table)
647-
if r_table is None:
648-
raise exceptions.NotFound("Table", table)
649-
650-
if not r_table['is_component']:
647+
if table not in {'Deployment', 'Component'}:
651648
pid = self.primary_id(table)
652649
if pid in condition:
653650
docs = self.get_many(table, condition[pid])
@@ -660,13 +657,16 @@ def delete(self, table, condition):
660657
if 'uuid' in condition:
661658
docs = self.get_many(table, '*', condition['uuid'])
662659
elif 'identifier' in condition:
663-
docs = self.get_many(table, condition['identifier'], '*')
660+
assert 'component' in condition
661+
docs = self.get_many(
662+
table, condition['component'], condition['identifier'], '*'
663+
)
664664
docs = self._do_filter(docs, condition)
665665
else:
666-
docs = self.get_many(table, '*', '*')
666+
docs = self.get_many(table, '*', '*', '*')
667667
docs = self._do_filter(docs, condition)
668668
for r in docs:
669-
del self[table, r['identifier'], r['uuid']]
669+
del self[table, r['component'], r['identifier'], r['uuid']]
670670

671671
def drop_table(self, table):
672672
"""Drop data from table.
@@ -735,9 +735,7 @@ def replace(self, table, condition, r):
735735
:param condition: The condition to update.
736736
:param r: The document to replace.
737737
"""
738-
r_table = self._get_with_component_identifier('Table', table)
739-
740-
if not r_table['is_component']:
738+
if table not in {'Component', 'Deployment'}:
741739
pid = self.primary_id(table)
742740
if pid in condition:
743741
docs = self.get_many(table, condition[pid])
@@ -749,18 +747,21 @@ def replace(self, table, condition, r):
749747
self[table, s[pid]] = r
750748
else:
751749
if 'uuid' in condition:
752-
s = self.get_many(table, '*', condition['uuid'])[0]
753-
self[table, s['identifier'], condition['uuid']] = r
750+
s = self.get_many(table, '*', '*', condition['uuid'])[0]
751+
self[table, s['component'], s['identifier'], condition['uuid']] = r
754752
elif 'identifier' in condition:
755-
docs = self.get_many(table, condition['identifier'], '*')
753+
assert 'component' in condition
754+
docs = self.get_many(
755+
table, condition['component'], condition['identifier'], '*'
756+
)
756757
docs = self._do_filter(docs, condition)
757758
for s in docs:
758-
self[table, s['identifier'], s['uuid']] = r
759+
self[table, s['component'], s['identifier'], s['uuid']] = r
759760
else:
760-
docs = self.get_many(table, '*', '*')
761+
docs = self.get_many(table, '*', '*', '*')
761762
docs = self._do_filter(docs, condition)
762763
for s in docs:
763-
self[table, s['identifier'], s['uuid']] = r
764+
self[table, s['component'], s['identifier'], s['uuid']] = r
764765

765766
def update(self, table, condition, key, value):
766767
"""Update data in the database.
@@ -770,9 +771,7 @@ def update(self, table, condition, key, value):
770771
:param key: The key to update.
771772
:param value: The value to update.
772773
"""
773-
r_table = self._get_with_component_identifier('Table', table)
774-
775-
if not r_table['is_component']:
774+
if table not in {'Component', 'Deployment'}:
776775
pid = self.primary_id(table)
777776
if pid in condition:
778777
docs = self.get_many(table, '*', condition[pid]) + self.get_many(
@@ -788,19 +787,21 @@ def update(self, table, condition, key, value):
788787
if 'uuid' in condition:
789788
s = self.get_many(table, '*', condition['uuid'])[0]
790789
s[key] = value
791-
self[table, s['identifier'], condition['uuid']] = s
790+
self[table, s['component'], s['identifier'], condition['uuid']] = s
792791
elif 'identifier' in condition:
793-
docs = self.get_many(table, condition['identifier'], '*')
792+
docs = self.get_many(
793+
table, condition['component'], condition['identifier'], '*'
794+
)
794795
docs = self._do_filter(docs, condition)
795796
for s in docs:
796797
s[key] = value
797-
self[table, s['identifier'], s['uuid']] = s
798+
self[table, s['component'], s['identifier'], s['uuid']] = s
798799
else:
799-
docs = self.get_many(table, '*', '*')
800+
docs = self.get_many(table, '*', '*', '*')
800801
docs = self._do_filter(docs, condition)
801802
for s in docs:
802803
s[key] = value
803-
self[table, s['identifier'], s['uuid']] = s
804+
self[table, s['component'], s['identifier'], s['uuid']] = s
804805

805806
@abstractmethod
806807
def keys(self, *pattern) -> t.List[t.Tuple[str, str, str]]:
@@ -882,9 +883,11 @@ def insert(self, table, documents):
882883
except exceptions.NotFound:
883884
pid = None
884885

885-
if ('uuid' == pid or not pid) and "uuid" in documents[0]:
886+
if table in {'Component', 'Deployment'}:
886887
for r in documents:
887-
self[table, r['identifier'], r['uuid']] = r
888+
self[table, r['component'], r['identifier'], r['uuid']] = copy.deepcopy(
889+
r
890+
)
888891
ids.append(r['uuid'])
889892
elif pid:
890893
pid = self.primary_id(table)
@@ -944,13 +947,7 @@ def do_test(r):
944947
return False
945948
return True
946949

947-
tables = self.get_many('Table', query.table, '*')
948-
if not tables:
949-
raise exceptions.NotFound("Table", query.table)
950-
951-
is_component = max(tables, key=lambda x: x['version'])['is_component']
952-
953-
if not is_component:
950+
if query.table not in {'Component', 'Deployment'}:
954951
pid = self.primary_id(query.table)
955952

956953
if pid in filter_kwargs:
@@ -966,41 +963,47 @@ def do_test(r):
966963
else:
967964

968965
if not filter_kwargs:
969-
keys = self.keys(query.table, '*', '*')
966+
keys = self.keys(query.table, '*', '*', '*')
970967
docs = [self[k] for k in keys]
971968
elif set(filter_kwargs.keys()) == {'uuid'}:
972-
keys = self.keys(query.table, '*', filter_kwargs['uuid']['value'])
969+
keys = self.keys(query.table, '*', '*', filter_kwargs['uuid']['value'])
973970
docs = [self[k] for k in keys]
974971
elif set(filter_kwargs.keys()) == {'identifier'}:
975972
assert filter_kwargs['identifier']['op'] == '=='
976-
977973
keys = self.keys(query.table, filter_kwargs['identifier']['value'], '*')
978974
docs = [self[k] for k in keys]
979-
elif set(filter_kwargs.keys()) == {'identifier', 'uuid'}:
975+
elif set(filter_kwargs.keys()) == {'component', 'identifier', 'uuid'}:
980976
assert filter_kwargs['identifier']['op'] == '=='
981977
assert filter_kwargs['uuid']['op'] == '=='
978+
assert filter_kwargs['component']['op'] == '=='
982979

983980
r = self[
984981
query.table,
982+
filter_kwargs['component']['value'],
985983
filter_kwargs['identifier']['value'],
986984
filter_kwargs['uuid']['value'],
987985
]
988986
if r is None:
989987
docs = []
990988
else:
991989
docs = [r]
992-
elif set(filter_kwargs.keys()) == {'identifier', 'version'}:
990+
elif set(filter_kwargs.keys()) == {'component', 'identifier', 'version'}:
993991
assert filter_kwargs['identifier']['op'] == '=='
992+
assert filter_kwargs['component']['op'] == '=='
994993
assert filter_kwargs['version']['op'] == '=='
995994

996-
keys = self.keys(query.table, filter_kwargs['identifier']['value'], '*')
995+
keys = self.keys(
996+
query.table,
997+
filter_kwargs['component']['value'],
998+
filter_kwargs['identifier']['value'],
999+
'*',
1000+
)
9971001
docs = [self[k] for k in keys]
9981002
docs = [
9991003
r for r in docs if r['version'] == filter_kwargs['version']['value']
10001004
]
10011005
else:
1002-
1003-
keys = self.keys(query.table, '*', '*')
1006+
keys = self.keys(query.table, '*', '*', '*')
10041007
docs = [self[k] for k in keys]
10051008
docs = [r for r in docs if do_test(r)]
10061009

@@ -1011,4 +1014,4 @@ def do_test(r):
10111014
cols = query.decomposition.select.args
10121015
for i, r in enumerate(docs):
10131016
docs[i] = {k: v for k, v in r.items() if k in cols}
1014-
return docs
1017+
return [copy.deepcopy(r) for r in docs]

0 commit comments

Comments
 (0)