forked from simonw/sqlite-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_enable_counts.py
187 lines (173 loc) · 6.28 KB
/
test_enable_counts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from sqlite_utils import Database
from sqlite_utils import cli
from click.testing import CliRunner
import pytest
def test_enable_counts_specific_table(fresh_db):
foo = fresh_db["foo"]
assert fresh_db.table_names() == []
for i in range(10):
foo.insert({"name": "item {}".format(i)})
assert fresh_db.table_names() == ["foo"]
assert foo.count == 10
# Now enable counts
foo.enable_counts()
assert foo.triggers_dict == {
"foo_counts_insert": (
"CREATE TRIGGER [foo_counts_insert] AFTER INSERT ON [foo]\n"
"BEGIN\n"
" INSERT OR REPLACE INTO [_counts]\n"
" VALUES (\n 'foo',\n"
" COALESCE(\n"
" (SELECT count FROM [_counts] WHERE [table] = 'foo'),\n"
" 0\n"
" ) + 1\n"
" );\n"
"END"
),
"foo_counts_delete": (
"CREATE TRIGGER [foo_counts_delete] AFTER DELETE ON [foo]\n"
"BEGIN\n"
" INSERT OR REPLACE INTO [_counts]\n"
" VALUES (\n"
" 'foo',\n"
" COALESCE(\n"
" (SELECT count FROM [_counts] WHERE [table] = 'foo'),\n"
" 0\n"
" ) - 1\n"
" );\n"
"END"
),
}
assert fresh_db.table_names() == ["foo", "_counts"]
assert list(fresh_db["_counts"].rows) == [{"count": 10, "table": "foo"}]
# Add some items to test the triggers
for i in range(5):
foo.insert({"name": "item {}".format(10 + i)})
assert foo.count == 15
assert list(fresh_db["_counts"].rows) == [{"count": 15, "table": "foo"}]
# Delete some items
foo.delete_where("rowid < 7")
assert foo.count == 9
assert list(fresh_db["_counts"].rows) == [{"count": 9, "table": "foo"}]
foo.delete_where()
assert foo.count == 0
assert list(fresh_db["_counts"].rows) == [{"count": 0, "table": "foo"}]
def test_enable_counts_all_tables(fresh_db):
foo = fresh_db["foo"]
bar = fresh_db["bar"]
foo.insert({"name": "Cleo"})
bar.insert({"name": "Cleo"})
foo.enable_fts(["name"])
fresh_db.enable_counts()
assert set(fresh_db.table_names()) == {
"foo",
"bar",
"foo_fts",
"foo_fts_data",
"foo_fts_idx",
"foo_fts_docsize",
"foo_fts_config",
"_counts",
}
assert list(fresh_db["_counts"].rows) == [
{"count": 1, "table": "foo"},
{"count": 1, "table": "bar"},
{"count": 3, "table": "foo_fts_data"},
{"count": 1, "table": "foo_fts_idx"},
{"count": 1, "table": "foo_fts_docsize"},
{"count": 1, "table": "foo_fts_config"},
]
@pytest.fixture
def counts_db_path(tmpdir):
path = str(tmpdir / "test.db")
db = Database(path)
db["foo"].insert({"name": "bar"})
db["bar"].insert({"name": "bar"})
db["bar"].insert({"name": "bar"})
db["baz"].insert({"name": "bar"})
return path
@pytest.mark.parametrize(
"extra_args,expected_triggers",
[
(
[],
[
"foo_counts_insert",
"foo_counts_delete",
"bar_counts_insert",
"bar_counts_delete",
"baz_counts_insert",
"baz_counts_delete",
],
),
(
["bar"],
[
"bar_counts_insert",
"bar_counts_delete",
],
),
],
)
def test_cli_enable_counts(counts_db_path, extra_args, expected_triggers):
db = Database(counts_db_path)
assert list(db.triggers_dict.keys()) == []
result = CliRunner().invoke(cli.cli, ["enable-counts", counts_db_path] + extra_args)
assert result.exit_code == 0
assert list(db.triggers_dict.keys()) == expected_triggers
def test_uses_counts_after_enable_counts(counts_db_path):
db = Database(counts_db_path)
logged = []
with db.tracer(lambda sql, parameters: logged.append((sql, parameters))):
assert db["foo"].count == 1
assert logged == [
("select name from sqlite_master where type = 'view'", None),
("select count(*) from [foo]", []),
]
logged.clear()
assert not db.use_counts_table
db.enable_counts()
assert db.use_counts_table
assert db["foo"].count == 1
assert logged == [
(
"CREATE TABLE IF NOT EXISTS [_counts](\n [table] TEXT PRIMARY KEY,\n count INTEGER DEFAULT 0\n);",
None,
),
("select name from sqlite_master where type = 'table'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select sql from sqlite_master where name = ?", ("foo",)),
("SELECT quote(:value)", {"value": "foo"}),
("select sql from sqlite_master where name = ?", ("bar",)),
("SELECT quote(:value)", {"value": "bar"}),
("select sql from sqlite_master where name = ?", ("baz",)),
("SELECT quote(:value)", {"value": "baz"}),
("select sql from sqlite_master where name = ?", ("_counts",)),
("select name from sqlite_master where type = 'view'", None),
("select [table], count from _counts where [table] in (?)", ["foo"]),
]
def test_reset_counts(counts_db_path):
db = Database(counts_db_path)
db["foo"].enable_counts()
db["bar"].enable_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
# Corrupt the value
db["_counts"].update("foo", {"count": 3})
assert db.cached_counts() == {"foo": 3, "bar": 2}
assert db["foo"].count == 3
# Reset them
db.reset_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
assert db["foo"].count == 1
def test_reset_counts_cli(counts_db_path):
db = Database(counts_db_path)
db["foo"].enable_counts()
db["bar"].enable_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
db["_counts"].update("foo", {"count": 3})
result = CliRunner().invoke(cli.cli, ["reset-counts", counts_db_path])
assert result.exit_code == 0
assert db.cached_counts() == {"foo": 1, "bar": 2}