Skip to content

Commit f10c822

Browse files
committed
Optimize REMOVE operations in initial sync.
1 parent d857d07 commit f10c822

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

crates/core/src/operations.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,23 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
7979
insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
8080

8181
// language=SQLite
82-
let bucket_statement = db.prepare_v2("INSERT OR IGNORE INTO ps_buckets(name) VALUES(?)")?;
82+
let bucket_statement = db.prepare_v2(
83+
"INSERT INTO ps_buckets(name)
84+
VALUES(?)
85+
ON CONFLICT DO UPDATE
86+
SET last_applied_op = last_applied_op
87+
RETURNING last_applied_op",
88+
)?;
8389
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
84-
bucket_statement.exec()?;
90+
bucket_statement.step()?;
91+
92+
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
93+
// operations when last_applied_op = 0.
94+
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
95+
// operation can supersede another PUT operation we're syncing at the same time.
96+
let mut is_empty = bucket_statement.column_int64(0)? == 0;
97+
98+
bucket_statement.reset()?;
8599

86100
let mut last_op: Option<i64> = None;
87101
let mut add_checksum: i32 = 0;
@@ -96,24 +110,28 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
96110

97111
last_op = Some(op_id);
98112

113+
let mut key: String = "".to_string();
114+
99115
if op == "PUT" || op == "REMOVE" {
100-
let key: String;
101116
if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) {
102117
let subkey = iterate_statement.column_text(6).unwrap_or("null");
103-
key = format!("{}/{}/{}", &object_type, &object_id, subkey);
104-
supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?;
118+
let populated_key = format!("{}/{}/{}", &object_type, &object_id, subkey);
119+
120+
supersede_statement.bind_text(2, &populated_key, sqlite::Destructor::STATIC)?;
105121
supersede_statement.exec()?;
106-
} else {
107-
key = String::from("");
122+
123+
key = populated_key;
108124
}
125+
}
109126

127+
if op == "PUT" || (op == "REMOVE" && !is_empty) {
110128
let opi = if op == "PUT" { 3 } else { 4 };
111129
insert_statement.bind_int64(2, op_id)?;
112130
insert_statement.bind_int(3, opi)?;
113-
if key == "" {
114-
insert_statement.bind_null(4)?;
115-
} else {
131+
if key != "" {
116132
insert_statement.bind_text(4, &key, sqlite::Destructor::STATIC)?;
133+
} else {
134+
insert_statement.bind_null(4)?;
117135
}
118136

119137
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
@@ -131,7 +149,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
131149

132150
insert_statement.bind_int(8, checksum)?;
133151
insert_statement.exec()?;
134-
} else if op == "MOVE" {
152+
} else if op == "MOVE" || (is_empty && op == "REMOVE") {
135153
add_checksum = add_checksum.wrapping_add(checksum);
136154
} else if op == "CLEAR" {
137155
// Any remaining PUT operations should get an implicit REMOVE
@@ -151,6 +169,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
151169
clear_statement2.exec()?;
152170

153171
add_checksum = 0;
172+
is_empty = true;
154173
}
155174
}
156175

0 commit comments

Comments
 (0)