Skip to content

Commit e462f3e

Browse files
authored
Fix null encoding in aggregates. Fix 2pc with extended begin stmts (#447)
* Fix null encoding in aggregates. Fix 2pc with extended begin stmts * Aha!
1 parent febdfb9 commit e462f3e

File tree

12 files changed

+156
-47
lines changed

12 files changed

+156
-47
lines changed

integration/java/pgdog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ void run() throws Exception {
124124
ResultSet rs = st.executeQuery("SELECT COUNT(*) as count FROM sharded");
125125
rs.next();
126126
assert_equals(rs.getInt("count"), 0);
127+
this.connection.rollback();
127128

128129
st.execute("INSERT INTO sharded (id, value) VALUES (1, 'test1')");
129130
st.execute("INSERT INTO sharded (id, value) VALUES (2, 'test2')");
@@ -137,6 +138,7 @@ void run() throws Exception {
137138
rs = st.executeQuery("SELECT COUNT(*) as count FROM sharded");
138139
rs.next();
139140
assert_equals(rs.getInt("count"), 0);
141+
this.connection.rollback();
140142

141143
st.execute("INSERT INTO sharded (id, value) VALUES (3, 'test3')");
142144
st.execute("INSERT INTO sharded (id, value) VALUES (4, 'test4')");
@@ -179,6 +181,7 @@ void run() throws Exception {
179181
ResultSet rs = countStmt.executeQuery();
180182
rs.next();
181183
assert_equals(rs.getInt("count"), 0);
184+
this.connection.rollback();
182185

183186
// Insert records using prepared statements
184187
insertStmt.setInt(1, 1);
@@ -208,6 +211,7 @@ void run() throws Exception {
208211
rs = countStmt.executeQuery();
209212
rs.next();
210213
assert_equals(rs.getInt("count"), 0);
214+
this.connection.rollback();
211215

212216
// Insert more records and commit
213217
insertStmt.setInt(1, 3);
@@ -224,6 +228,7 @@ void run() throws Exception {
224228
rs = countStmt.executeQuery();
225229
rs.next();
226230
assert_equals(rs.getInt("count"), 2);
231+
this.connection.rollback();
227232

228233
// Verify committed records
229234
selectStmt.setInt(1, 3);

pgdog/src/backend/pool/connection/aggregate.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,17 @@ impl<'a> Aggregates<'a> {
147147
//
148148
let mut row = DataRow::new();
149149
for (idx, datum) in grouping.columns {
150-
row.insert(idx, datum.encode(self.decoder.format(idx))?);
150+
row.insert(
151+
idx,
152+
datum.encode(self.decoder.format(idx))?,
153+
datum.is_null(),
154+
);
151155
}
152156
for acc in accumulator {
153157
row.insert(
154158
acc.target.column(),
155159
acc.datum.encode(self.decoder.format(acc.target.column()))?,
160+
acc.datum.is_null(),
156161
);
157162
}
158163
rows.push_back(row);

pgdog/src/frontend/client/query_engine/end_transaction.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,28 @@ impl QueryEngine {
77
&mut self,
88
context: &mut QueryEngineContext<'_>,
99
rollback: bool,
10+
extended: bool,
1011
) -> Result<(), Error> {
11-
let cmd = if rollback {
12-
CommandComplete::new_rollback()
12+
let bytes_sent = if extended {
13+
self.extended_transaction_reply(context, false, rollback)
14+
.await?
1315
} else {
14-
CommandComplete::new_commit()
16+
let cmd = if rollback {
17+
CommandComplete::new_rollback()
18+
} else {
19+
CommandComplete::new_commit()
20+
};
21+
let mut messages = if !context.in_transaction() {
22+
vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?]
23+
} else {
24+
vec![]
25+
};
26+
messages.push(cmd.message()?.backend());
27+
messages.push(ReadyForQuery::idle().message()?);
28+
29+
context.stream.send_many(&messages).await?
1530
};
16-
let mut messages = if !context.in_transaction() {
17-
vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?]
18-
} else {
19-
vec![]
20-
};
21-
messages.push(cmd.message()?.backend());
22-
messages.push(ReadyForQuery::idle().message()?);
2331

24-
let bytes_sent = context.stream.send_many(&messages).await?;
2532
self.stats.sent(bytes_sent);
2633
self.begin_stmt = None;
2734
context.transaction = None; // Clear transaction state
@@ -34,6 +41,7 @@ impl QueryEngine {
3441
context: &mut QueryEngineContext<'_>,
3542
route: &Route,
3643
rollback: bool,
44+
extended: bool,
3745
) -> Result<(), Error> {
3846
let cluster = self.backend.cluster()?;
3947

@@ -54,7 +62,7 @@ impl QueryEngine {
5462
self.cleanup_backend(context);
5563

5664
// Tell client we finished the transaction.
57-
self.end_not_connected(context, false).await?;
65+
self.end_not_connected(context, false, extended).await?;
5866
} else {
5967
self.execute(context, route).await?;
6068
}
@@ -105,7 +113,7 @@ mod tests {
105113
let mut engine = QueryEngine::default();
106114
// state copied from client
107115
let mut context = QueryEngineContext::new(&mut client);
108-
let result = engine.end_not_connected(&mut context, false).await;
116+
let result = engine.end_not_connected(&mut context, false, false).await;
109117
assert!(result.is_ok(), "end_transaction should succeed");
110118

111119
assert_eq!(

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,39 +134,35 @@ impl QueryEngine {
134134
transaction_type,
135135
extended,
136136
} => {
137-
if *extended {
138-
// Transaction control goes to all shards.
139-
context.cross_shard_disabled = Some(false);
140-
self.execute(context, &route).await?;
141-
} else {
142-
self.start_transaction(context, query.clone(), *transaction_type)
143-
.await?
144-
}
137+
self.start_transaction(context, query.clone(), *transaction_type, *extended)
138+
.await?
145139
}
146140
Command::CommitTransaction { extended } => {
147141
self.set_route = None;
148142

149143
if self.backend.connected() || *extended {
144+
let extended = *extended;
150145
let transaction_route = self.transaction_route(&route)?;
151146
context.client_request.route = Some(transaction_route.clone());
152147
context.cross_shard_disabled = Some(false);
153-
self.end_connected(context, &transaction_route, false)
148+
self.end_connected(context, &transaction_route, false, extended)
154149
.await?;
155150
} else {
156-
self.end_not_connected(context, false).await?
151+
self.end_not_connected(context, false, *extended).await?
157152
}
158153
}
159154
Command::RollbackTransaction { extended } => {
160155
self.set_route = None;
161156

162157
if self.backend.connected() || *extended {
158+
let extended = *extended;
163159
let transaction_route = self.transaction_route(&route)?;
164160
context.client_request.route = Some(transaction_route.clone());
165161
context.cross_shard_disabled = Some(false);
166-
self.end_connected(context, &transaction_route, true)
162+
self.end_connected(context, &transaction_route, true, extended)
167163
.await?;
168164
} else {
169-
self.end_not_connected(context, true).await?
165+
self.end_not_connected(context, true, *extended).await?
170166
}
171167
}
172168
Command::Query(_) => self.execute(context, &route).await?,
Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
frontend::client::TransactionType,
3-
net::{CommandComplete, Protocol, ReadyForQuery},
3+
net::{BindComplete, CommandComplete, NoticeResponse, ParseComplete, Protocol, ReadyForQuery},
44
};
55

66
use super::*;
@@ -12,20 +12,59 @@ impl QueryEngine {
1212
context: &mut QueryEngineContext<'_>,
1313
begin: BufferedQuery,
1414
transaction_type: TransactionType,
15+
extended: bool,
1516
) -> Result<(), Error> {
1617
context.transaction = Some(transaction_type);
1718

18-
let bytes_sent = context
19-
.stream
20-
.send_many(&[
21-
CommandComplete::new_begin().message()?.backend(),
22-
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
23-
])
24-
.await?;
19+
let bytes_sent = if extended {
20+
self.extended_transaction_reply(context, true, false)
21+
.await?
22+
} else {
23+
context
24+
.stream
25+
.send_many(&[
26+
CommandComplete::new_begin().message()?.backend(),
27+
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
28+
])
29+
.await?
30+
};
2531

2632
self.stats.sent(bytes_sent);
2733
self.begin_stmt = Some(begin);
2834

2935
Ok(())
3036
}
37+
38+
pub(super) async fn extended_transaction_reply(
39+
&self,
40+
context: &mut QueryEngineContext<'_>,
41+
in_transaction: bool,
42+
rollback: bool,
43+
) -> Result<usize, Error> {
44+
let mut reply = vec![];
45+
for message in context.client_request.iter() {
46+
match message.code() {
47+
'P' => reply.push(ParseComplete.message()?),
48+
'B' => reply.push(BindComplete.message()?),
49+
'D' | 'H' => (),
50+
'E' => reply.push(if in_transaction {
51+
CommandComplete::new_begin().message()?.backend()
52+
} else if !rollback {
53+
CommandComplete::new_commit().message()?.backend()
54+
} else {
55+
CommandComplete::new_rollback().message()?.backend()
56+
}),
57+
'S' => {
58+
if rollback && !context.in_transaction() {
59+
reply
60+
.push(NoticeResponse::from(ErrorResponse::no_transaction()).message()?);
61+
}
62+
reply.push(ReadyForQuery::in_transaction(in_transaction).message()?)
63+
}
64+
c => return Err(Error::UnexpectedMessage(c)),
65+
}
66+
}
67+
68+
Ok(context.stream.send_many(&reply).await?)
69+
}
3170
}

pgdog/src/net/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ pub enum Error {
2525
#[error("unexpected payload")]
2626
UnexpectedPayload,
2727

28+
#[error("data type not supported for encoding")]
29+
UnsupportedDataTypeForEncoding,
30+
31+
#[error("CommandComplete contains no row counts")]
32+
CommandCompleteNoRows,
33+
34+
#[error("unexpected replication meta message: {0}")]
35+
UnexpectedReplicationMetaMessage(char),
36+
2837
#[error("unsupported authentication: {0}")]
2938
UnsupportedAuthentication(i32),
3039

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
//! BindComplete (B) message.
2+
use super::code;
3+
use super::prelude::*;
4+
5+
#[derive(Debug, Clone)]
6+
pub struct BindComplete;
7+
8+
impl FromBytes for BindComplete {
9+
fn from_bytes(mut bytes: Bytes) -> Result<Self, Error> {
10+
code!(bytes, '2');
11+
let _len = bytes.get_i32();
12+
Ok(Self)
13+
}
14+
}
15+
16+
impl ToBytes for BindComplete {
17+
fn to_bytes(&self) -> Result<Bytes, Error> {
18+
let payload = Payload::named(self.code());
19+
Ok(payload.freeze())
20+
}
21+
}
22+
23+
impl Protocol for BindComplete {
24+
fn code(&self) -> char {
25+
'2'
26+
}
27+
}

pgdog/src/net/messages/command_complete.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl CommandComplete {
2828
.command()
2929
.split(" ")
3030
.last()
31-
.ok_or(Error::UnexpectedPayload)?
31+
.ok_or(Error::CommandCompleteNoRows)?
3232
.parse()
3333
.ok())
3434
}

pgdog/src/net/messages/data_row.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,18 @@ impl DataRow {
150150

151151
/// Insert column at index. If row is smaller than index,
152152
/// columns will be prefilled with NULLs.
153-
pub fn insert(&mut self, index: usize, value: impl ToDataRowColumn) -> &mut Self {
153+
pub fn insert(
154+
&mut self,
155+
index: usize,
156+
value: impl ToDataRowColumn,
157+
is_null: bool,
158+
) -> &mut Self {
154159
while self.columns.len() <= index {
155160
self.columns.push(Data::null());
156161
}
157-
self.columns[index] = value.to_data_row_column();
162+
let mut data = value.to_data_row_column();
163+
data.is_null = is_null;
164+
self.columns[index] = data;
158165
self
159166
}
160167

@@ -202,10 +209,15 @@ impl DataRow {
202209
decoder: &'a Decoder,
203210
) -> Result<Option<Column<'a>>, Error> {
204211
if let Some(field) = decoder.rd().field(index) {
205-
if let Some(data) = self.column(index) {
212+
if let Some(data) = self.columns.get(index) {
206213
return Ok(Some(Column {
207214
name: field.name.as_str(),
208-
value: Datum::new(&data, field.data_type(), decoder.format(index))?,
215+
value: Datum::new(
216+
&data.data,
217+
field.data_type(),
218+
decoder.format(index),
219+
data.is_null,
220+
)?,
209221
}));
210222
}
211223
}
@@ -218,10 +230,10 @@ impl DataRow {
218230
let mut row = vec![];
219231

220232
for (index, field) in rd.fields.iter().enumerate() {
221-
if let Some(data) = self.column(index) {
233+
if let Some(data) = self.columns.get(index) {
222234
row.push(Column {
223235
name: field.name.as_str(),
224-
value: Datum::new(&data, field.data_type(), field.format())?,
236+
value: Datum::new(&data.data, field.data_type(), field.format(), data.is_null)?,
225237
});
226238
}
227239
}
@@ -302,7 +314,7 @@ mod test {
302314
#[test]
303315
fn test_insert() {
304316
let mut dr = DataRow::new();
305-
dr.insert(4, "test");
317+
dr.insert(4, "test", false);
306318
assert_eq!(dr.columns.len(), 5);
307319
assert_eq!(dr.get::<String>(4, Format::Text).unwrap(), "test");
308320
assert_eq!(dr.get::<String>(0, Format::Text).unwrap(), "");

pgdog/src/net/messages/data_types/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,13 @@ impl Add for Datum {
9999
}
100100

101101
impl Datum {
102-
pub fn new(bytes: &[u8], data_type: DataType, encoding: Format) -> Result<Self, Error> {
103-
if bytes.is_empty() {
102+
pub fn new(
103+
bytes: &[u8],
104+
data_type: DataType,
105+
encoding: Format,
106+
null: bool,
107+
) -> Result<Self, Error> {
108+
if null {
104109
return Ok(Datum::Null);
105110
}
106111

@@ -132,7 +137,8 @@ impl Datum {
132137
Datum::Uuid(uuid) => uuid.encode(format),
133138
Datum::Text(s) => s.encode(format),
134139
Datum::Boolean(b) => b.encode(format),
135-
_ => Err(Error::UnexpectedPayload),
140+
Datum::Null => Ok(Bytes::new()),
141+
_ => Err(Error::UnsupportedDataTypeForEncoding),
136142
}
137143
}
138144
}

0 commit comments

Comments
 (0)