Skip to content

Commit

Permalink
server: Fix result set serialization in wire protocol
Browse files Browse the repository at this point in the history
Fix data row serialization for the PostgreSQL wire protocol:

- Send column values as part of a data row instead a data row per
  column.
- Fix column name and type information in row description.

Refs #87
  • Loading branch information
penberg authored and mergify[bot] committed Dec 1, 2022
1 parent 9b96301 commit d3107d2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 20 deletions.
7 changes: 7 additions & 0 deletions libsql-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion libsql-server/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ bytes = "1.2.1"
clap = { version = "4.0.23", features = [ "derive" ] }
crossbeam = "0.8.2"
futures = "0.3.25"
hex = "0.4.3"
parking_lot = "0.12.1"
pgwire = "0.5.0"
pin-project-lite = "0.2.9"
postgres-protocol = "0.6.4"
rusqlite = { version = "0.28.0", features = [ "buildtime_bindgen" ] }
rusqlite = { version = "0.28.0", features = [ "buildtime_bindgen", "column_decltype" ] }
smallvec = "1.10.0"
sqlparser = "0.27.0"
tokio = { version = "1.21.2", features = ["full"] }
Expand Down
20 changes: 9 additions & 11 deletions libsql-server/server/src/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
use anyhow::Result;
use crossbeam::channel::Sender;
use futures::stream::FuturesUnordered;
use rusqlite::types::Value;
use tokio::task::JoinHandle;

use crate::coordinator::query::{ErrorCode, QueryError, QueryResponse, QueryResult};
Expand Down Expand Up @@ -81,23 +82,20 @@ impl Worker {
fn perform_oneshot(&self, stmts: &Statements) -> QueryResult {
let mut result = vec![];
let mut prepared = self.db_conn.prepare(&stmts.stmts)?;
let col_names: Vec<String> = prepared
.column_names()
let columns: Vec<(String, Option<String>)> = prepared
.columns()
.iter()
.map(|s| s.to_string())
.map(|col| (col.name().into(), col.decl_type().map(str::to_lowercase)))
.collect();
//FIXME(sarna): the code below was ported as-is,
// but once we switch to gathering whole rows in the result vector
// instead of single values, Statement::query_map is a more convenient
// interface (it also implements Iter).
let mut rows = prepared.query([])?;
while let Some(row) = rows.next()? {
for (i, name) in col_names.iter().enumerate() {
result.push(format!("{} = {}", name, row.get::<usize, String>(i)?));
let mut row_ = vec![];
for (i, _) in columns.iter().enumerate() {
row_.push(row.get::<usize, Value>(i)?);
}
result.push(row_);
}

Ok(QueryResponse::ResultSet(result))
Ok(QueryResponse::ResultSet(columns, result))
}

fn handle_transaction(&self, job: Job) {
Expand Down
3 changes: 2 additions & 1 deletion libsql-server/server/src/coordinator/query.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::coordinator::scheduler::ClientId;
use rusqlite::types::Value;

pub type QueryResult = Result<QueryResponse, QueryError>;

#[derive(Debug)]
pub enum QueryResponse {
Ack,
ResultSet(Vec<String>),
ResultSet(Vec<(String, Option<String>)>, Vec<Vec<Value>>),
}

#[derive(Debug)]
Expand Down
46 changes: 41 additions & 5 deletions libsql-server/server/src/postgres/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pgwire::messages::response::{ReadyForQuery, READY_STATUS_IDLE};
use pgwire::messages::startup::SslRequest;
use pgwire::messages::PgWireBackendMessage;
use pgwire::tokio::PgWireMessageServerCodec;
use rusqlite::types::Value;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::Framed;

Expand All @@ -25,14 +26,49 @@ impl SimpleQueryHandler for SimpleHandler {
// TODO: find a way to prevent unecessary clones.
match &self.0 {
Ok(resp) => match resp {
QueryResponse::ResultSet(rows) => {
let data_row_stream = stream::iter(rows.clone().into_iter()).map(|r| {
let mut encoder = TextDataRowEncoder::new(1);
encoder.append_field(Some(&r))?;
QueryResponse::ResultSet(col_names, rows) => {
let nr_cols = col_names.len();
let field_infos = col_names
.iter()
.map(move |(name, ty)| {
let ty = match ty {
Some(ty) => match ty.as_str() {
"integer" => Type::INT8,
"real" => Type::NUMERIC,
"text" => Type::VARCHAR,
"blob" => Type::BYTEA,
_ => Type::UNKNOWN,
},
None => Type::UNKNOWN,
};
FieldInfo::new(name.into(), None, None, ty)
})
.collect();
let data_row_stream = stream::iter(rows.clone().into_iter()).map(move |row| {
let mut encoder = TextDataRowEncoder::new(nr_cols);
for col in &row {
match col {
Value::Null => {
encoder.append_field(None::<&u8>)?;
}
Value::Integer(i) => {
encoder.append_field(Some(&i))?;
}
Value::Real(f) => {
encoder.append_field(Some(&f))?;
}
Value::Text(t) => {
encoder.append_field(Some(&t))?;
}
Value::Blob(b) => {
encoder.append_field(Some(&hex::encode(b)))?;
}
}
}
encoder.finish()
});
return Ok(vec![Response::Query(text_query_response(
vec![FieldInfo::new("row".into(), None, None, Type::VARCHAR)],
field_infos,
data_row_stream,
))]);
}
Expand Down
8 changes: 6 additions & 2 deletions libsql-server/testing/server/ruby/postgresql_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@

it "queries tables" do
conn = PG.connect(host: "127.0.0.1", port: 5000)
conn.exec("INSERT INTO users VALUES ('me', 'my_pass')")
conn.exec("CREATE TABLE IF NOT EXISTS users (username TEXT, pass TEXT)")
conn.exec("DELETE FROM users")
conn.exec("INSERT INTO users VALUES ('me', 'my_pass')")
conn.exec("SELECT * FROM users") do |results|
puts results
results.each do |row|
expect(row["username"]).to eq("me")
expect(row["pass"]).to eq("my_pass")
end
end
end
end

0 comments on commit d3107d2

Please sign in to comment.