forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlite3.py
140 lines (110 loc) · 3.99 KB
/
sqlite3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import sqlite3
from pathlib import Path
from contextlib import suppress, closing
from collections.abc import MutableMapping
BUILD_TABLE = """
CREATE TABLE IF NOT EXISTS Dict (
key BLOB UNIQUE NOT NULL,
value BLOB NOT NULL
)
"""
GET_SIZE = "SELECT COUNT (key) FROM Dict"
LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
ITER_KEYS = "SELECT key FROM Dict"
class error(OSError):
pass
_ERR_CLOSED = "DBM object has already been closed"
_ERR_REINIT = "DBM object does not support reinitialization"
def _normalize_uri(path):
path = Path(path)
uri = path.absolute().as_uri()
while "//" in uri:
uri = uri.replace("//", "/")
return uri
class _Database(MutableMapping):
def __init__(self, path, /, *, flag, mode):
if hasattr(self, "_cx"):
raise error(_ERR_REINIT)
path = os.fsdecode(path)
match flag:
case "r":
flag = "ro"
case "w":
flag = "rw"
case "c":
flag = "rwc"
Path(path).touch(mode=mode, exist_ok=True)
case "n":
flag = "rwc"
Path(path).unlink(missing_ok=True)
Path(path).touch(mode=mode)
case _:
raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
f"not {flag!r}")
# We use the URI format when opening the database.
uri = _normalize_uri(path)
uri = f"{uri}?mode={flag}"
try:
self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
except sqlite3.Error as exc:
raise error(str(exc))
# This is an optimization only; it's ok if it fails.
with suppress(sqlite3.OperationalError):
self._cx.execute("PRAGMA journal_mode = wal")
if flag == "rwc":
self._execute(BUILD_TABLE)
def _execute(self, *args, **kwargs):
if not self._cx:
raise error(_ERR_CLOSED)
try:
return closing(self._cx.execute(*args, **kwargs))
except sqlite3.Error as exc:
raise error(str(exc))
def __len__(self):
with self._execute(GET_SIZE) as cu:
row = cu.fetchone()
return row[0]
def __getitem__(self, key):
with self._execute(LOOKUP_KEY, (key,)) as cu:
row = cu.fetchone()
if not row:
raise KeyError(key)
return row[0]
def __setitem__(self, key, value):
self._execute(STORE_KV, (key, value))
def __delitem__(self, key):
with self._execute(DELETE_KEY, (key,)) as cu:
if not cu.rowcount:
raise KeyError(key)
def __iter__(self):
try:
with self._execute(ITER_KEYS) as cu:
for row in cu:
yield row[0]
except sqlite3.Error as exc:
raise error(str(exc))
def close(self):
if self._cx:
self._cx.close()
self._cx = None
def keys(self):
return list(super().keys())
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def open(filename, /, flag="r", mode=0o666):
"""Open a dbm.sqlite3 database and return the dbm object.
The 'filename' parameter is the name of the database file.
The optional 'flag' parameter can be one of ...:
'r' (default): open an existing database for read only access
'w': open an existing database for read/write access
'c': create a database if it does not exist; open for read/write access
'n': always create a new, empty database; open for read/write access
The optional 'mode' parameter is the Unix file access mode of the database;
only used when creating a new database. Default: 0o666.
"""
return _Database(filename, flag=flag, mode=mode)