Skip to content

Commit d857d07

Browse files
committed
Avoid writing MOVE operations.
1 parent a6c6e62 commit d857d07

File tree

1 file changed

+56
-80
lines changed

1 file changed

+56
-80
lines changed

crates/core/src/operations.rs

Lines changed: 56 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,27 @@ use alloc::vec::Vec;
44
use serde::{Deserialize, Deserializer, Serialize};
55
use serde_json as json;
66

7+
use crate::error::{PSResult, SQLiteError};
78
use sqlite_nostd as sqlite;
89
use sqlite_nostd::{Connection, ResultCode};
9-
use crate::error::{SQLiteError, PSResult};
1010

1111
use crate::ext::SafeManagedStmt;
1212
use crate::sync_types::{BucketChecksum, Checkpoint, StreamingSyncLine};
1313
use crate::util::*;
1414

1515
// Run inside a transaction
16-
pub fn insert_operation(
17-
db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
16+
pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
1817
// language=SQLite
19-
let statement = db.prepare_v2("\
18+
let statement = db.prepare_v2(
19+
"\
2020
SELECT
2121
json_extract(e.value, '$.bucket') as bucket,
2222
json_extract(e.value, '$.data') as data,
2323
json_extract(e.value, '$.has_more') as has_more,
2424
json_extract(e.value, '$.after') as after,
2525
json_extract(e.value, '$.next_after') as next_after
26-
FROM json_each(json_extract(?, '$.buckets')) e")?;
26+
FROM json_each(json_extract(?, '$.buckets')) e",
27+
)?;
2728
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
2829

2930
while statement.step()? == ResultCode::ROW {
@@ -39,9 +40,14 @@ FROM json_each(json_extract(?, '$.buckets')) e")?;
3940
Ok(())
4041
}
4142

42-
pub fn insert_bucket_operations(db: *mut sqlite::sqlite3, bucket: &str, data: &str) -> Result<(), SQLiteError> {
43+
pub fn insert_bucket_operations(
44+
db: *mut sqlite::sqlite3,
45+
bucket: &str,
46+
data: &str,
47+
) -> Result<(), SQLiteError> {
4348
// language=SQLite
44-
let iterate_statement = db.prepare_v2("\
49+
let iterate_statement = db.prepare_v2(
50+
"\
4551
SELECT
4652
json_extract(e.value, '$.op_id') as op_id,
4753
json_extract(e.value, '$.op') as op,
@@ -50,32 +56,35 @@ SELECT
5056
json_extract(e.value, '$.checksum') as checksum,
5157
json_extract(e.value, '$.data') as data,
5258
json_extract(e.value, '$.subkey') as subkey
53-
FROM json_each(?) e")?;
59+
FROM json_each(?) e",
60+
)?;
5461
iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
5562

5663
// language=SQLite
57-
let supersede_statement = db.prepare_v2("\
64+
let supersede_statement = db.prepare_v2(
65+
"\
5866
UPDATE ps_oplog SET
5967
superseded = 1,
6068
op = 2,
6169
data = NULL
6270
WHERE ps_oplog.superseded = 0
6371
AND unlikely(ps_oplog.bucket = ?1)
64-
AND ps_oplog.key = ?2")?;
72+
AND ps_oplog.key = ?2",
73+
)?;
6574
supersede_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
6675

6776
// language=SQLite
6877
let insert_statement = db.prepare_v2("\
69-
INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
78+
INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)")?;
7079
insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
7180

7281
// language=SQLite
7382
let bucket_statement = db.prepare_v2("INSERT OR IGNORE INTO ps_buckets(name) VALUES(?)")?;
7483
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
7584
bucket_statement.exec()?;
7685

77-
let mut first_op: Option<i64> = None;
7886
let mut last_op: Option<i64> = None;
87+
let mut add_checksum: i32 = 0;
7988

8089
while iterate_statement.step()? == ResultCode::ROW {
8190
let op_id = iterate_statement.column_int64(0)?;
@@ -86,11 +95,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
8695
let op_data = iterate_statement.column_text(5);
8796

8897
last_op = Some(op_id);
89-
if first_op.is_none() {
90-
first_op = Some(op_id);
91-
}
9298

93-
if op == "PUT" || op == "REMOVE" || op == "MOVE" {
99+
if op == "PUT" || op == "REMOVE" {
94100
let key: String;
95101
if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) {
96102
let subkey = iterate_statement.column_text(6).unwrap_or("null");
@@ -101,8 +107,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
101107
key = String::from("");
102108
}
103109

104-
let superseded = if op == "MOVE" { 1 } else { 0 };
105-
let opi = if op == "MOVE" { 2 } else if op == "PUT" { 3 } else { 4 };
110+
let opi = if op == "PUT" { 3 } else { 4 };
106111
insert_statement.bind_int64(2, op_id)?;
107112
insert_statement.bind_int(3, opi)?;
108113
if key == "" {
@@ -125,8 +130,9 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
125130
}
126131

127132
insert_statement.bind_int(8, checksum)?;
128-
insert_statement.bind_int(9, superseded)?;
129133
insert_statement.exec()?;
134+
} else if op == "MOVE" {
135+
add_checksum = add_checksum.wrapping_add(checksum);
130136
} else if op == "CLEAR" {
131137
// Any remaining PUT operations should get an implicit REMOVE
132138
// language=SQLite
@@ -137,77 +143,59 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
137143
// And we need to re-apply all of those.
138144
// We also replace the checksum with the checksum of the CLEAR op.
139145
// language=SQLite
140-
let clear_statement2 = db.prepare_v2("UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2")?;
146+
let clear_statement2 = db.prepare_v2(
147+
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2",
148+
)?;
141149
clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?;
142150
clear_statement2.bind_int(1, checksum)?;
143151
clear_statement2.exec()?;
152+
153+
add_checksum = 0;
144154
}
145155
}
146156

147157
if let Some(last_op) = &last_op {
148158
// language=SQLite
149-
let statement = db.prepare_v2("UPDATE ps_buckets SET last_op = ?1 WHERE name = ?2")?;
150-
statement.bind_text(2, bucket, sqlite::Destructor::STATIC)?;
151-
statement.bind_int64(1, *last_op)?;
152-
statement.exec()?;
153-
}
154-
155-
156-
// Compact superseded ops immediately
157-
if let (Some(first_op), Some(last_op)) = (&first_op, &last_op) {
158-
// language=SQLite
159-
let statement = db.prepare_v2("UPDATE ps_buckets
160-
SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
161-
FROM ps_oplog AS oplog
162-
WHERE superseded = 1
163-
AND oplog.bucket = ?1
164-
AND oplog.op_id >= ?2
165-
AND oplog.op_id <= ?3)
166-
WHERE ps_buckets.name = ?1")?;
159+
let statement = db.prepare_v2(
160+
"UPDATE ps_buckets
161+
SET last_op = ?2,
162+
add_checksum = add_checksum + ?3
163+
WHERE name = ?1",
164+
)?;
167165
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
168-
statement.bind_int64(2, *first_op)?;
169-
statement.bind_int64(3, *last_op)?;
170-
statement.exec()?;
166+
statement.bind_int64(2, *last_op)?;
167+
statement.bind_int(3, add_checksum)?;
171168

172-
// language=SQLite
173-
let statement = db.prepare_v2("DELETE
174-
FROM ps_oplog
175-
WHERE superseded = 1
176-
AND bucket = ?
177-
AND op_id >= ?
178-
AND op_id <= ?")?;
179-
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
180-
statement.bind_int64(2, *first_op)?;
181-
statement.bind_int64(3, *last_op)?;
182169
statement.exec()?;
183170
}
184171

185172
Ok(())
186173
}
187174

188-
pub fn clear_remove_ops(
189-
db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
190-
175+
pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
191176
// language=SQLite
192-
let statement = db.prepare_v2(
193-
"SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?;
177+
let statement =
178+
db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?;
194179

195180
// language=SQLite
196-
let update_statement = db.prepare_v2("UPDATE ps_buckets
181+
let update_statement = db.prepare_v2(
182+
"UPDATE ps_buckets
197183
SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
198184
FROM ps_oplog AS oplog
199185
WHERE (superseded = 1 OR op != 3)
200186
AND oplog.bucket = ?1
201187
AND oplog.op_id <= ?2)
202-
WHERE ps_buckets.name = ?1")?;
188+
WHERE ps_buckets.name = ?1",
189+
)?;
203190

204191
// language=SQLite
205-
let delete_statement = db.prepare_v2("DELETE
192+
let delete_statement = db.prepare_v2(
193+
"DELETE
206194
FROM ps_oplog
207195
WHERE (superseded = 1 OR op != 3)
208196
AND bucket = ?1
209-
AND op_id <= ?2")?;
210-
197+
AND op_id <= ?2",
198+
)?;
211199

212200
while statement.step()? == ResultCode::ROW {
213201
// Note: Each iteration here may be run in a separate transaction.
@@ -228,10 +216,7 @@ pub fn clear_remove_ops(
228216
Ok(())
229217
}
230218

231-
232-
pub fn delete_pending_buckets(
233-
db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
234-
219+
pub fn delete_pending_buckets(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
235220
// language=SQLite
236221
let statement = db.prepare_v2(
237222
"DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)")?;
@@ -244,31 +229,27 @@ pub fn delete_pending_buckets(
244229
Ok(())
245230
}
246231

247-
248-
pub fn delete_bucket(
249-
db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> {
250-
232+
pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> {
251233
let id = gen_uuid();
252234
let new_name = format!("$delete_{}_{}", name, id.hyphenated().to_string());
253235

254236
// language=SQLite
255237
let statement = db.prepare_v2(
256-
"UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2")?;
238+
"UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2",
239+
)?;
257240
statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?;
258241
statement.bind_text(2, &name, sqlite::Destructor::STATIC)?;
259242
statement.exec()?;
260243

261244
// Rename bucket
262245
// language=SQLite
263-
let statement = db.prepare_v2(
264-
"UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?;
246+
let statement = db.prepare_v2("UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?;
265247
statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?;
266248
statement.bind_text(2, name, sqlite::Destructor::STATIC)?;
267249
statement.exec()?;
268250

269251
// language=SQLite
270-
let statement = db.prepare_v2(
271-
"DELETE FROM ps_buckets WHERE name = ?1")?;
252+
let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1")?;
272253
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
273254
statement.exec()?;
274255

@@ -281,13 +262,8 @@ pub fn delete_bucket(
281262
Ok(())
282263
}
283264

284-
285-
pub fn stream_operation(
286-
db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
287-
265+
pub fn stream_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
288266
let line: StreamingSyncLine = serde_json::from_str(data)?;
289267

290268
Ok(())
291269
}
292-
293-

0 commit comments

Comments
 (0)