Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit eb92268

Browse files
authored
Revert "Same-thread frame injection (#695)" (#775)
This reverts commit ebb7ae9.
1 parent 6edda0e commit eb92268

File tree

22 files changed

+527
-979
lines changed

22 files changed

+527
-979
lines changed

Cargo.lock

Lines changed: 111 additions & 140 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ hyper = { version = "0.14.23", features = ["http2"] }
3030
hyper-tungstenite = "0.10"
3131
itertools = "0.10.5"
3232
jsonwebtoken = "8.2.0"
33+
memmap = "0.7.0"
3334
mimalloc = { version = "0.1.36", default-features = false }
3435
nix = { version = "0.26.2", features = ["fs"] }
3536
once_cell = "1.17.0"

sqld/assets/test/simple_wallog

-28.2 KB
Binary file not shown.

sqld/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
77
std::env::set_var("PROTOC", protobuf_src::protoc());
88

99
let mut config = Config::new();
10-
config.bytes([".wal_log"]);
10+
config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]);
1111
tonic_build::configure()
1212
.protoc_arg("--experimental_allow_proto3_optional")
1313
.type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]")

sqld/proto/replication_log.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ message LogOffset {
88
message HelloRequest {}
99

1010
message HelloResponse {
11-
string log_id = 3;
11+
/// Uuid of the current generation
12+
string generation_id = 1;
13+
/// First frame_no in the current generation
14+
uint64 generation_start_index = 2;
15+
/// Uuid of the database being replicated
16+
string database_id = 3;
1217
}
1318

1419
message Frame {

sqld/src/error.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ pub enum Error {
8181
ConflictingRestoreParameters,
8282
#[error("Failed to fork database: {0}")]
8383
Fork(#[from] ForkError),
84-
#[error("Fatal replication error")]
85-
FatalReplicationError,
8684
}
8785

8886
trait ResponseError: std::error::Error {
@@ -134,7 +132,6 @@ impl IntoResponse for Error {
134132
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
135133
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
136134
Fork(e) => e.into_response(),
137-
FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
138135
}
139136
}
140137
}

sqld/src/namespace/fork.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tokio::time::Duration;
1010
use tokio_stream::StreamExt;
1111

1212
use crate::database::PrimaryDatabase;
13-
use crate::replication::frame::FrameBorrowed;
13+
use crate::replication::frame::Frame;
1414
use crate::replication::primary::frame_stream::FrameStream;
1515
use crate::replication::{LogReadError, ReplicationLogger};
1616
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
@@ -41,7 +41,7 @@ impl From<tokio::task::JoinError> for ForkError {
4141
}
4242
}
4343

44-
async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> {
44+
async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
4545
let page_no = frame.header().page_no;
4646
let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize;
4747
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
@@ -128,7 +128,7 @@ impl ForkTask<'_> {
128128
match res {
129129
Ok(frame) => {
130130
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
131-
write_frame(&frame, &mut data_file).await?;
131+
write_frame(frame, &mut data_file).await?;
132132
}
133133
Err(LogReadError::SnapshotRequired) => {
134134
let snapshot = loop {
@@ -147,7 +147,7 @@ impl ForkTask<'_> {
147147
for frame in iter {
148148
let frame = frame.map_err(ForkError::LogRead)?;
149149
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
150-
write_frame(&frame, &mut data_file).await?;
150+
write_frame(frame, &mut data_file).await?;
151151
}
152152
}
153153
Err(LogReadError::Ahead) => {

sqld/src/namespace/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,27 +561,30 @@ impl Namespace<ReplicaDatabase> {
561561
DatabaseConfigStore::load(&db_path).context("Could not load database config")?,
562562
);
563563

564+
let mut join_set = JoinSet::new();
564565
let replicator = Replicator::new(
565566
db_path.clone(),
566567
config.channel.clone(),
567568
config.uri.clone(),
568569
name.clone(),
570+
&mut join_set,
569571
reset,
570572
)
571573
.await?;
572-
let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe();
573-
let mut join_set = JoinSet::new();
574-
join_set.spawn(replicator.run());
574+
575+
let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone();
575576

576577
let stats = make_stats(
577578
&db_path,
578579
&mut join_set,
579580
config.stats_sender.clone(),
580581
name.clone(),
581-
applied_frame_no_receiver.clone(),
582+
replicator.current_frame_no_notifier.clone(),
582583
)
583584
.await?;
584585

586+
join_set.spawn(replicator.run());
587+
585588
let connection_maker = MakeWriteProxyConn::new(
586589
db_path.clone(),
587590
config.extensions.clone(),

sqld/src/replication/frame.rs

Lines changed: 38 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::alloc::Layout;
1+
use std::borrow::Cow;
22
use std::fmt;
3-
use std::mem::size_of;
4-
use std::ops::{Deref, DerefMut};
3+
use std::mem::{size_of, transmute};
4+
use std::ops::Deref;
55

6-
use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
7-
use bytes::Bytes;
6+
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
7+
use bytes::{Bytes, BytesMut};
88

99
use crate::LIBSQL_PAGE_SIZE;
1010

@@ -28,18 +28,10 @@ pub struct FrameHeader {
2828
}
2929

3030
#[derive(Clone, serde::Serialize, serde::Deserialize)]
31-
/// The shared version of a replication frame.
31+
/// The owned version of a replication frame.
3232
/// Cloning this is cheap.
3333
pub struct Frame {
34-
inner: Bytes,
35-
}
36-
37-
impl TryFrom<&[u8]> for Frame {
38-
type Error = anyhow::Error;
39-
40-
fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
41-
Ok(FrameMut::try_from(data)?.into())
42-
}
34+
data: Bytes,
4335
}
4436

4537
impl fmt::Debug for Frame {
@@ -51,129 +43,64 @@ impl fmt::Debug for Frame {
5143
}
5244
}
5345

54-
/// Owned version of a frame, on the heap
55-
pub struct FrameMut {
56-
inner: Box<FrameBorrowed>,
57-
}
58-
59-
impl TryFrom<&[u8]> for FrameMut {
60-
type Error = anyhow::Error;
61-
62-
fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
63-
anyhow::ensure!(
64-
data.len() == size_of::<FrameBorrowed>(),
65-
"invalid frame size"
66-
);
67-
// frames are relatively large (~4ko), we want to avoid allocating them on the stack and
68-
// then copying them to the heap, and instead copy them to the heap directly.
69-
let inner = unsafe {
70-
let layout = Layout::new::<FrameBorrowed>();
71-
let ptr = std::alloc::alloc(layout);
72-
ptr.copy_from(data.as_ptr(), data.len());
73-
Box::from_raw(ptr as *mut FrameBorrowed)
74-
};
75-
76-
Ok(Self { inner })
77-
}
78-
}
46+
impl Frame {
47+
/// size of a single frame
48+
pub const SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;
7949

80-
impl From<FrameMut> for Frame {
81-
fn from(value: FrameMut) -> Self {
82-
// transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of
83-
// [u8] divides the alignment of FrameBorrowed
84-
let data = unsafe {
85-
Vec::from_raw_parts(
86-
Box::into_raw(value.inner) as *mut u8,
87-
size_of::<FrameBorrowed>(),
88-
size_of::<FrameBorrowed>(),
89-
)
90-
};
91-
92-
Self {
93-
inner: Bytes::from(data),
94-
}
95-
}
96-
}
50+
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
51+
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
52+
let mut buf = BytesMut::with_capacity(Self::SIZE);
53+
buf.extend_from_slice(bytes_of(header));
54+
buf.extend_from_slice(data);
9755

98-
impl From<FrameBorrowed> for FrameMut {
99-
fn from(inner: FrameBorrowed) -> Self {
100-
Self {
101-
inner: Box::new(inner),
102-
}
56+
Self { data: buf.freeze() }
10357
}
104-
}
10558

106-
impl Frame {
107-
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
108-
FrameBorrowed::from_parts(header, data).into()
59+
pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> {
60+
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size");
61+
Ok(Self { data })
10962
}
11063

11164
pub fn bytes(&self) -> Bytes {
112-
self.inner.clone()
113-
}
114-
}
115-
116-
impl From<FrameBorrowed> for Frame {
117-
fn from(value: FrameBorrowed) -> Self {
118-
FrameMut::from(value).into()
65+
self.data.clone()
11966
}
12067
}
12168

12269
/// The borrowed version of Frame
123-
#[repr(C)]
124-
#[derive(Pod, Zeroable, Copy, Clone)]
70+
#[repr(transparent)]
12571
pub struct FrameBorrowed {
126-
header: FrameHeader,
127-
page: [u8; LIBSQL_PAGE_SIZE as usize],
72+
data: [u8],
12873
}
12974

13075
impl FrameBorrowed {
131-
/// Returns the bytes for this frame. Includes the header bytes.
132-
pub fn as_slice(&self) -> &[u8] {
133-
bytes_of(self)
134-
}
135-
136-
/// returns this frame's page data.
137-
pub fn page(&self) -> &[u8] {
138-
&self.page
76+
pub fn header(&self) -> Cow<FrameHeader> {
77+
let data = &self.data[..size_of::<FrameHeader>()];
78+
try_from_bytes(data)
79+
.map(Cow::Borrowed)
80+
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
13981
}
14082

141-
pub fn header(&self) -> &FrameHeader {
142-
&self.header
83+
/// Returns the bytes for this frame. Includes the header bytes.
84+
pub fn as_slice(&self) -> &[u8] {
85+
&self.data
14386
}
14487

145-
pub fn header_mut(&mut self) -> &mut FrameHeader {
146-
&mut self.header
88+
pub fn from_bytes(data: &[u8]) -> &Self {
89+
assert_eq!(data.len(), Frame::SIZE);
90+
// SAFETY: &FrameBorrowed is equivalent to &[u8]
91+
unsafe { transmute(data) }
14792
}
14893

149-
pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self {
150-
assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize);
151-
152-
FrameBorrowed {
153-
header: *header,
154-
page: page.try_into().unwrap(),
155-
}
94+
/// returns this frame's page data.
95+
pub fn page(&self) -> &[u8] {
96+
&self.data[size_of::<FrameHeader>()..]
15697
}
15798
}
15899

159100
impl Deref for Frame {
160101
type Target = FrameBorrowed;
161102

162103
fn deref(&self) -> &Self::Target {
163-
from_bytes(&self.inner)
164-
}
165-
}
166-
167-
impl Deref for FrameMut {
168-
type Target = FrameBorrowed;
169-
170-
fn deref(&self) -> &Self::Target {
171-
self.inner.as_ref()
172-
}
173-
}
174-
175-
impl DerefMut for FrameMut {
176-
fn deref_mut(&mut self) -> &mut Self::Target {
177-
self.inner.as_mut()
104+
FrameBorrowed::from_bytes(&self.data)
178105
}
179106
}

0 commit comments

Comments
 (0)