Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integration/java/pgdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ void run() throws Exception {
ResultSet rs = st.executeQuery("SELECT COUNT(*) as count FROM sharded");
rs.next();
assert_equals(rs.getInt("count"), 0);
this.connection.rollback();

st.execute("INSERT INTO sharded (id, value) VALUES (1, 'test1')");
st.execute("INSERT INTO sharded (id, value) VALUES (2, 'test2')");
Expand All @@ -137,6 +138,7 @@ void run() throws Exception {
rs = st.executeQuery("SELECT COUNT(*) as count FROM sharded");
rs.next();
assert_equals(rs.getInt("count"), 0);
this.connection.rollback();

st.execute("INSERT INTO sharded (id, value) VALUES (3, 'test3')");
st.execute("INSERT INTO sharded (id, value) VALUES (4, 'test4')");
Expand Down Expand Up @@ -179,6 +181,7 @@ void run() throws Exception {
ResultSet rs = countStmt.executeQuery();
rs.next();
assert_equals(rs.getInt("count"), 0);
this.connection.rollback();

// Insert records using prepared statements
insertStmt.setInt(1, 1);
Expand Down Expand Up @@ -208,6 +211,7 @@ void run() throws Exception {
rs = countStmt.executeQuery();
rs.next();
assert_equals(rs.getInt("count"), 0);
this.connection.rollback();

// Insert more records and commit
insertStmt.setInt(1, 3);
Expand All @@ -224,6 +228,7 @@ void run() throws Exception {
rs = countStmt.executeQuery();
rs.next();
assert_equals(rs.getInt("count"), 2);
this.connection.rollback();

// Verify committed records
selectStmt.setInt(1, 3);
Expand Down
7 changes: 6 additions & 1 deletion pgdog/src/backend/pool/connection/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,17 @@ impl<'a> Aggregates<'a> {
//
let mut row = DataRow::new();
for (idx, datum) in grouping.columns {
row.insert(idx, datum.encode(self.decoder.format(idx))?);
row.insert(
idx,
datum.encode(self.decoder.format(idx))?,
datum.is_null(),
);
}
for acc in accumulator {
row.insert(
acc.target.column(),
acc.datum.encode(self.decoder.format(acc.target.column()))?,
acc.datum.is_null(),
);
}
rows.push_back(row);
Expand Down
34 changes: 21 additions & 13 deletions pgdog/src/frontend/client/query_engine/end_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@ impl QueryEngine {
&mut self,
context: &mut QueryEngineContext<'_>,
rollback: bool,
extended: bool,
) -> Result<(), Error> {
let cmd = if rollback {
CommandComplete::new_rollback()
let bytes_sent = if extended {
self.extended_transaction_reply(context, false, rollback)
.await?
} else {
CommandComplete::new_commit()
let cmd = if rollback {
CommandComplete::new_rollback()
} else {
CommandComplete::new_commit()
};
let mut messages = if !context.in_transaction() {
vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?]
} else {
vec![]
};
messages.push(cmd.message()?.backend());
messages.push(ReadyForQuery::idle().message()?);

context.stream.send_many(&messages).await?
};
let mut messages = if !context.in_transaction() {
vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?]
} else {
vec![]
};
messages.push(cmd.message()?.backend());
messages.push(ReadyForQuery::idle().message()?);

let bytes_sent = context.stream.send_many(&messages).await?;
self.stats.sent(bytes_sent);
self.begin_stmt = None;
context.transaction = None; // Clear transaction state
Expand All @@ -34,6 +41,7 @@ impl QueryEngine {
context: &mut QueryEngineContext<'_>,
route: &Route,
rollback: bool,
extended: bool,
) -> Result<(), Error> {
let cluster = self.backend.cluster()?;

Expand All @@ -54,7 +62,7 @@ impl QueryEngine {
self.cleanup_backend(context);

// Tell client we finished the transaction.
self.end_not_connected(context, false).await?;
self.end_not_connected(context, false, extended).await?;
} else {
self.execute(context, route).await?;
}
Expand Down Expand Up @@ -105,7 +113,7 @@ mod tests {
let mut engine = QueryEngine::default();
// state copied from client
let mut context = QueryEngineContext::new(&mut client);
let result = engine.end_not_connected(&mut context, false).await;
let result = engine.end_not_connected(&mut context, false, false).await;
assert!(result.is_ok(), "end_transaction should succeed");

assert_eq!(
Expand Down
20 changes: 8 additions & 12 deletions pgdog/src/frontend/client/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,39 +134,35 @@ impl QueryEngine {
transaction_type,
extended,
} => {
if *extended {
// Transaction control goes to all shards.
context.cross_shard_disabled = Some(false);
self.execute(context, &route).await?;
} else {
self.start_transaction(context, query.clone(), *transaction_type)
.await?
}
self.start_transaction(context, query.clone(), *transaction_type, *extended)
.await?
}
Command::CommitTransaction { extended } => {
self.set_route = None;

if self.backend.connected() || *extended {
let extended = *extended;
let transaction_route = self.transaction_route(&route)?;
context.client_request.route = Some(transaction_route.clone());
context.cross_shard_disabled = Some(false);
self.end_connected(context, &transaction_route, false)
self.end_connected(context, &transaction_route, false, extended)
.await?;
} else {
self.end_not_connected(context, false).await?
self.end_not_connected(context, false, *extended).await?
}
}
Command::RollbackTransaction { extended } => {
self.set_route = None;

if self.backend.connected() || *extended {
let extended = *extended;
let transaction_route = self.transaction_route(&route)?;
context.client_request.route = Some(transaction_route.clone());
context.cross_shard_disabled = Some(false);
self.end_connected(context, &transaction_route, true)
self.end_connected(context, &transaction_route, true, extended)
.await?;
} else {
self.end_not_connected(context, true).await?
self.end_not_connected(context, true, *extended).await?
}
}
Command::Query(_) => self.execute(context, &route).await?,
Expand Down
55 changes: 47 additions & 8 deletions pgdog/src/frontend/client/query_engine/start_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
frontend::client::TransactionType,
net::{CommandComplete, Protocol, ReadyForQuery},
net::{BindComplete, CommandComplete, NoticeResponse, ParseComplete, Protocol, ReadyForQuery},
};

use super::*;
Expand All @@ -12,20 +12,59 @@ impl QueryEngine {
context: &mut QueryEngineContext<'_>,
begin: BufferedQuery,
transaction_type: TransactionType,
extended: bool,
) -> Result<(), Error> {
context.transaction = Some(transaction_type);

let bytes_sent = context
.stream
.send_many(&[
CommandComplete::new_begin().message()?.backend(),
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
])
.await?;
let bytes_sent = if extended {
self.extended_transaction_reply(context, true, false)
.await?
} else {
context
.stream
.send_many(&[
CommandComplete::new_begin().message()?.backend(),
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
])
.await?
};

self.stats.sent(bytes_sent);
self.begin_stmt = Some(begin);

Ok(())
}

pub(super) async fn extended_transaction_reply(
&self,
context: &mut QueryEngineContext<'_>,
in_transaction: bool,
rollback: bool,
) -> Result<usize, Error> {
let mut reply = vec![];
for message in context.client_request.iter() {
match message.code() {
'P' => reply.push(ParseComplete.message()?),
'B' => reply.push(BindComplete.message()?),
'D' | 'H' => (),
'E' => reply.push(if in_transaction {
CommandComplete::new_begin().message()?.backend()
} else if !rollback {
CommandComplete::new_commit().message()?.backend()
} else {
CommandComplete::new_rollback().message()?.backend()
}),
'S' => {
if rollback && !context.in_transaction() {
reply
.push(NoticeResponse::from(ErrorResponse::no_transaction()).message()?);
}
reply.push(ReadyForQuery::in_transaction(in_transaction).message()?)
}
c => return Err(Error::UnexpectedMessage(c)),
}
}

Ok(context.stream.send_many(&reply).await?)
}
}
9 changes: 9 additions & 0 deletions pgdog/src/net/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ pub enum Error {
#[error("unexpected payload")]
UnexpectedPayload,

#[error("data type not supported for encoding")]
UnsupportedDataTypeForEncoding,

#[error("CommandComplete contains no row counts")]
CommandCompleteNoRows,

#[error("unexpected replication meta message: {0}")]
UnexpectedReplicationMetaMessage(char),

#[error("unsupported authentication: {0}")]
UnsupportedAuthentication(i32),

Expand Down
27 changes: 27 additions & 0 deletions pgdog/src/net/messages/bind_complete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//! BindComplete (B) message.
use super::code;
use super::prelude::*;

#[derive(Debug, Clone)]
pub struct BindComplete;

impl FromBytes for BindComplete {
fn from_bytes(mut bytes: Bytes) -> Result<Self, Error> {
code!(bytes, '2');
let _len = bytes.get_i32();
Ok(Self)
}
}

impl ToBytes for BindComplete {
fn to_bytes(&self) -> Result<Bytes, Error> {
let payload = Payload::named(self.code());
Ok(payload.freeze())
}
}

impl Protocol for BindComplete {
fn code(&self) -> char {
'2'
}
}
2 changes: 1 addition & 1 deletion pgdog/src/net/messages/command_complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl CommandComplete {
.command()
.split(" ")
.last()
.ok_or(Error::UnexpectedPayload)?
.ok_or(Error::CommandCompleteNoRows)?
.parse()
.ok())
}
Expand Down
26 changes: 19 additions & 7 deletions pgdog/src/net/messages/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,18 @@ impl DataRow {

/// Insert column at index. If row is smaller than index,
/// columns will be prefilled with NULLs.
pub fn insert(&mut self, index: usize, value: impl ToDataRowColumn) -> &mut Self {
pub fn insert(
&mut self,
index: usize,
value: impl ToDataRowColumn,
is_null: bool,
) -> &mut Self {
while self.columns.len() <= index {
self.columns.push(Data::null());
}
self.columns[index] = value.to_data_row_column();
let mut data = value.to_data_row_column();
data.is_null = is_null;
self.columns[index] = data;
self
}

Expand Down Expand Up @@ -202,10 +209,15 @@ impl DataRow {
decoder: &'a Decoder,
) -> Result<Option<Column<'a>>, Error> {
if let Some(field) = decoder.rd().field(index) {
if let Some(data) = self.column(index) {
if let Some(data) = self.columns.get(index) {
return Ok(Some(Column {
name: field.name.as_str(),
value: Datum::new(&data, field.data_type(), decoder.format(index))?,
value: Datum::new(
&data.data,
field.data_type(),
decoder.format(index),
data.is_null,
)?,
}));
}
}
Expand All @@ -218,10 +230,10 @@ impl DataRow {
let mut row = vec![];

for (index, field) in rd.fields.iter().enumerate() {
if let Some(data) = self.column(index) {
if let Some(data) = self.columns.get(index) {
row.push(Column {
name: field.name.as_str(),
value: Datum::new(&data, field.data_type(), field.format())?,
value: Datum::new(&data.data, field.data_type(), field.format(), data.is_null)?,
});
}
}
Expand Down Expand Up @@ -302,7 +314,7 @@ mod test {
#[test]
fn test_insert() {
let mut dr = DataRow::new();
dr.insert(4, "test");
dr.insert(4, "test", false);
assert_eq!(dr.columns.len(), 5);
assert_eq!(dr.get::<String>(4, Format::Text).unwrap(), "test");
assert_eq!(dr.get::<String>(0, Format::Text).unwrap(), "");
Expand Down
12 changes: 9 additions & 3 deletions pgdog/src/net/messages/data_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,13 @@ impl Add for Datum {
}

impl Datum {
pub fn new(bytes: &[u8], data_type: DataType, encoding: Format) -> Result<Self, Error> {
if bytes.is_empty() {
pub fn new(
bytes: &[u8],
data_type: DataType,
encoding: Format,
null: bool,
) -> Result<Self, Error> {
if null {
return Ok(Datum::Null);
}

Expand Down Expand Up @@ -132,7 +137,8 @@ impl Datum {
Datum::Uuid(uuid) => uuid.encode(format),
Datum::Text(s) => s.encode(format),
Datum::Boolean(b) => b.encode(format),
_ => Err(Error::UnexpectedPayload),
Datum::Null => Ok(Bytes::new()),
_ => Err(Error::UnsupportedDataTypeForEncoding),
}
}
}
Expand Down
Loading
Loading